feat(analytics): add Spark batch analytics module (#70)
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com> Reviewed-on: #70
This commit was merged in pull request #70.
This commit is contained in:
+7
@@ -0,0 +1,7 @@
|
||||
package de.nowchess.bot.resource
|
||||
|
||||
case class JoinTournamentRequest(
|
||||
tournamentId: String,
|
||||
difficulty: String,
|
||||
serverUrl: Option[String],
|
||||
)
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package de.nowchess.bot.resource
|
||||
|
||||
case class JoinTournamentResponse(
|
||||
botId: String,
|
||||
difficulty: String,
|
||||
status: String,
|
||||
)
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
package de.nowchess.bot.resource
|
||||
|
||||
import de.nowchess.bot.service.TournamentBotGamePlayer
|
||||
import jakarta.annotation.security.RolesAllowed
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.*
|
||||
import jakarta.ws.rs.core.{MediaType, Response}
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@Path("/api/bots/official")
|
||||
@ApplicationScoped
|
||||
@RolesAllowed(Array("**"))
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
@Consumes(Array(MediaType.APPLICATION_JSON))
|
||||
class TournamentJoinResource:
|
||||
|
||||
private val log = Logger.getLogger(classOf[TournamentJoinResource])
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var player: TournamentBotGamePlayer = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
@POST
|
||||
@Path("/join-tournament")
|
||||
def joinTournament(req: JoinTournamentRequest): Response =
|
||||
val serverUrl = req.serverUrl.filter(_.nonEmpty).getOrElse(player.defaultServerUrl)
|
||||
val difficulty = if req.difficulty.nonEmpty then req.difficulty else "medium"
|
||||
log.infof(
|
||||
"Official bot join requested — tournament=%s difficulty=%s server=%s",
|
||||
req.tournamentId,
|
||||
difficulty,
|
||||
serverUrl,
|
||||
)
|
||||
player.joinTournament(req.tournamentId, difficulty, serverUrl) match
|
||||
case Right(botId) =>
|
||||
val resp = JoinTournamentResponse(botId, difficulty, "joining")
|
||||
Response.ok(resp).build()
|
||||
case Left(err) =>
|
||||
Response
|
||||
.status(Response.Status.BAD_GATEWAY)
|
||||
.entity(s"""{"error":"$err"}""")
|
||||
.build()
|
||||
+70
-46
@@ -38,6 +38,9 @@ class TournamentBotGamePlayer:
|
||||
@volatile private var running = true
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
val defaultServerUrl: String =
|
||||
System.getenv().asScala.getOrElse("TOURNAMENT_SERVER_URL", "http://localhost:8089")
|
||||
|
||||
@PostConstruct
|
||||
def initialize(): Unit =
|
||||
config match
|
||||
@@ -45,9 +48,42 @@ class TournamentBotGamePlayer:
|
||||
log.info("Tournament bot disabled — set TOURNAMENT_ID and TOURNAMENT_BOT_TOKEN to enable")
|
||||
case Some(cfg) =>
|
||||
log.infof("Tournament bot enabled — server=%s tournament=%s bot=%s", cfg.serverUrl, cfg.tournamentId, cfg.botId)
|
||||
val thread = new Thread(() => connect(cfg), s"TournamentBot-${cfg.tournamentId}")
|
||||
thread.setDaemon(true)
|
||||
thread.start()
|
||||
startAsync(cfg)
|
||||
|
||||
def joinTournament(tournamentId: String, difficulty: String, serverUrl: String): Either[String, String] =
|
||||
registerBot(serverUrl, difficulty) match
|
||||
case None => Left("Failed to register bot with tournament server")
|
||||
case Some((botId, token)) =>
|
||||
val cfg = TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty)
|
||||
if join(cfg) then
|
||||
startAsync(cfg)
|
||||
Right(botId)
|
||||
else Left("Failed to join tournament")
|
||||
|
||||
private def startAsync(cfg: TournamentBotConfig): Unit =
|
||||
val thread = new Thread(() => streamLoop(cfg), s"TournamentBot-${cfg.tournamentId}")
|
||||
thread.setDaemon(true)
|
||||
thread.start()
|
||||
|
||||
private def registerBot(serverUrl: String, difficulty: String): Option[(String, String)] =
|
||||
Try {
|
||||
val name = s"NowChess ${difficulty.capitalize}"
|
||||
val body = s"""{"name":"$name","isBot":true}"""
|
||||
val response = client
|
||||
.target(serverUrl)
|
||||
.path("api")
|
||||
.path("auth")
|
||||
.path("register")
|
||||
.request(MediaType.APPLICATION_JSON)
|
||||
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
|
||||
if response.getStatus == 201 then
|
||||
val node = objectMapper.readTree(response.readEntity(classOf[String]))
|
||||
val id = node.path("id").asText()
|
||||
val token = node.path("token").asText()
|
||||
response.close()
|
||||
if id.nonEmpty && token.nonEmpty then Some((id, token)) else None
|
||||
else { log.warnf("Bot registration returned status %d", response.getStatus); response.close(); None }
|
||||
}.getOrElse(None)
|
||||
|
||||
@PreDestroy
|
||||
def cleanup(): Unit =
|
||||
@@ -56,12 +92,11 @@ class TournamentBotGamePlayer:
|
||||
Try(client.close())
|
||||
log.info("Tournament bot stopped")
|
||||
|
||||
private def connect(cfg: TournamentBotConfig): Unit =
|
||||
if join(cfg) then
|
||||
while running do
|
||||
Try(streamEvents(cfg)) match
|
||||
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
|
||||
case Success(_) => sleep(2000)
|
||||
private def streamLoop(cfg: TournamentBotConfig): Unit =
|
||||
while running do
|
||||
Try(streamEvents(cfg)) match
|
||||
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
|
||||
case Success(_) => sleep(2000)
|
||||
|
||||
private def join(cfg: TournamentBotConfig): Boolean =
|
||||
Try {
|
||||
@@ -86,41 +121,23 @@ class TournamentBotGamePlayer:
|
||||
log.infof("Listening to tournament %s event stream", cfg.tournamentId)
|
||||
forEachLine(response.readEntity(classOf[InputStream])): line =>
|
||||
parse(line).foreach: node =>
|
||||
if node.path("type").asText() == "gameStart" then onGameStart(cfg, node.path("gameId").asText())
|
||||
if node.path("type").asText() == "gameStart" then
|
||||
onGameStart(cfg, node.path("gameId").asText(), node.path("color").asText())
|
||||
|
||||
private def onGameStart(cfg: TournamentBotConfig, gameId: String): Unit =
|
||||
if gameId.nonEmpty && activeGames.add(gameId) then
|
||||
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId) })
|
||||
private def onGameStart(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
|
||||
if gameId.nonEmpty && color.nonEmpty && activeGames.add(gameId) then
|
||||
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId, color) })
|
||||
()
|
||||
|
||||
private def playGame(cfg: TournamentBotConfig, gameId: String): Unit =
|
||||
private def playGame(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
|
||||
Try {
|
||||
colorFor(cfg, gameId) match
|
||||
case None =>
|
||||
log.debugf("Game %s is not ours — ignoring", gameId)
|
||||
activeGames.remove(gameId)
|
||||
case Some(color) =>
|
||||
log.infof("Playing game %s as %s", gameId, color)
|
||||
val stream = openGameStream(cfg, gameId)
|
||||
maybeMoveFromCurrentState(cfg, gameId, color)
|
||||
stream.foreach(consumeGameStream(cfg, gameId, color, _))
|
||||
activeGames.remove(gameId)
|
||||
log.infof("Playing game %s as %s", gameId, color)
|
||||
openGameStream(cfg, gameId).foreach(consumeGameStream(cfg, gameId, color, _))
|
||||
activeGames.remove(gameId)
|
||||
} match
|
||||
case Failure(ex) => log.errorf(ex, "Game %s crashed", gameId); activeGames.remove(gameId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def colorFor(cfg: TournamentBotConfig, gameId: String): Option[String] =
|
||||
fetchGame(cfg, gameId).flatMap: game =>
|
||||
val white = game.path("white").path("id").asText()
|
||||
val black = game.path("black").path("id").asText()
|
||||
if white == cfg.botId then Some("white")
|
||||
else if black == cfg.botId then Some("black")
|
||||
else None
|
||||
|
||||
private def maybeMoveFromCurrentState(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
|
||||
fetchGame(cfg, gameId).foreach: game =>
|
||||
maybeMove(cfg, gameId, color, game.path("turn").asText(), game.path("status").asText(), game.path("fen").asText())
|
||||
|
||||
private def consumeGameStream(cfg: TournamentBotConfig, gameId: String, color: String, stream: InputStream): Unit =
|
||||
val reader = new BufferedReader(new InputStreamReader(stream))
|
||||
// scalafix:off DisableSyntax.var
|
||||
@@ -134,10 +151,25 @@ class TournamentBotGamePlayer:
|
||||
.foreach { line =>
|
||||
parse(line).foreach: node =>
|
||||
node.path("type").asText() match
|
||||
case "gameState" =>
|
||||
maybeMove(
|
||||
cfg,
|
||||
gameId,
|
||||
color,
|
||||
node.path("turn").asText(),
|
||||
node.path("status").asText(),
|
||||
node.path("fen").asText(),
|
||||
)
|
||||
case "move" =>
|
||||
maybeMove(cfg, gameId, color, node.path("turn").asText(), "ongoing", node.path("fen").asText())
|
||||
case "gameEnd" => log.infof("Game %s ended — status=%s", gameId, node.path("status").asText()); done = true
|
||||
case _ => ()
|
||||
case "gameEnd" =>
|
||||
log.infof(
|
||||
"Game %s ended — status=%s winner=%s",
|
||||
gameId,
|
||||
node.path("status").asText(),
|
||||
node.path("winner").asText(),
|
||||
); done = true
|
||||
case _ => ()
|
||||
}
|
||||
|
||||
private def maybeMove(
|
||||
@@ -169,14 +201,6 @@ class TournamentBotGamePlayer:
|
||||
case Failure(ex) => log.errorf(ex, "Error submitting move %s in game %s", uci, gameId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def fetchGame(cfg: TournamentBotConfig, gameId: String): Option[JsonNode] =
|
||||
Try {
|
||||
val response = target(cfg).path("game").path(gameId).request(MediaType.APPLICATION_JSON).get()
|
||||
val node = if response.getStatus == 200 then Some(response.readEntity(classOf[JsonNode])) else None
|
||||
response.close()
|
||||
node
|
||||
}.getOrElse(None)
|
||||
|
||||
private def openGameStream(cfg: TournamentBotConfig, gameId: String): Option[InputStream] =
|
||||
Try {
|
||||
val response = authed(cfg, target(cfg).path("game").path(gameId).path("stream"))
|
||||
|
||||
Reference in New Issue
Block a user