diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 474fca1..90e8464 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -10,9 +10,11 @@ import io.fabric8.kubernetes.client.KubernetesClient import io.micrometer.core.instrument.{Gauge, MeterRegistry} import io.quarkus.scheduler.Scheduled import org.jboss.logging.Logger +import io.fabric8.kubernetes.client.KubernetesClientException import scala.jdk.CollectionConverters.* import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.ConcurrentHashMap import scala.compiletime.uninitialized @ApplicationScoped @@ -37,9 +39,19 @@ class AutoScaler: private var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var - private val log = Logger.getLogger(classOf[AutoScaler]) - private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L) - private val avgLoadRef = new AtomicReference[Double](0.0) + private val log = Logger.getLogger(classOf[AutoScaler]) + private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L) + private val avgLoadRef = new AtomicReference[Double](0.0) + private val drainingForScaleDown = ConcurrentHashMap.newKeySet[String]() + + def isDrainingForScaleDown(instanceId: String): Boolean = + drainingForScaleDown.contains(instanceId) + + def clearDraining(instanceId: String): Unit = + drainingForScaleDown.remove(instanceId) + + def clearDrainingByPodName(podName: String): Unit = + drainingForScaleDown.asScala.find(podName.contains).foreach(drainingForScaleDown.remove) private def kubeClientOpt: Option[KubernetesClient] = if kubeClientInstance.isUnsatisfied then None @@ -125,49 +137,69 @@ class AutoScaler: else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas then scaleDown() + private def patchRolloutReplicas( + kube: KubernetesClient, + direction: String, + delta: Int, + canScale: Int => Boolean, + atLimit: Int => Unit, + onSuccess: (Int, Int) => Unit, + maxRetries: Int = 3, + ): Unit = + def attempt(retries: Int): Unit = + try + Option( + kube + .genericKubernetesResources(argoApiVersion, argoKind) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .get(), + ).foreach { rollout => + rolloutSpec(rollout).foreach { spec => + spec.get("replicas") match + case current: Integer => + val n = current.intValue() + if !canScale(n) then atLimit(n) + else + spec.put("replicas", Integer.valueOf(n + delta)) + kube + .genericKubernetesResources(argoApiVersion, argoKind) + .inNamespace(config.k8sNamespace) + .resource(rollout) + .update() + meterRegistry.counter("nowchess.coordinator.scale.events", "direction", direction).increment() + onSuccess(n, n + delta) + case _ => () + } + } + catch + case ex: KubernetesClientException if ex.getCode == 409 => + if retries > 0 then + log.debugf("Conflict scaling %s %s, retrying (%d left)", direction, config.k8sRolloutName, retries - 1) + attempt(retries - 1) + else + meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment() + log.errorf(ex, "Failed to scale %s %s after conflict retries", direction, config.k8sRolloutName) + case ex: Exception => + meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment() + log.errorf(ex, "Failed to scale %s %s", direction, config.k8sRolloutName) + attempt(maxRetries) + def scaleUp(): Unit = log.info("Scaling up Argo Rollout") kubeClientOpt match - case None => - log.warn("Kubernetes client not available, cannot scale") + case None => log.warn("Kubernetes client not available, cannot scale") case Some(kube) => - try - Option( - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get(), - ).foreach { rollout => - rolloutSpec(rollout).foreach { spec => - spec.get("replicas") match - case replicas: Integer => - val currentReplicas = replicas.intValue() - val maxReplicas = config.scaleMaxReplicas - - if currentReplicas < maxReplicas then - spec.put("replicas", Integer.valueOf(currentReplicas + 1)) - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .resource(rollout) - .update() - meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment() - log.infof( - "Scaled up %s from %d to %d replicas", - config.k8sRolloutName, - currentReplicas, - currentReplicas + 1, - ) - loadBalancer.rebalance - else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName) - case _ => () - } - } - catch - case ex: Exception => - meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment() - log.errorf(ex, "Failed to scale up %s", config.k8sRolloutName) + patchRolloutReplicas( + kube, + direction = "up", + delta = 1, + canScale = _ < config.scaleMaxReplicas, + atLimit = n => log.infof("Already at max replicas %d for %s", n, config.k8sRolloutName), + onSuccess = (from, to) => + log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, from, to) + loadBalancer.rebalance, + ) def scaleDown(): Unit = log.info("Scaling down Argo Rollout") @@ -177,6 +209,7 @@ class AutoScaler: underloadedInstance.foreach { inst => log.infof("Marking instance %s for drain before scale-down", inst.instanceId) + drainingForScaleDown.add(inst.instanceId) failoverService .onInstanceStreamDropped(inst.instanceId) .subscribe() @@ -187,42 +220,34 @@ class AutoScaler: } kubeClientOpt match - case None => - log.warn("Kubernetes client not available, cannot scale") + case None => log.warn("Kubernetes client not available, cannot scale") case Some(kube) => - try - Option( - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get(), - ).foreach { rollout => - rolloutSpec(rollout).foreach { spec => - spec.get("replicas") match - case replicas: Integer => - val currentReplicas = replicas.intValue() - val minReplicas = config.scaleMinReplicas + patchRolloutReplicas( + kube, + direction = "down", + delta = -1, + canScale = _ > config.scaleMinReplicas, + atLimit = n => log.infof("Already at min replicas %d for %s", n, config.k8sRolloutName), + onSuccess = (from, to) => + log.infof("Scaled down %s from %d to %d replicas", config.k8sRolloutName, from, to) + underloadedInstance.foreach(inst => forceDeletePod(inst.instanceId, kube)), + ) - if currentReplicas > minReplicas then - spec.put("replicas", Integer.valueOf(currentReplicas - 1)) - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .resource(rollout) - .update() - meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment() - log.infof( - "Scaled down %s from %d to %d replicas", - config.k8sRolloutName, - currentReplicas, - currentReplicas - 1, - ) - else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) - case _ => () - } - } - catch - case ex: Exception => - meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() - log.errorf(ex, "Failed to scale down %s", config.k8sRolloutName) + private def forceDeletePod(instanceId: String, kube: KubernetesClient): Unit = + try + val pods = kube + .pods() + .inNamespace(config.k8sNamespace) + .withLabel(config.k8sRolloutLabelSelector) + .list() + .getItems + .asScala + pods.find(_.getMetadata.getName.contains(instanceId)) match + case Some(pod) => + kube.pods().inNamespace(config.k8sNamespace).withName(pod.getMetadata.getName).withGracePeriod(0L).delete() + log.infof("Force-deleted pod for drained instance %s", instanceId) + case None => + log.debugf("No pod found for drained instance %s, skipping deletion", instanceId) + catch + case ex: Exception => + log.warnf(ex, "Failed to force-delete pod for drained instance %s", instanceId) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala index c23ddb8..959ae69 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala @@ -88,7 +88,8 @@ class HealthMonitor: if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", ")) evicted.foreach(deleteK8sPod) - autoScaler.scaleUp() + val unexpectedEvictions = evicted.filterNot(autoScaler.isDrainingForScaleDown) + if unexpectedEvictions.nonEmpty then autoScaler.scaleUp() val instances = instanceRegistry.getAllInstances val failed = instances.collect { inst => val isHealthy = checkHealth(inst.instanceId) @@ -99,7 +100,8 @@ class HealthMonitor: Some(inst.instanceId) else None }.flatten - if failed.nonEmpty then autoScaler.scaleUp() + val unexpectedFailures = failed.filterNot(autoScaler.isDrainingForScaleDown) + if unexpectedFailures.nonEmpty then autoScaler.scaleUp() private def checkHealth(instanceId: String): Boolean = val redisHealthy = checkRedisHeartbeat(instanceId) @@ -227,12 +229,10 @@ class HealthMonitor: } private def handlePodGone(pod: Pod): Unit = + val podName = pod.getMetadata.getName + autoScaler.clearDrainingByPodName(podName) findRegisteredInstance(pod).foreach { inst => - log.warnf( - "Pod %s deleted — triggering failover for %s", - pod.getMetadata.getName, - inst.instanceId, - ) + log.warnf("Pod %s deleted — triggering failover for %s", podName, inst.instanceId) failoverService .onInstanceStreamDropped(inst.instanceId) .subscribe() diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala index 322d6e6..48fa7f3 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala @@ -1,13 +1,12 @@ package de.nowchess.chess.redis import com.fasterxml.jackson.databind.ObjectMapper -import de.nowchess.api.dto.{GameStateEventDto, GameWritebackEventDto} -import de.nowchess.api.game.{CorrespondenceClockState, LiveClockState} -import de.nowchess.chess.grpc.IoGrpcClientWrapper -import de.nowchess.api.game.{DrawReason, GameResult, WinReason} +import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto} +import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason} import de.nowchess.api.board.Color +import de.nowchess.chess.grpc.IoGrpcClientWrapper import de.nowchess.chess.observer.{GameEvent, Observer} -import de.nowchess.chess.registry.GameRegistry +import de.nowchess.chess.registry.{GameEntry, GameRegistry} import de.nowchess.chess.resource.GameDtoMapper import io.quarkus.redis.datasource.RedisDataSource import org.jboss.logging.Logger @@ -26,61 +25,69 @@ class GameRedisPublisher( onGameOver: String => Unit, ) extends Observer: + def emitInitialWriteback(): Unit = + try + registry.get(gameId).foreach { entry => + val dto = GameDtoMapper.toGameStateDto(entry, ioClient) + writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto))) + } + catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to emit initial writeback for game %s", gameId) + def onGameEvent(event: GameEvent): Unit = try GameRedisPublisher.log.debugf("Publishing game event for game %s", gameId) registry.get(gameId).foreach { entry => - val dto = GameDtoMapper.toGameStateDto(entry, ioClient) - val json = objectMapper.writeValueAsString(GameStateEventDto(dto)) - redis.pubsub(classOf[String]).publish(s2cTopicName, json) - - val clock = entry.engine.currentClockState - val wb = GameWritebackEventDto( - gameId = gameId, - fen = dto.fen, - pgn = dto.pgn, - moveCount = entry.engine.context.moves.size, - whiteId = entry.white.id.value, - whiteName = entry.white.displayName, - blackId = entry.black.id.value, - blackName = entry.black.displayName, - mode = entry.mode.toString, - resigned = entry.resigned, - limitSeconds = entry.engine.timeControl match { - case de.nowchess.api.game.TimeControl.Clock(l, _) => Some(l); case _ => None - }, - incrementSeconds = entry.engine.timeControl match { - case de.nowchess.api.game.TimeControl.Clock(_, i) => Some(i); case _ => None - }, - daysPerMove = entry.engine.timeControl match { - case de.nowchess.api.game.TimeControl.Correspondence(d) => Some(d); case _ => None - }, - whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs }, - blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs }, - incrementMs = clock.collect { case c: LiveClockState => c.incrementMs }, - clockLastTickAt = clock.collect { case c: LiveClockState => c.lastTickAt.toEpochMilli }, - clockMoveDeadline = clock.collect { case c: CorrespondenceClockState => c.moveDeadline.toEpochMilli }, - clockActiveColor = clock.map(_.activeColor.label.toLowerCase), - pendingDrawOffer = entry.engine.pendingDrawOfferBy.map(_.label.toLowerCase), - result = entry.engine.context.result.map { - case GameResult.Win(Color.White, _) => "white" - case GameResult.Win(Color.Black, _) => "black" - case GameResult.Draw(_) => "draw" - }, - terminationReason = entry.engine.context.result.map { - case GameResult.Win(_, WinReason.Checkmate) => "checkmate" - case GameResult.Win(_, WinReason.Resignation) => "resignation" - case GameResult.Win(_, WinReason.TimeControl) => "timeout" - case GameResult.Draw(DrawReason.Stalemate) => "stalemate" - case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material" - case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move" - case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition" - case GameResult.Draw(DrawReason.Agreement) => "agreement" - }, - redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci), - pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase), - ) - writebackEmit(objectMapper.writeValueAsString(wb)) + val dto = GameDtoMapper.toGameStateDto(entry, ioClient) + redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto))) + writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto))) if entry.engine.context.result.isDefined then onGameOver(gameId) } catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId) + + private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto = + val clock = entry.engine.currentClockState + GameWritebackEventDto( + gameId = gameId, + fen = dto.fen, + pgn = dto.pgn, + moveCount = entry.engine.context.moves.size, + whiteId = entry.white.id.value, + whiteName = entry.white.displayName, + blackId = entry.black.id.value, + blackName = entry.black.displayName, + mode = entry.mode.toString, + resigned = entry.resigned, + limitSeconds = entry.engine.timeControl match { + case TimeControl.Clock(l, _) => Some(l); case _ => None + }, + incrementSeconds = entry.engine.timeControl match { + case TimeControl.Clock(_, i) => Some(i); case _ => None + }, + daysPerMove = entry.engine.timeControl match { + case TimeControl.Correspondence(d) => Some(d); case _ => None + }, + whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs }, + blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs }, + incrementMs = clock.collect { case c: LiveClockState => c.incrementMs }, + clockLastTickAt = clock.collect { case c: LiveClockState => c.lastTickAt.toEpochMilli }, + clockMoveDeadline = clock.collect { case c: CorrespondenceClockState => c.moveDeadline.toEpochMilli }, + clockActiveColor = clock.map(_.activeColor.label.toLowerCase), + pendingDrawOffer = entry.engine.pendingDrawOfferBy.map(_.label.toLowerCase), + result = entry.engine.context.result.map { + case GameResult.Win(Color.White, _) => "white" + case GameResult.Win(Color.Black, _) => "black" + case GameResult.Draw(_) => "draw" + }, + terminationReason = entry.engine.context.result.map { + case GameResult.Win(_, WinReason.Checkmate) => "checkmate" + case GameResult.Win(_, WinReason.Resignation) => "resignation" + case GameResult.Win(_, WinReason.TimeControl) => "timeout" + case GameResult.Draw(DrawReason.Stalemate) => "stalemate" + case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material" + case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move" + case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition" + case GameResult.Draw(DrawReason.Agreement) => "agreement" + }, + redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci), + pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase), + ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index 05e8f91..c7843b1 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -13,6 +13,7 @@ import de.nowchess.chess.service.InstanceHeartbeatService import io.quarkus.redis.datasource.ReactiveRedisDataSource import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands +import jakarta.annotation.PostConstruct import jakarta.annotation.PreDestroy import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.inject.Instance @@ -45,6 +46,30 @@ class GameRedisSubscriberManager: private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]() private val s2cObservers = new ConcurrentHashMap[String, Observer]() + // scalafix:off DisableSyntax.var + private var clockExpireSubscriber: Option[ReactivePubSubCommands.ReactiveRedisSubscriber] = None + // scalafix:on DisableSyntax.var + + private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire" + + @PostConstruct + def subscribeClockExpiry(): Unit = + val handler: Consumer[String] = gameId => handleClockExpiry(gameId) + try + val subscriber = reactiveRedis + .pubsub(classOf[String]) + .subscribe(clockExpireChannel, handler) + .await() + .atMost(java.time.Duration.ofSeconds(5)) + clockExpireSubscriber = Some(subscriber) + log.infof("Subscribed to clock expiry channel %s", clockExpireChannel) + catch case ex: Exception => log.warnf(ex, "Failed to subscribe to clock expiry channel") + + private def handleClockExpiry(gameId: String): Unit = + if !s2cObservers.containsKey(gameId) then + log.infof("Clock expired for game %s — loading engine to enforce timeout", gameId) + subscribeGame(gameId) + private def c2sTopic(gameId: String): String = s"${redisConfig.prefix}:game:$gameId:c2s" @@ -65,6 +90,7 @@ class GameRedisSubscriberManager: ) s2cObservers.put(gameId, obs) registry.get(gameId).foreach(_.engine.subscribe(obs)) + obs.emitInitialWriteback() heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) @@ -156,5 +182,6 @@ class GameRedisSubscriberManager: @PreDestroy def cleanup(): Unit = + clockExpireSubscriber.foreach(_.unsubscribe(clockExpireChannel).await().indefinitely()) c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely()) s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs))) diff --git a/modules/store/build.gradle.kts b/modules/store/build.gradle.kts index 8486ea1..204a5e8 100644 --- a/modules/store/build.gradle.kts +++ b/modules/store/build.gradle.kts @@ -60,6 +60,7 @@ dependencies { implementation("io.quarkus:quarkus-opentelemetry") implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}") implementation("io.quarkus:quarkus-redis-client") + implementation("io.quarkus:quarkus-scheduler") testImplementation(platform("org.junit:junit-bom:5.13.4")) testImplementation("org.junit.jupiter:junit-jupiter") diff --git a/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala b/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala index 2e42b39..12b6283 100644 --- a/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala +++ b/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala @@ -34,6 +34,19 @@ class GameRecordRepository: .asScala .toList + def findExpiredLiveClockGames(nowMs: Long): List[GameRecord] = + em.createQuery( + "SELECT g FROM GameRecord g WHERE g.result IS NULL AND g.clockLastTickAt IS NOT NULL AND g.whiteRemainingMs IS NOT NULL", + classOf[GameRecord], + ).getResultList + .asScala + .toList + .filter { g => + val remaining = + if g.clockActiveColor == "white" then g.whiteRemainingMs.longValue else g.blackRemainingMs.longValue + g.clockLastTickAt.longValue + remaining < nowMs + } + def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] = em.createQuery( "SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC", diff --git a/modules/store/src/main/scala/de/nowchess/store/service/ClockExpiryScanner.scala b/modules/store/src/main/scala/de/nowchess/store/service/ClockExpiryScanner.scala new file mode 100644 index 0000000..07181f2 --- /dev/null +++ b/modules/store/src/main/scala/de/nowchess/store/service/ClockExpiryScanner.scala @@ -0,0 +1,36 @@ +package de.nowchess.store.service + +import de.nowchess.store.config.RedisConfig +import de.nowchess.store.repository.GameRecordRepository +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.scheduler.Scheduled +import jakarta.enterprise.context.ApplicationScoped +import jakarta.inject.Inject +import org.jboss.logging.Logger +import scala.compiletime.uninitialized + +@ApplicationScoped +class ClockExpiryScanner: + @Inject + // scalafix:off DisableSyntax.var + var repository: GameRecordRepository = uninitialized + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + // scalafix:on + + private val log = Logger.getLogger(classOf[ClockExpiryScanner]) + + private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire" + + @Scheduled(every = "30s") + def scan(): Unit = + try + val nowMs = System.currentTimeMillis() + val expired = repository.findExpiredLiveClockGames(nowMs) + if expired.nonEmpty then + log.infof("Found %d games with expired clocks", expired.size) + expired.foreach { record => + log.infof("Publishing clock expiry for game %s", record.gameId) + redis.pubsub(classOf[String]).publish(clockExpireChannel, record.gameId) + } + catch case ex: Exception => log.warnf(ex, "Clock expiry scan failed")