From a24924c23057db3d700a75dbc4333557789cd991 Mon Sep 17 00:00:00 2001 From: Janis Date: Tue, 9 Jun 2026 10:31:32 +0200 Subject: [PATCH] feat(events): migrate game-creation and bot flows to Redis Streams NCS-89 (#62) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace synchronous account→core game-creation HTTP call and plain pub/sub bot game-start events with Redis Streams using consumer groups, XACK, retry, and a Dead Letter Queue for at-least-once delivery and observability. - account: GameCreationStreamClient publishes game-creation requests and correlates responses via a per-instance consumer group (NCS-91) - core: GameCreationStreamListener consumes requests, calls GameCreationService, publishes response events, retries, and routes exhausted/unparseable events to the DLQ (NCS-91, NCS-93, NCS-94) - official-bots: bot game-start events migrated from pub/sub to Streams with consumer group, XACK, retry, and DLQ (NCS-92) - account EventPublisher dual-writes to the stream and legacy pub/sub channel for backward compatibility - all flows use the typed EventEnvelope (eventId/type/payload/timestamp/ correlationId) with DLQ error context (eventType, error, attempt) - register new DTOs and EventEnvelope/EventType for native reflection Closes NCS-91, NCS-92, NCS-93, NCS-94 Co-Authored-By: Claude Opus 4.8 --------- Co-authored-by: Janis Eccarius Reviewed-on: https://git.janis-eccarius.de/NowChess/NowChessSystems/pulls/62 --- .../client/GameCreationStreamClient.scala | 133 ++++++++++++++++ .../config/NativeReflectionConfig.scala | 10 ++ .../resource/OfficialChallengeResource.scala | 8 +- .../account/service/ChallengeService.scala | 14 +- .../account/service/EventPublisher.scala | 16 +- .../src/test/resources/application.yml | 2 + .../resource/ChallengeResourceTest.scala | 11 +- .../api/dto/GameCreationRequestDto.scala | 10 ++ .../api/dto/GameCreationResponseDto.scala | 6 + modules/core/build.gradle.kts | 2 + .../chess/config/NativeReflectionConfig.scala | 5 + .../redis/GameCreationStreamListener.scala | 146 ++++++++++++++++++ .../chess/service/GameCreationService.scala | 71 +++++++++ .../core/src/test/resources/application.yml | 2 + .../service/GameCreationServiceTest.scala | 68 ++++++++ .../bot/service/OfficialBotService.scala | 114 ++++++++++++-- 16 files changed, 583 insertions(+), 35 deletions(-) create mode 100644 modules/account/src/main/scala/de/nowchess/account/client/GameCreationStreamClient.scala create mode 100644 modules/api/src/main/scala/de/nowchess/api/dto/GameCreationRequestDto.scala create mode 100644 modules/api/src/main/scala/de/nowchess/api/dto/GameCreationResponseDto.scala create mode 100644 modules/core/src/main/scala/de/nowchess/chess/redis/GameCreationStreamListener.scala create mode 100644 modules/core/src/main/scala/de/nowchess/chess/service/GameCreationService.scala create mode 100644 modules/core/src/test/scala/de/nowchess/chess/service/GameCreationServiceTest.scala diff --git a/modules/account/src/main/scala/de/nowchess/account/client/GameCreationStreamClient.scala b/modules/account/src/main/scala/de/nowchess/account/client/GameCreationStreamClient.scala new file mode 100644 index 0000000..137ee57 --- /dev/null +++ b/modules/account/src/main/scala/de/nowchess/account/client/GameCreationStreamClient.scala @@ -0,0 +1,133 @@ +package de.nowchess.account.client + +import com.fasterxml.jackson.databind.ObjectMapper +import de.nowchess.account.config.RedisConfig +import de.nowchess.api.dto.{GameCreationRequestDto, GameCreationResponseDto, PlayerInfoDto, TimeControlDto} +import de.nowchess.api.game.GameMode +import de.nowchess.api.player.PlayerType +import de.nowchess.api.event.{EventEnvelope, EventType} +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs} +import io.quarkus.runtime.StartupEvent +import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.event.Observes +import jakarta.inject.Inject +import org.eclipse.microprofile.config.inject.ConfigProperty +import org.eclipse.microprofile.context.ManagedExecutor +import org.jboss.logging.Logger +import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID +import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit} + +@ApplicationScoped +class GameCreationStreamClient: + + // scalafix:off DisableSyntax.var + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var executor: ManagedExecutor = uninitialized + @ConfigProperty(name = "nowchess.game-creation-stream.enabled", defaultValue = "true") + private var streamEnabled: Boolean = true + // scalafix:on DisableSyntax.var + + private val log = Logger.getLogger(classOf[GameCreationStreamClient]) + private val instanceId = UUID.randomUUID().toString + private val groupName = s"account-game-creation-$instanceId" + private val consumerId = instanceId + private val maxStreamLen = 1000L + private val timeout = Duration.ofSeconds(10) + + private val pending = new ConcurrentHashMap[String, CompletableFuture[GameCreationResponseDto]]() + + private def requestStream: String = s"${redisConfig.prefix}:game-creation" + private def responseStream: String = s"${redisConfig.prefix}:game-creation-response" + + def start(@Observes _ev: StartupEvent): Unit = + if streamEnabled then + createGroupIfAbsent() + executor.submit( + new Runnable: + def run(): Unit = pollLoop(), + ) + log.infof("Game-creation response listener started (consumer=%s)", consumerId) + + def createGame(req: CoreCreateGameRequest): GameCreationResponseDto = + val correlationId = UUID.randomUUID().toString + val future = new CompletableFuture[GameCreationResponseDto]() + pending.put(correlationId, future) + Try { + val payload = objectMapper.valueToTree[com.fasterxml.jackson.databind.JsonNode](toDto(req)) + val envelope = EventEnvelope.of(EventType.GameCreationRequest, payload, Some(correlationId)) + publish(requestStream, envelope) + future.get(timeout.toMillis, TimeUnit.MILLISECONDS) + } match + case Success(resp) => + pending.remove(correlationId) + resp + case Failure(ex) => + pending.remove(correlationId) + log.errorf(ex, "Game creation request %s failed", correlationId) + GameCreationResponseDto(None, Some("Game creation request timed out or failed")) + + private def toDto(req: CoreCreateGameRequest): GameCreationRequestDto = + GameCreationRequestDto( + white = req.white.map(p => PlayerInfoDto(p.id, p.displayName, PlayerType.Human)), + black = req.black.map(p => PlayerInfoDto(p.id, p.displayName, PlayerType.Human)), + timeControl = req.timeControl.map(t => TimeControlDto(t.limitSeconds, t.incrementSeconds, t.daysPerMove)), + mode = req.mode.map(_ => GameMode.Authenticated), + ) + + private def createGroupIfAbsent(): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(responseStream, groupName, "0", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create response consumer group") + case Success(_) => () + + private def pollLoop(): Unit = + while true do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + groupName, + consumerId, + responseStream, + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach(handleResponse)) + } match + case Failure(ex) => log.warnf(ex, "Error in game-creation response poll loop") + case Success(_) => () + + private def handleResponse(msg: StreamMessage[String, String, String]): Unit = + val json = msg.payload().get("data") + Try(objectMapper.readValue(json, classOf[EventEnvelope])) match + case Success(envelope) => + envelope.correlationId.flatMap(id => Option(pending.remove(id))).foreach { future => + Try(objectMapper.treeToValue(envelope.payload, classOf[GameCreationResponseDto])) match + case Success(resp) => future.complete(resp) + case Failure(ex) => future.completeExceptionally(ex) + } + case Failure(ex) => log.warnf(ex, "Unparseable game-creation response: %s", json) + ack(msg.id()) + + private def ack(id: String): Unit = + Try(redis.stream(classOf[String]).xack(responseStream, groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack response %s", id) + case Success(_) => () + + private def publish(key: String, envelope: EventEnvelope): Unit = + val json = objectMapper.writeValueAsString(envelope) + redis + .stream(classOf[String]) + .xadd(key, new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), Map("data" -> json).asJava) + () diff --git a/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala b/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala index fafdbfa..ca76bcb 100644 --- a/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala +++ b/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala @@ -12,6 +12,12 @@ import de.nowchess.account.domain.{ UserAccount, } import de.nowchess.account.dto.* +import de.nowchess.api.dto.{ + GameCreationRequestDto, + GameCreationResponseDto, + PlayerInfoDto as ApiPlayerInfoDto, + TimeControlDto as ApiTimeControlDto, +} import de.nowchess.api.event.{EventEnvelope, EventType} import io.quarkus.runtime.annotations.RegisterForReflection @@ -49,6 +55,10 @@ import io.quarkus.runtime.annotations.RegisterForReflection classOf[CoreCreateGameRequest], classOf[CoreGameResponse], classOf[OfficialChallengeResponse], + classOf[GameCreationRequestDto], + classOf[GameCreationResponseDto], + classOf[ApiPlayerInfoDto], + classOf[ApiTimeControlDto], ), ) class NativeReflectionConfig diff --git a/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala b/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala index e82582c..f91b69c 100644 --- a/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala +++ b/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala @@ -1,6 +1,6 @@ package de.nowchess.account.resource -import de.nowchess.account.client.{CoreCreateGameRequest, CoreGameClient, CorePlayerInfo} +import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, GameCreationStreamClient} import de.nowchess.account.dto.{ErrorDto, OfficialChallengeResponse} import de.nowchess.account.service.{AccountService, EventPublisher} import jakarta.annotation.security.RolesAllowed @@ -9,7 +9,6 @@ import jakarta.inject.Inject import jakarta.ws.rs.* import jakarta.ws.rs.core.{MediaType, Response} import org.eclipse.microprofile.jwt.JsonWebToken -import org.eclipse.microprofile.rest.client.inject.RestClient import org.jboss.logging.Logger import scala.compiletime.uninitialized @@ -29,8 +28,7 @@ class OfficialChallengeResource: @Inject var botEventPublisher: EventPublisher = uninitialized @Inject - @RestClient - var coreGameClient: CoreGameClient = uninitialized + var gameCreationClient: GameCreationStreamClient = uninitialized // scalafix:on private val log = Logger.getLogger(classOf[OfficialChallengeResource]) @@ -72,7 +70,7 @@ class OfficialChallengeResource: (CorePlayerInfo(bot.id.toString, bot.name), CorePlayerInfo(user.id.toString, user.username), "white") val req = CoreCreateGameRequest(Some(white), Some(black), None, Some("Authenticated")) val gameId = - try Right(coreGameClient.createGame(req).gameId) + try gameCreationClient.createGame(req).gameId.toRight("Failed to create game") catch case _ => Left("Failed to create game") gameId match case Left(err) => diff --git a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala index c5e8650..0442640 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala @@ -1,12 +1,6 @@ package de.nowchess.account.service -import de.nowchess.account.client.{ - CoreCreateGameRequest, - CoreGameClient, - CoreGameResponse, - CorePlayerInfo, - CoreTimeControl, -} +import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, CoreTimeControl, GameCreationStreamClient} import de.nowchess.account.domain.{Challenge, ChallengeColor, ChallengeStatus, DeclineReason} import de.nowchess.account.dto.{ ChallengeDto, @@ -23,7 +17,6 @@ import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import jakarta.transaction.Transactional -import org.eclipse.microprofile.rest.client.inject.RestClient import org.jboss.logging.Logger import scala.compiletime.uninitialized @@ -45,8 +38,7 @@ class ChallengeService: var challengeRepository: ChallengeRepository = uninitialized @Inject - @RestClient - var coreGameClient: CoreGameClient = uninitialized + var gameCreationClient: GameCreationStreamClient = uninitialized @Inject var eventPublisher: EventPublisher = uninitialized @@ -187,7 +179,7 @@ class ChallengeService: val (white, black) = assignColors(challenge) val tc = buildTimeControl(challenge) val req = CoreCreateGameRequest(Some(white), Some(black), tc, Some("Authenticated")) - Right(coreGameClient.createGame(req).gameId) + gameCreationClient.createGame(req).gameId.toRight(ChallengeError.GameCreationFailed) catch case _ => Left(ChallengeError.GameCreationFailed) private def assignColors(challenge: Challenge): (CorePlayerInfo, CorePlayerInfo) = diff --git a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala index da20e4c..0f7cbaa 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala @@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.nowchess.account.config.RedisConfig import de.nowchess.api.event.{EventEnvelope, EventType} import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.XAddArgs import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* @ApplicationScoped class EventPublisher: @@ -17,13 +19,25 @@ class EventPublisher: @Inject var objectMapper: ObjectMapper = uninitialized // scalafix:on DisableSyntax.var + private val maxStreamLen = 1000L + def publishGameStart(botId: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit = val payload = objectMapper.createObjectNode() payload.put("gameId", gameId) payload.put("playingAs", playingAs) payload.put("difficulty", difficulty) payload.put("botAccountId", botAccountId) - publish(s"${redisConfig.prefix}:bot:$botId:events", EventType.GameStart, payload) + val envelope = EventEnvelope.of(EventType.BotGameStart, payload) + val json = objectMapper.writeValueAsString(envelope) + redis + .stream(classOf[String]) + .xadd( + s"${redisConfig.prefix}:bot:$botId:events:stream", + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json).asJava, + ) + redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json) + () def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit = val payload = objectMapper.createObjectNode() diff --git a/modules/account/src/test/resources/application.yml b/modules/account/src/test/resources/application.yml index 82fe950..fc197a6 100644 --- a/modules/account/src/test/resources/application.yml +++ b/modules/account/src/test/resources/application.yml @@ -34,3 +34,5 @@ nowchess: secret: test-secret auth: enabled: false + game-creation-stream: + enabled: false diff --git a/modules/account/src/test/scala/de/nowchess/account/resource/ChallengeResourceTest.scala b/modules/account/src/test/scala/de/nowchess/account/resource/ChallengeResourceTest.scala index 5345fc7..2d66fb6 100644 --- a/modules/account/src/test/scala/de/nowchess/account/resource/ChallengeResourceTest.scala +++ b/modules/account/src/test/scala/de/nowchess/account/resource/ChallengeResourceTest.scala @@ -1,11 +1,11 @@ package de.nowchess.account.resource -import de.nowchess.account.client.{CoreGameClient, CoreGameResponse} +import de.nowchess.account.client.GameCreationStreamClient +import de.nowchess.api.dto.GameCreationResponseDto import io.quarkus.test.InjectMock import io.quarkus.test.junit.QuarkusTest import io.restassured.RestAssured import io.restassured.http.ContentType -import org.eclipse.microprofile.rest.client.inject.RestClient import org.hamcrest.Matchers.* import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.{ArgumentMatchers, Mockito} @@ -14,14 +14,15 @@ import org.mockito.{ArgumentMatchers, Mockito} class ChallengeResourceTest: @InjectMock - @RestClient // scalafix:off DisableSyntax.var - var coreGameClient: CoreGameClient = scala.compiletime.uninitialized + var gameCreationClient: GameCreationStreamClient = scala.compiletime.uninitialized // scalafix:on @BeforeEach def setup(): Unit = - Mockito.when(coreGameClient.createGame(ArgumentMatchers.any())).thenReturn(CoreGameResponse("test-game-id")) + Mockito + .when(gameCreationClient.createGame(ArgumentMatchers.any())) + .thenReturn(GameCreationResponseDto(Some("test-game-id"))) private def givenRequest() = RestAssured.`given`().contentType(ContentType.JSON) diff --git a/modules/api/src/main/scala/de/nowchess/api/dto/GameCreationRequestDto.scala b/modules/api/src/main/scala/de/nowchess/api/dto/GameCreationRequestDto.scala new file mode 100644 index 0000000..efc61c0 --- /dev/null +++ b/modules/api/src/main/scala/de/nowchess/api/dto/GameCreationRequestDto.scala @@ -0,0 +1,10 @@ +package de.nowchess.api.dto + +import de.nowchess.api.game.GameMode + +final case class GameCreationRequestDto( + white: Option[PlayerInfoDto], + black: Option[PlayerInfoDto], + timeControl: Option[TimeControlDto], + mode: Option[GameMode] = None, +) diff --git a/modules/api/src/main/scala/de/nowchess/api/dto/GameCreationResponseDto.scala b/modules/api/src/main/scala/de/nowchess/api/dto/GameCreationResponseDto.scala new file mode 100644 index 0000000..eb52124 --- /dev/null +++ b/modules/api/src/main/scala/de/nowchess/api/dto/GameCreationResponseDto.scala @@ -0,0 +1,6 @@ +package de.nowchess.api.dto + +final case class GameCreationResponseDto( + gameId: Option[String], + error: Option[String] = None, +) diff --git a/modules/core/build.gradle.kts b/modules/core/build.gradle.kts index aeba709..5373aa6 100644 --- a/modules/core/build.gradle.kts +++ b/modules/core/build.gradle.kts @@ -138,6 +138,8 @@ tasks.withType(org.gradle.api.tasks.scala.ScalaCompile::class).configureEach { exclude("**/resource/GameDtoMapper.scala") exclude("**/resource/GameResource.scala") exclude("**/redis/GameRedis*.scala") + exclude("**/redis/GameCreationStreamListener.scala") + exclude("**/service/GameCreationService.scala") } } } diff --git a/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala b/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala index 618daef..b982851 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala @@ -2,6 +2,7 @@ package de.nowchess.chess.config import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square} import de.nowchess.api.dto.* +import de.nowchess.api.event.{EventEnvelope, EventType} import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult} import de.nowchess.api.move.{Move, MoveType, PromotionPiece} import de.nowchess.chess.registry.GameCacheDto @@ -13,6 +14,10 @@ import io.quarkus.runtime.annotations.RegisterForReflection classOf[GameCacheDto], classOf[ClockDto], classOf[CreateGameRequestDto], + classOf[GameCreationRequestDto], + classOf[GameCreationResponseDto], + classOf[EventEnvelope], + classOf[EventType], classOf[ErrorEventDto], classOf[GameWritebackEventDto], classOf[GameFullDto], diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameCreationStreamListener.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameCreationStreamListener.scala new file mode 100644 index 0000000..fea7b7b --- /dev/null +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameCreationStreamListener.scala @@ -0,0 +1,146 @@ +package de.nowchess.chess.redis + +import com.fasterxml.jackson.databind.ObjectMapper +import de.nowchess.api.dto.{GameCreationRequestDto, GameCreationResponseDto} +import de.nowchess.api.event.{EventEnvelope, EventType} +import de.nowchess.chess.config.RedisConfig +import de.nowchess.chess.service.GameCreationService +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs} +import io.quarkus.runtime.StartupEvent +import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.event.Observes +import jakarta.inject.Inject +import org.eclipse.microprofile.config.inject.ConfigProperty +import org.eclipse.microprofile.context.ManagedExecutor +import org.jboss.logging.Logger +import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID + +@ApplicationScoped +class GameCreationStreamListener: + + // scalafix:off DisableSyntax.var + @Inject var redis: RedisDataSource = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var creationService: GameCreationService = uninitialized + @Inject var executor: ManagedExecutor = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @ConfigProperty(name = "nowchess.game-creation-stream.enabled", defaultValue = "true") + private var streamEnabled: Boolean = true + // scalafix:on DisableSyntax.var + + private val log = Logger.getLogger(classOf[GameCreationStreamListener]) + private val groupName = "core-game-creation" + private val consumerId = UUID.randomUUID().toString + private val maxRetries = 3 + private val maxStreamLen = 1000L + + private def requestStream: String = s"${redisConfig.prefix}:game-creation" + private def responseStream: String = s"${redisConfig.prefix}:game-creation-response" + private def dlqStream: String = s"${redisConfig.prefix}:dlq" + + def start(@Observes _ev: StartupEvent): Unit = + if streamEnabled then + createGroupIfAbsent() + executor.submit( + new Runnable: + def run(): Unit = pollLoop(), + ) + log.infof("Game-creation request listener started (consumer=%s)", consumerId) + + private def createGroupIfAbsent(): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(requestStream, groupName, "0", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create game-creation consumer group") + case Success(_) => () + + private def pollLoop(): Unit = + while true do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + groupName, + consumerId, + requestStream, + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach(handleMessage)) + } match + case Failure(ex) => log.warnf(ex, "Error in game-creation poll loop") + case Success(_) => () + + private def handleMessage(msg: StreamMessage[String, String, String]): Unit = + val json = msg.payload().get("data") + val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0) + Try(objectMapper.readValue(json, classOf[EventEnvelope])) match + case Failure(ex) => + log.errorf(ex, "Unparseable game-creation event, sending to DLQ: %s", json) + toDlq(EventType.GameCreationRequest.toString, json, ex, attempt) + ack(msg.id()) + case Success(envelope) => + processEnvelope(msg, envelope, json, attempt) + + private def processEnvelope( + msg: StreamMessage[String, String, String], + envelope: EventEnvelope, + json: String, + attempt: Int, + ): Unit = + Try { + val req = objectMapper.treeToValue(envelope.payload, classOf[GameCreationRequestDto]) + val entry = creationService.createGame(req) + publishResponse(envelope.correlationId, GameCreationResponseDto(Some(entry.gameId))) + } match + case Success(_) => ack(msg.id()) + case Failure(ex) if attempt + 1 < maxRetries => + log.warnf(ex, "Game creation failed (attempt %d), retrying", attempt) + retry(json, attempt + 1) + ack(msg.id()) + case Failure(ex) => + log.errorf(ex, "Game creation failed after %d attempts, sending to DLQ", maxRetries) + publishResponse(envelope.correlationId, GameCreationResponseDto(None, Some("Game creation failed"))) + toDlq(envelope.`type`.toString, json, ex, attempt) + ack(msg.id()) + + private def publishResponse(correlationId: Option[String], resp: GameCreationResponseDto): Unit = + val payload = objectMapper.valueToTree[com.fasterxml.jackson.databind.JsonNode](resp) + val envelope = EventEnvelope.of(EventType.GameCreationResponse, payload, correlationId) + xadd(responseStream, Map("data" -> objectMapper.writeValueAsString(envelope))) + + private def retry(json: String, attempt: Int): Unit = + xadd(requestStream, Map("data" -> json, "attempt" -> attempt.toString)) + + private def toDlq(eventType: String, json: String, error: Throwable, attempt: Int): Unit = + xadd( + dlqStream, + Map( + "data" -> json, + "eventType" -> eventType, + "error" -> Option(error.getMessage).getOrElse(error.getClass.getName), + "attempt" -> (attempt + 1).toString, + ), + ) + + private def ack(id: String): Unit = + Try(redis.stream(classOf[String]).xack(requestStream, groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack message %s", id) + case Success(_) => () + + private def xadd(key: String, fields: Map[String, String]): Unit = + Try( + redis + .stream(classOf[String]) + .xadd(key, new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), fields.asJava), + ) match + case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key) + case Success(_) => () diff --git a/modules/core/src/main/scala/de/nowchess/chess/service/GameCreationService.scala b/modules/core/src/main/scala/de/nowchess/chess/service/GameCreationService.scala new file mode 100644 index 0000000..479fea3 --- /dev/null +++ b/modules/core/src/main/scala/de/nowchess/chess/service/GameCreationService.scala @@ -0,0 +1,71 @@ +package de.nowchess.chess.service + +import de.nowchess.api.dto.{GameCreationRequestDto, PlayerInfoDto, TimeControlDto} +import de.nowchess.api.game.{GameContext, GameMode, TimeControl} +import de.nowchess.api.player.{PlayerId, PlayerInfo} +import de.nowchess.chess.engine.GameEngine +import de.nowchess.chess.grpc.RuleSetGrpcAdapter +import de.nowchess.chess.redis.GameRedisSubscriberManager +import de.nowchess.chess.registry.{GameEntry, GameRegistry} +import jakarta.enterprise.context.ApplicationScoped +import jakarta.inject.Inject +import org.jboss.logging.Logger +import scala.compiletime.uninitialized + +@ApplicationScoped +class GameCreationService: + + private val log = Logger.getLogger(classOf[GameCreationService]) + + // scalafix:off DisableSyntax.var + @Inject var registry: GameRegistry = uninitialized + @Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized + @Inject var subscriberManager: GameRedisSubscriberManager = uninitialized + // scalafix:on DisableSyntax.var + + private val DefaultWhite = PlayerInfo(PlayerId("p1"), "Player 1") + private val DefaultBlack = PlayerInfo(PlayerId("p2"), "Player 2") + + def createGame(req: GameCreationRequestDto): GameEntry = + val white = playerInfoFrom(req.white, DefaultWhite) + val black = playerInfoFrom(req.black, DefaultBlack) + val tc = toTimeControl(req.timeControl) + val mode = req.mode.getOrElse(GameMode.Open) + val entry = newEntry(GameContext.initial, white, black, tc, mode) + registry.store(entry) + subscriberManager.subscribeGame(entry.gameId) + log.infof( + "Game %s created — white=%s black=%s mode=%s", + entry.gameId, + white.displayName, + black.displayName, + mode.toString, + ) + entry + + private def playerInfoFrom(dto: Option[PlayerInfoDto], default: PlayerInfo): PlayerInfo = + dto.fold(default)(d => PlayerInfo(PlayerId(d.id), d.displayName)) + + private def toTimeControl(dto: Option[TimeControlDto]): TimeControl = + dto match + case None => TimeControl.Unlimited + case Some(tc) => + tc.daysPerMove match + case Some(d) => TimeControl.Correspondence(d) + case None => + tc.limitSeconds.fold(TimeControl.Unlimited)(l => TimeControl.Clock(l, tc.incrementSeconds.getOrElse(0))) + + private def newEntry( + ctx: GameContext, + white: PlayerInfo, + black: PlayerInfo, + tc: TimeControl, + mode: GameMode, + ): GameEntry = + GameEntry( + registry.generateId(), + GameEngine(initialContext = ctx, ruleSet = ruleSetAdapter, timeControl = tc), + white, + black, + mode = mode, + ) diff --git a/modules/core/src/test/resources/application.yml b/modules/core/src/test/resources/application.yml index af2dd01..e182fce 100644 --- a/modules/core/src/test/resources/application.yml +++ b/modules/core/src/test/resources/application.yml @@ -18,6 +18,8 @@ nowchess: enabled: false coordinator: enabled: false + game-creation-stream: + enabled: false redis: host: localhost port: 6379 diff --git a/modules/core/src/test/scala/de/nowchess/chess/service/GameCreationServiceTest.scala b/modules/core/src/test/scala/de/nowchess/chess/service/GameCreationServiceTest.scala new file mode 100644 index 0000000..2fbd6e2 --- /dev/null +++ b/modules/core/src/test/scala/de/nowchess/chess/service/GameCreationServiceTest.scala @@ -0,0 +1,68 @@ +package de.nowchess.chess.service + +import de.nowchess.api.dto.{GameCreationRequestDto, PlayerInfoDto, TimeControlDto} +import de.nowchess.api.game.{GameMode, TimeControl} +import de.nowchess.api.player.PlayerType +import de.nowchess.chess.client.CombinedExportResponse +import de.nowchess.chess.grpc.IoGrpcClientWrapper +import de.nowchess.chess.redis.GameRedisSubscriberManager +import io.quarkus.test.InjectMock +import io.quarkus.test.junit.QuarkusTest +import jakarta.inject.Inject +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.{BeforeEach, DisplayName, Test} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{verify, when} +import scala.compiletime.uninitialized + +// scalafix:off +@QuarkusTest +@DisplayName("GameCreationService") +class GameCreationServiceTest: + + @Inject + var service: GameCreationService = uninitialized + + @InjectMock + var subscriberManager: GameRedisSubscriberManager = uninitialized + + @InjectMock + var ioWrapper: IoGrpcClientWrapper = uninitialized + + @BeforeEach + def setup(): Unit = + when(ioWrapper.exportCombined(any())) + .thenReturn(CombinedExportResponse("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", "")) + + private def player(id: String, name: String): PlayerInfoDto = + PlayerInfoDto(id, name, PlayerType.Human) + + @Test + def createsGameAndSubscribes(): Unit = + val req = + GameCreationRequestDto(Some(player("w", "White")), Some(player("b", "Black")), None, Some(GameMode.Authenticated)) + val entry = service.createGame(req) + assertNotNull(entry.gameId) + assertEquals("White", entry.white.displayName) + assertEquals("Black", entry.black.displayName) + assertEquals(GameMode.Authenticated, entry.mode) + verify(subscriberManager).subscribeGame(entry.gameId) + + @Test + def defaultsToOpenModeAndDefaultPlayers(): Unit = + val entry = service.createGame(GameCreationRequestDto(None, None, None, None)) + assertEquals(GameMode.Open, entry.mode) + assertEquals("Player 1", entry.white.displayName) + assertEquals("Player 2", entry.black.displayName) + + @Test + def mapsClockTimeControl(): Unit = + val tc = TimeControlDto(Some(300), Some(5), None) + val entry = service.createGame(GameCreationRequestDto(None, None, Some(tc), None)) + assertEquals(TimeControl.Clock(300, 5), entry.engine.timeControl) + + @Test + def mapsCorrespondenceTimeControl(): Unit = + val tc = TimeControlDto(None, None, Some(3)) + val entry = service.createGame(GameCreationRequestDto(None, None, Some(tc), None)) + assertEquals(TimeControl.Correspondence(3), entry.engine.timeControl) diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala index 958f568..d838b85 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala @@ -1,6 +1,7 @@ package de.nowchess.bot.service import com.fasterxml.jackson.databind.ObjectMapper +import de.nowchess.api.event.EventEnvelope import de.nowchess.api.move.{Move, MoveType, PromotionPiece} import de.nowchess.bot.BotController import de.nowchess.bot.BotDifficulty @@ -9,14 +10,20 @@ import de.nowchess.bot.config.RedisConfig import de.nowchess.io.fen.FenParser import io.micrometer.core.instrument.MeterRegistry import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs} import io.quarkus.runtime.StartupEvent import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.event.Observes import jakarta.inject.Inject +import org.eclipse.microprofile.context.ManagedExecutor import org.eclipse.microprofile.rest.client.inject.RestClient import org.jboss.logging.Logger import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID import java.util.function.Consumer import java.util.concurrent.TimeUnit @@ -31,6 +38,7 @@ class OfficialBotService: @Inject var objectMapper: ObjectMapper = uninitialized @Inject var botController: BotController = uninitialized @Inject var meterRegistry: MeterRegistry = uninitialized + @Inject var executor: ManagedExecutor = uninitialized @Inject @RestClient @@ -40,6 +48,14 @@ class OfficialBotService: private val terminalStatuses = Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw") + private val groupName = "official-bot" + private val consumerId = UUID.randomUUID().toString + private val maxRetries = 3 + private val maxStreamLen = 1000L + + private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream" + private def dlqStream: String = s"${redisConfig.prefix}:dlq" + @PostConstruct def initializeMetrics(): Unit = BotController.listBots.foreach { bot => @@ -54,20 +70,92 @@ class OfficialBotService: bots.foreach(subscribeToEventChannel) private def subscribeToEventChannel(botName: String): Unit = - val handler: Consumer[String] = msg => handleBotEvent(botName, msg) - redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:bot:$botName:events", handler) - () + createGroupIfAbsent(botName) + executor.submit( + new Runnable: + def run(): Unit = pollLoop(botName), + ) + log.infof("Listening to bot event stream for %s (consumer=%s)", botName, consumerId) - private def handleBotEvent(botName: String, msg: String): Unit = - try - val node = objectMapper.readTree(msg) - if node.path("type").asText() == "gameStart" then - val gameId = node.path("gameId").asText() - val playingAs = node.path("playingAs").asText() - val difficulty = node.path("difficulty").asInt(1400) - val botAccountId = node.path("botAccountId").asText() - watchGame(botName, gameId, playingAs, difficulty, botAccountId) - catch case _: Exception => () + private def createGroupIfAbsent(botName: String): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(eventStream(botName), groupName, "0", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create bot event consumer group for %s", botName) + case Success(_) => () + + private def pollLoop(botName: String): Unit = + while true do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + groupName, + consumerId, + eventStream(botName), + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach(msg => handleStreamMessage(botName, msg))) + } match + case Failure(ex) => log.warnf(ex, "Error in bot event poll loop for %s", botName) + case Success(_) => () + + private def handleStreamMessage(botName: String, msg: StreamMessage[String, String, String]): Unit = + val json = msg.payload().get("data") + val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0) + Try { + val envelope = objectMapper.readValue(json, classOf[EventEnvelope]) + handleBotEvent(botName, envelope) + } match + case Success(_) => ack(botName, msg.id()) + case Failure(ex) if attempt + 1 < maxRetries => + log.warnf(ex, "Bot event handling failed for %s (attempt %d), retrying", botName, attempt) + retry(botName, json, attempt + 1) + ack(botName, msg.id()) + case Failure(ex) => + log.errorf(ex, "Bot event handling failed for %s after %d attempts, sending to DLQ", botName, maxRetries) + toDlq(json, ex, attempt) + ack(botName, msg.id()) + + private def handleBotEvent(botName: String, envelope: EventEnvelope): Unit = + val payload = envelope.payload + val gameId = payload.path("gameId").asText() + val playingAs = payload.path("playingAs").asText() + val difficulty = payload.path("difficulty").asInt(1400) + val botAccountId = payload.path("botAccountId").asText() + watchGame(botName, gameId, playingAs, difficulty, botAccountId) + + private def ack(botName: String, id: String): Unit = + Try(redis.stream(classOf[String]).xack(eventStream(botName), groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack bot event %s", id) + case Success(_) => () + + private def retry(botName: String, json: String, attempt: Int): Unit = + xadd(eventStream(botName), Map("data" -> json, "attempt" -> attempt.toString)) + + private def toDlq(json: String, error: Throwable, attempt: Int): Unit = + xadd( + dlqStream, + Map( + "data" -> json, + "eventType" -> "BotGameStart", + "error" -> Option(error.getMessage).getOrElse(error.getClass.getName), + "attempt" -> (attempt + 1).toString, + ), + ) + + private def xadd(key: String, fields: Map[String, String]): Unit = + Try( + redis + .stream(classOf[String]) + .xadd(key, new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), fields.asJava), + ) match + case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key) + case Success(_) => () private def watchGame( botName: String,