fix: enhance AutoScaler and InstanceRegistry for replica management and stale instance eviction
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
This commit is contained in:
@@ -74,11 +74,11 @@ class AutoScaler:
|
||||
val maxReplicas = config.scaleMaxReplicas
|
||||
|
||||
if currentReplicas < maxReplicas then
|
||||
spec.put("replicas", String.valueOf(currentReplicas + 1))
|
||||
spec.put("replicas", Integer.valueOf(currentReplicas + 1))
|
||||
kube
|
||||
.resources(classOf[GenericKubernetesResource])
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
log.infof(
|
||||
"Scaled up %s from %d to %d replicas",
|
||||
@@ -115,11 +115,11 @@ class AutoScaler:
|
||||
val minReplicas = config.scaleMinReplicas
|
||||
|
||||
if currentReplicas > minReplicas then
|
||||
spec.put("replicas", String.valueOf(currentReplicas - 1))
|
||||
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
||||
kube
|
||||
.resources(classOf[GenericKubernetesResource])
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
|
||||
@@ -39,6 +39,8 @@ class HealthMonitor:
|
||||
redisPrefix = prefix
|
||||
|
||||
def checkInstanceHealth: Unit =
|
||||
val evicted = instanceRegistry.evictStaleInstances(config.heartbeatTtl)
|
||||
if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
instances.foreach { inst =>
|
||||
val isHealthy = checkHealth(inst.instanceId)
|
||||
|
||||
+18
@@ -8,6 +8,7 @@ import scala.compiletime.uninitialized
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.time.{Duration, Instant}
|
||||
import io.smallrye.mutiny.Uni
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
@@ -68,3 +69,20 @@ class InstanceRegistry:
|
||||
def removeInstance(instanceId: String): Unit =
|
||||
instances.remove(instanceId)
|
||||
log.infof("Instance %s removed from registry", instanceId)
|
||||
|
||||
def evictStaleInstances(maxAge: Duration): List[String] =
|
||||
val cutoff = Instant.now().minus(maxAge)
|
||||
val stale = instances.asScala
|
||||
.collect { case (id, inst) =>
|
||||
try
|
||||
if Instant.parse(inst.lastHeartbeat).isBefore(cutoff) then Some(id)
|
||||
else None
|
||||
catch case _: Exception => None
|
||||
}
|
||||
.flatten
|
||||
.toList
|
||||
stale.foreach { id =>
|
||||
instances.remove(id)
|
||||
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
||||
}
|
||||
stale
|
||||
|
||||
Reference in New Issue
Block a user