diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala index d838b85..6ca506e 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala @@ -24,8 +24,9 @@ import scala.jdk.CollectionConverters.* import scala.util.{Failure, Success, Try} import java.time.Duration import java.util.UUID +import io.quarkus.redis.datasource.pubsub.PubSubCommands +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.function.Consumer -import java.util.concurrent.TimeUnit @ApplicationScoped class OfficialBotService: @@ -48,14 +49,18 @@ class OfficialBotService: private val terminalStatuses = Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw") - private val groupName = "official-bot" - private val consumerId = UUID.randomUUID().toString - private val maxRetries = 3 - private val maxStreamLen = 1000L + private val groupName = "official-bot" + private val gameOverGroup = "official-bots-game-over" + private val consumerId = UUID.randomUUID().toString + private val maxRetries = 3 + private val maxStreamLen = 1000L private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream" + private def gameOverStream: String = s"${redisConfig.prefix}:game-over" private def dlqStream: String = s"${redisConfig.prefix}:dlq" + private val gameWatches = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]() + @PostConstruct def initializeMetrics(): Unit = BotController.listBots.foreach { bot => @@ -68,6 +73,7 @@ class OfficialBotService: try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots)) catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service") bots.foreach(subscribeToEventChannel) + subscribeToGameOverStream() private def subscribeToEventChannel(botName: String): Unit = createGroupIfAbsent(botName) @@ -165,9 +171,80 @@ class OfficialBotService: botAccountId: String, ): Unit = val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg) - redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler) + val subscriber = redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler) + gameWatches.put(gameId, (botName, subscriber)) () + private def subscribeToGameOverStream(): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(gameOverStream, gameOverGroup, "$", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create game-over consumer group") + case Success(_) => () + executor.submit( + new Runnable: + def run(): Unit = gameOverPollLoop(), + ) + log.infof("Listening to game-over stream (consumer=%s)", consumerId) + + private def gameOverPollLoop(): Unit = + while true do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + gameOverGroup, + consumerId, + gameOverStream, + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach(msg => handleGameOverMessage(msg))) + } match + case Failure(ex) => log.warnf(ex, "Error in game-over poll loop") + case Success(_) => () + + private def handleGameOverMessage(msg: StreamMessage[String, String, String]): Unit = + val json = msg.payload().get("data") + val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0) + Try { + val node = objectMapper.readTree(json) + val gameId = node.path("payload").path("gameId").asText() + if gameId.nonEmpty then + Option(gameWatches.remove(gameId)).foreach { (botName, subscriber) => + val topic = s"${redisConfig.prefix}:game:$gameId:s2c" + Try(subscriber.unsubscribe(topic)) match + case Failure(ex) => log.warnf(ex, "Failed to unsubscribe from game %s", gameId) + case Success(_) => log.infof("Bot %s cleaned up game %s after GameOver", botName, gameId) + } + } match + case Success(_) => + ackGameOver(msg.id()) + case Failure(ex) if attempt + 1 < maxRetries => + log.warnf(ex, "GameOver handling failed (attempt %d), retrying", attempt) + xadd(gameOverStream, Map("data" -> json, "attempt" -> (attempt + 1).toString)) + ackGameOver(msg.id()) + case Failure(ex) => + log.errorf(ex, "GameOver handling failed after %d attempts, sending to DLQ", maxRetries) + xadd( + dlqStream, + Map( + "data" -> json, + "eventType" -> "GameOver", + "error" -> Option(ex.getMessage).getOrElse(ex.getClass.getName), + "attempt" -> attempt.toString, + ), + ) + ackGameOver(msg.id()) + + private def ackGameOver(id: String): Unit = + Try(redis.stream(classOf[String]).xack(gameOverStream, gameOverGroup, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack game-over message %s", id) + case Success(_) => () + private def handleGameEvent( botName: String, gameId: String,