Compare commits

...

3 Commits

Author SHA1 Message Date
Janis 9ca8af9bf8 feat: add configurable CPU and memory scaling thresholds for auto-scaling
Build & Test (NowChessSystems) TeamCity build failed
2026-05-16 15:04:14 +02:00
Janis fcd7c80169 fix: improve code readability in GameRecordRepository and GameRedisPublisher
Build & Test (NowChessSystems) TeamCity build failed
2026-05-16 13:22:40 +02:00
Janis 6448bec3f8 feat: implement clock expiry scanning and handling for game records
Build & Test (NowChessSystems) TeamCity build failed
2026-05-16 13:07:41 +02:00
9 changed files with 334 additions and 165 deletions
@@ -47,6 +47,8 @@ nowchess:
k8s-rollout-label-selector: "app=nowchess-core" k8s-rollout-label-selector: "app=nowchess-core"
startup-validation-timeout: 15s startup-validation-timeout: 15s
failover-wait-timeout: 30s failover-wait-timeout: 30s
scale-cpu-threshold-percent: 0.8
scale-memory-threshold-percent: 0.8
--- ---
# dev profile # dev profile
@@ -62,3 +62,9 @@ trait CoordinatorConfig:
@WithName("failover-wait-timeout") @WithName("failover-wait-timeout")
def failoverWaitTimeout: Duration def failoverWaitTimeout: Duration
@WithName("scale-cpu-threshold-percent")
def scaleCpuThresholdPercent: Double
@WithName("scale-memory-threshold-percent")
def scaleMemoryThresholdPercent: Double
@@ -10,9 +10,11 @@ import io.fabric8.kubernetes.client.KubernetesClient
import io.micrometer.core.instrument.{Gauge, MeterRegistry} import io.micrometer.core.instrument.{Gauge, MeterRegistry}
import io.quarkus.scheduler.Scheduled import io.quarkus.scheduler.Scheduled
import org.jboss.logging.Logger import org.jboss.logging.Logger
import io.fabric8.kubernetes.client.KubernetesClientException
import scala.jdk.CollectionConverters.* import scala.jdk.CollectionConverters.*
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import scala.compiletime.uninitialized import scala.compiletime.uninitialized
@ApplicationScoped @ApplicationScoped
@@ -40,6 +42,16 @@ class AutoScaler:
private val log = Logger.getLogger(classOf[AutoScaler]) private val log = Logger.getLogger(classOf[AutoScaler])
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L) private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
private val avgLoadRef = new AtomicReference[Double](0.0) private val avgLoadRef = new AtomicReference[Double](0.0)
private val drainingForScaleDown = ConcurrentHashMap.newKeySet[String]()
def isDrainingForScaleDown(instanceId: String): Boolean =
drainingForScaleDown.contains(instanceId)
def clearDraining(instanceId: String): Unit =
drainingForScaleDown.remove(instanceId)
def clearDrainingByPodName(podName: String): Unit =
drainingForScaleDown.asScala.find(podName.contains).foreach(drainingForScaleDown.remove)
private def kubeClientOpt: Option[KubernetesClient] = private def kubeClientOpt: Option[KubernetesClient] =
if kubeClientInstance.isUnsatisfied then None if kubeClientInstance.isUnsatisfied then None
@@ -72,6 +84,34 @@ class AutoScaler:
} }
// scalafix:on DisableSyntax.asInstanceOf // scalafix:on DisableSyntax.asInstanceOf
private def parseMillicores(s: String): Long =
if s.endsWith("n") then s.dropRight(1).toLongOption.map(_ / 1000000).getOrElse(0L)
else if s.endsWith("m") then s.dropRight(1).toLongOption.getOrElse(0L)
else s.toLongOption.map(_ * 1000).getOrElse(0L)
private def parseBytes(s: String): Long =
if s.endsWith("Ki") then s.dropRight(2).toLongOption.map(_ * 1024L).getOrElse(0L)
else if s.endsWith("Mi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L).getOrElse(0L)
else if s.endsWith("Gi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L * 1024L).getOrElse(0L)
else if s.endsWith("K") then s.dropRight(1).toLongOption.map(_ * 1000L).getOrElse(0L)
else if s.endsWith("M") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L).getOrElse(0L)
else if s.endsWith("G") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L * 1000L).getOrElse(0L)
else s.toLongOption.getOrElse(0L)
private def exceedsRatio(used: Long, request: Long, threshold: Double, resource: String, instanceId: String): Boolean =
if request <= 0 then false
else
val ratio = used.toDouble / request.toDouble
log.debugf(
"Instance %s %s: %d used / %d requested = %.0f%%",
instanceId,
resource,
used,
request,
ratio * 100,
)
ratio > threshold
// scalafix:off DisableSyntax.asInstanceOf // scalafix:off DisableSyntax.asInstanceOf
private def isResourceConstrained(instanceId: String): Boolean = private def isResourceConstrained(instanceId: String): Boolean =
kubeClientOpt.fold(false) { kube => kubeClientOpt.fold(false) { kube =>
@@ -80,25 +120,37 @@ class AutoScaler:
kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala
pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod => pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod =>
try try
val requests = Option(pod.getSpec)
.flatMap(s => Option(s.getContainers))
.flatMap(cs => if cs.isEmpty then None else Option(cs.get(0)))
.flatMap(c => Option(c.getResources))
.flatMap(r => Option(r.getRequests))
val cpuRequestMillis = requests.flatMap(m => Option(m.get("cpu"))).map(q => parseMillicores(q.toString)).getOrElse(0L)
val memRequestBytes = requests.flatMap(m => Option(m.get("memory"))).map(q => parseBytes(q.toString)).getOrElse(0L)
if cpuRequestMillis <= 0 && memRequestBytes <= 0 then
log.debugf("No resource requests found for instance %s, skipping resource check", instanceId)
false
else
val metricsRes = kube val metricsRes = kube
.genericKubernetesResources(metricsApiVersion, "PodMetrics") .genericKubernetesResources(metricsApiVersion, "PodMetrics")
.inNamespace(config.k8sNamespace) .inNamespace(config.k8sNamespace)
.withName(pod.getMetadata.getName) .withName(pod.getMetadata.getName)
.get() .get()
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]] val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
Option(metricsMap.get("metrics")) val usageOpt = Option(metricsMap.get("metrics"))
.map(_.asInstanceOf[java.util.Map[String, AnyRef]]) .map(_.asInstanceOf[java.util.Map[String, AnyRef]])
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]])) .flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
.filter(!_.isEmpty) .filter(!_.isEmpty)
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]]) .map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]])) .flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
.flatMap(u => Option(u.get("cpu")))
.map(_.toString) usageOpt.exists { usage =>
.exists { cpuStr => val cpuUsed = Option(usage.get("cpu")).map(v => parseMillicores(v.toString)).getOrElse(0L)
val cpuMillis = val memUsed = Option(usage.get("memory")).map(v => parseBytes(v.toString)).getOrElse(0L)
if cpuStr.endsWith("m") then cpuStr.dropRight(1).toLongOption.getOrElse(0L) exceedsRatio(cpuUsed, cpuRequestMillis, config.scaleCpuThresholdPercent, "CPU", instanceId) ||
else cpuStr.toLongOption.map(_ * 1000).getOrElse(0L) exceedsRatio(memUsed, memRequestBytes, config.scaleMemoryThresholdPercent, "memory", instanceId)
cpuMillis > 800
} }
catch case _: Exception => false catch case _: Exception => false
} }
@@ -117,20 +169,36 @@ class AutoScaler:
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
if instances.nonEmpty then if instances.nonEmpty then
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
val scaleUpLoad = config.scaleUpThreshold * config.maxGamesPerCore
val scaleDownLoad = config.scaleDownThreshold * config.maxGamesPerCore
avgLoadRef.set(avgLoad) avgLoadRef.set(avgLoad)
val hasHighCpuOrMemory = instances.exists(inst => isResourceConstrained(inst.instanceId)) val constrainedInstance = instances.find(inst => isResourceConstrained(inst.instanceId))
val hasHighCpuOrMemory = constrainedInstance.isDefined
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore || hasHighCpuOrMemory then scaleUp() log.infof(
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas "Scale check: instances=%d avgLoad=%.1f scaleUpAt=%.1f scaleDownAt=%.1f resourceConstrained=%s",
instances.size,
avgLoad,
scaleUpLoad,
scaleDownLoad,
constrainedInstance.map(_.instanceId).getOrElse("none"),
)
if avgLoad > scaleUpLoad || hasHighCpuOrMemory then scaleUp()
else if avgLoad < scaleDownLoad && instances.size > config.scaleMinReplicas
then scaleDown() then scaleDown()
def scaleUp(): Unit = private def patchRolloutReplicas(
log.info("Scaling up Argo Rollout") kube: KubernetesClient,
kubeClientOpt match direction: String,
case None => delta: Int,
log.warn("Kubernetes client not available, cannot scale") canScale: Int => Boolean,
case Some(kube) => atLimit: Int => Unit,
onSuccess: (Int, Int) => Unit,
maxRetries: Int = 3,
): Unit =
def attempt(retries: Int): Unit =
try try
Option( Option(
kube kube
@@ -141,33 +209,49 @@ class AutoScaler:
).foreach { rollout => ).foreach { rollout =>
rolloutSpec(rollout).foreach { spec => rolloutSpec(rollout).foreach { spec =>
spec.get("replicas") match spec.get("replicas") match
case replicas: Integer => case current: Integer =>
val currentReplicas = replicas.intValue() val n = current.intValue()
val maxReplicas = config.scaleMaxReplicas if !canScale(n) then atLimit(n)
else
if currentReplicas < maxReplicas then spec.put("replicas", Integer.valueOf(n + delta))
spec.put("replicas", Integer.valueOf(currentReplicas + 1))
kube kube
.genericKubernetesResources(argoApiVersion, argoKind) .genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace) .inNamespace(config.k8sNamespace)
.resource(rollout) .resource(rollout)
.update() .update()
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment() meterRegistry.counter("nowchess.coordinator.scale.events", "direction", direction).increment()
log.infof( onSuccess(n, n + delta)
"Scaled up %s from %d to %d replicas",
config.k8sRolloutName,
currentReplicas,
currentReplicas + 1,
)
loadBalancer.rebalance
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
case _ => () case _ => ()
} }
} }
catch catch
case ex: KubernetesClientException if ex.getCode == 409 =>
if retries > 0 then
log.debugf("Conflict scaling %s %s, retrying (%d left)", direction, config.k8sRolloutName, retries - 1)
attempt(retries - 1)
else
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
log.errorf(ex, "Failed to scale %s %s after conflict retries", direction, config.k8sRolloutName)
case ex: Exception => case ex: Exception =>
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment() meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
log.errorf(ex, "Failed to scale up %s", config.k8sRolloutName) log.errorf(ex, "Failed to scale %s %s", direction, config.k8sRolloutName)
attempt(maxRetries)
def scaleUp(): Unit =
log.info("Scaling up Argo Rollout")
kubeClientOpt match
case None => log.warn("Kubernetes client not available, cannot scale")
case Some(kube) =>
patchRolloutReplicas(
kube,
direction = "up",
delta = 1,
canScale = _ < config.scaleMaxReplicas,
atLimit = n => log.infof("Already at max replicas %d for %s", n, config.k8sRolloutName),
onSuccess = (from, to) =>
log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, from, to)
loadBalancer.rebalance,
)
def scaleDown(): Unit = def scaleDown(): Unit =
log.info("Scaling down Argo Rollout") log.info("Scaling down Argo Rollout")
@@ -177,6 +261,7 @@ class AutoScaler:
underloadedInstance.foreach { inst => underloadedInstance.foreach { inst =>
log.infof("Marking instance %s for drain before scale-down", inst.instanceId) log.infof("Marking instance %s for drain before scale-down", inst.instanceId)
drainingForScaleDown.add(inst.instanceId)
failoverService failoverService
.onInstanceStreamDropped(inst.instanceId) .onInstanceStreamDropped(inst.instanceId)
.subscribe() .subscribe()
@@ -187,42 +272,34 @@ class AutoScaler:
} }
kubeClientOpt match kubeClientOpt match
case None => case None => log.warn("Kubernetes client not available, cannot scale")
log.warn("Kubernetes client not available, cannot scale")
case Some(kube) => case Some(kube) =>
try patchRolloutReplicas(
Option( kube,
kube direction = "down",
.genericKubernetesResources(argoApiVersion, argoKind) delta = -1,
.inNamespace(config.k8sNamespace) canScale = _ > config.scaleMinReplicas,
.withName(config.k8sRolloutName) atLimit = n => log.infof("Already at min replicas %d for %s", n, config.k8sRolloutName),
.get(), onSuccess = (from, to) =>
).foreach { rollout => log.infof("Scaled down %s from %d to %d replicas", config.k8sRolloutName, from, to)
rolloutSpec(rollout).foreach { spec => underloadedInstance.foreach(inst => forceDeletePod(inst.instanceId, kube)),
spec.get("replicas") match
case replicas: Integer =>
val currentReplicas = replicas.intValue()
val minReplicas = config.scaleMinReplicas
if currentReplicas > minReplicas then
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
kube
.genericKubernetesResources(argoApiVersion, argoKind)
.inNamespace(config.k8sNamespace)
.resource(rollout)
.update()
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
log.infof(
"Scaled down %s from %d to %d replicas",
config.k8sRolloutName,
currentReplicas,
currentReplicas - 1,
) )
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
case _ => () private def forceDeletePod(instanceId: String, kube: KubernetesClient): Unit =
} try
} val pods = kube
.pods()
.inNamespace(config.k8sNamespace)
.withLabel(config.k8sRolloutLabelSelector)
.list()
.getItems
.asScala
pods.find(_.getMetadata.getName.contains(instanceId)) match
case Some(pod) =>
kube.pods().inNamespace(config.k8sNamespace).withName(pod.getMetadata.getName).withGracePeriod(0L).delete()
log.infof("Force-deleted pod for drained instance %s", instanceId)
case None =>
log.debugf("No pod found for drained instance %s, skipping deletion", instanceId)
catch catch
case ex: Exception => case ex: Exception =>
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() log.warnf(ex, "Failed to force-delete pod for drained instance %s", instanceId)
log.errorf(ex, "Failed to scale down %s", config.k8sRolloutName)
@@ -88,7 +88,8 @@ class HealthMonitor:
if evicted.nonEmpty then if evicted.nonEmpty then
log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", ")) log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
evicted.foreach(deleteK8sPod) evicted.foreach(deleteK8sPod)
autoScaler.scaleUp() val unexpectedEvictions = evicted.filterNot(autoScaler.isDrainingForScaleDown)
if unexpectedEvictions.nonEmpty then autoScaler.scaleUp()
val instances = instanceRegistry.getAllInstances val instances = instanceRegistry.getAllInstances
val failed = instances.collect { inst => val failed = instances.collect { inst =>
val isHealthy = checkHealth(inst.instanceId) val isHealthy = checkHealth(inst.instanceId)
@@ -99,7 +100,8 @@ class HealthMonitor:
Some(inst.instanceId) Some(inst.instanceId)
else None else None
}.flatten }.flatten
if failed.nonEmpty then autoScaler.scaleUp() val unexpectedFailures = failed.filterNot(autoScaler.isDrainingForScaleDown)
if unexpectedFailures.nonEmpty then autoScaler.scaleUp()
private def checkHealth(instanceId: String): Boolean = private def checkHealth(instanceId: String): Boolean =
val redisHealthy = checkRedisHeartbeat(instanceId) val redisHealthy = checkRedisHeartbeat(instanceId)
@@ -227,12 +229,10 @@ class HealthMonitor:
} }
private def handlePodGone(pod: Pod): Unit = private def handlePodGone(pod: Pod): Unit =
val podName = pod.getMetadata.getName
autoScaler.clearDrainingByPodName(podName)
findRegisteredInstance(pod).foreach { inst => findRegisteredInstance(pod).foreach { inst =>
log.warnf( log.warnf("Pod %s deleted — triggering failover for %s", podName, inst.instanceId)
"Pod %s deleted — triggering failover for %s",
pod.getMetadata.getName,
inst.instanceId,
)
failoverService failoverService
.onInstanceStreamDropped(inst.instanceId) .onInstanceStreamDropped(inst.instanceId)
.subscribe() .subscribe()
@@ -1,13 +1,12 @@
package de.nowchess.chess.redis package de.nowchess.chess.redis
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.api.dto.{GameStateEventDto, GameWritebackEventDto} import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
import de.nowchess.api.game.{CorrespondenceClockState, LiveClockState} import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.api.game.{DrawReason, GameResult, WinReason}
import de.nowchess.api.board.Color import de.nowchess.api.board.Color
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.observer.{GameEvent, Observer} import de.nowchess.chess.observer.{GameEvent, Observer}
import de.nowchess.chess.registry.GameRegistry import de.nowchess.chess.registry.{GameEntry, GameRegistry}
import de.nowchess.chess.resource.GameDtoMapper import de.nowchess.chess.resource.GameDtoMapper
import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.RedisDataSource
import org.jboss.logging.Logger import org.jboss.logging.Logger
@@ -26,16 +25,28 @@ class GameRedisPublisher(
onGameOver: String => Unit, onGameOver: String => Unit,
) extends Observer: ) extends Observer:
def emitInitialWriteback(): Unit =
try
registry.get(gameId).foreach { entry =>
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
}
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to emit initial writeback for game %s", gameId)
def onGameEvent(event: GameEvent): Unit = def onGameEvent(event: GameEvent): Unit =
try try
GameRedisPublisher.log.debugf("Publishing game event for game %s", gameId) GameRedisPublisher.log.debugf("Publishing game event for game %s", gameId)
registry.get(gameId).foreach { entry => registry.get(gameId).foreach { entry =>
val dto = GameDtoMapper.toGameStateDto(entry, ioClient) val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
val json = objectMapper.writeValueAsString(GameStateEventDto(dto)) redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
redis.pubsub(classOf[String]).publish(s2cTopicName, json) writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
if entry.engine.context.result.isDefined then onGameOver(gameId)
}
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
val clock = entry.engine.currentClockState val clock = entry.engine.currentClockState
val wb = GameWritebackEventDto( GameWritebackEventDto(
gameId = gameId, gameId = gameId,
fen = dto.fen, fen = dto.fen,
pgn = dto.pgn, pgn = dto.pgn,
@@ -47,13 +58,13 @@ class GameRedisPublisher(
mode = entry.mode.toString, mode = entry.mode.toString,
resigned = entry.resigned, resigned = entry.resigned,
limitSeconds = entry.engine.timeControl match { limitSeconds = entry.engine.timeControl match {
case de.nowchess.api.game.TimeControl.Clock(l, _) => Some(l); case _ => None case TimeControl.Clock(l, _) => Some(l); case _ => None
}, },
incrementSeconds = entry.engine.timeControl match { incrementSeconds = entry.engine.timeControl match {
case de.nowchess.api.game.TimeControl.Clock(_, i) => Some(i); case _ => None case TimeControl.Clock(_, i) => Some(i); case _ => None
}, },
daysPerMove = entry.engine.timeControl match { daysPerMove = entry.engine.timeControl match {
case de.nowchess.api.game.TimeControl.Correspondence(d) => Some(d); case _ => None case TimeControl.Correspondence(d) => Some(d); case _ => None
}, },
whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs }, whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs },
blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs }, blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs },
@@ -80,7 +91,3 @@ class GameRedisPublisher(
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci), redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase), pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
) )
writebackEmit(objectMapper.writeValueAsString(wb))
if entry.engine.context.result.isDefined then onGameOver(gameId)
}
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
@@ -13,6 +13,7 @@ import de.nowchess.chess.service.InstanceHeartbeatService
import io.quarkus.redis.datasource.ReactiveRedisDataSource import io.quarkus.redis.datasource.ReactiveRedisDataSource
import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Instance import jakarta.enterprise.inject.Instance
@@ -45,6 +46,30 @@ class GameRedisSubscriberManager:
private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]() private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]()
private val s2cObservers = new ConcurrentHashMap[String, Observer]() private val s2cObservers = new ConcurrentHashMap[String, Observer]()
// scalafix:off DisableSyntax.var
private var clockExpireSubscriber: Option[ReactivePubSubCommands.ReactiveRedisSubscriber] = None
// scalafix:on DisableSyntax.var
private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire"
@PostConstruct
def subscribeClockExpiry(): Unit =
val handler: Consumer[String] = gameId => handleClockExpiry(gameId)
try
val subscriber = reactiveRedis
.pubsub(classOf[String])
.subscribe(clockExpireChannel, handler)
.await()
.atMost(java.time.Duration.ofSeconds(5))
clockExpireSubscriber = Some(subscriber)
log.infof("Subscribed to clock expiry channel %s", clockExpireChannel)
catch case ex: Exception => log.warnf(ex, "Failed to subscribe to clock expiry channel")
private def handleClockExpiry(gameId: String): Unit =
if !s2cObservers.containsKey(gameId) then
log.infof("Clock expired for game %s — loading engine to enforce timeout", gameId)
subscribeGame(gameId)
private def c2sTopic(gameId: String): String = private def c2sTopic(gameId: String): String =
s"${redisConfig.prefix}:game:$gameId:c2s" s"${redisConfig.prefix}:game:$gameId:c2s"
@@ -65,6 +90,7 @@ class GameRedisSubscriberManager:
) )
s2cObservers.put(gameId, obs) s2cObservers.put(gameId, obs)
registry.get(gameId).foreach(_.engine.subscribe(obs)) registry.get(gameId).foreach(_.engine.subscribe(obs))
obs.emitInitialWriteback()
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
@@ -156,5 +182,6 @@ class GameRedisSubscriberManager:
@PreDestroy @PreDestroy
def cleanup(): Unit = def cleanup(): Unit =
clockExpireSubscriber.foreach(_.unsubscribe(clockExpireChannel).await().indefinitely())
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely()) c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely())
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs))) s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
+1
View File
@@ -60,6 +60,7 @@ dependencies {
implementation("io.quarkus:quarkus-opentelemetry") implementation("io.quarkus:quarkus-opentelemetry")
implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}") implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}")
implementation("io.quarkus:quarkus-redis-client") implementation("io.quarkus:quarkus-redis-client")
implementation("io.quarkus:quarkus-scheduler")
testImplementation(platform("org.junit:junit-bom:5.13.4")) testImplementation(platform("org.junit:junit-bom:5.13.4"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
@@ -34,6 +34,19 @@ class GameRecordRepository:
.asScala .asScala
.toList .toList
def findExpiredLiveClockGames(nowMs: Long): List[GameRecord] =
em.createQuery(
"SELECT g FROM GameRecord g WHERE g.result IS NULL AND g.clockLastTickAt IS NOT NULL AND g.whiteRemainingMs IS NOT NULL",
classOf[GameRecord],
).getResultList
.asScala
.toList
.filter { g =>
val remaining =
if g.clockActiveColor == "white" then g.whiteRemainingMs.longValue else g.blackRemainingMs.longValue
g.clockLastTickAt.longValue + remaining < nowMs
}
def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] = def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] =
em.createQuery( em.createQuery(
"SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC", "SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC",
@@ -0,0 +1,36 @@
package de.nowchess.store.service
import de.nowchess.store.config.RedisConfig
import de.nowchess.store.repository.GameRecordRepository
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.scheduler.Scheduled
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@ApplicationScoped
class ClockExpiryScanner:
@Inject
// scalafix:off DisableSyntax.var
var repository: GameRecordRepository = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
// scalafix:on
private val log = Logger.getLogger(classOf[ClockExpiryScanner])
private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire"
@Scheduled(every = "30s")
def scan(): Unit =
try
val nowMs = System.currentTimeMillis()
val expired = repository.findExpiredLiveClockGames(nowMs)
if expired.nonEmpty then
log.infof("Found %d games with expired clocks", expired.size)
expired.foreach { record =>
log.infof("Publishing clock expiry for game %s", record.gameId)
redis.pubsub(classOf[String]).publish(clockExpireChannel, record.gameId)
}
catch case ex: Exception => log.warnf(ex, "Clock expiry scan failed")