From 3f12f695f132b92f634d98df2c037292498b6e86 Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 13 May 2026 22:08:22 +0200 Subject: [PATCH] feat: implement periodic scaling checks and enhance instance management in AutoScaler --- .../coordinator/service/AutoScaler.scala | 101 ++++++++++++------ .../service/InstanceRegistry.scala | 30 +++--- .../de/nowchess/chess/engine/GameEngine.scala | 2 +- .../service/InstanceHeartbeatService.scala | 2 +- 4 files changed, 85 insertions(+), 50 deletions(-) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 9883c97..de6300c 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -8,6 +8,7 @@ import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.api.model.GenericKubernetesResource import io.fabric8.kubernetes.client.KubernetesClient import io.micrometer.core.instrument.{Gauge, MeterRegistry} +import io.quarkus.scheduler.Scheduled import org.jboss.logging.Logger import java.util.concurrent.atomic.AtomicReference @@ -25,6 +26,12 @@ class AutoScaler: @Inject private var instanceRegistry: InstanceRegistry = uninitialized + @Inject + private var loadBalancer: LoadBalancer = uninitialized + + @Inject + private var failoverService: FailoverService = uninitialized + @Inject private var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var @@ -51,6 +58,11 @@ class AutoScaler: meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment(0) () + @Scheduled(every = "10s") + def periodicScaleCheck(): Unit = + try checkAndScale + catch case ex: Exception => log.warnf(ex, "Auto-scale check failed") + // scalafix:off DisableSyntax.asInstanceOf private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] = Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] => @@ -105,6 +117,7 @@ class AutoScaler: currentReplicas, currentReplicas + 1, ) + loadBalancer.rebalance else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName) case _ => () } @@ -116,43 +129,61 @@ class AutoScaler: def scaleDown(): Unit = log.info("Scaling down Argo Rollout") - kubeClientOpt match - case None => - log.warn("Kubernetes client not available, cannot scale") - case Some(kube) => - try - Option( - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get(), - ).foreach { rollout => - rolloutSpec(rollout).foreach { spec => - spec.get("replicas") match - case replicas: Integer => - val currentReplicas = replicas.intValue() - val minReplicas = config.scaleMinReplicas + val underloadedInstance = instanceRegistry.getAllInstances + .filter(_.state == "HEALTHY") + .minByOption(_.subscriptionCount) - if currentReplicas > minReplicas then - spec.put("replicas", Integer.valueOf(currentReplicas - 1)) + underloadedInstance.foreach { inst => + log.infof("Draining instance %s before scale-down", inst.instanceId) + failoverService + .onInstanceStreamDropped(inst.instanceId) + .subscribe() + .`with`( + _ => + kubeClientOpt match + case None => + log.warn("Kubernetes client not available, cannot scale") + case Some(kube) => + try + Option( kube .genericKubernetesResources(argoApiVersion, argoKind) .inNamespace(config.k8sNamespace) - .resource(rollout) - .update() - meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment() - log.infof( - "Scaled down %s from %d to %d replicas", - config.k8sRolloutName, - currentReplicas, - currentReplicas - 1, - ) - else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) - case _ => () - } - } - catch - case ex: Exception => + .withName(config.k8sRolloutName) + .get(), + ).foreach { rollout => + rolloutSpec(rollout).foreach { spec => + spec.get("replicas") match + case replicas: Integer => + val currentReplicas = replicas.intValue() + val minReplicas = config.scaleMinReplicas + + if currentReplicas > minReplicas then + spec.put("replicas", Integer.valueOf(currentReplicas - 1)) + kube + .genericKubernetesResources(argoApiVersion, argoKind) + .inNamespace(config.k8sNamespace) + .resource(rollout) + .update() + meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment() + log.infof( + "Scaled down %s from %d to %d replicas", + config.k8sRolloutName, + currentReplicas, + currentReplicas - 1, + ) + else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) + case _ => () + } + } + catch + case ex: Exception => + meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() + log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName), + ex => meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() - log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName) + log.warnf(ex, "Failed to drain instance %s before scale-down", inst.instanceId), + ) + } + + if underloadedInstance.isEmpty then log.warn("No healthy instances found for scale-down") diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala index 6f074dd..2474646 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala @@ -76,20 +76,24 @@ class InstanceRegistry: .onItem() .transformToUni { value => try - val metadata = mapper.readValue(value, classOf[InstanceMetadata]) - val isNew = !instances.containsKey(instanceId) - instances.put(instanceId, metadata) - if isNew then - meterRegistry.counter("nowchess.coordinator.instances.joined").increment() - log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount) + if value == null then + log.debugf("Instance %s metadata missing from Redis (may have expired)", instanceId) + Uni.createFrom().item(()) else - log.debugf( - "Instance %s updated (subscriptions=%d state=%s)", - instanceId, - metadata.subscriptionCount, - metadata.state, - ) - Uni.createFrom().item(()) + val metadata = mapper.readValue(value, classOf[InstanceMetadata]) + val isNew = !instances.containsKey(instanceId) + instances.put(instanceId, metadata) + if isNew then + meterRegistry.counter("nowchess.coordinator.instances.joined").increment() + log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount) + else + log.debugf( + "Instance %s updated (subscriptions=%d state=%s)", + instanceId, + metadata.subscriptionCount, + metadata.state, + ) + Uni.createFrom().item(()) catch case ex: Exception => log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId) diff --git a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala index 9becc0a..074ed74 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala @@ -60,8 +60,8 @@ class GameEngine( @SuppressWarnings(Array("DisableSyntax.var")) private var pendingTakebackRequest: Option[Color] = initialTakebackRequest + GameEngine.activeGamesCount.incrementAndGet() meterRegistry.foreach { reg => - GameEngine.activeGamesCount.incrementAndGet() reg.counter("nowchess.games.started").increment() } private def gamesCompletedCounter(result: String): Counter = diff --git a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala index 097aa16..a1db68f 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala @@ -199,7 +199,7 @@ class InstanceHeartbeatService: val json = mapper.writeValueAsString(metadata) reactiveRedis .value(classOf[String]) - .setex(key, 5L, json) + .setex(key, 15L, json) .subscribe() .`with`( _ => redisHeartbeatPending.set(false),