Compare commits

..

6 Commits

Author SHA1 Message Date
shosho996 145f467648 feat: NCS-121 pipeline for tournament (#68)
Build & Test (NowChessSystems) TeamCity build finished
Image for Tournment

---------

Co-authored-by: Lala, Shahd <Shahd.Lala@sybit.de>
Reviewed-on: #68
2026-06-09 23:50:51 +02:00
Janis db9d153391 feat(official-bots): consume GameOver stream for bot cleanup (#67)
Build & Test (NowChessSystems) TeamCity build finished
Add consumer group official-bots-game-over on {prefix}:game-over stream.
Track pub/sub subscribers per gameId in gameWatches map. On GameOver event,
unsubscribe from the game s2c channel and remove from watch map.
XACK after cleanup; DLQ after maxRetries failures.

Closes NCS-103
https://knockoutwhist.youtrack.cloud/issue/NCS-103

Reviewed-on: #67
2026-06-09 21:49:42 +02:00
Janis 55f102cbaa feat(ws): migrate challenge notifications to Redis Streams (#66)
Replace pub/sub publish in EventPublisher with XADD to user event stream.
UserWebSocketResource subscribes via XREADGROUP consumer group (per-connection
group, '$' offset). DLQ after maxRetries=3 on delivery failure. Poll loop
uses connection identity to prevent thread leak on reconnect.

Closes NCS-104
https://knockoutwhist.youtrack.cloud/issue/NCS-104

Reviewed-on: #66
2026-06-09 21:49:21 +02:00
Janis d66b6fa471 chore(account): remove dead CoreGameClient REST trait (#65)
Move CoreCreateGameRequest, CorePlayerInfo, CoreTimeControl to CoreGameDtos.
Delete CoreGameClient trait (replaced by GameCreationStreamClient) and
CoreGameResponse (unused after stream migration). Remove from reflection config.

Closes NCS-105
https://knockoutwhist.youtrack.cloud/issue/NCS-105

Reviewed-on: #65
2026-06-09 21:49:05 +02:00
Janis 676e4110c0 feat(core): publish GameOver event to Redis Streams (#64)
Add GameOver to EventType enum and GameOverPayload DTO.
GameRedisPublisher publishes to {prefix}:game-over stream (MAXLEN ~1000)
on game completion. NativeReflectionConfig updated for core module.

Closes NCS-102
https://knockoutwhist.youtrack.cloud/issue/NCS-102

Reviewed-on: #64
2026-06-09 21:48:41 +02:00
Janis 0ad2e10999 feat(bot-platform): migrate BotRegistry to Redis Streams consumer group (#63)
Replace pub/sub subscribe with XREADGROUP on bot game-start stream.
Remove dual-write from EventPublisher.publishGameStart.
Consumer group: bot-platform-consumer, XACK after forwarding.
Poll loop uses emitter identity to prevent thread leak on re-registration.
Group created with '$' offset — no historical replay on first connect.

Closes NCS-101
https://knockoutwhist.youtrack.cloud/issue/NCS-101

Reviewed-on: #63
2026-06-09 21:48:21 +02:00
23 changed files with 706 additions and 89 deletions
+1
View File
@@ -56,6 +56,7 @@ jobs:
- official-bots
- rule
- store
- tournament
- ws
arch:
- name: default
+4
View File
@@ -54,3 +54,7 @@ modules/account/src/main/resources/keys/dev-private.pem
modules/account/src/main/resources/keys/dev-public.pem
modules/core/src/main/resources/keys/dev-public.pem
*.hprof
### Embedded repos (not submodules) ###
GitOps/
frontend/
@@ -1,28 +0,0 @@
package de.nowchess.account.client
import de.nowchess.security.{InternalClientHeadersFactory, InternalSecretClientFilter}
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import org.eclipse.microprofile.rest.client.annotation.{RegisterClientHeaders, RegisterProvider}
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient
case class CorePlayerInfo(id: String, displayName: String)
case class CoreTimeControl(limitSeconds: Option[Int], incrementSeconds: Option[Int], daysPerMove: Option[Int])
case class CoreCreateGameRequest(
white: Option[CorePlayerInfo],
black: Option[CorePlayerInfo],
timeControl: Option[CoreTimeControl],
mode: Option[String],
)
case class CoreGameResponse(gameId: String)
@Path("/api/board/game")
@RegisterRestClient(configKey = "core-service")
@RegisterProvider(classOf[InternalSecretClientFilter])
@RegisterClientHeaders(classOf[InternalClientHeadersFactory])
trait CoreGameClient:
@POST
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
def createGame(req: CoreCreateGameRequest): CoreGameResponse
@@ -0,0 +1,10 @@
package de.nowchess.account.client
case class CorePlayerInfo(id: String, displayName: String)
case class CoreTimeControl(limitSeconds: Option[Int], incrementSeconds: Option[Int], daysPerMove: Option[Int])
case class CoreCreateGameRequest(
white: Option[CorePlayerInfo],
black: Option[CorePlayerInfo],
timeControl: Option[CoreTimeControl],
mode: Option[String],
)
@@ -1,6 +1,6 @@
package de.nowchess.account.config
import de.nowchess.account.client.{CoreCreateGameRequest, CoreGameResponse, CorePlayerInfo, CoreTimeControl}
import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, CoreTimeControl}
import de.nowchess.account.domain.{
BotAccount,
Challenge,
@@ -53,7 +53,6 @@ import io.quarkus.runtime.annotations.RegisterForReflection
classOf[CorePlayerInfo],
classOf[CoreTimeControl],
classOf[CoreCreateGameRequest],
classOf[CoreGameResponse],
classOf[OfficialChallengeResponse],
classOf[GameCreationRequestDto],
classOf[GameCreationResponseDto],
@@ -36,26 +36,32 @@ class EventPublisher:
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json).asJava,
)
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json)
()
def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit =
val payload = objectMapper.createObjectNode()
payload.put("challengeId", challengeId)
payload.put("challengerName", challengerName)
publish(s"${redisConfig.prefix}:user:$destUserId:events", EventType.ChallengeCreated, payload)
publishToUserStream(destUserId, EventType.ChallengeCreated, payload)
def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit =
val payload = objectMapper.createObjectNode()
payload.put("challengeId", challengeId)
payload.put("gameId", gameId)
publish(s"${redisConfig.prefix}:user:$challengerId:events", EventType.ChallengeAccepted, payload)
publishToUserStream(challengerId, EventType.ChallengeAccepted, payload)
private def publish(
channel: String,
private def publishToUserStream(
userId: String,
eventType: EventType,
payload: com.fasterxml.jackson.databind.node.ObjectNode,
): Unit =
val envelope = EventEnvelope.of(eventType, payload)
redis.pubsub(classOf[String]).publish(channel, objectMapper.writeValueAsString(envelope))
val json = objectMapper.writeValueAsString(envelope)
redis
.stream(classOf[String])
.xadd(
s"${redisConfig.prefix}:user:$userId:events:stream",
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json).asJava,
)
()
@@ -0,0 +1,56 @@
package de.nowchess.account.service
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import de.nowchess.account.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.*
import org.mockito.Mockito.*
import scala.compiletime.uninitialized
class EventPublisherTest:
// scalafix:off DisableSyntax.var
private var redis: RedisDataSource = uninitialized
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
private var redisConfig: RedisConfig = uninitialized
// scalafix:on DisableSyntax.var
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
@BeforeEach
def setup(): Unit =
redis = mock(classOf[RedisDataSource])
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
redisConfig = mock(classOf[RedisConfig])
when(redis.stream(classOf[String])).thenReturn(streamCmds)
when(redisConfig.prefix).thenReturn("nowchess")
private def publisher: EventPublisher =
val p = new EventPublisher
p.redis = redis
p.redisConfig = redisConfig
p.objectMapper = objectMapper
p
@Test
def publishChallengeCreatedWritesToUserStream(): Unit =
publisher.publishChallengeCreated("user1", "ch1", "Alice")
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:user:user1:events:stream"),
any(classOf[XAddArgs]),
any(),
)
verify(redis, never()).pubsub(any(classOf[Class[?]]))
@Test
def publishChallengeAcceptedWritesToUserStream(): Unit =
publisher.publishChallengeAccepted("user2", "ch1", "game42")
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:user:user2:events:stream"),
any(classOf[XAddArgs]),
any(),
)
verify(redis, never()).pubsub(any(classOf[Class[?]]))
@@ -1,4 +1,4 @@
package de.nowchess.api.event
enum EventType:
case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted
case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted, GameOver
@@ -0,0 +1,9 @@
package de.nowchess.api.event
final case class GameOverPayload(
gameId: String,
result: String,
terminationReason: String,
whiteId: String,
blackId: String,
)
+1
View File
@@ -73,6 +73,7 @@ dependencies {
testImplementation(platform("org.junit:junit-bom:5.13.4"))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("io.quarkus:quarkus-junit")
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.rest-assured:rest-assured")
testImplementation("io.quarkus:quarkus-test-security")
@@ -2,14 +2,18 @@ package de.nowchess.botplatform.registry
import de.nowchess.botplatform.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import io.quarkus.redis.datasource.stream.{XGroupCreateArgs, XReadGroupArgs}
import io.smallrye.mutiny.subscription.MultiEmitter
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.eclipse.microprofile.context.ManagedExecutor
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success, Try}
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
@ApplicationScoped
class BotRegistry:
@@ -17,31 +21,68 @@ class BotRegistry:
private val log = Logger.getLogger(classOf[BotRegistry])
// scalafix:off DisableSyntax.var
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var executor: ManagedExecutor = uninitialized
// scalafix:on DisableSyntax.var
private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]()
private val groupName = "bot-platform-consumer"
private val consumerId = UUID.randomUUID().toString
private val emitters = ConcurrentHashMap[String, MultiEmitter[? >: String]]()
def register(botId: String, emitter: MultiEmitter[? >: String]): Unit =
val channel = s"${redisConfig.prefix}:bot:$botId:events"
val handler: Consumer[String] = msg => emitter.emit(msg)
val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler)
connections.put(botId, (emitter, subscriber))
log.infof("Bot %s registered", botId)
createGroupIfAbsent(botId)
emitters.put(botId, emitter)
executor.submit(
new Runnable:
def run(): Unit = pollLoop(botId, emitter),
)
log.infof("Bot %s registered on stream consumer group", botId)
()
def unregister(botId: String): Unit =
Option(connections.remove(botId)).foreach { (_, subscriber) =>
subscriber.unsubscribe(s"${redisConfig.prefix}:bot:$botId:events")
}
emitters.remove(botId)
log.infof("Bot %s unregistered", botId)
def dispatch(botId: String, event: String): Unit =
log.debugf("Dispatching event to bot %s", botId)
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event)
()
def registeredBots: List[String] =
import scala.jdk.CollectionConverters.*
connections.keys().asScala.toList
emitters.keys().asScala.toList
private def streamKey(botId: String): String =
s"${redisConfig.prefix}:bot:$botId:events:stream"
private def createGroupIfAbsent(botId: String): Unit =
Try(
redis
.stream(classOf[String])
.xgroupCreate(streamKey(botId), groupName, "$", new XGroupCreateArgs().mkstream()),
) match
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for bot %s", botId)
case Success(_) => ()
private def pollLoop(botId: String, myEmitter: MultiEmitter[? >: String]): Unit =
while emitters.get(botId) eq myEmitter do
Try {
val messages = redis
.stream(classOf[String])
.xreadgroup(
groupName,
consumerId,
streamKey(botId),
">",
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
)
Option(messages).foreach(_.forEach { msg =>
if emitters.get(botId) eq myEmitter then
myEmitter.emit(msg.payload().get("data"))
ack(botId, msg.id())
})
} match
case Failure(ex) => log.warnf(ex, "Error in poll loop for bot %s", botId)
case Success(_) => ()
private def ack(botId: String, id: String): Unit =
Try(redis.stream(classOf[String]).xack(streamKey(botId), groupName, id)) match
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for bot %s", id, botId)
case Success(_) => ()
@@ -0,0 +1,83 @@
package de.nowchess.botplatform.registry
import de.nowchess.botplatform.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.{StreamCommands, XGroupCreateArgs}
import io.smallrye.mutiny.subscription.MultiEmitter
import org.eclipse.microprofile.context.ManagedExecutor
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.*
import org.mockito.Mockito.*
class BotRegistryTest:
// scalafix:off DisableSyntax.var
private var registry: BotRegistry = scala.compiletime.uninitialized
private var redis: RedisDataSource = scala.compiletime.uninitialized
private var streamCmds: StreamCommands[String, String, Nothing] =
scala.compiletime.uninitialized
private var redisConfig: RedisConfig = scala.compiletime.uninitialized
private var executor: ManagedExecutor = scala.compiletime.uninitialized
// scalafix:on DisableSyntax.var
@BeforeEach
def setup(): Unit =
redis = mock(classOf[RedisDataSource])
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
redisConfig = mock(classOf[RedisConfig])
executor = mock(classOf[ManagedExecutor])
when(redis.stream(classOf[String])).thenReturn(streamCmds)
when(redisConfig.prefix).thenReturn("nowchess")
registry = new BotRegistry
registry.redis = redis
registry.redisConfig = redisConfig
registry.executor = executor
@Test
def registerStartsPollThread(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("bot1", emitter)
verify(executor).submit(any(classOf[Runnable]))
@Test
def registerCreatesConsumerGroupWithMkstream(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("bot1", emitter)
verify(streamCmds)
.xgroupCreate(
org.mockito.ArgumentMatchers.eq("nowchess:bot:bot1:events:stream"),
org.mockito.ArgumentMatchers.eq("bot-platform-consumer"),
org.mockito.ArgumentMatchers.eq("$"),
any(classOf[XGroupCreateArgs]),
)
@Test
def registerTracksBot(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("bot42", emitter)
assertTrue(registry.registeredBots.contains("bot42"))
@Test
def unregisterRemovesBot(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("botX", emitter)
registry.unregister("botX")
assertFalse(registry.registeredBots.contains("botX"))
@Test
def busyGroupExceptionIsIgnoredOnRegister(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
when(streamCmds.xgroupCreate(any(), any(), any(), any()))
.thenThrow(new RuntimeException("BUSYGROUP Consumer Group name already exists"))
val exec: Executable = () => registry.register("botBusy", emitter)
assertDoesNotThrow(exec)
@Test
def registerDoesNotInteractWithPubSub(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("botNoPubSub", emitter)
verify(redis, never()).pubsub(any(classOf[Class[?]]))
@@ -2,7 +2,7 @@ package de.nowchess.chess.config
import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square}
import de.nowchess.api.dto.*
import de.nowchess.api.event.{EventEnvelope, EventType}
import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload}
import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult}
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
import de.nowchess.chess.registry.GameCacheDto
@@ -18,6 +18,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
classOf[GameCreationResponseDto],
classOf[EventEnvelope],
classOf[EventType],
classOf[GameOverPayload],
classOf[ErrorEventDto],
classOf[GameWritebackEventDto],
classOf[GameFullDto],
@@ -1,15 +1,18 @@
package de.nowchess.chess.redis
import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import de.nowchess.api.board.Color
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload}
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.observer.{GameEvent, Observer}
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
import de.nowchess.chess.resource.GameDtoMapper
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.XAddArgs
import org.jboss.logging.Logger
import scala.jdk.CollectionConverters.*
object GameRedisPublisher:
private val log = Logger.getLogger(classOf[GameRedisPublisher])
@@ -23,8 +26,11 @@ class GameRedisPublisher(
writebackEmit: String => Unit,
ioClient: IoGrpcClientWrapper,
onGameOver: String => Unit,
redisPrefix: String,
) extends Observer:
private val maxStreamLen = 1000L
def emitInitialWriteback(): Unit =
try
registry.get(gameId).foreach { entry =>
@@ -40,10 +46,39 @@ class GameRedisPublisher(
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
if entry.engine.context.result.isDefined then onGameOver(gameId)
entry.engine.context.result.foreach { result =>
publishGameOver(entry, result)
onGameOver(gameId)
}
}
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
private def publishGameOver(entry: GameEntry, result: GameResult): Unit =
val resultStr = result match
case GameResult.Win(Color.White, _) => "white"
case GameResult.Win(Color.Black, _) => "black"
case GameResult.Draw(_) => "draw"
val terminationReason = result match
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
case GameResult.Win(_, WinReason.Resignation) => "resignation"
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
case GameResult.Draw(DrawReason.Agreement) => "agreement"
val payload = objectMapper.valueToTree[JsonNode](
GameOverPayload(gameId, resultStr, terminationReason, entry.white.id.value, entry.black.id.value),
)
val envelope = EventEnvelope.of(EventType.GameOver, payload)
redis
.stream(classOf[String])
.xadd(
s"$redisPrefix:game-over",
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> objectMapper.writeValueAsString(envelope)).asJava,
)
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
val clock = entry.engine.currentClockState
GameWritebackEventDto(
@@ -93,6 +93,7 @@ class GameRedisSubscriberManager:
writebackFn,
ioClient,
unsubscribeGame,
redisConfig.prefix,
)
s2cObservers.put(gameId, obs)
registry.get(gameId).foreach(_.engine.subscribe(obs))
@@ -0,0 +1,123 @@
package de.nowchess.chess.redis
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import de.nowchess.api.board.Color
import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason}
import de.nowchess.api.player.{PlayerId, PlayerInfo}
import de.nowchess.chess.client.CombinedExportResponse
import de.nowchess.chess.engine.GameEngine
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.observer.GameEvent
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
import de.nowchess.rules.sets.DefaultRules
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.*
import org.mockito.Mockito.*
import scala.compiletime.uninitialized
class GameRedisPublisherTest:
// scalafix:off DisableSyntax.var
private var redis: RedisDataSource = uninitialized
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
private var pubsubCmds: PubSubCommands[String] = uninitialized
private var registry: GameRegistry = uninitialized
private var ioClient: IoGrpcClientWrapper = uninitialized
private var onGameOverCalled: Boolean = false
// scalafix:on DisableSyntax.var
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
private val gameId = "game1"
private val whitePlayer = PlayerInfo(PlayerId("white1"), "Alice")
private val blackPlayer = PlayerInfo(PlayerId("black1"), "Bob")
@BeforeEach
def setup(): Unit =
redis = mock(classOf[RedisDataSource])
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
pubsubCmds = mock(classOf[PubSubCommands[String]])
registry = mock(classOf[GameRegistry])
ioClient = mock(classOf[IoGrpcClientWrapper])
when(redis.stream(classOf[String])).thenReturn(streamCmds)
when(redis.pubsub(classOf[String])).thenReturn(pubsubCmds)
when(ioClient.exportCombined(any()))
.thenReturn(CombinedExportResponse("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", ""))
onGameOverCalled = false
private def publisherWithResult(result: GameResult): GameRedisPublisher =
val ctx = GameContext.initial.copy(result = Some(result))
val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules)
val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer)
when(registry.get(gameId)).thenReturn(Some(entry))
new GameRedisPublisher(
gameId,
registry,
redis,
objectMapper,
s"nowchess:game:$gameId:s2c",
_ => (),
ioClient,
_ => onGameOverCalled = true,
"nowchess",
)
@Test
def publishesGameOverOnCheckmate(): Unit =
val publisher = publisherWithResult(GameResult.Win(Color.White, WinReason.Checkmate))
publisher.onGameEvent(mock(classOf[GameEvent]))
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
any(classOf[XAddArgs]),
any(),
)
assertTrue(onGameOverCalled)
@Test
def publishesGameOverOnResignation(): Unit =
val publisher = publisherWithResult(GameResult.Win(Color.Black, WinReason.Resignation))
publisher.onGameEvent(mock(classOf[GameEvent]))
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
any(classOf[XAddArgs]),
any(),
)
@Test
def publishesGameOverOnDraw(): Unit =
val publisher = publisherWithResult(GameResult.Draw(DrawReason.Agreement))
publisher.onGameEvent(mock(classOf[GameEvent]))
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
any(classOf[XAddArgs]),
any(),
)
@Test
def doesNotPublishGameOverWhenNoResult(): Unit =
val ctx = GameContext.initial
val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules)
val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer)
when(registry.get(gameId)).thenReturn(Some(entry))
val publisher = new GameRedisPublisher(
gameId,
registry,
redis,
objectMapper,
s"nowchess:game:$gameId:s2c",
_ => (),
ioClient,
_ => onGameOverCalled = true,
"nowchess",
)
publisher.onGameEvent(mock(classOf[GameEvent]))
verify(streamCmds, never()).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
any(classOf[XAddArgs]),
any(),
)
assertFalse(onGameOverCalled)
@@ -24,8 +24,9 @@ import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success, Try}
import java.time.Duration
import java.util.UUID
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.function.Consumer
import java.util.concurrent.TimeUnit
@ApplicationScoped
class OfficialBotService:
@@ -48,14 +49,18 @@ class OfficialBotService:
private val terminalStatuses =
Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw")
private val groupName = "official-bot"
private val consumerId = UUID.randomUUID().toString
private val maxRetries = 3
private val maxStreamLen = 1000L
private val groupName = "official-bot"
private val gameOverGroup = "official-bots-game-over"
private val consumerId = UUID.randomUUID().toString
private val maxRetries = 3
private val maxStreamLen = 1000L
private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream"
private def gameOverStream: String = s"${redisConfig.prefix}:game-over"
private def dlqStream: String = s"${redisConfig.prefix}:dlq"
private val gameWatches = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
@PostConstruct
def initializeMetrics(): Unit =
BotController.listBots.foreach { bot =>
@@ -68,6 +73,7 @@ class OfficialBotService:
try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots))
catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service")
bots.foreach(subscribeToEventChannel)
subscribeToGameOverStream()
private def subscribeToEventChannel(botName: String): Unit =
createGroupIfAbsent(botName)
@@ -165,9 +171,80 @@ class OfficialBotService:
botAccountId: String,
): Unit =
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
val subscriber = redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
gameWatches.put(gameId, (botName, subscriber))
()
private def subscribeToGameOverStream(): Unit =
Try(
redis
.stream(classOf[String])
.xgroupCreate(gameOverStream, gameOverGroup, "$", new XGroupCreateArgs().mkstream()),
) match
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
case Failure(ex) => log.warnf(ex, "Failed to create game-over consumer group")
case Success(_) => ()
executor.submit(
new Runnable:
def run(): Unit = gameOverPollLoop(),
)
log.infof("Listening to game-over stream (consumer=%s)", consumerId)
private def gameOverPollLoop(): Unit =
while true do
Try {
val messages = redis
.stream(classOf[String])
.xreadgroup(
gameOverGroup,
consumerId,
gameOverStream,
">",
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
)
Option(messages).foreach(_.forEach(msg => handleGameOverMessage(msg)))
} match
case Failure(ex) => log.warnf(ex, "Error in game-over poll loop")
case Success(_) => ()
private def handleGameOverMessage(msg: StreamMessage[String, String, String]): Unit =
val json = msg.payload().get("data")
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
Try {
val node = objectMapper.readTree(json)
val gameId = node.path("payload").path("gameId").asText()
if gameId.nonEmpty then
Option(gameWatches.remove(gameId)).foreach { (botName, subscriber) =>
val topic = s"${redisConfig.prefix}:game:$gameId:s2c"
Try(subscriber.unsubscribe(topic)) match
case Failure(ex) => log.warnf(ex, "Failed to unsubscribe from game %s", gameId)
case Success(_) => log.infof("Bot %s cleaned up game %s after GameOver", botName, gameId)
}
} match
case Success(_) =>
ackGameOver(msg.id())
case Failure(ex) if attempt + 1 < maxRetries =>
log.warnf(ex, "GameOver handling failed (attempt %d), retrying", attempt)
xadd(gameOverStream, Map("data" -> json, "attempt" -> (attempt + 1).toString))
ackGameOver(msg.id())
case Failure(ex) =>
log.errorf(ex, "GameOver handling failed after %d attempts, sending to DLQ", maxRetries)
xadd(
dlqStream,
Map(
"data" -> json,
"eventType" -> "GameOver",
"error" -> Option(ex.getMessage).getOrElse(ex.getClass.getName),
"attempt" -> attempt.toString,
),
)
ackGameOver(msg.id())
private def ackGameOver(id: String): Unit =
Try(redis.stream(classOf[String]).xack(gameOverStream, gameOverGroup, id)) match
case Failure(ex) => log.warnf(ex, "Failed to ack game-over message %s", id)
case Success(_) => ()
private def handleGameEvent(
botName: String,
gameId: String,
@@ -0,0 +1,36 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
#
# Before building the container image run:
#
# ./gradlew build
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/tournament-jvm .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/tournament-jvm
#
# This image uses the `run-java.sh` script to run the application.
# You can find more information about the UBI base runtime images and their configuration here:
# https://rh-openjdk.github.io/redhat-openjdk-containers/
###
FROM registry.access.redhat.com/ubi9/openjdk-21-runtime:1.24
ENV LANGUAGE='en_US:en'
# We make four distinct layers so if there are application changes the library layers can be re-used
COPY --chown=185 build/quarkus-app/lib/ /deployments/lib/
COPY --chown=185 build/quarkus-app/*.jar /deployments/
COPY --chown=185 build/quarkus-app/app/ /deployments/app/
COPY --chown=185 build/quarkus-app/quarkus/ /deployments/quarkus/
EXPOSE 8080
USER 185
ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
ENV JAVA_APP_JAR="/deployments/quarkus-run.jar"
ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ]
@@ -0,0 +1,33 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
#
# Before building the container image run:
#
# ./gradlew build -Dquarkus.package.jar.type=legacy-jar
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.legacy-jar -t quarkus/tournament-legacy-jar .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/tournament-legacy-jar
#
# This image uses the `run-java.sh` script to run the application.
# You can find more information about the UBI base runtime images and their configuration here:
# https://rh-openjdk.github.io/redhat-openjdk-containers/
###
FROM registry.access.redhat.com/ubi9/openjdk-21-runtime:1.24
ENV LANGUAGE='en_US:en'
COPY build/lib/* /deployments/lib/
COPY build/*-runner.jar /deployments/quarkus-run.jar
EXPOSE 8080
USER 185
ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
ENV JAVA_APP_JAR="/deployments/quarkus-run.jar"
ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ]
@@ -0,0 +1,29 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode.
#
# Before building the container image run:
#
# ./gradlew :modules:tournament:build -Dquarkus.native.enabled=true
#
# Then, build the image with:
#
# docker build -f modules/tournament/src/main/docker/Dockerfile.native -t quarkus/tournament .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/tournament
#
# The `registry.access.redhat.com/ubi9/ubi-minimal:9.7` base image is based on UBI 9.
# To use UBI 8, switch to `quay.io/ubi8/ubi-minimal:8.10`.
###
FROM registry.access.redhat.com/ubi9/ubi-minimal:9.7
WORKDIR /work/
RUN chown 1001 /work \
&& chmod "g+rwX" /work \
&& chown 1001:root /work
COPY --chown=1001:root --chmod=0755 modules/tournament/build/*-runner /work/application
EXPOSE 8080
USER 1001
ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"]
@@ -0,0 +1,32 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode.
# It uses a micro base image, tuned for Quarkus native executables.
# It reduces the size of the resulting container image.
# Check https://quarkus.io/guides/quarkus-runtime-base-image for further information about this image.
#
# Before building the container image run:
#
# ./gradlew build -Dquarkus.native.enabled=true
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.native-micro -t quarkus/tournament .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/tournament
#
# The `quay.io/quarkus/ubi9-quarkus-micro-image:2.0` base image is based on UBI 9.
# To use UBI 8, switch to `quay.io/quarkus/quarkus-micro-image:2.0`.
###
FROM quay.io/quarkus/ubi9-quarkus-micro-image:2.0
WORKDIR /work/
RUN chown 1001 /work \
&& chmod "g+rwX" /work \
&& chown 1001:root /work
COPY --chown=1001:root --chmod=0755 build/*-runner /work/application
EXPOSE 8080
USER 1001
ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"]
+3
View File
@@ -0,0 +1,3 @@
MAJOR=0
MINOR=1
PATCH=0
@@ -2,15 +2,18 @@ package de.nowchess.ws.resource
import de.nowchess.ws.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import io.quarkus.redis.datasource.stream.{XAddArgs, XGroupCreateArgs, XReadGroupArgs}
import io.quarkus.websockets.next.*
import io.smallrye.jwt.auth.principal.JWTParser
import jakarta.inject.Inject
import org.eclipse.microprofile.context.ManagedExecutor
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.util.Try
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success, Try}
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
@WebSocket(path = "/api/user/ws")
class UserWebSocketResource:
@@ -18,20 +21,22 @@ class UserWebSocketResource:
private val log = Logger.getLogger(classOf[UserWebSocketResource])
// scalafix:off DisableSyntax.var
@Inject
var redis: RedisDataSource = uninitialized
@Inject
var redisConfig: RedisConfig = uninitialized
@Inject
var jwtParser: JWTParser = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var jwtParser: JWTParser = uninitialized
@Inject var executor: ManagedExecutor = uninitialized
// scalafix:on DisableSyntax.var
private val connections = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
private val consumerId = UUID.randomUUID().toString
private val maxRetries = 3
private val maxStreamLen = 1000L
private def userTopic(userId: String): String =
s"${redisConfig.prefix}:user:$userId:events"
private val connections = new ConcurrentHashMap[String, (String, WebSocketConnection)]()
private def userStreamKey(userId: String): String =
s"${redisConfig.prefix}:user:$userId:events:stream"
private def dlqKey: String = s"${redisConfig.prefix}:dlq"
@OnOpen
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
@@ -45,16 +50,76 @@ class UserWebSocketResource:
log.warn("WebSocket opened with no valid JWT — closing connection")
connection.close().subscribe().`with`(_ => (), _ => ())
case Some(userId) =>
log.infof("User WebSocket opened — userId=%s", userId)
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler)
connections.put(connection.id(), (userId, subscriber))
log.infof("User WebSocket opened — userId=%s connId=%s", userId, connection.id())
createGroupIfAbsent(userId, connection.id())
connections.put(connection.id(), (userId, connection))
executor.submit(
new Runnable:
def run(): Unit = pollLoop(connection.id(), userId, connection),
)
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
@OnClose
def onClose(connection: WebSocketConnection): Unit =
log.infof("User WebSocket closed — connectionId=%s", connection.id())
Option(connections.remove(connection.id())).foreach { (userId, subscriber) =>
subscriber.unsubscribe(userTopic(userId))
}
connections.remove(connection.id())
private def createGroupIfAbsent(userId: String, groupName: String): Unit =
Try(
redis
.stream(classOf[String])
.xgroupCreate(userStreamKey(userId), groupName, "$", new XGroupCreateArgs().mkstream()),
) match
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for userId=%s", userId)
case Success(_) => ()
private def pollLoop(connectionId: String, userId: String, myConnection: WebSocketConnection): Unit =
while Option(connections.get(connectionId)).exists(_._2 eq myConnection) do
Try {
val messages = redis
.stream(classOf[String])
.xreadgroup(
connectionId,
consumerId,
userStreamKey(userId),
">",
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
)
Option(messages).foreach(_.forEach { msg =>
if Option(connections.get(connectionId)).exists(_._2 eq myConnection) then
val json = msg.payload().get("data")
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
Try(myConnection.sendText(json).await().atMost(Duration.ofSeconds(5))) match
case Success(_) =>
ack(connectionId, userId, msg.id())
case Failure(_) if attempt + 1 < maxRetries =>
xadd(userStreamKey(userId), json, attempt + 1)
ack(connectionId, userId, msg.id())
case Failure(ex) =>
log.warnf(ex, "Delivery failed for userId=%s after %d attempts, sending to DLQ", userId, maxRetries)
xadd(dlqKey, json, attempt)
ack(connectionId, userId, msg.id())
})
} match
case Failure(ex) => log.warnf(ex, "Error in poll loop for userId=%s", userId)
case Success(_) => ()
private def ack(groupName: String, userId: String, id: String): Unit =
Try(redis.stream(classOf[String]).xack(userStreamKey(userId), groupName, id)) match
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for userId=%s", id, userId)
case Success(_) => ()
private def xadd(key: String, json: String, attempt: Int): Unit =
Try(
redis
.stream(classOf[String])
.xadd(
key,
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json, "attempt" -> attempt.toString).asJava,
),
) match
case Failure(ex) => log.warnf(ex, "Failed to publish to stream %s", key)
case Success(_) => ()