feat(events): migrate game-creation and bot flows to Redis Streams NCS-89 (#62)
Build & Test (NowChessSystems) TeamCity build finished

Replace synchronous account→core game-creation HTTP call and plain
pub/sub bot game-start events with Redis Streams using consumer groups,
XACK, retry, and a Dead Letter Queue for at-least-once delivery and
observability.

- account: GameCreationStreamClient publishes game-creation requests and
  correlates responses via a per-instance consumer group (NCS-91)
- core: GameCreationStreamListener consumes requests, calls
  GameCreationService, publishes response events, retries, and routes
  exhausted/unparseable events to the DLQ (NCS-91, NCS-93, NCS-94)
- official-bots: bot game-start events migrated from pub/sub to Streams
  with consumer group, XACK, retry, and DLQ (NCS-92)
- account EventPublisher dual-writes to the stream and legacy pub/sub
  channel for backward compatibility
- all flows use the typed EventEnvelope (eventId/type/payload/timestamp/
  correlationId) with DLQ error context (eventType, error, attempt)
- register new DTOs and EventEnvelope/EventType for native reflection

Closes NCS-91, NCS-92, NCS-93, NCS-94

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com>
Reviewed-on: #62
This commit was merged in pull request #62.
This commit is contained in:
2026-06-09 10:31:32 +02:00
parent 11826356c1
commit a24924c230
16 changed files with 583 additions and 35 deletions
@@ -1,6 +1,7 @@
package de.nowchess.bot.service
import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.api.event.EventEnvelope
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
import de.nowchess.bot.BotController
import de.nowchess.bot.BotDifficulty
@@ -9,14 +10,20 @@ import de.nowchess.bot.config.RedisConfig
import de.nowchess.io.fen.FenParser
import io.micrometer.core.instrument.MeterRegistry
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs}
import io.quarkus.runtime.StartupEvent
import jakarta.annotation.PostConstruct
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.event.Observes
import jakarta.inject.Inject
import org.eclipse.microprofile.context.ManagedExecutor
import org.eclipse.microprofile.rest.client.inject.RestClient
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success, Try}
import java.time.Duration
import java.util.UUID
import java.util.function.Consumer
import java.util.concurrent.TimeUnit
@@ -31,6 +38,7 @@ class OfficialBotService:
@Inject var objectMapper: ObjectMapper = uninitialized
@Inject var botController: BotController = uninitialized
@Inject var meterRegistry: MeterRegistry = uninitialized
@Inject var executor: ManagedExecutor = uninitialized
@Inject
@RestClient
@@ -40,6 +48,14 @@ class OfficialBotService:
private val terminalStatuses =
Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw")
private val groupName = "official-bot"
private val consumerId = UUID.randomUUID().toString
private val maxRetries = 3
private val maxStreamLen = 1000L
private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream"
private def dlqStream: String = s"${redisConfig.prefix}:dlq"
@PostConstruct
def initializeMetrics(): Unit =
BotController.listBots.foreach { bot =>
@@ -54,20 +70,92 @@ class OfficialBotService:
bots.foreach(subscribeToEventChannel)
private def subscribeToEventChannel(botName: String): Unit =
val handler: Consumer[String] = msg => handleBotEvent(botName, msg)
redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:bot:$botName:events", handler)
()
createGroupIfAbsent(botName)
executor.submit(
new Runnable:
def run(): Unit = pollLoop(botName),
)
log.infof("Listening to bot event stream for %s (consumer=%s)", botName, consumerId)
private def handleBotEvent(botName: String, msg: String): Unit =
try
val node = objectMapper.readTree(msg)
if node.path("type").asText() == "gameStart" then
val gameId = node.path("gameId").asText()
val playingAs = node.path("playingAs").asText()
val difficulty = node.path("difficulty").asInt(1400)
val botAccountId = node.path("botAccountId").asText()
watchGame(botName, gameId, playingAs, difficulty, botAccountId)
catch case _: Exception => ()
private def createGroupIfAbsent(botName: String): Unit =
Try(
redis
.stream(classOf[String])
.xgroupCreate(eventStream(botName), groupName, "0", new XGroupCreateArgs().mkstream()),
) match
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
case Failure(ex) => log.warnf(ex, "Failed to create bot event consumer group for %s", botName)
case Success(_) => ()
private def pollLoop(botName: String): Unit =
while true do
Try {
val messages = redis
.stream(classOf[String])
.xreadgroup(
groupName,
consumerId,
eventStream(botName),
">",
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
)
Option(messages).foreach(_.forEach(msg => handleStreamMessage(botName, msg)))
} match
case Failure(ex) => log.warnf(ex, "Error in bot event poll loop for %s", botName)
case Success(_) => ()
private def handleStreamMessage(botName: String, msg: StreamMessage[String, String, String]): Unit =
val json = msg.payload().get("data")
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
Try {
val envelope = objectMapper.readValue(json, classOf[EventEnvelope])
handleBotEvent(botName, envelope)
} match
case Success(_) => ack(botName, msg.id())
case Failure(ex) if attempt + 1 < maxRetries =>
log.warnf(ex, "Bot event handling failed for %s (attempt %d), retrying", botName, attempt)
retry(botName, json, attempt + 1)
ack(botName, msg.id())
case Failure(ex) =>
log.errorf(ex, "Bot event handling failed for %s after %d attempts, sending to DLQ", botName, maxRetries)
toDlq(json, ex, attempt)
ack(botName, msg.id())
private def handleBotEvent(botName: String, envelope: EventEnvelope): Unit =
val payload = envelope.payload
val gameId = payload.path("gameId").asText()
val playingAs = payload.path("playingAs").asText()
val difficulty = payload.path("difficulty").asInt(1400)
val botAccountId = payload.path("botAccountId").asText()
watchGame(botName, gameId, playingAs, difficulty, botAccountId)
private def ack(botName: String, id: String): Unit =
Try(redis.stream(classOf[String]).xack(eventStream(botName), groupName, id)) match
case Failure(ex) => log.warnf(ex, "Failed to ack bot event %s", id)
case Success(_) => ()
private def retry(botName: String, json: String, attempt: Int): Unit =
xadd(eventStream(botName), Map("data" -> json, "attempt" -> attempt.toString))
private def toDlq(json: String, error: Throwable, attempt: Int): Unit =
xadd(
dlqStream,
Map(
"data" -> json,
"eventType" -> "BotGameStart",
"error" -> Option(error.getMessage).getOrElse(error.getClass.getName),
"attempt" -> (attempt + 1).toString,
),
)
private def xadd(key: String, fields: Map[String, String]): Unit =
Try(
redis
.stream(classOf[String])
.xadd(key, new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), fields.asJava),
) match
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
case Success(_) => ()
private def watchGame(
botName: String,