From b4920d3817e58bda94d7764e608b856ce9a909f7 Mon Sep 17 00:00:00 2001 From: Janis Date: Fri, 8 May 2026 12:37:23 +0200 Subject: [PATCH] fix: enhance AutoScaler and InstanceRegistry for replica management and stale instance eviction --- .../coordinator/service/AutoScaler.scala | 12 ++++++------ .../coordinator/service/HealthMonitor.scala | 2 ++ .../coordinator/service/InstanceRegistry.scala | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 9a93de5..7bfbc9e 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -74,11 +74,11 @@ class AutoScaler: val maxReplicas = config.scaleMaxReplicas if currentReplicas < maxReplicas then - spec.put("replicas", String.valueOf(currentReplicas + 1)) + spec.put("replicas", Integer.valueOf(currentReplicas + 1)) kube - .resources(classOf[GenericKubernetesResource]) + .genericKubernetesResources(argoApiVersion, argoKind) .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) + .resource(rollout) .update() log.infof( "Scaled up %s from %d to %d replicas", @@ -115,11 +115,11 @@ class AutoScaler: val minReplicas = config.scaleMinReplicas if currentReplicas > minReplicas then - spec.put("replicas", String.valueOf(currentReplicas - 1)) + spec.put("replicas", Integer.valueOf(currentReplicas - 1)) kube - .resources(classOf[GenericKubernetesResource]) + .genericKubernetesResources(argoApiVersion, argoKind) .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) + .resource(rollout) .update() log.infof( "Scaled down %s from %d to %d replicas", diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala index 4a031a7..3382bf3 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala @@ -39,6 +39,8 @@ class HealthMonitor: redisPrefix = prefix def checkInstanceHealth: Unit = + val evicted = instanceRegistry.evictStaleInstances(config.heartbeatTtl) + if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", ")) val instances = instanceRegistry.getAllInstances instances.foreach { inst => val isHealthy = checkHealth(inst.instanceId) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala index 0e2db31..2d13f92 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala @@ -8,6 +8,7 @@ import scala.compiletime.uninitialized import com.fasterxml.jackson.databind.ObjectMapper import de.nowchess.coordinator.dto.InstanceMetadata import java.util.concurrent.ConcurrentHashMap +import java.time.{Duration, Instant} import io.smallrye.mutiny.Uni import org.jboss.logging.Logger @@ -68,3 +69,20 @@ class InstanceRegistry: def removeInstance(instanceId: String): Unit = instances.remove(instanceId) log.infof("Instance %s removed from registry", instanceId) + + def evictStaleInstances(maxAge: Duration): List[String] = + val cutoff = Instant.now().minus(maxAge) + val stale = instances.asScala + .collect { case (id, inst) => + try + if Instant.parse(inst.lastHeartbeat).isBefore(cutoff) then Some(id) + else None + catch case _: Exception => None + } + .flatten + .toList + stale.foreach { id => + instances.remove(id) + log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge) + } + stale