feat: implement clock expiry scanning and handling for game records
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
This commit is contained in:
+104
-79
@@ -10,9 +10,11 @@ import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import io.quarkus.scheduler.Scheduled
|
||||
import org.jboss.logging.Logger
|
||||
import io.fabric8.kubernetes.client.KubernetesClientException
|
||||
import scala.jdk.CollectionConverters.*
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -37,9 +39,19 @@ class AutoScaler:
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||
private val avgLoadRef = new AtomicReference[Double](0.0)
|
||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||
private val avgLoadRef = new AtomicReference[Double](0.0)
|
||||
private val drainingForScaleDown = ConcurrentHashMap.newKeySet[String]()
|
||||
|
||||
def isDrainingForScaleDown(instanceId: String): Boolean =
|
||||
drainingForScaleDown.contains(instanceId)
|
||||
|
||||
def clearDraining(instanceId: String): Unit =
|
||||
drainingForScaleDown.remove(instanceId)
|
||||
|
||||
def clearDrainingByPodName(podName: String): Unit =
|
||||
drainingForScaleDown.asScala.find(podName.contains).foreach(drainingForScaleDown.remove)
|
||||
|
||||
private def kubeClientOpt: Option[KubernetesClient] =
|
||||
if kubeClientInstance.isUnsatisfied then None
|
||||
@@ -125,49 +137,69 @@ class AutoScaler:
|
||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
||||
then scaleDown()
|
||||
|
||||
private def patchRolloutReplicas(
|
||||
kube: KubernetesClient,
|
||||
direction: String,
|
||||
delta: Int,
|
||||
canScale: Int => Boolean,
|
||||
atLimit: Int => Unit,
|
||||
onSuccess: (Int, Int) => Unit,
|
||||
maxRetries: Int = 3,
|
||||
): Unit =
|
||||
def attempt(retries: Int): Unit =
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case current: Integer =>
|
||||
val n = current.intValue()
|
||||
if !canScale(n) then atLimit(n)
|
||||
else
|
||||
spec.put("replicas", Integer.valueOf(n + delta))
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", direction).increment()
|
||||
onSuccess(n, n + delta)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: KubernetesClientException if ex.getCode == 409 =>
|
||||
if retries > 0 then
|
||||
log.debugf("Conflict scaling %s %s, retrying (%d left)", direction, config.k8sRolloutName, retries - 1)
|
||||
attempt(retries - 1)
|
||||
else
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
|
||||
log.errorf(ex, "Failed to scale %s %s after conflict retries", direction, config.k8sRolloutName)
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
|
||||
log.errorf(ex, "Failed to scale %s %s", direction, config.k8sRolloutName)
|
||||
attempt(maxRetries)
|
||||
|
||||
def scaleUp(): Unit =
|
||||
log.info("Scaling up Argo Rollout")
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.warn("Kubernetes client not available, cannot scale")
|
||||
case None => log.warn("Kubernetes client not available, cannot scale")
|
||||
case Some(kube) =>
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case replicas: Integer =>
|
||||
val currentReplicas = replicas.intValue()
|
||||
val maxReplicas = config.scaleMaxReplicas
|
||||
|
||||
if currentReplicas < maxReplicas then
|
||||
spec.put("replicas", Integer.valueOf(currentReplicas + 1))
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment()
|
||||
log.infof(
|
||||
"Scaled up %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
currentReplicas,
|
||||
currentReplicas + 1,
|
||||
)
|
||||
loadBalancer.rebalance
|
||||
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment()
|
||||
log.errorf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
||||
patchRolloutReplicas(
|
||||
kube,
|
||||
direction = "up",
|
||||
delta = 1,
|
||||
canScale = _ < config.scaleMaxReplicas,
|
||||
atLimit = n => log.infof("Already at max replicas %d for %s", n, config.k8sRolloutName),
|
||||
onSuccess = (from, to) =>
|
||||
log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, from, to)
|
||||
loadBalancer.rebalance,
|
||||
)
|
||||
|
||||
def scaleDown(): Unit =
|
||||
log.info("Scaling down Argo Rollout")
|
||||
@@ -177,6 +209,7 @@ class AutoScaler:
|
||||
|
||||
underloadedInstance.foreach { inst =>
|
||||
log.infof("Marking instance %s for drain before scale-down", inst.instanceId)
|
||||
drainingForScaleDown.add(inst.instanceId)
|
||||
failoverService
|
||||
.onInstanceStreamDropped(inst.instanceId)
|
||||
.subscribe()
|
||||
@@ -187,42 +220,34 @@ class AutoScaler:
|
||||
}
|
||||
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.warn("Kubernetes client not available, cannot scale")
|
||||
case None => log.warn("Kubernetes client not available, cannot scale")
|
||||
case Some(kube) =>
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case replicas: Integer =>
|
||||
val currentReplicas = replicas.intValue()
|
||||
val minReplicas = config.scaleMinReplicas
|
||||
patchRolloutReplicas(
|
||||
kube,
|
||||
direction = "down",
|
||||
delta = -1,
|
||||
canScale = _ > config.scaleMinReplicas,
|
||||
atLimit = n => log.infof("Already at min replicas %d for %s", n, config.k8sRolloutName),
|
||||
onSuccess = (from, to) =>
|
||||
log.infof("Scaled down %s from %d to %d replicas", config.k8sRolloutName, from, to)
|
||||
underloadedInstance.foreach(inst => forceDeletePod(inst.instanceId, kube)),
|
||||
)
|
||||
|
||||
if currentReplicas > minReplicas then
|
||||
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
currentReplicas,
|
||||
currentReplicas - 1,
|
||||
)
|
||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||
log.errorf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||
private def forceDeletePod(instanceId: String, kube: KubernetesClient): Unit =
|
||||
try
|
||||
val pods = kube
|
||||
.pods()
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withLabel(config.k8sRolloutLabelSelector)
|
||||
.list()
|
||||
.getItems
|
||||
.asScala
|
||||
pods.find(_.getMetadata.getName.contains(instanceId)) match
|
||||
case Some(pod) =>
|
||||
kube.pods().inNamespace(config.k8sNamespace).withName(pod.getMetadata.getName).withGracePeriod(0L).delete()
|
||||
log.infof("Force-deleted pod for drained instance %s", instanceId)
|
||||
case None =>
|
||||
log.debugf("No pod found for drained instance %s, skipping deletion", instanceId)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to force-delete pod for drained instance %s", instanceId)
|
||||
|
||||
+7
-7
@@ -88,7 +88,8 @@ class HealthMonitor:
|
||||
if evicted.nonEmpty then
|
||||
log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||
evicted.foreach(deleteK8sPod)
|
||||
autoScaler.scaleUp()
|
||||
val unexpectedEvictions = evicted.filterNot(autoScaler.isDrainingForScaleDown)
|
||||
if unexpectedEvictions.nonEmpty then autoScaler.scaleUp()
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
val failed = instances.collect { inst =>
|
||||
val isHealthy = checkHealth(inst.instanceId)
|
||||
@@ -99,7 +100,8 @@ class HealthMonitor:
|
||||
Some(inst.instanceId)
|
||||
else None
|
||||
}.flatten
|
||||
if failed.nonEmpty then autoScaler.scaleUp()
|
||||
val unexpectedFailures = failed.filterNot(autoScaler.isDrainingForScaleDown)
|
||||
if unexpectedFailures.nonEmpty then autoScaler.scaleUp()
|
||||
|
||||
private def checkHealth(instanceId: String): Boolean =
|
||||
val redisHealthy = checkRedisHeartbeat(instanceId)
|
||||
@@ -227,12 +229,10 @@ class HealthMonitor:
|
||||
}
|
||||
|
||||
private def handlePodGone(pod: Pod): Unit =
|
||||
val podName = pod.getMetadata.getName
|
||||
autoScaler.clearDrainingByPodName(podName)
|
||||
findRegisteredInstance(pod).foreach { inst =>
|
||||
log.warnf(
|
||||
"Pod %s deleted — triggering failover for %s",
|
||||
pod.getMetadata.getName,
|
||||
inst.instanceId,
|
||||
)
|
||||
log.warnf("Pod %s deleted — triggering failover for %s", podName, inst.instanceId)
|
||||
failoverService
|
||||
.onInstanceStreamDropped(inst.instanceId)
|
||||
.subscribe()
|
||||
|
||||
Reference in New Issue
Block a user