feat(coordinator): add configurable coordinator settings and enhance WebSocket connection handling
Build & Test (NowChessSystems) TeamCity build failed

This commit is contained in:
2026-04-26 22:33:16 +02:00
parent 64d5afa4d1
commit 6b59e68e04
27 changed files with 873 additions and 617 deletions
@@ -24,6 +24,7 @@ nowchess:
prefix: nowchess
coordinator:
enabled: ${NOWCHESS_COORDINATOR_ENABLED:false}
host: localhost
grpc-port: 9086
stream-heartbeat-interval: 200ms
@@ -99,6 +100,7 @@ nowchess:
prefix: ${REDIS_PREFIX:nowchess}
coordinator:
enabled: ${NOWCHESS_COORDINATOR_ENABLED:true}
host: ${COORDINATOR_SERVICE_HOST:localhost}
grpc-port: ${COORDINATOR_SERVICE_GRPC_PORT:9086}
stream-heartbeat-interval: 200ms
@@ -3,6 +3,6 @@ package de.nowchess.chess.redis
sealed trait C2sMessage
object C2sMessage:
case object Connected extends C2sMessage
case class Move(uci: String) extends C2sMessage
case object Ping extends C2sMessage
case object Connected extends C2sMessage
case class Move(uci: String, playerId: Option[String] = None) extends C2sMessage
case object Ping extends C2sMessage
@@ -1,7 +1,9 @@
package de.nowchess.chess.redis
import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.api.board.Color
import de.nowchess.api.dto.GameFullEventDto
import de.nowchess.api.game.GameMode
import de.nowchess.chess.config.RedisConfig
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.observer.Observer
@@ -82,10 +84,10 @@ class GameRedisSubscriberManager:
private def handleC2sMessage(gameId: String, msg: String): Unit =
parseC2sMessage(msg) match
case Some(C2sMessage.Connected) => handleConnected(gameId)
case Some(C2sMessage.Move(uci)) => handleMove(gameId, uci)
case Some(C2sMessage.Ping) => ()
case None => ()
case Some(C2sMessage.Connected) => handleConnected(gameId)
case Some(C2sMessage.Move(uci, playerId)) => handleMove(gameId, uci, playerId)
case Some(C2sMessage.Ping) => ()
case None => ()
private def handleConnected(gameId: String): Unit =
registry.get(gameId).foreach { entry =>
@@ -94,16 +96,30 @@ class GameRedisSubscriberManager:
redisson.getTopic(s2cTopicName(gameId)).publish(json)
}
private def handleMove(gameId: String, uci: String): Unit =
private def handleMove(gameId: String, uci: String, playerId: Option[String]): Unit =
registry.get(gameId).foreach { entry =>
entry.engine.processUserInput(uci)
entry.mode match
case GameMode.Open => entry.engine.processUserInput(uci)
case GameMode.Authenticated =>
playerId match
case None => ()
case Some(pid) =>
val turn = entry.engine.context.turn
val authorised =
(entry.white.id.value == pid && turn == Color.White) ||
(entry.black.id.value == pid && turn == Color.Black)
if authorised then entry.engine.processUserInput(uci)
}
private def parseC2sMessage(msg: String): Option[C2sMessage] =
Try(objectMapper.readTree(msg)).toOption.flatMap { node =>
Option(node.get("type")).map(_.asText()).flatMap {
case "CONNECTED" => Some(C2sMessage.Connected)
case "MOVE" => Option(node.get("uci")).map(u => C2sMessage.Move(u.asText()))
case "MOVE" =>
Option(node.get("uci")).map { u =>
val pid = Option(node.get("playerId")).map(_.asText()).filter(_.nonEmpty)
C2sMessage.Move(u.asText(), pid)
}
case "PING" => Some(C2sMessage.Ping)
case _ => None
}
@@ -33,6 +33,9 @@ class InstanceHeartbeatService:
@ConfigProperty(name = "quarkus.grpc.server.port", defaultValue = "9000")
private var grpcPort: Int = 0
@ConfigProperty(name = "nowchess.coordinator.enabled", defaultValue = "true")
private var coordinatorEnabled: Boolean = true
private var coordinatorStub: CoordinatorServiceStub = uninitialized
private val log = Logger.getLogger(classOf[InstanceHeartbeatService])
@@ -45,20 +48,36 @@ class InstanceHeartbeatService:
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
private var subscriptionCount = 0
private var localCacheSize = 0
private var serviceActive = false
private var shuttingDown = false
def onStart(@Observes event: StartupEvent): Unit =
if !coordinatorEnabled then
log.info("Coordinator support disabled via config; skipping heartbeat service startup")
return
try
shuttingDown = false
generateInstanceId()
initializeHeartbeatStream()
scheduleHeartbeats()
serviceActive = true
log.infof("Instance heartbeat service started with ID: %s", instanceId)
catch
case ex: Exception =>
serviceActive = false
log.errorf(ex, "Failed to start instance heartbeat service")
def onShutdown(@Observes event: ShutdownEvent): Unit =
shuttingDown = true
if !serviceActive then
log.info("Instance heartbeat service stopped")
return
try
cleanup()
serviceActive = false
log.info("Instance heartbeat service stopped")
catch
case ex: Exception =>
@@ -74,12 +93,16 @@ class InstanceHeartbeatService:
localCacheSize = count
def addGameSubscription(gameId: String): Unit =
if !coordinatorEnabled then return
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.add(gameId)
subscriptionCount += 1
def removeGameSubscription(gameId: String): Unit =
if !coordinatorEnabled then return
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.remove(gameId)
@@ -103,7 +126,8 @@ class InstanceHeartbeatService:
override def onError(t: Throwable): Unit =
log.warnf(t, "Heartbeat stream error")
streamObserver = None
heartbeatExecutor.schedule((() => initializeHeartbeatStream()): Runnable, 5, TimeUnit.SECONDS)
if !shuttingDown then
heartbeatExecutor.schedule((() => initializeHeartbeatStream()): Runnable, 5, TimeUnit.SECONDS)
override def onCompleted: Unit =
log.info("Heartbeat stream completed")
@@ -182,11 +206,12 @@ class InstanceHeartbeatService:
streamObserver.foreach(_.onCompleted())
streamObserver = None
val key = s"$redisPrefix:instances:$instanceId"
redissonClient.getBucket[String](key).delete()
if instanceId.nonEmpty then
val key = s"$redisPrefix:instances:$instanceId"
redissonClient.getBucket[String](key).delete()
val setKey = s"$redisPrefix:instance:$instanceId:games"
redissonClient.getSet[String](setKey).delete()
val setKey = s"$redisPrefix:instance:$instanceId:games"
redissonClient.getSet[String](setKey).delete()
heartbeatExecutor.shutdown()
redisHeartbeatExecutor.shutdown()
@@ -12,6 +12,8 @@ quarkus:
url: http://localhost:8085
nowchess:
coordinator:
enabled: false
redis:
host: localhost
port: 6379
@@ -0,0 +1,17 @@
package de.nowchess.chess.config
import jakarta.annotation.Priority
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Alternative
import jakarta.enterprise.inject.Produces
import org.mockito.Mockito
import org.redisson.api.RedissonClient
@Alternative
@Priority(1)
@ApplicationScoped
class MockRedissonProducer:
@Produces
@ApplicationScoped
def produceRedissonClient(): RedissonClient =
Mockito.mock(classOf[RedissonClient], Mockito.RETURNS_DEEP_STUBS)