feat: implement clock expiry scanning and handling for game records (#53)
Build & Test (NowChessSystems) TeamCity build finished

Reviewed-on: #53
This commit was merged in pull request #53.
This commit is contained in:
2026-05-16 13:24:48 +02:00
parent 5d5fffa812
commit 8f9eb12f66
7 changed files with 251 additions and 142 deletions
@@ -10,9 +10,11 @@ import io.fabric8.kubernetes.client.KubernetesClient
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
import io.quarkus.scheduler.Scheduled
import org.jboss.logging.Logger
import io.fabric8.kubernetes.client.KubernetesClientException
import scala.jdk.CollectionConverters.*
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import scala.compiletime.uninitialized
@ApplicationScoped
@@ -37,9 +39,19 @@ class AutoScaler:
private var meterRegistry: MeterRegistry = uninitialized
// scalafix:on DisableSyntax.var
private val log = Logger.getLogger(classOf[AutoScaler])
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
private val avgLoadRef = new AtomicReference[Double](0.0)
private val log = Logger.getLogger(classOf[AutoScaler])
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
private val avgLoadRef = new AtomicReference[Double](0.0)
private val drainingForScaleDown = ConcurrentHashMap.newKeySet[String]()
def isDrainingForScaleDown(instanceId: String): Boolean =
drainingForScaleDown.contains(instanceId)
def clearDraining(instanceId: String): Unit =
drainingForScaleDown.remove(instanceId)
def clearDrainingByPodName(podName: String): Unit =
drainingForScaleDown.asScala.find(podName.contains).foreach(drainingForScaleDown.remove)
private def kubeClientOpt: Option[KubernetesClient] =
if kubeClientInstance.isUnsatisfied then None
@@ -125,49 +137,69 @@ class AutoScaler:
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
then scaleDown()
private def patchRolloutReplicas(
kube: KubernetesClient,
direction: String,
delta: Int,
canScale: Int => Boolean,
atLimit: Int => Unit,
onSuccess: (Int, Int) => Unit,
maxRetries: Int = 3,
): Unit =
def attempt(retries: Int): Unit =
try
Option(
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.get(),
).foreach { rollout =>
rolloutSpec(rollout).foreach { spec =>
spec.get("replicas") match
case current: Integer =>
val n = current.intValue()
if !canScale(n) then atLimit(n)
else
spec.put("replicas", Integer.valueOf(n + delta))
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.resource(rollout)
.update()
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", direction).increment()
onSuccess(n, n + delta)
case _ => ()
}
}
catch
case ex: KubernetesClientException if ex.getCode == 409 =>
if retries > 0 then
log.debugf("Conflict scaling %s %s, retrying (%d left)", direction, config.k8sRolloutName, retries - 1)
attempt(retries - 1)
else
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
log.errorf(ex, "Failed to scale %s %s after conflict retries", direction, config.k8sRolloutName)
case ex: Exception =>
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
log.errorf(ex, "Failed to scale %s %s", direction, config.k8sRolloutName)
attempt(maxRetries)
def scaleUp(): Unit =
log.info("Scaling up Argo Rollout")
kubeClientOpt match
case None =>
log.warn("Kubernetes client not available, cannot scale")
case None => log.warn("Kubernetes client not available, cannot scale")
case Some(kube) =>
try
Option(
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.get(),
).foreach { rollout =>
rolloutSpec(rollout).foreach { spec =>
spec.get("replicas") match
case replicas: Integer =>
val currentReplicas = replicas.intValue()
val maxReplicas = config.scaleMaxReplicas
if currentReplicas < maxReplicas then
spec.put("replicas", Integer.valueOf(currentReplicas + 1))
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.resource(rollout)
.update()
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment()
log.infof(
"Scaled up %s from %d to %d replicas",
config.k8sRolloutName,
currentReplicas,
currentReplicas + 1,
)
loadBalancer.rebalance
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
case _ => ()
}
}
catch
case ex: Exception =>
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment()
log.errorf(ex, "Failed to scale up %s", config.k8sRolloutName)
patchRolloutReplicas(
kube,
direction = "up",
delta = 1,
canScale = _ < config.scaleMaxReplicas,
atLimit = n => log.infof("Already at max replicas %d for %s", n, config.k8sRolloutName),
onSuccess = (from, to) =>
log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, from, to)
loadBalancer.rebalance,
)
def scaleDown(): Unit =
log.info("Scaling down Argo Rollout")
@@ -177,6 +209,7 @@ class AutoScaler:
underloadedInstance.foreach { inst =>
log.infof("Marking instance %s for drain before scale-down", inst.instanceId)
drainingForScaleDown.add(inst.instanceId)
failoverService
.onInstanceStreamDropped(inst.instanceId)
.subscribe()
@@ -187,42 +220,34 @@ class AutoScaler:
}
kubeClientOpt match
case None =>
log.warn("Kubernetes client not available, cannot scale")
case None => log.warn("Kubernetes client not available, cannot scale")
case Some(kube) =>
try
Option(
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.get(),
).foreach { rollout =>
rolloutSpec(rollout).foreach { spec =>
spec.get("replicas") match
case replicas: Integer =>
val currentReplicas = replicas.intValue()
val minReplicas = config.scaleMinReplicas
patchRolloutReplicas(
kube,
direction = "down",
delta = -1,
canScale = _ > config.scaleMinReplicas,
atLimit = n => log.infof("Already at min replicas %d for %s", n, config.k8sRolloutName),
onSuccess = (from, to) =>
log.infof("Scaled down %s from %d to %d replicas", config.k8sRolloutName, from, to)
underloadedInstance.foreach(inst => forceDeletePod(inst.instanceId, kube)),
)
if currentReplicas > minReplicas then
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.resource(rollout)
.update()
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
log.infof(
"Scaled down %s from %d to %d replicas",
config.k8sRolloutName,
currentReplicas,
currentReplicas - 1,
)
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
case _ => ()
}
}
catch
case ex: Exception =>
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
log.errorf(ex, "Failed to scale down %s", config.k8sRolloutName)
private def forceDeletePod(instanceId: String, kube: KubernetesClient): Unit =
try
val pods = kube
.pods()
.inNamespace(config.k8sNamespace)
.withLabel(config.k8sRolloutLabelSelector)
.list()
.getItems
.asScala
pods.find(_.getMetadata.getName.contains(instanceId)) match
case Some(pod) =>
kube.pods().inNamespace(config.k8sNamespace).withName(pod.getMetadata.getName).withGracePeriod(0L).delete()
log.infof("Force-deleted pod for drained instance %s", instanceId)
case None =>
log.debugf("No pod found for drained instance %s, skipping deletion", instanceId)
catch
case ex: Exception =>
log.warnf(ex, "Failed to force-delete pod for drained instance %s", instanceId)
@@ -88,7 +88,8 @@ class HealthMonitor:
if evicted.nonEmpty then
log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
evicted.foreach(deleteK8sPod)
autoScaler.scaleUp()
val unexpectedEvictions = evicted.filterNot(autoScaler.isDrainingForScaleDown)
if unexpectedEvictions.nonEmpty then autoScaler.scaleUp()
val instances = instanceRegistry.getAllInstances
val failed = instances.collect { inst =>
val isHealthy = checkHealth(inst.instanceId)
@@ -99,7 +100,8 @@ class HealthMonitor:
Some(inst.instanceId)
else None
}.flatten
if failed.nonEmpty then autoScaler.scaleUp()
val unexpectedFailures = failed.filterNot(autoScaler.isDrainingForScaleDown)
if unexpectedFailures.nonEmpty then autoScaler.scaleUp()
private def checkHealth(instanceId: String): Boolean =
val redisHealthy = checkRedisHeartbeat(instanceId)
@@ -227,12 +229,10 @@ class HealthMonitor:
}
private def handlePodGone(pod: Pod): Unit =
val podName = pod.getMetadata.getName
autoScaler.clearDrainingByPodName(podName)
findRegisteredInstance(pod).foreach { inst =>
log.warnf(
"Pod %s deleted — triggering failover for %s",
pod.getMetadata.getName,
inst.instanceId,
)
log.warnf("Pod %s deleted — triggering failover for %s", podName, inst.instanceId)
failoverService
.onInstanceStreamDropped(inst.instanceId)
.subscribe()
@@ -1,13 +1,12 @@
package de.nowchess.chess.redis
import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.api.dto.{GameStateEventDto, GameWritebackEventDto}
import de.nowchess.api.game.{CorrespondenceClockState, LiveClockState}
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.api.game.{DrawReason, GameResult, WinReason}
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
import de.nowchess.api.board.Color
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.observer.{GameEvent, Observer}
import de.nowchess.chess.registry.GameRegistry
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
import de.nowchess.chess.resource.GameDtoMapper
import io.quarkus.redis.datasource.RedisDataSource
import org.jboss.logging.Logger
@@ -26,61 +25,69 @@ class GameRedisPublisher(
onGameOver: String => Unit,
) extends Observer:
def emitInitialWriteback(): Unit =
try
registry.get(gameId).foreach { entry =>
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
}
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to emit initial writeback for game %s", gameId)
def onGameEvent(event: GameEvent): Unit =
try
GameRedisPublisher.log.debugf("Publishing game event for game %s", gameId)
registry.get(gameId).foreach { entry =>
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
val json = objectMapper.writeValueAsString(GameStateEventDto(dto))
redis.pubsub(classOf[String]).publish(s2cTopicName, json)
val clock = entry.engine.currentClockState
val wb = GameWritebackEventDto(
gameId = gameId,
fen = dto.fen,
pgn = dto.pgn,
moveCount = entry.engine.context.moves.size,
whiteId = entry.white.id.value,
whiteName = entry.white.displayName,
blackId = entry.black.id.value,
blackName = entry.black.displayName,
mode = entry.mode.toString,
resigned = entry.resigned,
limitSeconds = entry.engine.timeControl match {
case de.nowchess.api.game.TimeControl.Clock(l, _) => Some(l); case _ => None
},
incrementSeconds = entry.engine.timeControl match {
case de.nowchess.api.game.TimeControl.Clock(_, i) => Some(i); case _ => None
},
daysPerMove = entry.engine.timeControl match {
case de.nowchess.api.game.TimeControl.Correspondence(d) => Some(d); case _ => None
},
whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs },
blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs },
incrementMs = clock.collect { case c: LiveClockState => c.incrementMs },
clockLastTickAt = clock.collect { case c: LiveClockState => c.lastTickAt.toEpochMilli },
clockMoveDeadline = clock.collect { case c: CorrespondenceClockState => c.moveDeadline.toEpochMilli },
clockActiveColor = clock.map(_.activeColor.label.toLowerCase),
pendingDrawOffer = entry.engine.pendingDrawOfferBy.map(_.label.toLowerCase),
result = entry.engine.context.result.map {
case GameResult.Win(Color.White, _) => "white"
case GameResult.Win(Color.Black, _) => "black"
case GameResult.Draw(_) => "draw"
},
terminationReason = entry.engine.context.result.map {
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
case GameResult.Win(_, WinReason.Resignation) => "resignation"
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
case GameResult.Draw(DrawReason.Agreement) => "agreement"
},
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
)
writebackEmit(objectMapper.writeValueAsString(wb))
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
if entry.engine.context.result.isDefined then onGameOver(gameId)
}
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
val clock = entry.engine.currentClockState
GameWritebackEventDto(
gameId = gameId,
fen = dto.fen,
pgn = dto.pgn,
moveCount = entry.engine.context.moves.size,
whiteId = entry.white.id.value,
whiteName = entry.white.displayName,
blackId = entry.black.id.value,
blackName = entry.black.displayName,
mode = entry.mode.toString,
resigned = entry.resigned,
limitSeconds = entry.engine.timeControl match {
case TimeControl.Clock(l, _) => Some(l); case _ => None
},
incrementSeconds = entry.engine.timeControl match {
case TimeControl.Clock(_, i) => Some(i); case _ => None
},
daysPerMove = entry.engine.timeControl match {
case TimeControl.Correspondence(d) => Some(d); case _ => None
},
whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs },
blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs },
incrementMs = clock.collect { case c: LiveClockState => c.incrementMs },
clockLastTickAt = clock.collect { case c: LiveClockState => c.lastTickAt.toEpochMilli },
clockMoveDeadline = clock.collect { case c: CorrespondenceClockState => c.moveDeadline.toEpochMilli },
clockActiveColor = clock.map(_.activeColor.label.toLowerCase),
pendingDrawOffer = entry.engine.pendingDrawOfferBy.map(_.label.toLowerCase),
result = entry.engine.context.result.map {
case GameResult.Win(Color.White, _) => "white"
case GameResult.Win(Color.Black, _) => "black"
case GameResult.Draw(_) => "draw"
},
terminationReason = entry.engine.context.result.map {
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
case GameResult.Win(_, WinReason.Resignation) => "resignation"
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
case GameResult.Draw(DrawReason.Agreement) => "agreement"
},
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
)
@@ -13,6 +13,7 @@ import de.nowchess.chess.service.InstanceHeartbeatService
import io.quarkus.redis.datasource.ReactiveRedisDataSource
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Instance
@@ -45,6 +46,30 @@ class GameRedisSubscriberManager:
private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]()
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
// scalafix:off DisableSyntax.var
private var clockExpireSubscriber: Option[ReactivePubSubCommands.ReactiveRedisSubscriber] = None
// scalafix:on DisableSyntax.var
private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire"
@PostConstruct
def subscribeClockExpiry(): Unit =
val handler: Consumer[String] = gameId => handleClockExpiry(gameId)
try
val subscriber = reactiveRedis
.pubsub(classOf[String])
.subscribe(clockExpireChannel, handler)
.await()
.atMost(java.time.Duration.ofSeconds(5))
clockExpireSubscriber = Some(subscriber)
log.infof("Subscribed to clock expiry channel %s", clockExpireChannel)
catch case ex: Exception => log.warnf(ex, "Failed to subscribe to clock expiry channel")
private def handleClockExpiry(gameId: String): Unit =
if !s2cObservers.containsKey(gameId) then
log.infof("Clock expired for game %s — loading engine to enforce timeout", gameId)
subscribeGame(gameId)
private def c2sTopic(gameId: String): String =
s"${redisConfig.prefix}:game:$gameId:c2s"
@@ -65,6 +90,7 @@ class GameRedisSubscriberManager:
)
s2cObservers.put(gameId, obs)
registry.get(gameId).foreach(_.engine.subscribe(obs))
obs.emitInitialWriteback()
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
@@ -156,5 +182,6 @@ class GameRedisSubscriberManager:
@PreDestroy
def cleanup(): Unit =
clockExpireSubscriber.foreach(_.unsubscribe(clockExpireChannel).await().indefinitely())
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely())
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
+1
View File
@@ -60,6 +60,7 @@ dependencies {
implementation("io.quarkus:quarkus-opentelemetry")
implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}")
implementation("io.quarkus:quarkus-redis-client")
implementation("io.quarkus:quarkus-scheduler")
testImplementation(platform("org.junit:junit-bom:5.13.4"))
testImplementation("org.junit.jupiter:junit-jupiter")
@@ -34,6 +34,19 @@ class GameRecordRepository:
.asScala
.toList
def findExpiredLiveClockGames(nowMs: Long): List[GameRecord] =
em.createQuery(
"SELECT g FROM GameRecord g WHERE g.result IS NULL AND g.clockLastTickAt IS NOT NULL AND g.whiteRemainingMs IS NOT NULL",
classOf[GameRecord],
).getResultList
.asScala
.toList
.filter { g =>
val remaining =
if g.clockActiveColor == "white" then g.whiteRemainingMs.longValue else g.blackRemainingMs.longValue
g.clockLastTickAt.longValue + remaining < nowMs
}
def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] =
em.createQuery(
"SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC",
@@ -0,0 +1,36 @@
package de.nowchess.store.service
import de.nowchess.store.config.RedisConfig
import de.nowchess.store.repository.GameRecordRepository
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.scheduler.Scheduled
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@ApplicationScoped
class ClockExpiryScanner:
@Inject
// scalafix:off DisableSyntax.var
var repository: GameRecordRepository = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
// scalafix:on
private val log = Logger.getLogger(classOf[ClockExpiryScanner])
private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire"
@Scheduled(every = "30s")
def scan(): Unit =
try
val nowMs = System.currentTimeMillis()
val expired = repository.findExpiredLiveClockGames(nowMs)
if expired.nonEmpty then
log.infof("Found %d games with expired clocks", expired.size)
expired.foreach { record =>
log.infof("Publishing clock expiry for game %s", record.gameId)
redis.pubsub(classOf[String]).publish(clockExpireChannel, record.gameId)
}
catch case ex: Exception => log.warnf(ex, "Clock expiry scan failed")