diff --git a/modules/account/src/main/scala/de/nowchess/account/client/CoreGameClient.scala b/modules/account/src/main/scala/de/nowchess/account/client/CoreGameClient.scala deleted file mode 100644 index e75a5bd..0000000 --- a/modules/account/src/main/scala/de/nowchess/account/client/CoreGameClient.scala +++ /dev/null @@ -1,28 +0,0 @@ -package de.nowchess.account.client - -import de.nowchess.security.{InternalClientHeadersFactory, InternalSecretClientFilter} -import jakarta.ws.rs.* -import jakarta.ws.rs.core.MediaType -import org.eclipse.microprofile.rest.client.annotation.{RegisterClientHeaders, RegisterProvider} -import org.eclipse.microprofile.rest.client.inject.RegisterRestClient - -case class CorePlayerInfo(id: String, displayName: String) -case class CoreTimeControl(limitSeconds: Option[Int], incrementSeconds: Option[Int], daysPerMove: Option[Int]) -case class CoreCreateGameRequest( - white: Option[CorePlayerInfo], - black: Option[CorePlayerInfo], - timeControl: Option[CoreTimeControl], - mode: Option[String], -) -case class CoreGameResponse(gameId: String) - -@Path("/api/board/game") -@RegisterRestClient(configKey = "core-service") -@RegisterProvider(classOf[InternalSecretClientFilter]) -@RegisterClientHeaders(classOf[InternalClientHeadersFactory]) -trait CoreGameClient: - - @POST - @Consumes(Array(MediaType.APPLICATION_JSON)) - @Produces(Array(MediaType.APPLICATION_JSON)) - def createGame(req: CoreCreateGameRequest): CoreGameResponse diff --git a/modules/account/src/main/scala/de/nowchess/account/client/CoreGameDtos.scala b/modules/account/src/main/scala/de/nowchess/account/client/CoreGameDtos.scala new file mode 100644 index 0000000..c3b48c1 --- /dev/null +++ b/modules/account/src/main/scala/de/nowchess/account/client/CoreGameDtos.scala @@ -0,0 +1,10 @@ +package de.nowchess.account.client + +case class CorePlayerInfo(id: String, displayName: String) +case class CoreTimeControl(limitSeconds: Option[Int], incrementSeconds: Option[Int], daysPerMove: Option[Int]) +case class CoreCreateGameRequest( + white: Option[CorePlayerInfo], + black: Option[CorePlayerInfo], + timeControl: Option[CoreTimeControl], + mode: Option[String], +) diff --git a/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala b/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala index ca76bcb..8b72c7e 100644 --- a/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala +++ b/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala @@ -1,6 +1,6 @@ package de.nowchess.account.config -import de.nowchess.account.client.{CoreCreateGameRequest, CoreGameResponse, CorePlayerInfo, CoreTimeControl} +import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, CoreTimeControl} import de.nowchess.account.domain.{ BotAccount, Challenge, @@ -53,7 +53,6 @@ import io.quarkus.runtime.annotations.RegisterForReflection classOf[CorePlayerInfo], classOf[CoreTimeControl], classOf[CoreCreateGameRequest], - classOf[CoreGameResponse], classOf[OfficialChallengeResponse], classOf[GameCreationRequestDto], classOf[GameCreationResponseDto], diff --git a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala index 0f7cbaa..9b71f6b 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala @@ -36,26 +36,32 @@ class EventPublisher: new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), Map("data" -> json).asJava, ) - redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json) () def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit = val payload = objectMapper.createObjectNode() payload.put("challengeId", challengeId) payload.put("challengerName", challengerName) - publish(s"${redisConfig.prefix}:user:$destUserId:events", EventType.ChallengeCreated, payload) + publishToUserStream(destUserId, EventType.ChallengeCreated, payload) def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit = val payload = objectMapper.createObjectNode() payload.put("challengeId", challengeId) payload.put("gameId", gameId) - publish(s"${redisConfig.prefix}:user:$challengerId:events", EventType.ChallengeAccepted, payload) + publishToUserStream(challengerId, EventType.ChallengeAccepted, payload) - private def publish( - channel: String, + private def publishToUserStream( + userId: String, eventType: EventType, payload: com.fasterxml.jackson.databind.node.ObjectNode, ): Unit = val envelope = EventEnvelope.of(eventType, payload) - redis.pubsub(classOf[String]).publish(channel, objectMapper.writeValueAsString(envelope)) + val json = objectMapper.writeValueAsString(envelope) + redis + .stream(classOf[String]) + .xadd( + s"${redisConfig.prefix}:user:$userId:events:stream", + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json).asJava, + ) () diff --git a/modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala b/modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala new file mode 100644 index 0000000..1e94893 --- /dev/null +++ b/modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala @@ -0,0 +1,56 @@ +package de.nowchess.account.service + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import de.nowchess.account.config.RedisConfig +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs} +import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.* +import org.mockito.Mockito.* +import scala.compiletime.uninitialized + +class EventPublisherTest: + + // scalafix:off DisableSyntax.var + private var redis: RedisDataSource = uninitialized + private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized + private var redisConfig: RedisConfig = uninitialized + // scalafix:on DisableSyntax.var + + private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) + + @BeforeEach + def setup(): Unit = + redis = mock(classOf[RedisDataSource]) + streamCmds = mock(classOf[StreamCommands[String, String, Nothing]]) + redisConfig = mock(classOf[RedisConfig]) + when(redis.stream(classOf[String])).thenReturn(streamCmds) + when(redisConfig.prefix).thenReturn("nowchess") + + private def publisher: EventPublisher = + val p = new EventPublisher + p.redis = redis + p.redisConfig = redisConfig + p.objectMapper = objectMapper + p + + @Test + def publishChallengeCreatedWritesToUserStream(): Unit = + publisher.publishChallengeCreated("user1", "ch1", "Alice") + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:user:user1:events:stream"), + any(classOf[XAddArgs]), + any(), + ) + verify(redis, never()).pubsub(any(classOf[Class[?]])) + + @Test + def publishChallengeAcceptedWritesToUserStream(): Unit = + publisher.publishChallengeAccepted("user2", "ch1", "game42") + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:user:user2:events:stream"), + any(classOf[XAddArgs]), + any(), + ) + verify(redis, never()).pubsub(any(classOf[Class[?]])) diff --git a/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala b/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala index f07f424..97666b7 100644 --- a/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala +++ b/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala @@ -1,4 +1,4 @@ package de.nowchess.api.event enum EventType: - case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted + case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted, GameOver diff --git a/modules/api/src/main/scala/de/nowchess/api/event/GameOverPayload.scala b/modules/api/src/main/scala/de/nowchess/api/event/GameOverPayload.scala new file mode 100644 index 0000000..16680c3 --- /dev/null +++ b/modules/api/src/main/scala/de/nowchess/api/event/GameOverPayload.scala @@ -0,0 +1,9 @@ +package de.nowchess.api.event + +final case class GameOverPayload( + gameId: String, + result: String, + terminationReason: String, + whiteId: String, + blackId: String, +) diff --git a/modules/bot-platform/build.gradle.kts b/modules/bot-platform/build.gradle.kts index 77885be..1b5ecec 100644 --- a/modules/bot-platform/build.gradle.kts +++ b/modules/bot-platform/build.gradle.kts @@ -73,6 +73,7 @@ dependencies { testImplementation(platform("org.junit:junit-bom:5.13.4")) testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("io.quarkus:quarkus-junit") + testImplementation("io.quarkus:quarkus-junit5-mockito") testImplementation("io.rest-assured:rest-assured") testImplementation("io.quarkus:quarkus-test-security") diff --git a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala index 195364c..13ae429 100644 --- a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala +++ b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala @@ -2,14 +2,18 @@ package de.nowchess.botplatform.registry import de.nowchess.botplatform.config.RedisConfig import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.stream.{XGroupCreateArgs, XReadGroupArgs} import io.smallrye.mutiny.subscription.MultiEmitter import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject +import org.eclipse.microprofile.context.ManagedExecutor import org.jboss.logging.Logger import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import java.util.function.Consumer @ApplicationScoped class BotRegistry: @@ -17,31 +21,68 @@ class BotRegistry: private val log = Logger.getLogger(classOf[BotRegistry]) // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var executor: ManagedExecutor = uninitialized // scalafix:on DisableSyntax.var - private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]() + private val groupName = "bot-platform-consumer" + private val consumerId = UUID.randomUUID().toString + + private val emitters = ConcurrentHashMap[String, MultiEmitter[? >: String]]() def register(botId: String, emitter: MultiEmitter[? >: String]): Unit = - val channel = s"${redisConfig.prefix}:bot:$botId:events" - val handler: Consumer[String] = msg => emitter.emit(msg) - val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler) - connections.put(botId, (emitter, subscriber)) - log.infof("Bot %s registered", botId) + createGroupIfAbsent(botId) + emitters.put(botId, emitter) + executor.submit( + new Runnable: + def run(): Unit = pollLoop(botId, emitter), + ) + log.infof("Bot %s registered on stream consumer group", botId) () def unregister(botId: String): Unit = - Option(connections.remove(botId)).foreach { (_, subscriber) => - subscriber.unsubscribe(s"${redisConfig.prefix}:bot:$botId:events") - } + emitters.remove(botId) log.infof("Bot %s unregistered", botId) - def dispatch(botId: String, event: String): Unit = - log.debugf("Dispatching event to bot %s", botId) - redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event) - () - def registeredBots: List[String] = - import scala.jdk.CollectionConverters.* - connections.keys().asScala.toList + emitters.keys().asScala.toList + + private def streamKey(botId: String): String = + s"${redisConfig.prefix}:bot:$botId:events:stream" + + private def createGroupIfAbsent(botId: String): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(streamKey(botId), groupName, "$", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create consumer group for bot %s", botId) + case Success(_) => () + + private def pollLoop(botId: String, myEmitter: MultiEmitter[? >: String]): Unit = + while emitters.get(botId) eq myEmitter do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + groupName, + consumerId, + streamKey(botId), + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach { msg => + if emitters.get(botId) eq myEmitter then + myEmitter.emit(msg.payload().get("data")) + ack(botId, msg.id()) + }) + } match + case Failure(ex) => log.warnf(ex, "Error in poll loop for bot %s", botId) + case Success(_) => () + + private def ack(botId: String, id: String): Unit = + Try(redis.stream(classOf[String]).xack(streamKey(botId), groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack message %s for bot %s", id, botId) + case Success(_) => () diff --git a/modules/bot-platform/src/test/scala/de/nowchess/botplatform/registry/BotRegistryTest.scala b/modules/bot-platform/src/test/scala/de/nowchess/botplatform/registry/BotRegistryTest.scala new file mode 100644 index 0000000..95f58db --- /dev/null +++ b/modules/bot-platform/src/test/scala/de/nowchess/botplatform/registry/BotRegistryTest.scala @@ -0,0 +1,83 @@ +package de.nowchess.botplatform.registry + +import de.nowchess.botplatform.config.RedisConfig +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamCommands, XGroupCreateArgs} +import io.smallrye.mutiny.subscription.MultiEmitter +import org.eclipse.microprofile.context.ManagedExecutor +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.* +import org.mockito.Mockito.* + +class BotRegistryTest: + + // scalafix:off DisableSyntax.var + private var registry: BotRegistry = scala.compiletime.uninitialized + private var redis: RedisDataSource = scala.compiletime.uninitialized + private var streamCmds: StreamCommands[String, String, Nothing] = + scala.compiletime.uninitialized + private var redisConfig: RedisConfig = scala.compiletime.uninitialized + private var executor: ManagedExecutor = scala.compiletime.uninitialized + // scalafix:on DisableSyntax.var + + @BeforeEach + def setup(): Unit = + redis = mock(classOf[RedisDataSource]) + streamCmds = mock(classOf[StreamCommands[String, String, Nothing]]) + redisConfig = mock(classOf[RedisConfig]) + executor = mock(classOf[ManagedExecutor]) + + when(redis.stream(classOf[String])).thenReturn(streamCmds) + when(redisConfig.prefix).thenReturn("nowchess") + + registry = new BotRegistry + registry.redis = redis + registry.redisConfig = redisConfig + registry.executor = executor + + @Test + def registerStartsPollThread(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("bot1", emitter) + verify(executor).submit(any(classOf[Runnable])) + + @Test + def registerCreatesConsumerGroupWithMkstream(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("bot1", emitter) + verify(streamCmds) + .xgroupCreate( + org.mockito.ArgumentMatchers.eq("nowchess:bot:bot1:events:stream"), + org.mockito.ArgumentMatchers.eq("bot-platform-consumer"), + org.mockito.ArgumentMatchers.eq("$"), + any(classOf[XGroupCreateArgs]), + ) + + @Test + def registerTracksBot(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("bot42", emitter) + assertTrue(registry.registeredBots.contains("bot42")) + + @Test + def unregisterRemovesBot(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("botX", emitter) + registry.unregister("botX") + assertFalse(registry.registeredBots.contains("botX")) + + @Test + def busyGroupExceptionIsIgnoredOnRegister(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + when(streamCmds.xgroupCreate(any(), any(), any(), any())) + .thenThrow(new RuntimeException("BUSYGROUP Consumer Group name already exists")) + val exec: Executable = () => registry.register("botBusy", emitter) + assertDoesNotThrow(exec) + + @Test + def registerDoesNotInteractWithPubSub(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("botNoPubSub", emitter) + verify(redis, never()).pubsub(any(classOf[Class[?]])) diff --git a/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala b/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala index b982851..8ef0476 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala @@ -2,7 +2,7 @@ package de.nowchess.chess.config import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square} import de.nowchess.api.dto.* -import de.nowchess.api.event.{EventEnvelope, EventType} +import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload} import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult} import de.nowchess.api.move.{Move, MoveType, PromotionPiece} import de.nowchess.chess.registry.GameCacheDto @@ -18,6 +18,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection classOf[GameCreationResponseDto], classOf[EventEnvelope], classOf[EventType], + classOf[GameOverPayload], classOf[ErrorEventDto], classOf[GameWritebackEventDto], classOf[GameFullDto], diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala index 48fa7f3..735dba0 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala @@ -1,15 +1,18 @@ package de.nowchess.chess.redis -import com.fasterxml.jackson.databind.ObjectMapper -import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto} -import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason} +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import de.nowchess.api.board.Color +import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto} +import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload} +import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason} import de.nowchess.chess.grpc.IoGrpcClientWrapper import de.nowchess.chess.observer.{GameEvent, Observer} import de.nowchess.chess.registry.{GameEntry, GameRegistry} import de.nowchess.chess.resource.GameDtoMapper import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.XAddArgs import org.jboss.logging.Logger +import scala.jdk.CollectionConverters.* object GameRedisPublisher: private val log = Logger.getLogger(classOf[GameRedisPublisher]) @@ -23,8 +26,11 @@ class GameRedisPublisher( writebackEmit: String => Unit, ioClient: IoGrpcClientWrapper, onGameOver: String => Unit, + redisPrefix: String, ) extends Observer: + private val maxStreamLen = 1000L + def emitInitialWriteback(): Unit = try registry.get(gameId).foreach { entry => @@ -40,10 +46,39 @@ class GameRedisPublisher( val dto = GameDtoMapper.toGameStateDto(entry, ioClient) redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto))) writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto))) - if entry.engine.context.result.isDefined then onGameOver(gameId) + entry.engine.context.result.foreach { result => + publishGameOver(entry, result) + onGameOver(gameId) + } } catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId) + private def publishGameOver(entry: GameEntry, result: GameResult): Unit = + val resultStr = result match + case GameResult.Win(Color.White, _) => "white" + case GameResult.Win(Color.Black, _) => "black" + case GameResult.Draw(_) => "draw" + val terminationReason = result match + case GameResult.Win(_, WinReason.Checkmate) => "checkmate" + case GameResult.Win(_, WinReason.Resignation) => "resignation" + case GameResult.Win(_, WinReason.TimeControl) => "timeout" + case GameResult.Draw(DrawReason.Stalemate) => "stalemate" + case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material" + case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move" + case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition" + case GameResult.Draw(DrawReason.Agreement) => "agreement" + val payload = objectMapper.valueToTree[JsonNode]( + GameOverPayload(gameId, resultStr, terminationReason, entry.white.id.value, entry.black.id.value), + ) + val envelope = EventEnvelope.of(EventType.GameOver, payload) + redis + .stream(classOf[String]) + .xadd( + s"$redisPrefix:game-over", + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> objectMapper.writeValueAsString(envelope)).asJava, + ) + private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto = val clock = entry.engine.currentClockState GameWritebackEventDto( diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index f21e162..0d14ae7 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -93,6 +93,7 @@ class GameRedisSubscriberManager: writebackFn, ioClient, unsubscribeGame, + redisConfig.prefix, ) s2cObservers.put(gameId, obs) registry.get(gameId).foreach(_.engine.subscribe(obs)) diff --git a/modules/core/src/test/scala/de/nowchess/chess/redis/GameRedisPublisherTest.scala b/modules/core/src/test/scala/de/nowchess/chess/redis/GameRedisPublisherTest.scala new file mode 100644 index 0000000..63cb240 --- /dev/null +++ b/modules/core/src/test/scala/de/nowchess/chess/redis/GameRedisPublisherTest.scala @@ -0,0 +1,123 @@ +package de.nowchess.chess.redis + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import de.nowchess.api.board.Color +import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason} +import de.nowchess.api.player.{PlayerId, PlayerInfo} +import de.nowchess.chess.client.CombinedExportResponse +import de.nowchess.chess.engine.GameEngine +import de.nowchess.chess.grpc.IoGrpcClientWrapper +import de.nowchess.chess.observer.GameEvent +import de.nowchess.chess.registry.{GameEntry, GameRegistry} +import de.nowchess.rules.sets.DefaultRules +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs} +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.* +import org.mockito.Mockito.* +import scala.compiletime.uninitialized + +class GameRedisPublisherTest: + + // scalafix:off DisableSyntax.var + private var redis: RedisDataSource = uninitialized + private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized + private var pubsubCmds: PubSubCommands[String] = uninitialized + private var registry: GameRegistry = uninitialized + private var ioClient: IoGrpcClientWrapper = uninitialized + private var onGameOverCalled: Boolean = false + // scalafix:on DisableSyntax.var + + private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) + private val gameId = "game1" + private val whitePlayer = PlayerInfo(PlayerId("white1"), "Alice") + private val blackPlayer = PlayerInfo(PlayerId("black1"), "Bob") + + @BeforeEach + def setup(): Unit = + redis = mock(classOf[RedisDataSource]) + streamCmds = mock(classOf[StreamCommands[String, String, Nothing]]) + pubsubCmds = mock(classOf[PubSubCommands[String]]) + registry = mock(classOf[GameRegistry]) + ioClient = mock(classOf[IoGrpcClientWrapper]) + when(redis.stream(classOf[String])).thenReturn(streamCmds) + when(redis.pubsub(classOf[String])).thenReturn(pubsubCmds) + when(ioClient.exportCombined(any())) + .thenReturn(CombinedExportResponse("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", "")) + onGameOverCalled = false + + private def publisherWithResult(result: GameResult): GameRedisPublisher = + val ctx = GameContext.initial.copy(result = Some(result)) + val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules) + val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer) + when(registry.get(gameId)).thenReturn(Some(entry)) + new GameRedisPublisher( + gameId, + registry, + redis, + objectMapper, + s"nowchess:game:$gameId:s2c", + _ => (), + ioClient, + _ => onGameOverCalled = true, + "nowchess", + ) + + @Test + def publishesGameOverOnCheckmate(): Unit = + val publisher = publisherWithResult(GameResult.Win(Color.White, WinReason.Checkmate)) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + assertTrue(onGameOverCalled) + + @Test + def publishesGameOverOnResignation(): Unit = + val publisher = publisherWithResult(GameResult.Win(Color.Black, WinReason.Resignation)) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + + @Test + def publishesGameOverOnDraw(): Unit = + val publisher = publisherWithResult(GameResult.Draw(DrawReason.Agreement)) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + + @Test + def doesNotPublishGameOverWhenNoResult(): Unit = + val ctx = GameContext.initial + val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules) + val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer) + when(registry.get(gameId)).thenReturn(Some(entry)) + val publisher = new GameRedisPublisher( + gameId, + registry, + redis, + objectMapper, + s"nowchess:game:$gameId:s2c", + _ => (), + ioClient, + _ => onGameOverCalled = true, + "nowchess", + ) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds, never()).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + assertFalse(onGameOverCalled) diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala index d838b85..6ca506e 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala @@ -24,8 +24,9 @@ import scala.jdk.CollectionConverters.* import scala.util.{Failure, Success, Try} import java.time.Duration import java.util.UUID +import io.quarkus.redis.datasource.pubsub.PubSubCommands +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.function.Consumer -import java.util.concurrent.TimeUnit @ApplicationScoped class OfficialBotService: @@ -48,14 +49,18 @@ class OfficialBotService: private val terminalStatuses = Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw") - private val groupName = "official-bot" - private val consumerId = UUID.randomUUID().toString - private val maxRetries = 3 - private val maxStreamLen = 1000L + private val groupName = "official-bot" + private val gameOverGroup = "official-bots-game-over" + private val consumerId = UUID.randomUUID().toString + private val maxRetries = 3 + private val maxStreamLen = 1000L private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream" + private def gameOverStream: String = s"${redisConfig.prefix}:game-over" private def dlqStream: String = s"${redisConfig.prefix}:dlq" + private val gameWatches = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]() + @PostConstruct def initializeMetrics(): Unit = BotController.listBots.foreach { bot => @@ -68,6 +73,7 @@ class OfficialBotService: try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots)) catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service") bots.foreach(subscribeToEventChannel) + subscribeToGameOverStream() private def subscribeToEventChannel(botName: String): Unit = createGroupIfAbsent(botName) @@ -165,9 +171,80 @@ class OfficialBotService: botAccountId: String, ): Unit = val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg) - redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler) + val subscriber = redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler) + gameWatches.put(gameId, (botName, subscriber)) () + private def subscribeToGameOverStream(): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(gameOverStream, gameOverGroup, "$", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create game-over consumer group") + case Success(_) => () + executor.submit( + new Runnable: + def run(): Unit = gameOverPollLoop(), + ) + log.infof("Listening to game-over stream (consumer=%s)", consumerId) + + private def gameOverPollLoop(): Unit = + while true do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + gameOverGroup, + consumerId, + gameOverStream, + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach(msg => handleGameOverMessage(msg))) + } match + case Failure(ex) => log.warnf(ex, "Error in game-over poll loop") + case Success(_) => () + + private def handleGameOverMessage(msg: StreamMessage[String, String, String]): Unit = + val json = msg.payload().get("data") + val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0) + Try { + val node = objectMapper.readTree(json) + val gameId = node.path("payload").path("gameId").asText() + if gameId.nonEmpty then + Option(gameWatches.remove(gameId)).foreach { (botName, subscriber) => + val topic = s"${redisConfig.prefix}:game:$gameId:s2c" + Try(subscriber.unsubscribe(topic)) match + case Failure(ex) => log.warnf(ex, "Failed to unsubscribe from game %s", gameId) + case Success(_) => log.infof("Bot %s cleaned up game %s after GameOver", botName, gameId) + } + } match + case Success(_) => + ackGameOver(msg.id()) + case Failure(ex) if attempt + 1 < maxRetries => + log.warnf(ex, "GameOver handling failed (attempt %d), retrying", attempt) + xadd(gameOverStream, Map("data" -> json, "attempt" -> (attempt + 1).toString)) + ackGameOver(msg.id()) + case Failure(ex) => + log.errorf(ex, "GameOver handling failed after %d attempts, sending to DLQ", maxRetries) + xadd( + dlqStream, + Map( + "data" -> json, + "eventType" -> "GameOver", + "error" -> Option(ex.getMessage).getOrElse(ex.getClass.getName), + "attempt" -> attempt.toString, + ), + ) + ackGameOver(msg.id()) + + private def ackGameOver(id: String): Unit = + Try(redis.stream(classOf[String]).xack(gameOverStream, gameOverGroup, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack game-over message %s", id) + case Success(_) => () + private def handleGameEvent( botName: String, gameId: String, diff --git a/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala b/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala index 89434f5..13c30ec 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala @@ -2,15 +2,18 @@ package de.nowchess.ws.resource import de.nowchess.ws.config.RedisConfig import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.stream.{XAddArgs, XGroupCreateArgs, XReadGroupArgs} import io.quarkus.websockets.next.* import io.smallrye.jwt.auth.principal.JWTParser import jakarta.inject.Inject +import org.eclipse.microprofile.context.ManagedExecutor import org.jboss.logging.Logger import scala.compiletime.uninitialized -import scala.util.Try +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import java.util.function.Consumer @WebSocket(path = "/api/user/ws") class UserWebSocketResource: @@ -18,20 +21,22 @@ class UserWebSocketResource: private val log = Logger.getLogger(classOf[UserWebSocketResource]) // scalafix:off DisableSyntax.var - @Inject - var redis: RedisDataSource = uninitialized - - @Inject - var redisConfig: RedisConfig = uninitialized - - @Inject - var jwtParser: JWTParser = uninitialized + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var jwtParser: JWTParser = uninitialized + @Inject var executor: ManagedExecutor = uninitialized // scalafix:on DisableSyntax.var - private val connections = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]() + private val consumerId = UUID.randomUUID().toString + private val maxRetries = 3 + private val maxStreamLen = 1000L - private def userTopic(userId: String): String = - s"${redisConfig.prefix}:user:$userId:events" + private val connections = new ConcurrentHashMap[String, (String, WebSocketConnection)]() + + private def userStreamKey(userId: String): String = + s"${redisConfig.prefix}:user:$userId:events:stream" + + private def dlqKey: String = s"${redisConfig.prefix}:dlq" @OnOpen def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit = @@ -45,16 +50,76 @@ class UserWebSocketResource: log.warn("WebSocket opened with no valid JWT — closing connection") connection.close().subscribe().`with`(_ => (), _ => ()) case Some(userId) => - log.infof("User WebSocket opened — userId=%s", userId) - val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ()) - val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler) - connections.put(connection.id(), (userId, subscriber)) + log.infof("User WebSocket opened — userId=%s connId=%s", userId, connection.id()) + createGroupIfAbsent(userId, connection.id()) + connections.put(connection.id(), (userId, connection)) + executor.submit( + new Runnable: + def run(): Unit = pollLoop(connection.id(), userId, connection), + ) val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}""" connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ()) @OnClose def onClose(connection: WebSocketConnection): Unit = log.infof("User WebSocket closed — connectionId=%s", connection.id()) - Option(connections.remove(connection.id())).foreach { (userId, subscriber) => - subscriber.unsubscribe(userTopic(userId)) - } + connections.remove(connection.id()) + + private def createGroupIfAbsent(userId: String, groupName: String): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(userStreamKey(userId), groupName, "$", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create consumer group for userId=%s", userId) + case Success(_) => () + + private def pollLoop(connectionId: String, userId: String, myConnection: WebSocketConnection): Unit = + while Option(connections.get(connectionId)).exists(_._2 eq myConnection) do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + connectionId, + consumerId, + userStreamKey(userId), + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach { msg => + if Option(connections.get(connectionId)).exists(_._2 eq myConnection) then + val json = msg.payload().get("data") + val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0) + Try(myConnection.sendText(json).await().atMost(Duration.ofSeconds(5))) match + case Success(_) => + ack(connectionId, userId, msg.id()) + case Failure(_) if attempt + 1 < maxRetries => + xadd(userStreamKey(userId), json, attempt + 1) + ack(connectionId, userId, msg.id()) + case Failure(ex) => + log.warnf(ex, "Delivery failed for userId=%s after %d attempts, sending to DLQ", userId, maxRetries) + xadd(dlqKey, json, attempt) + ack(connectionId, userId, msg.id()) + }) + } match + case Failure(ex) => log.warnf(ex, "Error in poll loop for userId=%s", userId) + case Success(_) => () + + private def ack(groupName: String, userId: String, id: String): Unit = + Try(redis.stream(classOf[String]).xack(userStreamKey(userId), groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack message %s for userId=%s", id, userId) + case Success(_) => () + + private def xadd(key: String, json: String, attempt: Int): Unit = + Try( + redis + .stream(classOf[String]) + .xadd( + key, + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json, "attempt" -> attempt.toString).asJava, + ), + ) match + case Failure(ex) => log.warnf(ex, "Failed to publish to stream %s", key) + case Success(_) => ()