diff --git a/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala b/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala index f07f424..97666b7 100644 --- a/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala +++ b/modules/api/src/main/scala/de/nowchess/api/event/EventType.scala @@ -1,4 +1,4 @@ package de.nowchess.api.event enum EventType: - case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted + case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted, GameOver diff --git a/modules/api/src/main/scala/de/nowchess/api/event/GameOverPayload.scala b/modules/api/src/main/scala/de/nowchess/api/event/GameOverPayload.scala new file mode 100644 index 0000000..16680c3 --- /dev/null +++ b/modules/api/src/main/scala/de/nowchess/api/event/GameOverPayload.scala @@ -0,0 +1,9 @@ +package de.nowchess.api.event + +final case class GameOverPayload( + gameId: String, + result: String, + terminationReason: String, + whiteId: String, + blackId: String, +) diff --git a/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala b/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala index b982851..8ef0476 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/config/NativeReflectionConfig.scala @@ -2,7 +2,7 @@ package de.nowchess.chess.config import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square} import de.nowchess.api.dto.* -import de.nowchess.api.event.{EventEnvelope, EventType} +import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload} import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult} import de.nowchess.api.move.{Move, MoveType, PromotionPiece} import de.nowchess.chess.registry.GameCacheDto @@ -18,6 +18,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection classOf[GameCreationResponseDto], classOf[EventEnvelope], classOf[EventType], + classOf[GameOverPayload], classOf[ErrorEventDto], classOf[GameWritebackEventDto], classOf[GameFullDto], diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala index 48fa7f3..735dba0 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala @@ -1,15 +1,18 @@ package de.nowchess.chess.redis -import com.fasterxml.jackson.databind.ObjectMapper -import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto} -import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason} +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import de.nowchess.api.board.Color +import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto} +import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload} +import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason} import de.nowchess.chess.grpc.IoGrpcClientWrapper import de.nowchess.chess.observer.{GameEvent, Observer} import de.nowchess.chess.registry.{GameEntry, GameRegistry} import de.nowchess.chess.resource.GameDtoMapper import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.XAddArgs import org.jboss.logging.Logger +import scala.jdk.CollectionConverters.* object GameRedisPublisher: private val log = Logger.getLogger(classOf[GameRedisPublisher]) @@ -23,8 +26,11 @@ class GameRedisPublisher( writebackEmit: String => Unit, ioClient: IoGrpcClientWrapper, onGameOver: String => Unit, + redisPrefix: String, ) extends Observer: + private val maxStreamLen = 1000L + def emitInitialWriteback(): Unit = try registry.get(gameId).foreach { entry => @@ -40,10 +46,39 @@ class GameRedisPublisher( val dto = GameDtoMapper.toGameStateDto(entry, ioClient) redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto))) writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto))) - if entry.engine.context.result.isDefined then onGameOver(gameId) + entry.engine.context.result.foreach { result => + publishGameOver(entry, result) + onGameOver(gameId) + } } catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId) + private def publishGameOver(entry: GameEntry, result: GameResult): Unit = + val resultStr = result match + case GameResult.Win(Color.White, _) => "white" + case GameResult.Win(Color.Black, _) => "black" + case GameResult.Draw(_) => "draw" + val terminationReason = result match + case GameResult.Win(_, WinReason.Checkmate) => "checkmate" + case GameResult.Win(_, WinReason.Resignation) => "resignation" + case GameResult.Win(_, WinReason.TimeControl) => "timeout" + case GameResult.Draw(DrawReason.Stalemate) => "stalemate" + case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material" + case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move" + case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition" + case GameResult.Draw(DrawReason.Agreement) => "agreement" + val payload = objectMapper.valueToTree[JsonNode]( + GameOverPayload(gameId, resultStr, terminationReason, entry.white.id.value, entry.black.id.value), + ) + val envelope = EventEnvelope.of(EventType.GameOver, payload) + redis + .stream(classOf[String]) + .xadd( + s"$redisPrefix:game-over", + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> objectMapper.writeValueAsString(envelope)).asJava, + ) + private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto = val clock = entry.engine.currentClockState GameWritebackEventDto( diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index 04f0477..d1f6f29 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -89,6 +89,7 @@ class GameRedisSubscriberManager: writebackFn, ioClient, unsubscribeGame, + redisConfig.prefix, ) s2cObservers.put(gameId, obs) registry.get(gameId).foreach(_.engine.subscribe(obs)) diff --git a/modules/core/src/test/scala/de/nowchess/chess/redis/GameRedisPublisherTest.scala b/modules/core/src/test/scala/de/nowchess/chess/redis/GameRedisPublisherTest.scala new file mode 100644 index 0000000..63cb240 --- /dev/null +++ b/modules/core/src/test/scala/de/nowchess/chess/redis/GameRedisPublisherTest.scala @@ -0,0 +1,123 @@ +package de.nowchess.chess.redis + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import de.nowchess.api.board.Color +import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason} +import de.nowchess.api.player.{PlayerId, PlayerInfo} +import de.nowchess.chess.client.CombinedExportResponse +import de.nowchess.chess.engine.GameEngine +import de.nowchess.chess.grpc.IoGrpcClientWrapper +import de.nowchess.chess.observer.GameEvent +import de.nowchess.chess.registry.{GameEntry, GameRegistry} +import de.nowchess.rules.sets.DefaultRules +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs} +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.* +import org.mockito.Mockito.* +import scala.compiletime.uninitialized + +class GameRedisPublisherTest: + + // scalafix:off DisableSyntax.var + private var redis: RedisDataSource = uninitialized + private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized + private var pubsubCmds: PubSubCommands[String] = uninitialized + private var registry: GameRegistry = uninitialized + private var ioClient: IoGrpcClientWrapper = uninitialized + private var onGameOverCalled: Boolean = false + // scalafix:on DisableSyntax.var + + private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) + private val gameId = "game1" + private val whitePlayer = PlayerInfo(PlayerId("white1"), "Alice") + private val blackPlayer = PlayerInfo(PlayerId("black1"), "Bob") + + @BeforeEach + def setup(): Unit = + redis = mock(classOf[RedisDataSource]) + streamCmds = mock(classOf[StreamCommands[String, String, Nothing]]) + pubsubCmds = mock(classOf[PubSubCommands[String]]) + registry = mock(classOf[GameRegistry]) + ioClient = mock(classOf[IoGrpcClientWrapper]) + when(redis.stream(classOf[String])).thenReturn(streamCmds) + when(redis.pubsub(classOf[String])).thenReturn(pubsubCmds) + when(ioClient.exportCombined(any())) + .thenReturn(CombinedExportResponse("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", "")) + onGameOverCalled = false + + private def publisherWithResult(result: GameResult): GameRedisPublisher = + val ctx = GameContext.initial.copy(result = Some(result)) + val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules) + val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer) + when(registry.get(gameId)).thenReturn(Some(entry)) + new GameRedisPublisher( + gameId, + registry, + redis, + objectMapper, + s"nowchess:game:$gameId:s2c", + _ => (), + ioClient, + _ => onGameOverCalled = true, + "nowchess", + ) + + @Test + def publishesGameOverOnCheckmate(): Unit = + val publisher = publisherWithResult(GameResult.Win(Color.White, WinReason.Checkmate)) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + assertTrue(onGameOverCalled) + + @Test + def publishesGameOverOnResignation(): Unit = + val publisher = publisherWithResult(GameResult.Win(Color.Black, WinReason.Resignation)) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + + @Test + def publishesGameOverOnDraw(): Unit = + val publisher = publisherWithResult(GameResult.Draw(DrawReason.Agreement)) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + + @Test + def doesNotPublishGameOverWhenNoResult(): Unit = + val ctx = GameContext.initial + val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules) + val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer) + when(registry.get(gameId)).thenReturn(Some(entry)) + val publisher = new GameRedisPublisher( + gameId, + registry, + redis, + objectMapper, + s"nowchess:game:$gameId:s2c", + _ => (), + ioClient, + _ => onGameOverCalled = true, + "nowchess", + ) + publisher.onGameEvent(mock(classOf[GameEvent])) + verify(streamCmds, never()).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:game-over"), + any(classOf[XAddArgs]), + any(), + ) + assertFalse(onGameOverCalled)