feat: Merge branch 'main' of git.janis-eccarius.de:NowChess/NowChessSystems into feat/NCS-121
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
This commit is contained in:
@@ -1,28 +0,0 @@
|
|||||||
package de.nowchess.account.client
|
|
||||||
|
|
||||||
import de.nowchess.security.{InternalClientHeadersFactory, InternalSecretClientFilter}
|
|
||||||
import jakarta.ws.rs.*
|
|
||||||
import jakarta.ws.rs.core.MediaType
|
|
||||||
import org.eclipse.microprofile.rest.client.annotation.{RegisterClientHeaders, RegisterProvider}
|
|
||||||
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient
|
|
||||||
|
|
||||||
case class CorePlayerInfo(id: String, displayName: String)
|
|
||||||
case class CoreTimeControl(limitSeconds: Option[Int], incrementSeconds: Option[Int], daysPerMove: Option[Int])
|
|
||||||
case class CoreCreateGameRequest(
|
|
||||||
white: Option[CorePlayerInfo],
|
|
||||||
black: Option[CorePlayerInfo],
|
|
||||||
timeControl: Option[CoreTimeControl],
|
|
||||||
mode: Option[String],
|
|
||||||
)
|
|
||||||
case class CoreGameResponse(gameId: String)
|
|
||||||
|
|
||||||
@Path("/api/board/game")
|
|
||||||
@RegisterRestClient(configKey = "core-service")
|
|
||||||
@RegisterProvider(classOf[InternalSecretClientFilter])
|
|
||||||
@RegisterClientHeaders(classOf[InternalClientHeadersFactory])
|
|
||||||
trait CoreGameClient:
|
|
||||||
|
|
||||||
@POST
|
|
||||||
@Consumes(Array(MediaType.APPLICATION_JSON))
|
|
||||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
|
||||||
def createGame(req: CoreCreateGameRequest): CoreGameResponse
|
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package de.nowchess.account.client
|
||||||
|
|
||||||
|
case class CorePlayerInfo(id: String, displayName: String)
|
||||||
|
case class CoreTimeControl(limitSeconds: Option[Int], incrementSeconds: Option[Int], daysPerMove: Option[Int])
|
||||||
|
case class CoreCreateGameRequest(
|
||||||
|
white: Option[CorePlayerInfo],
|
||||||
|
black: Option[CorePlayerInfo],
|
||||||
|
timeControl: Option[CoreTimeControl],
|
||||||
|
mode: Option[String],
|
||||||
|
)
|
||||||
+1
-2
@@ -1,6 +1,6 @@
|
|||||||
package de.nowchess.account.config
|
package de.nowchess.account.config
|
||||||
|
|
||||||
import de.nowchess.account.client.{CoreCreateGameRequest, CoreGameResponse, CorePlayerInfo, CoreTimeControl}
|
import de.nowchess.account.client.{CoreCreateGameRequest, CorePlayerInfo, CoreTimeControl}
|
||||||
import de.nowchess.account.domain.{
|
import de.nowchess.account.domain.{
|
||||||
BotAccount,
|
BotAccount,
|
||||||
Challenge,
|
Challenge,
|
||||||
@@ -53,7 +53,6 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
|||||||
classOf[CorePlayerInfo],
|
classOf[CorePlayerInfo],
|
||||||
classOf[CoreTimeControl],
|
classOf[CoreTimeControl],
|
||||||
classOf[CoreCreateGameRequest],
|
classOf[CoreCreateGameRequest],
|
||||||
classOf[CoreGameResponse],
|
|
||||||
classOf[OfficialChallengeResponse],
|
classOf[OfficialChallengeResponse],
|
||||||
classOf[GameCreationRequestDto],
|
classOf[GameCreationRequestDto],
|
||||||
classOf[GameCreationResponseDto],
|
classOf[GameCreationResponseDto],
|
||||||
|
|||||||
@@ -36,26 +36,32 @@ class EventPublisher:
|
|||||||
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||||
Map("data" -> json).asJava,
|
Map("data" -> json).asJava,
|
||||||
)
|
)
|
||||||
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json)
|
|
||||||
()
|
()
|
||||||
|
|
||||||
def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit =
|
def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit =
|
||||||
val payload = objectMapper.createObjectNode()
|
val payload = objectMapper.createObjectNode()
|
||||||
payload.put("challengeId", challengeId)
|
payload.put("challengeId", challengeId)
|
||||||
payload.put("challengerName", challengerName)
|
payload.put("challengerName", challengerName)
|
||||||
publish(s"${redisConfig.prefix}:user:$destUserId:events", EventType.ChallengeCreated, payload)
|
publishToUserStream(destUserId, EventType.ChallengeCreated, payload)
|
||||||
|
|
||||||
def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit =
|
def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit =
|
||||||
val payload = objectMapper.createObjectNode()
|
val payload = objectMapper.createObjectNode()
|
||||||
payload.put("challengeId", challengeId)
|
payload.put("challengeId", challengeId)
|
||||||
payload.put("gameId", gameId)
|
payload.put("gameId", gameId)
|
||||||
publish(s"${redisConfig.prefix}:user:$challengerId:events", EventType.ChallengeAccepted, payload)
|
publishToUserStream(challengerId, EventType.ChallengeAccepted, payload)
|
||||||
|
|
||||||
private def publish(
|
private def publishToUserStream(
|
||||||
channel: String,
|
userId: String,
|
||||||
eventType: EventType,
|
eventType: EventType,
|
||||||
payload: com.fasterxml.jackson.databind.node.ObjectNode,
|
payload: com.fasterxml.jackson.databind.node.ObjectNode,
|
||||||
): Unit =
|
): Unit =
|
||||||
val envelope = EventEnvelope.of(eventType, payload)
|
val envelope = EventEnvelope.of(eventType, payload)
|
||||||
redis.pubsub(classOf[String]).publish(channel, objectMapper.writeValueAsString(envelope))
|
val json = objectMapper.writeValueAsString(envelope)
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xadd(
|
||||||
|
s"${redisConfig.prefix}:user:$userId:events:stream",
|
||||||
|
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||||
|
Map("data" -> json).asJava,
|
||||||
|
)
|
||||||
()
|
()
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package de.nowchess.account.service
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
|
||||||
|
import de.nowchess.account.config.RedisConfig
|
||||||
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
|
||||||
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
import org.mockito.ArgumentMatchers.*
|
||||||
|
import org.mockito.Mockito.*
|
||||||
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
|
class EventPublisherTest:
|
||||||
|
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
|
||||||
|
private var redisConfig: RedisConfig = uninitialized
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
def setup(): Unit =
|
||||||
|
redis = mock(classOf[RedisDataSource])
|
||||||
|
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
|
||||||
|
redisConfig = mock(classOf[RedisConfig])
|
||||||
|
when(redis.stream(classOf[String])).thenReturn(streamCmds)
|
||||||
|
when(redisConfig.prefix).thenReturn("nowchess")
|
||||||
|
|
||||||
|
private def publisher: EventPublisher =
|
||||||
|
val p = new EventPublisher
|
||||||
|
p.redis = redis
|
||||||
|
p.redisConfig = redisConfig
|
||||||
|
p.objectMapper = objectMapper
|
||||||
|
p
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishChallengeCreatedWritesToUserStream(): Unit =
|
||||||
|
publisher.publishChallengeCreated("user1", "ch1", "Alice")
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:user:user1:events:stream"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
verify(redis, never()).pubsub(any(classOf[Class[?]]))
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishChallengeAcceptedWritesToUserStream(): Unit =
|
||||||
|
publisher.publishChallengeAccepted("user2", "ch1", "game42")
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:user:user2:events:stream"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
verify(redis, never()).pubsub(any(classOf[Class[?]]))
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package de.nowchess.api.event
|
package de.nowchess.api.event
|
||||||
|
|
||||||
enum EventType:
|
enum EventType:
|
||||||
case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted
|
case GameStart, GameCreationRequest, GameCreationResponse, BotGameStart, ChallengeCreated, ChallengeAccepted, GameOver
|
||||||
|
|||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package de.nowchess.api.event
|
||||||
|
|
||||||
|
final case class GameOverPayload(
|
||||||
|
gameId: String,
|
||||||
|
result: String,
|
||||||
|
terminationReason: String,
|
||||||
|
whiteId: String,
|
||||||
|
blackId: String,
|
||||||
|
)
|
||||||
@@ -73,6 +73,7 @@ dependencies {
|
|||||||
testImplementation(platform("org.junit:junit-bom:5.13.4"))
|
testImplementation(platform("org.junit:junit-bom:5.13.4"))
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter")
|
testImplementation("org.junit.jupiter:junit-jupiter")
|
||||||
testImplementation("io.quarkus:quarkus-junit")
|
testImplementation("io.quarkus:quarkus-junit")
|
||||||
|
testImplementation("io.quarkus:quarkus-junit5-mockito")
|
||||||
testImplementation("io.rest-assured:rest-assured")
|
testImplementation("io.rest-assured:rest-assured")
|
||||||
testImplementation("io.quarkus:quarkus-test-security")
|
testImplementation("io.quarkus:quarkus-test-security")
|
||||||
|
|
||||||
|
|||||||
+61
-20
@@ -2,14 +2,18 @@ package de.nowchess.botplatform.registry
|
|||||||
|
|
||||||
import de.nowchess.botplatform.config.RedisConfig
|
import de.nowchess.botplatform.config.RedisConfig
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
import io.quarkus.redis.datasource.stream.{XGroupCreateArgs, XReadGroupArgs}
|
||||||
import io.smallrye.mutiny.subscription.MultiEmitter
|
import io.smallrye.mutiny.subscription.MultiEmitter
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
|
import org.eclipse.microprofile.context.ManagedExecutor
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
|
import scala.util.{Failure, Success, Try}
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util.UUID
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.function.Consumer
|
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class BotRegistry:
|
class BotRegistry:
|
||||||
@@ -17,31 +21,68 @@ class BotRegistry:
|
|||||||
private val log = Logger.getLogger(classOf[BotRegistry])
|
private val log = Logger.getLogger(classOf[BotRegistry])
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject var redis: RedisDataSource = uninitialized
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
@Inject var redisConfig: RedisConfig = uninitialized
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
|
@Inject var executor: ManagedExecutor = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]()
|
private val groupName = "bot-platform-consumer"
|
||||||
|
private val consumerId = UUID.randomUUID().toString
|
||||||
|
|
||||||
|
private val emitters = ConcurrentHashMap[String, MultiEmitter[? >: String]]()
|
||||||
|
|
||||||
def register(botId: String, emitter: MultiEmitter[? >: String]): Unit =
|
def register(botId: String, emitter: MultiEmitter[? >: String]): Unit =
|
||||||
val channel = s"${redisConfig.prefix}:bot:$botId:events"
|
createGroupIfAbsent(botId)
|
||||||
val handler: Consumer[String] = msg => emitter.emit(msg)
|
emitters.put(botId, emitter)
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler)
|
executor.submit(
|
||||||
connections.put(botId, (emitter, subscriber))
|
new Runnable:
|
||||||
log.infof("Bot %s registered", botId)
|
def run(): Unit = pollLoop(botId, emitter),
|
||||||
|
)
|
||||||
|
log.infof("Bot %s registered on stream consumer group", botId)
|
||||||
()
|
()
|
||||||
|
|
||||||
def unregister(botId: String): Unit =
|
def unregister(botId: String): Unit =
|
||||||
Option(connections.remove(botId)).foreach { (_, subscriber) =>
|
emitters.remove(botId)
|
||||||
subscriber.unsubscribe(s"${redisConfig.prefix}:bot:$botId:events")
|
|
||||||
}
|
|
||||||
log.infof("Bot %s unregistered", botId)
|
log.infof("Bot %s unregistered", botId)
|
||||||
|
|
||||||
def dispatch(botId: String, event: String): Unit =
|
|
||||||
log.debugf("Dispatching event to bot %s", botId)
|
|
||||||
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event)
|
|
||||||
()
|
|
||||||
|
|
||||||
def registeredBots: List[String] =
|
def registeredBots: List[String] =
|
||||||
import scala.jdk.CollectionConverters.*
|
emitters.keys().asScala.toList
|
||||||
connections.keys().asScala.toList
|
|
||||||
|
private def streamKey(botId: String): String =
|
||||||
|
s"${redisConfig.prefix}:bot:$botId:events:stream"
|
||||||
|
|
||||||
|
private def createGroupIfAbsent(botId: String): Unit =
|
||||||
|
Try(
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xgroupCreate(streamKey(botId), groupName, "$", new XGroupCreateArgs().mkstream()),
|
||||||
|
) match
|
||||||
|
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for bot %s", botId)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def pollLoop(botId: String, myEmitter: MultiEmitter[? >: String]): Unit =
|
||||||
|
while emitters.get(botId) eq myEmitter do
|
||||||
|
Try {
|
||||||
|
val messages = redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xreadgroup(
|
||||||
|
groupName,
|
||||||
|
consumerId,
|
||||||
|
streamKey(botId),
|
||||||
|
">",
|
||||||
|
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
|
||||||
|
)
|
||||||
|
Option(messages).foreach(_.forEach { msg =>
|
||||||
|
if emitters.get(botId) eq myEmitter then
|
||||||
|
myEmitter.emit(msg.payload().get("data"))
|
||||||
|
ack(botId, msg.id())
|
||||||
|
})
|
||||||
|
} match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Error in poll loop for bot %s", botId)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def ack(botId: String, id: String): Unit =
|
||||||
|
Try(redis.stream(classOf[String]).xack(streamKey(botId), groupName, id)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for bot %s", id, botId)
|
||||||
|
case Success(_) => ()
|
||||||
|
|||||||
+83
@@ -0,0 +1,83 @@
|
|||||||
|
package de.nowchess.botplatform.registry
|
||||||
|
|
||||||
|
import de.nowchess.botplatform.config.RedisConfig
|
||||||
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.stream.{StreamCommands, XGroupCreateArgs}
|
||||||
|
import io.smallrye.mutiny.subscription.MultiEmitter
|
||||||
|
import org.eclipse.microprofile.context.ManagedExecutor
|
||||||
|
import org.junit.jupiter.api.Assertions.*
|
||||||
|
import org.junit.jupiter.api.function.Executable
|
||||||
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
import org.mockito.ArgumentMatchers.*
|
||||||
|
import org.mockito.Mockito.*
|
||||||
|
|
||||||
|
class BotRegistryTest:
|
||||||
|
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
|
private var registry: BotRegistry = scala.compiletime.uninitialized
|
||||||
|
private var redis: RedisDataSource = scala.compiletime.uninitialized
|
||||||
|
private var streamCmds: StreamCommands[String, String, Nothing] =
|
||||||
|
scala.compiletime.uninitialized
|
||||||
|
private var redisConfig: RedisConfig = scala.compiletime.uninitialized
|
||||||
|
private var executor: ManagedExecutor = scala.compiletime.uninitialized
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
def setup(): Unit =
|
||||||
|
redis = mock(classOf[RedisDataSource])
|
||||||
|
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
|
||||||
|
redisConfig = mock(classOf[RedisConfig])
|
||||||
|
executor = mock(classOf[ManagedExecutor])
|
||||||
|
|
||||||
|
when(redis.stream(classOf[String])).thenReturn(streamCmds)
|
||||||
|
when(redisConfig.prefix).thenReturn("nowchess")
|
||||||
|
|
||||||
|
registry = new BotRegistry
|
||||||
|
registry.redis = redis
|
||||||
|
registry.redisConfig = redisConfig
|
||||||
|
registry.executor = executor
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def registerStartsPollThread(): Unit =
|
||||||
|
val emitter = mock(classOf[MultiEmitter[String]])
|
||||||
|
registry.register("bot1", emitter)
|
||||||
|
verify(executor).submit(any(classOf[Runnable]))
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def registerCreatesConsumerGroupWithMkstream(): Unit =
|
||||||
|
val emitter = mock(classOf[MultiEmitter[String]])
|
||||||
|
registry.register("bot1", emitter)
|
||||||
|
verify(streamCmds)
|
||||||
|
.xgroupCreate(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:bot:bot1:events:stream"),
|
||||||
|
org.mockito.ArgumentMatchers.eq("bot-platform-consumer"),
|
||||||
|
org.mockito.ArgumentMatchers.eq("$"),
|
||||||
|
any(classOf[XGroupCreateArgs]),
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def registerTracksBot(): Unit =
|
||||||
|
val emitter = mock(classOf[MultiEmitter[String]])
|
||||||
|
registry.register("bot42", emitter)
|
||||||
|
assertTrue(registry.registeredBots.contains("bot42"))
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def unregisterRemovesBot(): Unit =
|
||||||
|
val emitter = mock(classOf[MultiEmitter[String]])
|
||||||
|
registry.register("botX", emitter)
|
||||||
|
registry.unregister("botX")
|
||||||
|
assertFalse(registry.registeredBots.contains("botX"))
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def busyGroupExceptionIsIgnoredOnRegister(): Unit =
|
||||||
|
val emitter = mock(classOf[MultiEmitter[String]])
|
||||||
|
when(streamCmds.xgroupCreate(any(), any(), any(), any()))
|
||||||
|
.thenThrow(new RuntimeException("BUSYGROUP Consumer Group name already exists"))
|
||||||
|
val exec: Executable = () => registry.register("botBusy", emitter)
|
||||||
|
assertDoesNotThrow(exec)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def registerDoesNotInteractWithPubSub(): Unit =
|
||||||
|
val emitter = mock(classOf[MultiEmitter[String]])
|
||||||
|
registry.register("botNoPubSub", emitter)
|
||||||
|
verify(redis, never()).pubsub(any(classOf[Class[?]]))
|
||||||
@@ -2,7 +2,7 @@ package de.nowchess.chess.config
|
|||||||
|
|
||||||
import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square}
|
import de.nowchess.api.board.{CastlingRights, Color, File, Piece, PieceType, Rank, Square}
|
||||||
import de.nowchess.api.dto.*
|
import de.nowchess.api.dto.*
|
||||||
import de.nowchess.api.event.{EventEnvelope, EventType}
|
import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload}
|
||||||
import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult}
|
import de.nowchess.api.game.{DrawReason, GameContext, GameMode, GameResult}
|
||||||
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
|
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
|
||||||
import de.nowchess.chess.registry.GameCacheDto
|
import de.nowchess.chess.registry.GameCacheDto
|
||||||
@@ -18,6 +18,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
|||||||
classOf[GameCreationResponseDto],
|
classOf[GameCreationResponseDto],
|
||||||
classOf[EventEnvelope],
|
classOf[EventEnvelope],
|
||||||
classOf[EventType],
|
classOf[EventType],
|
||||||
|
classOf[GameOverPayload],
|
||||||
classOf[ErrorEventDto],
|
classOf[ErrorEventDto],
|
||||||
classOf[GameWritebackEventDto],
|
classOf[GameWritebackEventDto],
|
||||||
classOf[GameFullDto],
|
classOf[GameFullDto],
|
||||||
|
|||||||
@@ -1,15 +1,18 @@
|
|||||||
package de.nowchess.chess.redis
|
package de.nowchess.chess.redis
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
|
||||||
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
|
|
||||||
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
|
|
||||||
import de.nowchess.api.board.Color
|
import de.nowchess.api.board.Color
|
||||||
|
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
|
||||||
|
import de.nowchess.api.event.{EventEnvelope, EventType, GameOverPayload}
|
||||||
|
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
|
||||||
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||||
import de.nowchess.chess.observer.{GameEvent, Observer}
|
import de.nowchess.chess.observer.{GameEvent, Observer}
|
||||||
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
||||||
import de.nowchess.chess.resource.GameDtoMapper
|
import de.nowchess.chess.resource.GameDtoMapper
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.stream.XAddArgs
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
|
|
||||||
object GameRedisPublisher:
|
object GameRedisPublisher:
|
||||||
private val log = Logger.getLogger(classOf[GameRedisPublisher])
|
private val log = Logger.getLogger(classOf[GameRedisPublisher])
|
||||||
@@ -23,8 +26,11 @@ class GameRedisPublisher(
|
|||||||
writebackEmit: String => Unit,
|
writebackEmit: String => Unit,
|
||||||
ioClient: IoGrpcClientWrapper,
|
ioClient: IoGrpcClientWrapper,
|
||||||
onGameOver: String => Unit,
|
onGameOver: String => Unit,
|
||||||
|
redisPrefix: String,
|
||||||
) extends Observer:
|
) extends Observer:
|
||||||
|
|
||||||
|
private val maxStreamLen = 1000L
|
||||||
|
|
||||||
def emitInitialWriteback(): Unit =
|
def emitInitialWriteback(): Unit =
|
||||||
try
|
try
|
||||||
registry.get(gameId).foreach { entry =>
|
registry.get(gameId).foreach { entry =>
|
||||||
@@ -40,10 +46,39 @@ class GameRedisPublisher(
|
|||||||
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
||||||
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
|
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
|
||||||
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
|
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
|
||||||
if entry.engine.context.result.isDefined then onGameOver(gameId)
|
entry.engine.context.result.foreach { result =>
|
||||||
|
publishGameOver(entry, result)
|
||||||
|
onGameOver(gameId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
|
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
|
||||||
|
|
||||||
|
private def publishGameOver(entry: GameEntry, result: GameResult): Unit =
|
||||||
|
val resultStr = result match
|
||||||
|
case GameResult.Win(Color.White, _) => "white"
|
||||||
|
case GameResult.Win(Color.Black, _) => "black"
|
||||||
|
case GameResult.Draw(_) => "draw"
|
||||||
|
val terminationReason = result match
|
||||||
|
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
|
||||||
|
case GameResult.Win(_, WinReason.Resignation) => "resignation"
|
||||||
|
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
|
||||||
|
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
|
||||||
|
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
|
||||||
|
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
|
||||||
|
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
|
||||||
|
case GameResult.Draw(DrawReason.Agreement) => "agreement"
|
||||||
|
val payload = objectMapper.valueToTree[JsonNode](
|
||||||
|
GameOverPayload(gameId, resultStr, terminationReason, entry.white.id.value, entry.black.id.value),
|
||||||
|
)
|
||||||
|
val envelope = EventEnvelope.of(EventType.GameOver, payload)
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xadd(
|
||||||
|
s"$redisPrefix:game-over",
|
||||||
|
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||||
|
Map("data" -> objectMapper.writeValueAsString(envelope)).asJava,
|
||||||
|
)
|
||||||
|
|
||||||
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
|
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
|
||||||
val clock = entry.engine.currentClockState
|
val clock = entry.engine.currentClockState
|
||||||
GameWritebackEventDto(
|
GameWritebackEventDto(
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ class GameRedisSubscriberManager:
|
|||||||
writebackFn,
|
writebackFn,
|
||||||
ioClient,
|
ioClient,
|
||||||
unsubscribeGame,
|
unsubscribeGame,
|
||||||
|
redisConfig.prefix,
|
||||||
)
|
)
|
||||||
s2cObservers.put(gameId, obs)
|
s2cObservers.put(gameId, obs)
|
||||||
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
||||||
|
|||||||
@@ -0,0 +1,123 @@
|
|||||||
|
package de.nowchess.chess.redis
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
|
||||||
|
import de.nowchess.api.board.Color
|
||||||
|
import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason}
|
||||||
|
import de.nowchess.api.player.{PlayerId, PlayerInfo}
|
||||||
|
import de.nowchess.chess.client.CombinedExportResponse
|
||||||
|
import de.nowchess.chess.engine.GameEngine
|
||||||
|
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||||
|
import de.nowchess.chess.observer.GameEvent
|
||||||
|
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
||||||
|
import de.nowchess.rules.sets.DefaultRules
|
||||||
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||||
|
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
|
||||||
|
import org.junit.jupiter.api.Assertions.*
|
||||||
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
import org.mockito.ArgumentMatchers.*
|
||||||
|
import org.mockito.Mockito.*
|
||||||
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
|
class GameRedisPublisherTest:
|
||||||
|
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
|
||||||
|
private var pubsubCmds: PubSubCommands[String] = uninitialized
|
||||||
|
private var registry: GameRegistry = uninitialized
|
||||||
|
private var ioClient: IoGrpcClientWrapper = uninitialized
|
||||||
|
private var onGameOverCalled: Boolean = false
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
|
||||||
|
private val gameId = "game1"
|
||||||
|
private val whitePlayer = PlayerInfo(PlayerId("white1"), "Alice")
|
||||||
|
private val blackPlayer = PlayerInfo(PlayerId("black1"), "Bob")
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
def setup(): Unit =
|
||||||
|
redis = mock(classOf[RedisDataSource])
|
||||||
|
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
|
||||||
|
pubsubCmds = mock(classOf[PubSubCommands[String]])
|
||||||
|
registry = mock(classOf[GameRegistry])
|
||||||
|
ioClient = mock(classOf[IoGrpcClientWrapper])
|
||||||
|
when(redis.stream(classOf[String])).thenReturn(streamCmds)
|
||||||
|
when(redis.pubsub(classOf[String])).thenReturn(pubsubCmds)
|
||||||
|
when(ioClient.exportCombined(any()))
|
||||||
|
.thenReturn(CombinedExportResponse("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", ""))
|
||||||
|
onGameOverCalled = false
|
||||||
|
|
||||||
|
private def publisherWithResult(result: GameResult): GameRedisPublisher =
|
||||||
|
val ctx = GameContext.initial.copy(result = Some(result))
|
||||||
|
val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules)
|
||||||
|
val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer)
|
||||||
|
when(registry.get(gameId)).thenReturn(Some(entry))
|
||||||
|
new GameRedisPublisher(
|
||||||
|
gameId,
|
||||||
|
registry,
|
||||||
|
redis,
|
||||||
|
objectMapper,
|
||||||
|
s"nowchess:game:$gameId:s2c",
|
||||||
|
_ => (),
|
||||||
|
ioClient,
|
||||||
|
_ => onGameOverCalled = true,
|
||||||
|
"nowchess",
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishesGameOverOnCheckmate(): Unit =
|
||||||
|
val publisher = publisherWithResult(GameResult.Win(Color.White, WinReason.Checkmate))
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
assertTrue(onGameOverCalled)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishesGameOverOnResignation(): Unit =
|
||||||
|
val publisher = publisherWithResult(GameResult.Win(Color.Black, WinReason.Resignation))
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def publishesGameOverOnDraw(): Unit =
|
||||||
|
val publisher = publisherWithResult(GameResult.Draw(DrawReason.Agreement))
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def doesNotPublishGameOverWhenNoResult(): Unit =
|
||||||
|
val ctx = GameContext.initial
|
||||||
|
val engine = new GameEngine(initialContext = ctx, ruleSet = DefaultRules)
|
||||||
|
val entry = GameEntry(gameId, engine, whitePlayer, blackPlayer)
|
||||||
|
when(registry.get(gameId)).thenReturn(Some(entry))
|
||||||
|
val publisher = new GameRedisPublisher(
|
||||||
|
gameId,
|
||||||
|
registry,
|
||||||
|
redis,
|
||||||
|
objectMapper,
|
||||||
|
s"nowchess:game:$gameId:s2c",
|
||||||
|
_ => (),
|
||||||
|
ioClient,
|
||||||
|
_ => onGameOverCalled = true,
|
||||||
|
"nowchess",
|
||||||
|
)
|
||||||
|
publisher.onGameEvent(mock(classOf[GameEvent]))
|
||||||
|
verify(streamCmds, never()).xadd(
|
||||||
|
org.mockito.ArgumentMatchers.eq("nowchess:game-over"),
|
||||||
|
any(classOf[XAddArgs]),
|
||||||
|
any(),
|
||||||
|
)
|
||||||
|
assertFalse(onGameOverCalled)
|
||||||
+83
-6
@@ -24,8 +24,9 @@ import scala.jdk.CollectionConverters.*
|
|||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Failure, Success, Try}
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||||
|
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
|
||||||
import java.util.function.Consumer
|
import java.util.function.Consumer
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class OfficialBotService:
|
class OfficialBotService:
|
||||||
@@ -48,14 +49,18 @@ class OfficialBotService:
|
|||||||
private val terminalStatuses =
|
private val terminalStatuses =
|
||||||
Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw")
|
Set("checkmate", "resign", "timeout", "stalemate", "insufficientMaterial", "draw")
|
||||||
|
|
||||||
private val groupName = "official-bot"
|
private val groupName = "official-bot"
|
||||||
private val consumerId = UUID.randomUUID().toString
|
private val gameOverGroup = "official-bots-game-over"
|
||||||
private val maxRetries = 3
|
private val consumerId = UUID.randomUUID().toString
|
||||||
private val maxStreamLen = 1000L
|
private val maxRetries = 3
|
||||||
|
private val maxStreamLen = 1000L
|
||||||
|
|
||||||
private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream"
|
private def eventStream(botName: String): String = s"${redisConfig.prefix}:bot:$botName:events:stream"
|
||||||
|
private def gameOverStream: String = s"${redisConfig.prefix}:game-over"
|
||||||
private def dlqStream: String = s"${redisConfig.prefix}:dlq"
|
private def dlqStream: String = s"${redisConfig.prefix}:dlq"
|
||||||
|
|
||||||
|
private val gameWatches = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
def initializeMetrics(): Unit =
|
def initializeMetrics(): Unit =
|
||||||
BotController.listBots.foreach { bot =>
|
BotController.listBots.foreach { bot =>
|
||||||
@@ -68,6 +73,7 @@ class OfficialBotService:
|
|||||||
try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots))
|
try accountServiceClient.syncBots(SyncOfficialBotsRequest(bots))
|
||||||
catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service")
|
catch case ex: Exception => log.errorf(ex, "Failed to auto-register official bots with account service")
|
||||||
bots.foreach(subscribeToEventChannel)
|
bots.foreach(subscribeToEventChannel)
|
||||||
|
subscribeToGameOverStream()
|
||||||
|
|
||||||
private def subscribeToEventChannel(botName: String): Unit =
|
private def subscribeToEventChannel(botName: String): Unit =
|
||||||
createGroupIfAbsent(botName)
|
createGroupIfAbsent(botName)
|
||||||
@@ -165,9 +171,80 @@ class OfficialBotService:
|
|||||||
botAccountId: String,
|
botAccountId: String,
|
||||||
): Unit =
|
): Unit =
|
||||||
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
|
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
|
||||||
redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
|
||||||
|
gameWatches.put(gameId, (botName, subscriber))
|
||||||
()
|
()
|
||||||
|
|
||||||
|
private def subscribeToGameOverStream(): Unit =
|
||||||
|
Try(
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xgroupCreate(gameOverStream, gameOverGroup, "$", new XGroupCreateArgs().mkstream()),
|
||||||
|
) match
|
||||||
|
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to create game-over consumer group")
|
||||||
|
case Success(_) => ()
|
||||||
|
executor.submit(
|
||||||
|
new Runnable:
|
||||||
|
def run(): Unit = gameOverPollLoop(),
|
||||||
|
)
|
||||||
|
log.infof("Listening to game-over stream (consumer=%s)", consumerId)
|
||||||
|
|
||||||
|
private def gameOverPollLoop(): Unit =
|
||||||
|
while true do
|
||||||
|
Try {
|
||||||
|
val messages = redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xreadgroup(
|
||||||
|
gameOverGroup,
|
||||||
|
consumerId,
|
||||||
|
gameOverStream,
|
||||||
|
">",
|
||||||
|
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
|
||||||
|
)
|
||||||
|
Option(messages).foreach(_.forEach(msg => handleGameOverMessage(msg)))
|
||||||
|
} match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Error in game-over poll loop")
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def handleGameOverMessage(msg: StreamMessage[String, String, String]): Unit =
|
||||||
|
val json = msg.payload().get("data")
|
||||||
|
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
|
||||||
|
Try {
|
||||||
|
val node = objectMapper.readTree(json)
|
||||||
|
val gameId = node.path("payload").path("gameId").asText()
|
||||||
|
if gameId.nonEmpty then
|
||||||
|
Option(gameWatches.remove(gameId)).foreach { (botName, subscriber) =>
|
||||||
|
val topic = s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||||
|
Try(subscriber.unsubscribe(topic)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to unsubscribe from game %s", gameId)
|
||||||
|
case Success(_) => log.infof("Bot %s cleaned up game %s after GameOver", botName, gameId)
|
||||||
|
}
|
||||||
|
} match
|
||||||
|
case Success(_) =>
|
||||||
|
ackGameOver(msg.id())
|
||||||
|
case Failure(ex) if attempt + 1 < maxRetries =>
|
||||||
|
log.warnf(ex, "GameOver handling failed (attempt %d), retrying", attempt)
|
||||||
|
xadd(gameOverStream, Map("data" -> json, "attempt" -> (attempt + 1).toString))
|
||||||
|
ackGameOver(msg.id())
|
||||||
|
case Failure(ex) =>
|
||||||
|
log.errorf(ex, "GameOver handling failed after %d attempts, sending to DLQ", maxRetries)
|
||||||
|
xadd(
|
||||||
|
dlqStream,
|
||||||
|
Map(
|
||||||
|
"data" -> json,
|
||||||
|
"eventType" -> "GameOver",
|
||||||
|
"error" -> Option(ex.getMessage).getOrElse(ex.getClass.getName),
|
||||||
|
"attempt" -> attempt.toString,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
ackGameOver(msg.id())
|
||||||
|
|
||||||
|
private def ackGameOver(id: String): Unit =
|
||||||
|
Try(redis.stream(classOf[String]).xack(gameOverStream, gameOverGroup, id)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to ack game-over message %s", id)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
private def handleGameEvent(
|
private def handleGameEvent(
|
||||||
botName: String,
|
botName: String,
|
||||||
gameId: String,
|
gameId: String,
|
||||||
|
|||||||
@@ -2,15 +2,18 @@ package de.nowchess.ws.resource
|
|||||||
|
|
||||||
import de.nowchess.ws.config.RedisConfig
|
import de.nowchess.ws.config.RedisConfig
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
import io.quarkus.redis.datasource.stream.{XAddArgs, XGroupCreateArgs, XReadGroupArgs}
|
||||||
import io.quarkus.websockets.next.*
|
import io.quarkus.websockets.next.*
|
||||||
import io.smallrye.jwt.auth.principal.JWTParser
|
import io.smallrye.jwt.auth.principal.JWTParser
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
|
import org.eclipse.microprofile.context.ManagedExecutor
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
import scala.util.Try
|
import scala.jdk.CollectionConverters.*
|
||||||
|
import scala.util.{Failure, Success, Try}
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util.UUID
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.function.Consumer
|
|
||||||
|
|
||||||
@WebSocket(path = "/api/user/ws")
|
@WebSocket(path = "/api/user/ws")
|
||||||
class UserWebSocketResource:
|
class UserWebSocketResource:
|
||||||
@@ -18,20 +21,22 @@ class UserWebSocketResource:
|
|||||||
private val log = Logger.getLogger(classOf[UserWebSocketResource])
|
private val log = Logger.getLogger(classOf[UserWebSocketResource])
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
var redis: RedisDataSource = uninitialized
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
|
@Inject var jwtParser: JWTParser = uninitialized
|
||||||
@Inject
|
@Inject var executor: ManagedExecutor = uninitialized
|
||||||
var redisConfig: RedisConfig = uninitialized
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
var jwtParser: JWTParser = uninitialized
|
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val connections = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
|
private val consumerId = UUID.randomUUID().toString
|
||||||
|
private val maxRetries = 3
|
||||||
|
private val maxStreamLen = 1000L
|
||||||
|
|
||||||
private def userTopic(userId: String): String =
|
private val connections = new ConcurrentHashMap[String, (String, WebSocketConnection)]()
|
||||||
s"${redisConfig.prefix}:user:$userId:events"
|
|
||||||
|
private def userStreamKey(userId: String): String =
|
||||||
|
s"${redisConfig.prefix}:user:$userId:events:stream"
|
||||||
|
|
||||||
|
private def dlqKey: String = s"${redisConfig.prefix}:dlq"
|
||||||
|
|
||||||
@OnOpen
|
@OnOpen
|
||||||
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
||||||
@@ -45,16 +50,76 @@ class UserWebSocketResource:
|
|||||||
log.warn("WebSocket opened with no valid JWT — closing connection")
|
log.warn("WebSocket opened with no valid JWT — closing connection")
|
||||||
connection.close().subscribe().`with`(_ => (), _ => ())
|
connection.close().subscribe().`with`(_ => (), _ => ())
|
||||||
case Some(userId) =>
|
case Some(userId) =>
|
||||||
log.infof("User WebSocket opened — userId=%s", userId)
|
log.infof("User WebSocket opened — userId=%s connId=%s", userId, connection.id())
|
||||||
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
createGroupIfAbsent(userId, connection.id())
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler)
|
connections.put(connection.id(), (userId, connection))
|
||||||
connections.put(connection.id(), (userId, subscriber))
|
executor.submit(
|
||||||
|
new Runnable:
|
||||||
|
def run(): Unit = pollLoop(connection.id(), userId, connection),
|
||||||
|
)
|
||||||
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
|
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
|
||||||
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
|
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
|
||||||
|
|
||||||
@OnClose
|
@OnClose
|
||||||
def onClose(connection: WebSocketConnection): Unit =
|
def onClose(connection: WebSocketConnection): Unit =
|
||||||
log.infof("User WebSocket closed — connectionId=%s", connection.id())
|
log.infof("User WebSocket closed — connectionId=%s", connection.id())
|
||||||
Option(connections.remove(connection.id())).foreach { (userId, subscriber) =>
|
connections.remove(connection.id())
|
||||||
subscriber.unsubscribe(userTopic(userId))
|
|
||||||
}
|
private def createGroupIfAbsent(userId: String, groupName: String): Unit =
|
||||||
|
Try(
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xgroupCreate(userStreamKey(userId), groupName, "$", new XGroupCreateArgs().mkstream()),
|
||||||
|
) match
|
||||||
|
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for userId=%s", userId)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def pollLoop(connectionId: String, userId: String, myConnection: WebSocketConnection): Unit =
|
||||||
|
while Option(connections.get(connectionId)).exists(_._2 eq myConnection) do
|
||||||
|
Try {
|
||||||
|
val messages = redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xreadgroup(
|
||||||
|
connectionId,
|
||||||
|
consumerId,
|
||||||
|
userStreamKey(userId),
|
||||||
|
">",
|
||||||
|
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
|
||||||
|
)
|
||||||
|
Option(messages).foreach(_.forEach { msg =>
|
||||||
|
if Option(connections.get(connectionId)).exists(_._2 eq myConnection) then
|
||||||
|
val json = msg.payload().get("data")
|
||||||
|
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
|
||||||
|
Try(myConnection.sendText(json).await().atMost(Duration.ofSeconds(5))) match
|
||||||
|
case Success(_) =>
|
||||||
|
ack(connectionId, userId, msg.id())
|
||||||
|
case Failure(_) if attempt + 1 < maxRetries =>
|
||||||
|
xadd(userStreamKey(userId), json, attempt + 1)
|
||||||
|
ack(connectionId, userId, msg.id())
|
||||||
|
case Failure(ex) =>
|
||||||
|
log.warnf(ex, "Delivery failed for userId=%s after %d attempts, sending to DLQ", userId, maxRetries)
|
||||||
|
xadd(dlqKey, json, attempt)
|
||||||
|
ack(connectionId, userId, msg.id())
|
||||||
|
})
|
||||||
|
} match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Error in poll loop for userId=%s", userId)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def ack(groupName: String, userId: String, id: String): Unit =
|
||||||
|
Try(redis.stream(classOf[String]).xack(userStreamKey(userId), groupName, id)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for userId=%s", id, userId)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def xadd(key: String, json: String, attempt: Int): Unit =
|
||||||
|
Try(
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xadd(
|
||||||
|
key,
|
||||||
|
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||||
|
Map("data" -> json, "attempt" -> attempt.toString).asJava,
|
||||||
|
),
|
||||||
|
) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to publish to stream %s", key)
|
||||||
|
case Success(_) => ()
|
||||||
|
|||||||
Reference in New Issue
Block a user