feat: implement periodic scaling checks and enhance instance management in AutoScaler
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
This commit is contained in:
+66
-35
@@ -8,6 +8,7 @@ import de.nowchess.coordinator.config.CoordinatorConfig
|
|||||||
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
||||||
import io.fabric8.kubernetes.client.KubernetesClient
|
import io.fabric8.kubernetes.client.KubernetesClient
|
||||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||||
|
import io.quarkus.scheduler.Scheduled
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
@@ -25,6 +26,12 @@ class AutoScaler:
|
|||||||
@Inject
|
@Inject
|
||||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var loadBalancer: LoadBalancer = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var failoverService: FailoverService = uninitialized
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private var meterRegistry: MeterRegistry = uninitialized
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
@@ -51,6 +58,11 @@ class AutoScaler:
|
|||||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment(0)
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment(0)
|
||||||
()
|
()
|
||||||
|
|
||||||
|
@Scheduled(every = "10s")
|
||||||
|
def periodicScaleCheck(): Unit =
|
||||||
|
try checkAndScale
|
||||||
|
catch case ex: Exception => log.warnf(ex, "Auto-scale check failed")
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.asInstanceOf
|
// scalafix:off DisableSyntax.asInstanceOf
|
||||||
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
||||||
Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] =>
|
Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] =>
|
||||||
@@ -105,6 +117,7 @@ class AutoScaler:
|
|||||||
currentReplicas,
|
currentReplicas,
|
||||||
currentReplicas + 1,
|
currentReplicas + 1,
|
||||||
)
|
)
|
||||||
|
loadBalancer.rebalance
|
||||||
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
||||||
case _ => ()
|
case _ => ()
|
||||||
}
|
}
|
||||||
@@ -116,43 +129,61 @@ class AutoScaler:
|
|||||||
|
|
||||||
def scaleDown(): Unit =
|
def scaleDown(): Unit =
|
||||||
log.info("Scaling down Argo Rollout")
|
log.info("Scaling down Argo Rollout")
|
||||||
kubeClientOpt match
|
val underloadedInstance = instanceRegistry.getAllInstances
|
||||||
case None =>
|
.filter(_.state == "HEALTHY")
|
||||||
log.warn("Kubernetes client not available, cannot scale")
|
.minByOption(_.subscriptionCount)
|
||||||
case Some(kube) =>
|
|
||||||
try
|
|
||||||
Option(
|
|
||||||
kube
|
|
||||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
|
||||||
.inNamespace(config.k8sNamespace)
|
|
||||||
.withName(config.k8sRolloutName)
|
|
||||||
.get(),
|
|
||||||
).foreach { rollout =>
|
|
||||||
rolloutSpec(rollout).foreach { spec =>
|
|
||||||
spec.get("replicas") match
|
|
||||||
case replicas: Integer =>
|
|
||||||
val currentReplicas = replicas.intValue()
|
|
||||||
val minReplicas = config.scaleMinReplicas
|
|
||||||
|
|
||||||
if currentReplicas > minReplicas then
|
underloadedInstance.foreach { inst =>
|
||||||
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
log.infof("Draining instance %s before scale-down", inst.instanceId)
|
||||||
|
failoverService
|
||||||
|
.onInstanceStreamDropped(inst.instanceId)
|
||||||
|
.subscribe()
|
||||||
|
.`with`(
|
||||||
|
_ =>
|
||||||
|
kubeClientOpt match
|
||||||
|
case None =>
|
||||||
|
log.warn("Kubernetes client not available, cannot scale")
|
||||||
|
case Some(kube) =>
|
||||||
|
try
|
||||||
|
Option(
|
||||||
kube
|
kube
|
||||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||||
.inNamespace(config.k8sNamespace)
|
.inNamespace(config.k8sNamespace)
|
||||||
.resource(rollout)
|
.withName(config.k8sRolloutName)
|
||||||
.update()
|
.get(),
|
||||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
).foreach { rollout =>
|
||||||
log.infof(
|
rolloutSpec(rollout).foreach { spec =>
|
||||||
"Scaled down %s from %d to %d replicas",
|
spec.get("replicas") match
|
||||||
config.k8sRolloutName,
|
case replicas: Integer =>
|
||||||
currentReplicas,
|
val currentReplicas = replicas.intValue()
|
||||||
currentReplicas - 1,
|
val minReplicas = config.scaleMinReplicas
|
||||||
)
|
|
||||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
if currentReplicas > minReplicas then
|
||||||
case _ => ()
|
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
||||||
}
|
kube
|
||||||
}
|
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||||
catch
|
.inNamespace(config.k8sNamespace)
|
||||||
case ex: Exception =>
|
.resource(rollout)
|
||||||
|
.update()
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||||
|
log.infof(
|
||||||
|
"Scaled down %s from %d to %d replicas",
|
||||||
|
config.k8sRolloutName,
|
||||||
|
currentReplicas,
|
||||||
|
currentReplicas - 1,
|
||||||
|
)
|
||||||
|
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
||||||
|
case _ => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
case ex: Exception =>
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||||
|
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName),
|
||||||
|
ex =>
|
||||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
log.warnf(ex, "Failed to drain instance %s before scale-down", inst.instanceId),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if underloadedInstance.isEmpty then log.warn("No healthy instances found for scale-down")
|
||||||
|
|||||||
+17
-13
@@ -76,20 +76,24 @@ class InstanceRegistry:
|
|||||||
.onItem()
|
.onItem()
|
||||||
.transformToUni { value =>
|
.transformToUni { value =>
|
||||||
try
|
try
|
||||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
if value == null then
|
||||||
val isNew = !instances.containsKey(instanceId)
|
log.debugf("Instance %s metadata missing from Redis (may have expired)", instanceId)
|
||||||
instances.put(instanceId, metadata)
|
Uni.createFrom().item(())
|
||||||
if isNew then
|
|
||||||
meterRegistry.counter("nowchess.coordinator.instances.joined").increment()
|
|
||||||
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
|
||||||
else
|
else
|
||||||
log.debugf(
|
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
||||||
"Instance %s updated (subscriptions=%d state=%s)",
|
val isNew = !instances.containsKey(instanceId)
|
||||||
instanceId,
|
instances.put(instanceId, metadata)
|
||||||
metadata.subscriptionCount,
|
if isNew then
|
||||||
metadata.state,
|
meterRegistry.counter("nowchess.coordinator.instances.joined").increment()
|
||||||
)
|
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
||||||
Uni.createFrom().item(())
|
else
|
||||||
|
log.debugf(
|
||||||
|
"Instance %s updated (subscriptions=%d state=%s)",
|
||||||
|
instanceId,
|
||||||
|
metadata.subscriptionCount,
|
||||||
|
metadata.state,
|
||||||
|
)
|
||||||
|
Uni.createFrom().item(())
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId)
|
log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId)
|
||||||
|
|||||||
@@ -60,8 +60,8 @@ class GameEngine(
|
|||||||
@SuppressWarnings(Array("DisableSyntax.var"))
|
@SuppressWarnings(Array("DisableSyntax.var"))
|
||||||
private var pendingTakebackRequest: Option[Color] = initialTakebackRequest
|
private var pendingTakebackRequest: Option[Color] = initialTakebackRequest
|
||||||
|
|
||||||
|
GameEngine.activeGamesCount.incrementAndGet()
|
||||||
meterRegistry.foreach { reg =>
|
meterRegistry.foreach { reg =>
|
||||||
GameEngine.activeGamesCount.incrementAndGet()
|
|
||||||
reg.counter("nowchess.games.started").increment()
|
reg.counter("nowchess.games.started").increment()
|
||||||
}
|
}
|
||||||
private def gamesCompletedCounter(result: String): Counter =
|
private def gamesCompletedCounter(result: String): Counter =
|
||||||
|
|||||||
+1
-1
@@ -199,7 +199,7 @@ class InstanceHeartbeatService:
|
|||||||
val json = mapper.writeValueAsString(metadata)
|
val json = mapper.writeValueAsString(metadata)
|
||||||
reactiveRedis
|
reactiveRedis
|
||||||
.value(classOf[String])
|
.value(classOf[String])
|
||||||
.setex(key, 5L, json)
|
.setex(key, 15L, json)
|
||||||
.subscribe()
|
.subscribe()
|
||||||
.`with`(
|
.`with`(
|
||||||
_ => redisHeartbeatPending.set(false),
|
_ => redisHeartbeatPending.set(false),
|
||||||
|
|||||||
Reference in New Issue
Block a user