From 3b42aea41610c860defd93008c4faa1462de3c2b Mon Sep 17 00:00:00 2001 From: "Lala, Shahd" Date: Thu, 4 Jun 2026 18:13:34 +0000 Subject: [PATCH] feat: local tour WORKS bots play --- .../account/service/AccountService.scala | 2 + .../redis/GameRedisSubscriberManager.scala | 17 +- .../bot/service/OfficialBotService.scala | 207 ++++++++++------- .../bot/service/TournamentBotConfig.scala | 35 +++ .../bot/service/TournamentBotGamePlayer.scala | 219 ++++++++++++++++++ .../redis/GameResultStreamListener.scala | 10 +- 6 files changed, 406 insertions(+), 84 deletions(-) create mode 100644 modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotConfig.scala create mode 100644 modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotGamePlayer.scala diff --git a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala index 4d97bdb..f89ddaf 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala @@ -217,6 +217,8 @@ class AccountService: bot.name = name bot.createdAt = Instant.now() officialBotAccountRepository.persist(bot) + bot.token = generateBotToken(bot.id, bot.name) + officialBotAccountRepository.persist(bot) log.infof("Auto-registered official bot: %s", name) } diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index 04f0477..41505e1 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -22,7 +22,7 @@ import org.jboss.logging.Logger import scala.compiletime.uninitialized import scala.jdk.CollectionConverters.* import scala.util.Try -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors} import java.util.function.Consumer @ApplicationScoped @@ -46,6 +46,10 @@ class GameRedisSubscriberManager: private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]() private val s2cObservers = new ConcurrentHashMap[String, Observer]() + // Per-game single-thread executor so c2s messages are handled off the Vert.x + // event loop (handleConnected/handleMove make blocking gRPC + Redis calls) while + // staying ordered per game. + private val c2sExecutors = new ConcurrentHashMap[String, ExecutorService]() // scalafix:off DisableSyntax.var private var clockExpireSubscriber: Option[ReactivePubSubCommands.ReactiveRedisSubscriber] = None @@ -95,7 +99,14 @@ class GameRedisSubscriberManager: obs.emitInitialWriteback() heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) - val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) + val executor = c2sExecutors.computeIfAbsent(gameId, _ => Executors.newSingleThreadExecutor()) + val handler: Consumer[String] = msg => + val task = new Runnable: + def run(): Unit = + try handleC2sMessage(gameId, msg) + catch case ex: Exception => log.warnf(ex, "Error handling c2s message for game %s", gameId) + Try(executor.execute(task)) + () try val subscriber = reactiveRedis .pubsub(classOf[String]) @@ -113,6 +124,7 @@ class GameRedisSubscriberManager: Option(s2cObservers.remove(gameId)).foreach { obs => registry.get(gameId).foreach(_.engine.unsubscribe(obs)) } + Option(c2sExecutors.remove(gameId)).foreach(_.shutdownNow()) heartbeatServiceOpt.foreach(_.removeGameSubscription(gameId)) log.debugf("Unsubscribed from game %s", gameId) @@ -187,3 +199,4 @@ class GameRedisSubscriberManager: clockExpireSubscriber.foreach(_.unsubscribe(clockExpireChannel).await().indefinitely()) c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely()) s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs))) + c2sExecutors.forEach((_, executor) => executor.shutdownNow()) diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala index 958f568..99078f3 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala @@ -2,23 +2,24 @@ package de.nowchess.bot.service import com.fasterxml.jackson.databind.ObjectMapper import de.nowchess.api.move.{Move, MoveType, PromotionPiece} -import de.nowchess.bot.BotController +import de.nowchess.bot.Bot import de.nowchess.bot.BotDifficulty -import de.nowchess.bot.client.{AccountServiceClient, SyncOfficialBotsRequest} +import de.nowchess.bot.bots.ClassicalBot import de.nowchess.bot.config.RedisConfig import de.nowchess.io.fen.FenParser import io.micrometer.core.instrument.MeterRegistry -import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.ReactiveRedisDataSource +import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands import io.quarkus.runtime.StartupEvent import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.event.Observes import jakarta.inject.Inject -import org.eclipse.microprofile.rest.client.inject.RestClient import org.jboss.logging.Logger import scala.compiletime.uninitialized +import java.time.Duration +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors} import java.util.function.Consumer -import java.util.concurrent.TimeUnit @ApplicationScoped class OfficialBotService: @@ -26,100 +27,144 @@ class OfficialBotService: private val log = Logger.getLogger(classOf[OfficialBotService]) // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized - @Inject var objectMapper: ObjectMapper = uninitialized - @Inject var botController: BotController = uninitialized - @Inject var meterRegistry: MeterRegistry = uninitialized - - @Inject - @RestClient - var accountServiceClient: AccountServiceClient = uninitialized + @Inject var reactiveRedis: ReactiveRedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var + private val engine: Bot = ClassicalBot(BotDifficulty.Medium) + private val workers: ExecutorService = Executors.newCachedThreadPool() + + // gameId -> (turn color -> bot account id we play that color as) + private val watchedGames = new ConcurrentHashMap[String, ConcurrentHashMap[String, String]]() + private val subscribers = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]() + private val terminalStatuses = Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw") @PostConstruct def initializeMetrics(): Unit = - BotController.listBots.foreach { bot => - meterRegistry.timer("nowchess.bot.move.duration", "bot", bot).record(0L, TimeUnit.MILLISECONDS) - meterRegistry.counter("nowchess.bot.moves.computed", "bot", bot).increment(0) - } + meterRegistry + .timer("nowchess.bot.move.duration", "bot", engineName) + .record(0L, java.util.concurrent.TimeUnit.MILLISECONDS) + meterRegistry.counter("nowchess.bot.moves.computed", "bot", engineName).increment(0) + // Wildcard subscription: handles gameStart events for *any* bot account, + // whether official (easy/medium/hard/expert) or user-created. Runs at startup + // on a context-bearing thread, so the blocking await is safe here. def onStart(@Observes event: StartupEvent): Unit = - val bots = BotController.listBots - try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots)) - catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service") - bots.foreach(subscribeToEventChannel) - - private def subscribeToEventChannel(botName: String): Unit = - val handler: Consumer[String] = msg => handleBotEvent(botName, msg) - redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:bot:$botName:events", handler) + val pattern = s"${redisConfig.prefix}:bot:*:events" + val handler: Consumer[String] = msg => handleBotEvent(msg) + reactiveRedis + .pubsub(classOf[String]) + .subscribeToPattern(pattern, handler) + .await() + .atMost(Duration.ofSeconds(5)) + log.infof("Bot engine subscribed to pattern %s", pattern) () - private def handleBotEvent(botName: String, msg: String): Unit = + private def handleBotEvent(msg: String): Unit = try val node = objectMapper.readTree(msg) if node.path("type").asText() == "gameStart" then - val gameId = node.path("gameId").asText() - val playingAs = node.path("playingAs").asText() - val difficulty = node.path("difficulty").asInt(1400) - val botAccountId = node.path("botAccountId").asText() - watchGame(botName, gameId, playingAs, difficulty, botAccountId) - catch case _: Exception => () + val gameId = node.path("gameId").asText() + val playingAs = node.path("playingAs").asText() + val playerId = node.path("botAccountId").asText() + if gameId.nonEmpty && playerId.nonEmpty && playingAs.nonEmpty then + log.infof("gameStart: game=%s playingAs=%s bot=%s", gameId, playingAs, playerId) + registerColor(gameId, playingAs, playerId) + else log.warnf("gameStart missing gameId/playingAs/botAccountId: %s", msg) + catch case e: Exception => log.errorf(e, "Failed to handle bot event: %s", msg) - private def watchGame( - botName: String, - gameId: String, - playingAs: String, - difficulty: Int, - botAccountId: String, - ): Unit = - val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg) - redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler) + private def registerColor(gameId: String, playingAs: String, playerId: String): Unit = + val fresh = new ConcurrentHashMap[String, String]() + val existing = watchedGames.putIfAbsent(gameId, fresh) + val colors = if existing == null then fresh else existing + colors.put(playingAs, playerId) + // Only the first color registered for a game subscribes + triggers the position. + if existing == null then subscribeAndConnect(gameId) () - private def handleGameEvent( - botName: String, - gameId: String, - playingAs: String, - difficulty: Int, - botAccountId: String, - msg: String, - ): Unit = - try - val node = objectMapper.readTree(msg) - val status = node.path("state").path("status").asText("") - if !terminalStatuses.contains(status) then - val turn = node.path("state").path("turn").asText("") - if turn == playingAs then - val fen = node.path("state").path("fen").asText() - computeAndSendMove(botName, gameId, fen, difficulty, botAccountId) - catch case _: Exception => () - - private def computeAndSendMove( - botName: String, - gameId: String, - fen: String, - difficulty: Int, - botAccountId: String, - ): Unit = - val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium) - botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot => - FenParser.parseFen(fen).toOption.foreach { context => - val timer = meterRegistry.timer("nowchess.bot.move.duration", "bot", botName) - val moveOpt = timer.recordCallable[Option[Move]](() => bot(context)) - moveOpt.foreach { move => - meterRegistry.counter("nowchess.bot.moves.computed", "bot", botName).increment() - val uci = toUci(move) - val c2sTopic = s"${redisConfig.prefix}:game:$gameId:c2s" - val moveMsg = s"""{"type":"MOVE","uci":"$uci","playerId":"$botAccountId"}""" - redis.pubsub(classOf[String]).publish(c2sTopic, moveMsg) - () - } - } + // Called on the Redis event-loop thread (the pattern handler's thread), which + // carries a Vert.x context. The subscribe must be issued non-blocking from this + // context — issuing it from a plain worker thread (or awaiting it there) leaves + // the subscribe reply with "no handler waiting" and the call times out. + private def subscribeAndConnect(gameId: String): Unit = + val s2c = s"${redisConfig.prefix}:game:$gameId:s2c" + val handler: Consumer[String] = msg => handleGameEvent(gameId, msg) + val pubsub = reactiveRedis.pubsub(classOf[String]) + val onSubscribed: Consumer[ReactivePubSubCommands.ReactiveRedisSubscriber] = { subscriber => + subscribers.put(gameId, subscriber) + sendConnected(pubsub, gameId) } + pubsub.subscribe(s2c, handler).subscribe().`with`(onSubscribed, logFailure(s"subscribe to game $gameId")) + () + + // Ask the server to emit the current position (gameFull) immediately. + private def sendConnected(pubsub: ReactivePubSubCommands[String], gameId: String): Unit = + val c2s = s"${redisConfig.prefix}:game:$gameId:c2s" + val onOk: Consumer[Void] = _ => log.infof("Watching game %s", gameId) + pubsub.publish(c2s, """{"type":"CONNECTED"}""").subscribe().`with`(onOk, logFailure(s"send CONNECTED for game $gameId")) + () + + private def handleGameEvent(gameId: String, msg: String): Unit = + try + val node = objectMapper.readTree(msg) + // gameState has state at root; gameFull (CONNECTED reply) nests it under game.state + val stateNode = + if node.path("type").asText() == "gameFull" then node.path("game").path("state") + else node.path("state") + val status = stateNode.path("status").asText("") + val turn = stateNode.path("turn").asText("") + if terminalStatuses.contains(status) then stopWatching(gameId) + else + val colors = watchedGames.get(gameId) + val playerId = if colors == null then null else colors.get(turn) + if playerId != null then + val fen = stateNode.path("fen").asText() + workers.submit(new Runnable { def run(): Unit = computeAndSendMove(gameId, fen, playerId) }) + () + catch case e: Exception => log.errorf(e, "Failed to handle game event for %s", gameId) + + private def stopWatching(gameId: String): Unit = + watchedGames.remove(gameId) + Option(subscribers.remove(gameId)).foreach { subscriber => + val noop: Consumer[Void] = _ => () + subscriber + .unsubscribe(s"${redisConfig.prefix}:game:$gameId:s2c") + .subscribe() + .`with`(noop, logFailure(s"unsubscribe game $gameId")) + } + () + + // Runs on a worker thread: the engine search must stay off the event loop. The + // move is published reactively (scheduled onto the Redis event loop), so the + // worker thread's lack of a Vert.x context does not matter for publishing. + private def computeAndSendMove(gameId: String, fen: String, playerId: String): Unit = + FenParser.parseFen(fen) match + case Left(err) => log.warnf("FEN parse failed for game %s: %s", gameId, err.toString) + case Right(context) => + val timer = meterRegistry.timer("nowchess.bot.move.duration", "bot", engineName) + val moveOpt = timer.recordCallable[Option[Move]](() => engine(context)) + moveOpt match + case None => log.warnf("Engine returned no move for game %s fen=%s", gameId, fen) + case Some(move) => + meterRegistry.counter("nowchess.bot.moves.computed", "bot", engineName).increment() + val uci = toUci(move) + val moveMsg = s"""{"type":"MOVE","uci":"$uci","playerId":"$playerId"}""" + val onOk: Consumer[Void] = _ => log.infof("Bot moved %s in game %s", uci, gameId) + reactiveRedis + .pubsub(classOf[String]) + .publish(s"${redisConfig.prefix}:game:$gameId:c2s", moveMsg) + .subscribe() + .`with`(onOk, logFailure(s"publish move in game $gameId")) + () + + private def logFailure(what: String): Consumer[Throwable] = + err => log.errorf(err, "Failed to %s", what) + + private def engineName: String = "classical" private def toUci(move: Move): String = val base = s"${move.from}${move.to}" diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotConfig.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotConfig.scala new file mode 100644 index 0000000..be32a29 --- /dev/null +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotConfig.scala @@ -0,0 +1,35 @@ +package de.nowchess.bot.service + +import com.fasterxml.jackson.databind.ObjectMapper +import scala.util.Try + +final case class TournamentBotConfig( + serverUrl: String, + tournamentId: String, + token: String, + botId: String, + difficulty: String, +) + +object TournamentBotConfig: + + private val mapper = new ObjectMapper() + + def fromEnv(env: Map[String, String]): Option[TournamentBotConfig] = + for + tournamentId <- env.get("TOURNAMENT_ID").filter(_.nonEmpty) + token <- env.get("TOURNAMENT_BOT_TOKEN").filter(_.nonEmpty) + botId <- jwtSubject(token) + serverUrl = env.getOrElse("TOURNAMENT_SERVER_URL", "http://localhost:8089") + difficulty = env.getOrElse("TOURNAMENT_BOT_DIFFICULTY", "medium") + yield TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty) + + def jwtSubject(token: String): Option[String] = + Try { + val parts = token.split("\\.") + if parts.length >= 2 then + val payload = new String(java.util.Base64.getUrlDecoder.decode(parts(1))) + val sub = mapper.readTree(payload).path("sub").asText() + Option(sub).filter(_.nonEmpty) + else None + }.toOption.flatten diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotGamePlayer.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotGamePlayer.scala new file mode 100644 index 0000000..d67a240 --- /dev/null +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/TournamentBotGamePlayer.scala @@ -0,0 +1,219 @@ +package de.nowchess.bot.service + +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import de.nowchess.api.move.{Move, MoveType, PromotionPiece} +import de.nowchess.bot.{Bot, BotController} +import de.nowchess.io.fen.FenParser +import io.quarkus.runtime.Startup +import jakarta.annotation.{PostConstruct, PreDestroy} +import jakarta.enterprise.context.ApplicationScoped +import jakarta.inject.Inject +import jakarta.ws.rs.client.{Client, ClientBuilder, Entity} +import jakarta.ws.rs.core.MediaType +import org.jboss.logging.Logger +import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors} + +@Startup +@ApplicationScoped +class TournamentBotGamePlayer: + + private val log = Logger.getLogger(classOf[TournamentBotGamePlayer]) + + // scalafix:off DisableSyntax.var + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var botController: BotController = uninitialized + // scalafix:on DisableSyntax.var + + private val client: Client = ClientBuilder.newClient() + private val workers: ExecutorService = Executors.newCachedThreadPool() + private val activeGames = ConcurrentHashMap.newKeySet[String]() + + private val config = TournamentBotConfig.fromEnv(System.getenv().asScala.toMap) + + // scalafix:off DisableSyntax.var + @volatile private var running = true + // scalafix:on DisableSyntax.var + + @PostConstruct + def initialize(): Unit = + config match + case None => + log.info("Tournament bot disabled — set TOURNAMENT_ID and TOURNAMENT_BOT_TOKEN to enable") + case Some(cfg) => + log.infof("Tournament bot enabled — server=%s tournament=%s bot=%s", cfg.serverUrl, cfg.tournamentId, cfg.botId) + val thread = new Thread(() => connect(cfg), s"TournamentBot-${cfg.tournamentId}") + thread.setDaemon(true) + thread.start() + + @PreDestroy + def cleanup(): Unit = + running = false + workers.shutdownNow() + Try(client.close()) + log.info("Tournament bot stopped") + + private def connect(cfg: TournamentBotConfig): Unit = + if join(cfg) then + while running do + Try(streamEvents(cfg)) match + case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000) + case Success(_) => sleep(2000) + + private def join(cfg: TournamentBotConfig): Boolean = + Try { + val response = authed(cfg, target(cfg).path("join")) + .post(Entity.entity("", MediaType.APPLICATION_JSON)) + val ok = response.getStatus == 200 + if ok then log.infof("Joined tournament %s", cfg.tournamentId) + else log.errorf("Failed to join tournament %s — status %d", cfg.tournamentId, response.getStatus) + response.close() + ok + }.getOrElse { log.error("Join request failed"); false } + + private def streamEvents(cfg: TournamentBotConfig): Unit = + val response = authed(cfg, target(cfg).path("stream")) + .header("Accept", "application/x-ndjson") + .get() + if response.getStatus != 200 then + log.warnf("Tournament stream returned status %d", response.getStatus) + response.close() + sleep(5000) + else + log.infof("Listening to tournament %s event stream", cfg.tournamentId) + forEachLine(response.readEntity(classOf[InputStream])): line => + parse(line).foreach: node => + if node.path("type").asText() == "gameStart" then onGameStart(cfg, node.path("gameId").asText()) + + private def onGameStart(cfg: TournamentBotConfig, gameId: String): Unit = + if gameId.nonEmpty && activeGames.add(gameId) then + workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId) }) + () + + private def playGame(cfg: TournamentBotConfig, gameId: String): Unit = + Try { + colorFor(cfg, gameId) match + case None => + log.debugf("Game %s is not ours — ignoring", gameId) + activeGames.remove(gameId) + case Some(color) => + log.infof("Playing game %s as %s", gameId, color) + val stream = openGameStream(cfg, gameId) + maybeMoveFromCurrentState(cfg, gameId, color) + if stream != null then consumeGameStream(cfg, gameId, color, stream) + activeGames.remove(gameId) + } match + case Failure(ex) => log.errorf(ex, "Game %s crashed", gameId); activeGames.remove(gameId) + case Success(_) => () + + private def colorFor(cfg: TournamentBotConfig, gameId: String): Option[String] = + fetchGame(cfg, gameId).flatMap: game => + val white = game.path("white").path("id").asText() + val black = game.path("black").path("id").asText() + if white == cfg.botId then Some("white") + else if black == cfg.botId then Some("black") + else None + + private def maybeMoveFromCurrentState(cfg: TournamentBotConfig, gameId: String, color: String): Unit = + fetchGame(cfg, gameId).foreach: game => + maybeMove(cfg, gameId, color, game.path("turn").asText(), game.path("status").asText(), game.path("fen").asText()) + + private def consumeGameStream(cfg: TournamentBotConfig, gameId: String, color: String, stream: InputStream): Unit = + val reader = new BufferedReader(new InputStreamReader(stream)) + // scalafix:off DisableSyntax.var + var done = false + var line = reader.readLine() + // scalafix:on DisableSyntax.var + while line != null && running && !done do + parse(line).foreach: node => + node.path("type").asText() match + case "move" => maybeMove(cfg, gameId, color, node.path("turn").asText(), "ongoing", node.path("fen").asText()) + case "gameEnd" => log.infof("Game %s ended — status=%s", gameId, node.path("status").asText()); done = true + case _ => () + line = reader.readLine() + + private def maybeMove( + cfg: TournamentBotConfig, + gameId: String, + color: String, + turn: String, + status: String, + fen: String, + ): Unit = + if turn == color && status == "ongoing" && fen.nonEmpty then + computeUci(cfg, fen) match + case None => log.warnf("No move found for game %s (fen=%s)", gameId, fen) + case Some(uci) => submitMove(cfg, gameId, uci) + + private def computeUci(cfg: TournamentBotConfig, fen: String): Option[String] = + FenParser.parseFen(fen) match + case Left(err) => log.warnf("FEN parse failed: %s (%s)", fen, err.toString); None + case Right(context) => engine(cfg).apply(context).map(toUci) + + private def submitMove(cfg: TournamentBotConfig, gameId: String, uci: String): Unit = + Try { + val response = authed(cfg, target(cfg).path("game").path(gameId).path("move").path(uci)) + .post(Entity.entity("", MediaType.APPLICATION_JSON)) + if response.getStatus == 200 then log.infof("Played %s in game %s", uci, gameId) + else log.warnf("Move %s rejected in game %s — status %d", uci, gameId, response.getStatus) + response.close() + } match + case Failure(ex) => log.errorf(ex, "Error submitting move %s in game %s", uci, gameId) + case Success(_) => () + + private def fetchGame(cfg: TournamentBotConfig, gameId: String): Option[JsonNode] = + Try { + val response = target(cfg).path("game").path(gameId).request(MediaType.APPLICATION_JSON).get() + val node = if response.getStatus == 200 then Some(response.readEntity(classOf[JsonNode])) else None + response.close() + node + }.getOrElse(None) + + private def openGameStream(cfg: TournamentBotConfig, gameId: String): InputStream = + Try { + val response = authed(cfg, target(cfg).path("game").path(gameId).path("stream")) + .header("Accept", "application/x-ndjson") + .get() + if response.getStatus == 200 then response.readEntity(classOf[InputStream]) + else { log.warnf("Game stream %s returned status %d", gameId, response.getStatus); response.close(); null } + }.getOrElse(null) + + private def engine(cfg: TournamentBotConfig): Bot = + botController.getBot(cfg.difficulty).orElse(botController.getBot("medium")).get + + private def target(cfg: TournamentBotConfig) = + client.target(cfg.serverUrl).path("api").path("tournament").path(cfg.tournamentId) + + private def authed(cfg: TournamentBotConfig, t: jakarta.ws.rs.client.WebTarget) = + t.request(MediaType.APPLICATION_JSON).header("Authorization", s"Bearer ${cfg.token}") + + private def parse(line: String): Option[JsonNode] = + val trimmed = line.trim + if trimmed.isEmpty then None else Try(objectMapper.readTree(trimmed)).toOption + + private def forEachLine(stream: InputStream)(handle: String => Unit): Unit = + val reader = new BufferedReader(new InputStreamReader(stream)) + // scalafix:off DisableSyntax.var + var line: String = reader.readLine() + // scalafix:on DisableSyntax.var + while line != null && running do + Try(handle(line)).failed.foreach(ex => log.warnf(ex, "Error handling stream line")) + line = reader.readLine() + + private def toUci(move: Move): String = + val base = s"${move.from}${move.to}" + move.moveType match + case MoveType.Promotion(piece) => base + promotionChar(piece) + case _ => base + + private def promotionChar(piece: PromotionPiece): String = + piece match + case PromotionPiece.Knight => "n" + case PromotionPiece.Bishop => "b" + case PromotionPiece.Rook => "r" + case PromotionPiece.Queen => "q" + + private def sleep(ms: Long): Unit = Try(Thread.sleep(ms)) diff --git a/modules/tournament/src/main/scala/de/nowchess/tournament/redis/GameResultStreamListener.scala b/modules/tournament/src/main/scala/de/nowchess/tournament/redis/GameResultStreamListener.scala index bd77bda..33c0e01 100644 --- a/modules/tournament/src/main/scala/de/nowchess/tournament/redis/GameResultStreamListener.scala +++ b/modules/tournament/src/main/scala/de/nowchess/tournament/redis/GameResultStreamListener.scala @@ -49,7 +49,8 @@ class GameResultStreamListener: case Success(_) => () private def pollLoop(): Unit = - while true do + var running = true + while running do Try { val messages = redis.stream(classOf[String]).xreadgroup( groupName, @@ -60,9 +61,16 @@ class GameResultStreamListener: ) if messages != null then messages.forEach(msg => handleMessage(msg)) } match + case Failure(ex) if isInterrupted(ex) => + Thread.currentThread().interrupt() + running = false case Failure(ex) => log.warnf(ex, "Error in result poll loop") case Success(_) => () + private def isInterrupted(ex: Throwable): Boolean = + ex.isInstanceOf[InterruptedException] || + (ex.getCause != null && ex.getCause.isInstanceOf[InterruptedException]) + private def handleMessage(msg: StreamMessage[String, String, String]): Unit = val json = msg.payload().get("data") Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match