feat(events): migrate game-creation and bot flows to Redis Streams
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
Replace synchronous account→core game-creation HTTP call and plain pub/sub bot game-start events with Redis Streams using consumer groups, XACK, retry, and a Dead Letter Queue for at-least-once delivery and observability. - account: GameCreationStreamClient publishes game-creation requests and correlates responses via a per-instance consumer group (NCS-91) - core: GameCreationStreamListener consumes requests, calls GameCreationService, publishes response events, retries, and routes exhausted/unparseable events to the DLQ (NCS-91, NCS-93, NCS-94) - official-bots: bot game-start events migrated from pub/sub to Streams with consumer group, XACK, retry, and DLQ (NCS-92) - account EventPublisher dual-writes to the stream and legacy pub/sub channel for backward compatibility - all flows use the typed EventEnvelope (eventId/type/payload/timestamp/ correlationId) with DLQ error context (eventType, error, attempt) - register new DTOs and EventEnvelope/EventType for native reflection Closes NCS-91, NCS-92, NCS-93, NCS-94 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+131
@@ -0,0 +1,131 @@
|
||||
package de.nowchess.account.client
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.account.config.RedisConfig
|
||||
import de.nowchess.api.dto.{GameCreationRequestDto, GameCreationResponseDto, PlayerInfoDto, TimeControlDto}
|
||||
import de.nowchess.api.game.GameMode
|
||||
import de.nowchess.api.player.PlayerType
|
||||
import de.nowchess.api.event.{EventEnvelope, EventType}
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs}
|
||||
import io.quarkus.runtime.Startup
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import org.eclipse.microprofile.context.ManagedExecutor
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import java.time.Duration
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit}
|
||||
|
||||
@Startup
|
||||
@ApplicationScoped
|
||||
class GameCreationStreamClient:
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var redis: RedisDataSource = uninitialized
|
||||
@Inject var redisConfig: RedisConfig = uninitialized
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
@Inject var executor: ManagedExecutor = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[GameCreationStreamClient])
|
||||
private val instanceId = UUID.randomUUID().toString
|
||||
private val groupName = s"account-game-creation-$instanceId"
|
||||
private val consumerId = instanceId
|
||||
private val maxStreamLen = 1000L
|
||||
private val timeout = Duration.ofSeconds(10)
|
||||
|
||||
private val pending = new ConcurrentHashMap[String, CompletableFuture[GameCreationResponseDto]]()
|
||||
|
||||
private def requestStream: String = s"${redisConfig.prefix}:game-creation"
|
||||
private def responseStream: String = s"${redisConfig.prefix}:game-creation-response"
|
||||
|
||||
@PostConstruct
|
||||
def start(): Unit =
|
||||
createGroupIfAbsent()
|
||||
executor.submit(
|
||||
new Runnable:
|
||||
def run(): Unit = pollLoop(),
|
||||
)
|
||||
log.infof("Game-creation response listener started (consumer=%s)", consumerId)
|
||||
|
||||
def createGame(req: CoreCreateGameRequest): GameCreationResponseDto =
|
||||
val correlationId = UUID.randomUUID().toString
|
||||
val future = new CompletableFuture[GameCreationResponseDto]()
|
||||
pending.put(correlationId, future)
|
||||
Try {
|
||||
val payload = objectMapper.valueToTree[com.fasterxml.jackson.databind.JsonNode](toDto(req))
|
||||
val envelope = EventEnvelope.of(EventType.GameCreationRequest, payload, Some(correlationId))
|
||||
publish(requestStream, envelope)
|
||||
future.get(timeout.toMillis, TimeUnit.MILLISECONDS)
|
||||
} match
|
||||
case Success(resp) =>
|
||||
pending.remove(correlationId)
|
||||
resp
|
||||
case Failure(ex) =>
|
||||
pending.remove(correlationId)
|
||||
log.errorf(ex, "Game creation request %s failed", correlationId)
|
||||
GameCreationResponseDto(None, Some("Game creation request timed out or failed"))
|
||||
|
||||
private def toDto(req: CoreCreateGameRequest): GameCreationRequestDto =
|
||||
GameCreationRequestDto(
|
||||
white = req.white.map(p => PlayerInfoDto(p.id, p.displayName, PlayerType.Human)),
|
||||
black = req.black.map(p => PlayerInfoDto(p.id, p.displayName, PlayerType.Human)),
|
||||
timeControl = req.timeControl.map(t => TimeControlDto(t.limitSeconds, t.incrementSeconds, t.daysPerMove)),
|
||||
mode = req.mode.map(_ => GameMode.Authenticated),
|
||||
)
|
||||
|
||||
private def createGroupIfAbsent(): Unit =
|
||||
Try(
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xgroupCreate(responseStream, groupName, "0", new XGroupCreateArgs().mkstream()),
|
||||
) match
|
||||
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||
case Failure(ex) => log.warnf(ex, "Failed to create response consumer group")
|
||||
case Success(_) => ()
|
||||
|
||||
private def pollLoop(): Unit =
|
||||
while true do
|
||||
Try {
|
||||
val messages = redis
|
||||
.stream(classOf[String])
|
||||
.xreadgroup(
|
||||
groupName,
|
||||
consumerId,
|
||||
responseStream,
|
||||
">",
|
||||
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
|
||||
)
|
||||
Option(messages).foreach(_.forEach(handleResponse))
|
||||
} match
|
||||
case Failure(ex) => log.warnf(ex, "Error in game-creation response poll loop")
|
||||
case Success(_) => ()
|
||||
|
||||
private def handleResponse(msg: StreamMessage[String, String, String]): Unit =
|
||||
val json = msg.payload().get("data")
|
||||
Try(objectMapper.readValue(json, classOf[EventEnvelope])) match
|
||||
case Success(envelope) =>
|
||||
envelope.correlationId.flatMap(id => Option(pending.remove(id))).foreach { future =>
|
||||
Try(objectMapper.treeToValue(envelope.payload, classOf[GameCreationResponseDto])) match
|
||||
case Success(resp) => future.complete(resp)
|
||||
case Failure(ex) => future.completeExceptionally(ex)
|
||||
}
|
||||
case Failure(ex) => log.warnf(ex, "Unparseable game-creation response: %s", json)
|
||||
ack(msg.id())
|
||||
|
||||
private def ack(id: String): Unit =
|
||||
Try(redis.stream(classOf[String]).xack(responseStream, groupName, id)) match
|
||||
case Failure(ex) => log.warnf(ex, "Failed to ack response %s", id)
|
||||
case Success(_) => ()
|
||||
|
||||
private def publish(key: String, envelope: EventEnvelope): Unit =
|
||||
val json = objectMapper.writeValueAsString(envelope)
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xadd(key, new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), Map("data" -> json).asJava)
|
||||
()
|
||||
+10
@@ -12,6 +12,12 @@ import de.nowchess.account.domain.{
|
||||
UserAccount,
|
||||
}
|
||||
import de.nowchess.account.dto.*
|
||||
import de.nowchess.api.dto.{
|
||||
GameCreationRequestDto,
|
||||
GameCreationResponseDto,
|
||||
PlayerInfoDto as ApiPlayerInfoDto,
|
||||
TimeControlDto as ApiTimeControlDto,
|
||||
}
|
||||
import de.nowchess.api.event.{EventEnvelope, EventType}
|
||||
import io.quarkus.runtime.annotations.RegisterForReflection
|
||||
|
||||
@@ -49,6 +55,10 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
||||
classOf[CoreCreateGameRequest],
|
||||
classOf[CoreGameResponse],
|
||||
classOf[OfficialChallengeResponse],
|
||||
classOf[GameCreationRequestDto],
|
||||
classOf[GameCreationResponseDto],
|
||||
classOf[ApiPlayerInfoDto],
|
||||
classOf[ApiTimeControlDto],
|
||||
),
|
||||
)
|
||||
class NativeReflectionConfig
|
||||
|
||||
+3
-5
@@ -1,6 +1,6 @@
|
||||
package de.nowchess.account.resource
|
||||
|
||||
import de.nowchess.account.client.{CoreCreateGameRequest, CoreGameClient, CorePlayerInfo}
|
||||
import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, GameCreationStreamClient}
|
||||
import de.nowchess.account.dto.{ErrorDto, OfficialChallengeResponse}
|
||||
import de.nowchess.account.service.{AccountService, EventPublisher}
|
||||
import jakarta.annotation.security.RolesAllowed
|
||||
@@ -9,7 +9,6 @@ import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.*
|
||||
import jakarta.ws.rs.core.{MediaType, Response}
|
||||
import org.eclipse.microprofile.jwt.JsonWebToken
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@@ -29,8 +28,7 @@ class OfficialChallengeResource:
|
||||
@Inject var botEventPublisher: EventPublisher = uninitialized
|
||||
|
||||
@Inject
|
||||
@RestClient
|
||||
var coreGameClient: CoreGameClient = uninitialized
|
||||
var gameCreationClient: GameCreationStreamClient = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
private val log = Logger.getLogger(classOf[OfficialChallengeResource])
|
||||
@@ -72,7 +70,7 @@ class OfficialChallengeResource:
|
||||
(CorePlayerInfo(bot.id.toString, bot.name), CorePlayerInfo(user.id.toString, user.username), "white")
|
||||
val req = CoreCreateGameRequest(Some(white), Some(black), None, Some("Authenticated"))
|
||||
val gameId =
|
||||
try Right(coreGameClient.createGame(req).gameId)
|
||||
try gameCreationClient.createGame(req).gameId.toRight("Failed to create game")
|
||||
catch case _ => Left("Failed to create game")
|
||||
gameId match
|
||||
case Left(err) =>
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
package de.nowchess.account.service
|
||||
|
||||
import de.nowchess.account.client.{
|
||||
CoreCreateGameRequest,
|
||||
CoreGameClient,
|
||||
CoreGameResponse,
|
||||
CorePlayerInfo,
|
||||
CoreTimeControl,
|
||||
}
|
||||
import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, CoreTimeControl, GameCreationStreamClient}
|
||||
import de.nowchess.account.domain.{Challenge, ChallengeColor, ChallengeStatus, DeclineReason}
|
||||
import de.nowchess.account.dto.{
|
||||
ChallengeDto,
|
||||
@@ -23,7 +17,6 @@ import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.transaction.Transactional
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@@ -45,8 +38,7 @@ class ChallengeService:
|
||||
var challengeRepository: ChallengeRepository = uninitialized
|
||||
|
||||
@Inject
|
||||
@RestClient
|
||||
var coreGameClient: CoreGameClient = uninitialized
|
||||
var gameCreationClient: GameCreationStreamClient = uninitialized
|
||||
|
||||
@Inject
|
||||
var eventPublisher: EventPublisher = uninitialized
|
||||
@@ -187,7 +179,7 @@ class ChallengeService:
|
||||
val (white, black) = assignColors(challenge)
|
||||
val tc = buildTimeControl(challenge)
|
||||
val req = CoreCreateGameRequest(Some(white), Some(black), tc, Some("Authenticated"))
|
||||
Right(coreGameClient.createGame(req).gameId)
|
||||
gameCreationClient.createGame(req).gameId.toRight(ChallengeError.GameCreationFailed)
|
||||
catch case _ => Left(ChallengeError.GameCreationFailed)
|
||||
|
||||
private def assignColors(challenge: Challenge): (CorePlayerInfo, CorePlayerInfo) =
|
||||
|
||||
@@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.account.config.RedisConfig
|
||||
import de.nowchess.api.event.{EventEnvelope, EventType}
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.stream.XAddArgs
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
|
||||
@ApplicationScoped
|
||||
class EventPublisher:
|
||||
@@ -17,13 +19,25 @@ class EventPublisher:
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val maxStreamLen = 1000L
|
||||
|
||||
def publishGameStart(botId: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit =
|
||||
val payload = objectMapper.createObjectNode()
|
||||
payload.put("gameId", gameId)
|
||||
payload.put("playingAs", playingAs)
|
||||
payload.put("difficulty", difficulty)
|
||||
payload.put("botAccountId", botAccountId)
|
||||
publish(s"${redisConfig.prefix}:bot:$botId:events", EventType.GameStart, payload)
|
||||
val envelope = EventEnvelope.of(EventType.BotGameStart, payload)
|
||||
val json = objectMapper.writeValueAsString(envelope)
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xadd(
|
||||
s"${redisConfig.prefix}:bot:$botId:events:stream",
|
||||
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||
Map("data" -> json).asJava,
|
||||
)
|
||||
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json)
|
||||
()
|
||||
|
||||
def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit =
|
||||
val payload = objectMapper.createObjectNode()
|
||||
|
||||
+6
-5
@@ -1,11 +1,11 @@
|
||||
package de.nowchess.account.resource
|
||||
|
||||
import de.nowchess.account.client.{CoreGameClient, CoreGameResponse}
|
||||
import de.nowchess.account.client.GameCreationStreamClient
|
||||
import de.nowchess.api.dto.GameCreationResponseDto
|
||||
import io.quarkus.test.InjectMock
|
||||
import io.quarkus.test.junit.QuarkusTest
|
||||
import io.restassured.RestAssured
|
||||
import io.restassured.http.ContentType
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient
|
||||
import org.hamcrest.Matchers.*
|
||||
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||
import org.mockito.{ArgumentMatchers, Mockito}
|
||||
@@ -14,14 +14,15 @@ import org.mockito.{ArgumentMatchers, Mockito}
|
||||
class ChallengeResourceTest:
|
||||
|
||||
@InjectMock
|
||||
@RestClient
|
||||
// scalafix:off DisableSyntax.var
|
||||
var coreGameClient: CoreGameClient = scala.compiletime.uninitialized
|
||||
var gameCreationClient: GameCreationStreamClient = scala.compiletime.uninitialized
|
||||
// scalafix:on
|
||||
|
||||
@BeforeEach
|
||||
def setup(): Unit =
|
||||
Mockito.when(coreGameClient.createGame(ArgumentMatchers.any())).thenReturn(CoreGameResponse("test-game-id"))
|
||||
Mockito
|
||||
.when(gameCreationClient.createGame(ArgumentMatchers.any()))
|
||||
.thenReturn(GameCreationResponseDto(Some("test-game-id")))
|
||||
|
||||
private def givenRequest() = RestAssured.`given`().contentType(ContentType.JSON)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user