feat(official-bots): consume GameOver stream for bot cleanup (#67)
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
Add consumer group official-bots-game-over on {prefix}:game-over stream.
Track pub/sub subscribers per gameId in gameWatches map. On GameOver event,
unsubscribe from the game s2c channel and remove from watch map.
XACK after cleanup; DLQ after maxRetries failures.
Closes NCS-103
https://knockoutwhist.youtrack.cloud/issue/NCS-103
Reviewed-on: #67
This commit was merged in pull request #67.
This commit is contained in:
+83
-6
@@ -24,8 +24,9 @@ import scala.jdk.CollectionConverters.*
|
|||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Failure, Success, Try}
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||||
|
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
|
||||||
import java.util.function.Consumer
|
import java.util.function.Consumer
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class OfficialBotService:
|
class OfficialBotService:
|
||||||
@@ -48,14 +49,18 @@ class OfficialBotService:
|
|||||||
private val terminalStatuses =
|
private val terminalStatuses =
|
||||||
Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw")
|
Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw")
|
||||||
|
|
||||||
private val groupName = "official-bot"
|
private val groupName = "official-bot"
|
||||||
private val consumerId = UUID.randomUUID().toString
|
private val gameOverGroup = "official-bots-game-over"
|
||||||
private val maxRetries = 3
|
private val consumerId = UUID.randomUUID().toString
|
||||||
private val maxStreamLen = 1000L
|
private val maxRetries = 3
|
||||||
|
private val maxStreamLen = 1000L
|
||||||
|
|
||||||
private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream"
|
private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream"
|
||||||
|
private def gameOverStream: String = s"${redisConfig.prefix}:game-over"
|
||||||
private def dlqStream: String = s"${redisConfig.prefix}:dlq"
|
private def dlqStream: String = s"${redisConfig.prefix}:dlq"
|
||||||
|
|
||||||
|
private val gameWatches = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
def initializeMetrics(): Unit =
|
def initializeMetrics(): Unit =
|
||||||
BotController.listBots.foreach { bot =>
|
BotController.listBots.foreach { bot =>
|
||||||
@@ -68,6 +73,7 @@ class OfficialBotService:
|
|||||||
try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots))
|
try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots))
|
||||||
catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service")
|
catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service")
|
||||||
bots.foreach(subscribeToEventChannel)
|
bots.foreach(subscribeToEventChannel)
|
||||||
|
subscribeToGameOverStream()
|
||||||
|
|
||||||
private def subscribeToEventChannel(botName: String): Unit =
|
private def subscribeToEventChannel(botName: String): Unit =
|
||||||
createGroupIfAbsent(botName)
|
createGroupIfAbsent(botName)
|
||||||
@@ -165,9 +171,80 @@ class OfficialBotService:
|
|||||||
botAccountId: String,
|
botAccountId: String,
|
||||||
): Unit =
|
): Unit =
|
||||||
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
|
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
|
||||||
redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
|
||||||
|
gameWatches.put(gameId, (botName, subscriber))
|
||||||
()
|
()
|
||||||
|
|
||||||
|
private def subscribeToGameOverStream(): Unit =
|
||||||
|
Try(
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xgroupCreate(gameOverStream, gameOverGroup, "$", new XGroupCreateArgs().mkstream()),
|
||||||
|
) match
|
||||||
|
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to create game-over consumer group")
|
||||||
|
case Success(_) => ()
|
||||||
|
executor.submit(
|
||||||
|
new Runnable:
|
||||||
|
def run(): Unit = gameOverPollLoop(),
|
||||||
|
)
|
||||||
|
log.infof("Listening to game-over stream (consumer=%s)", consumerId)
|
||||||
|
|
||||||
|
private def gameOverPollLoop(): Unit =
|
||||||
|
while true do
|
||||||
|
Try {
|
||||||
|
val messages = redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xreadgroup(
|
||||||
|
gameOverGroup,
|
||||||
|
consumerId,
|
||||||
|
gameOverStream,
|
||||||
|
">",
|
||||||
|
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
|
||||||
|
)
|
||||||
|
Option(messages).foreach(_.forEach(msg => handleGameOverMessage(msg)))
|
||||||
|
} match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Error in game-over poll loop")
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def handleGameOverMessage(msg: StreamMessage[String, String, String]): Unit =
|
||||||
|
val json = msg.payload().get("data")
|
||||||
|
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
|
||||||
|
Try {
|
||||||
|
val node = objectMapper.readTree(json)
|
||||||
|
val gameId = node.path("payload").path("gameId").asText()
|
||||||
|
if gameId.nonEmpty then
|
||||||
|
Option(gameWatches.remove(gameId)).foreach { (botName, subscriber) =>
|
||||||
|
val topic = s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||||
|
Try(subscriber.unsubscribe(topic)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to unsubscribe from game %s", gameId)
|
||||||
|
case Success(_) => log.infof("Bot %s cleaned up game %s after GameOver", botName, gameId)
|
||||||
|
}
|
||||||
|
} match
|
||||||
|
case Success(_) =>
|
||||||
|
ackGameOver(msg.id())
|
||||||
|
case Failure(ex) if attempt + 1 < maxRetries =>
|
||||||
|
log.warnf(ex, "GameOver handling failed (attempt %d), retrying", attempt)
|
||||||
|
xadd(gameOverStream, Map("data" -> json, "attempt" -> (attempt + 1).toString))
|
||||||
|
ackGameOver(msg.id())
|
||||||
|
case Failure(ex) =>
|
||||||
|
log.errorf(ex, "GameOver handling failed after %d attempts, sending to DLQ", maxRetries)
|
||||||
|
xadd(
|
||||||
|
dlqStream,
|
||||||
|
Map(
|
||||||
|
"data" -> json,
|
||||||
|
"eventType" -> "GameOver",
|
||||||
|
"error" -> Option(ex.getMessage).getOrElse(ex.getClass.getName),
|
||||||
|
"attempt" -> attempt.toString,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
ackGameOver(msg.id())
|
||||||
|
|
||||||
|
private def ackGameOver(id: String): Unit =
|
||||||
|
Try(redis.stream(classOf[String]).xack(gameOverStream, gameOverGroup, id)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to ack game-over message %s", id)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
private def handleGameEvent(
|
private def handleGameEvent(
|
||||||
botName: String,
|
botName: String,
|
||||||
gameId: String,
|
gameId: String,
|
||||||
|
|||||||
Reference in New Issue
Block a user