feat(core): publish GameOver event to Redis Streams #64
@@ -1,4 +1,4 @@
|
|||||||
package de.nowchess.api.event
|
package de.nowchess.api.event
|
||||||
|
|
||||||
enum EventType:
|
enum EventType:
|
||||||
case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted
|
case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted, GameOver
|
||||||
|
|||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package de.nowchess.api.event
|
||||||
|
|
||||||
|
final case class GameOverPayload(
|
||||||
|
gameId: String,
|
||||||
|
result: String,
|
||||||
|
terminationReason: String,
|
||||||
|
whiteId: String,
|
||||||
|
blackId: String,
|
||||||
|
)
|
||||||
@@ -2,7 +2,7 @@ package de.nowchess.chess.config
|
|||||||
|
|
||||||
import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square}
|
import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square}
|
||||||
import de.nowchess.api.dto.*
|
import de.nowchess.api.dto.*
|
||||||
import de.nowchess.api.event.{EventEnvelope, EventType}
|
import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload}
|
||||||
import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult}
|
import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult}
|
||||||
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
|
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
|
||||||
import de.nowchess.chess.registry.GameCacheDto
|
import de.nowchess.chess.registry.GameCacheDto
|
||||||
@@ -18,6 +18,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
|||||||
classOf[GameCreationResponseDto],
|
classOf[GameCreationResponseDto],
|
||||||
classOf[EventEnvelope],
|
classOf[EventEnvelope],
|
||||||
classOf[EventType],
|
classOf[EventType],
|
||||||
|
classOf[GameOverPayload],
|
||||||
classOf[ErrorEventDto],
|
classOf[ErrorEventDto],
|
||||||
classOf[GameWritebackEventDto],
|
classOf[GameWritebackEventDto],
|
||||||
classOf[GameFullDto],
|
classOf[GameFullDto],
|
||||||
|
|||||||
@@ -1,15 +1,18 @@
|
|||||||
package de.nowchess.chess.redis
|
package de.nowchess.chess.redis
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
|
||||||
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
|
|
||||||
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
|
|
||||||
import de.nowchess.api.board.Color
|
import de.nowchess.api.board.Color
|
||||||
|
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
|
||||||
|
import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload}
|
||||||
|
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
|
||||||
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||||
import de.nowchess.chess.observer.{GameEvent, Observer}
|
import de.nowchess.chess.observer.{GameEvent, Observer}
|
||||||
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
||||||
import de.nowchess.chess.resource.GameDtoMapper
|
import de.nowchess.chess.resource.GameDtoMapper
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.stream.XAddArgs
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
|
|
||||||
object GameRedisPublisher:
|
object GameRedisPublisher:
|
||||||
private val log = Logger.getLogger(classOf[GameRedisPublisher])
|
private val log = Logger.getLogger(classOf[GameRedisPublisher])
|
||||||
@@ -23,8 +26,11 @@ class GameRedisPublisher(
|
|||||||
writebackEmit: String => Unit,
|
writebackEmit: String => Unit,
|
||||||
ioClient: IoGrpcClientWrapper,
|
ioClient: IoGrpcClientWrapper,
|
||||||
onGameOver: String => Unit,
|
onGameOver: String => Unit,
|
||||||
|
redisPrefix: String,
|
||||||
) extends Observer:
|
) extends Observer:
|
||||||
|
|
||||||
|
private val maxStreamLen = 1000L
|
||||||
|
|
||||||
def emitInitialWriteback(): Unit =
|
def emitInitialWriteback(): Unit =
|
||||||
try
|
try
|
||||||
registry.get(gameId).foreach { entry =>
|
registry.get(gameId).foreach { entry =>
|
||||||
@@ -40,10 +46,39 @@ class GameRedisPublisher(
|
|||||||
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
||||||
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
|
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
|
||||||
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
|
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
|
||||||
if entry.engine.context.result.isDefined then onGameOver(gameId)
|
entry.engine.context.result.foreach { result =>
|
||||||
|
publishGameOver(entry, result)
|
||||||
|
onGameOver(gameId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
|
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
|
||||||
|
|
||||||
|
private def publishGameOver(entry: GameEntry, result: GameResult): Unit =
|
||||||
|
val resultStr = result match
|
||||||
|
case GameResult.Win(Color.White, _) => "white"
|
||||||
|
case GameResult.Win(Color.Black, _) => "black"
|
||||||
|
case GameResult.Draw(_) => "draw"
|
||||||
|
val terminationReason = result match
|
||||||
|
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
|
||||||
|
case GameResult.Win(_, WinReason.Resignation) => "resignation"
|
||||||
|
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
|
||||||
|
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
|
||||||
|
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
|
||||||
|
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
|
||||||
|
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
|
||||||
|
case GameResult.Draw(DrawReason.Agreement) => "agreement"
|
||||||
|
val payload = objectMapper.valueToTree[JsonNode](
|
||||||
|
GameOverPayload(gameId, resultStr, terminationReason, entry.white.id.value, entry.black.id.value),
|
||||||
|
)
|
||||||
|
val envelope = EventEnvelope.of(EventType.GameOver, payload)
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xadd(
|
||||||
|
s"$redisPrefix:game-over",
|
||||||
|
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||||
|
Map("data" -> objectMapper.writeValueAsString(envelope)).asJava,
|
||||||
|
)
|
||||||
|
|
||||||
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
|
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
|
||||||
val clock = entry.engine.currentClockState
|
val clock = entry.engine.currentClockState
|
||||||
GameWritebackEventDto(
|
GameWritebackEventDto(
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ class GameRedisSubscriberManager:
|
|||||||
writebackFn,
|
writebackFn,
|
||||||
ioClient,
|
ioClient,
|
||||||
unsubscribeGame,
|
unsubscribeGame,
|
||||||
|
redisConfig.prefix,
|
||||||
)
|
)
|
||||||
s2cObservers.put(gameId, obs)
|
s2cObservers.put(gameId, obs)
|
||||||
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
||||||
|
|||||||
@@ -0,0 +1,123 @@
|
|||||||
|
package de.nowchess.chess.redis
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
|
||||||
|
import de.nowchess.api.board.Color
|
||||||
|
import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason}
|
||||||
|
import de.nowchess.api.player.{PlayerId, PlayerInfo}
|
||||||
|
import de.nowchess.chess.client.CombinedExportResponse
|
||||||
|
import de.nowchess.chess.engine.GameEngine
|
||||||
|
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||||
|
import de.nowchess.chess.observer.GameEvent
|
||||||
|
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
||||||
|
import de.nowchess.rules.sets.DefaultRules
|
||||||
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||||
|
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
|
||||||
|
import org.junit.jupiter.api.Assertions.*
|
||||||
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
import org.mockito.ArgumentMatchers.*
|
||||||
|
import org.mockito.Mockito.*
|
||||||
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
|
class GameRedisPublisherTest:
|
||||||
|
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
|
||||||
|
private var pubsubCmds: PubSubCommands[String] = uninitialized
|
||||||
|
private var registry: GameRegistry = uninitialized
|
||||||
|
private var ioClient: IoGrpcClientWrapper = uninitialized
|
||||||
|
private var onGameOverCalled: Boolean = false
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
|
||||||
|
private val gameId = "game1"
|
||||||
|
private val whitePlayer = PlayerInfo(PlayerId("white1"), "Alice")
|
||||||
|
private val blackPlayer = PlayerInfo(PlayerId("black1"), "Bob")
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
def setup(): Unit =
|
||||||
|
redis = mock(classOf[RedisDataSource])
|
||||||
|
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
|
||||||
|
pubsubCmds = mock(classOf[PubSubCommands[String]])
|
||||||
|
registry = mock(classOf[GameRegistry])
|
||||||
|
ioClient = mock(classOf[IoGrpcClientWrapper])
|
||||||
|
when(redis.stream(classOf[String])).thenReturn(streamCmds)
|
||||||
|
when(redis.pubsub(classOf[String])).thenReturn(pubsubCmds)
|
||||||
|
when(ioClient.exportCombined(any()))
|
||||||
|
.thenReturn(CombinedExportResponse("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", ""))
|
||||||
|
onGameOverCalled = false
|
||||||
|
|
||||||
|
private def publisherWithResult(result: GameResult): GameRedisPublisher =
|
||||||
|
val ctx = GameContext.initial.copy(result = Some(result))
|
||||||
|
val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules)
|
||||||
|
val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer)
|
||||||
|
when(registry.get(gameId)).thenReturn(Some(entry))
|
||||||
|
new GameRedisPublisher(
|
||||||
|
gameId,
|
||||||
|
registry,
|
||||||
|
redis,
|
||||||
|
objectMapper,
|
||||||
|
s"nowchess:game:$gameId:s2c",
|
||||||
|
_ => (),
|
||||||
|
ioClient,
|
||||||
|
_ => onGameOverCalled = true,
|
||||||
|
"nowchess",
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishesGameOverOnCheckmate(): Unit =
|
||||||
|
val publisher = publisherWithResult(GameResult.Win(Color.White, WinReason.Checkmate))
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
assertTrue(onGameOverCalled)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishesGameOverOnResignation(): Unit =
|
||||||
|
val publisher = publisherWithResult(GameResult.Win(Color.Black, WinReason.Resignation))
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishesGameOverOnDraw(): Unit =
|
||||||
|
val publisher = publisherWithResult(GameResult.Draw(DrawReason.Agreement))
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def doesNotPublishGameOverWhenNoResult(): Unit =
|
||||||
|
val ctx = GameContext.initial
|
||||||
|
val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules)
|
||||||
|
val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer)
|
||||||
|
when(registry.get(gameId)).thenReturn(Some(entry))
|
||||||
|
val publisher = new GameRedisPublisher(
|
||||||
|
gameId,
|
||||||
|
registry,
|
||||||
|
redis,
|
||||||
|
objectMapper,
|
||||||
|
s"nowchess:game:$gameId:s2c",
|
||||||
|
_ => (),
|
||||||
|
ioClient,
|
||||||
|
_ => onGameOverCalled = true,
|
||||||
|
"nowchess",
|
||||||
|
)
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds, never()).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
assertFalse(onGameOverCalled)
|
||||||
Reference in New Issue
Block a user