Compare commits
3 Commits
main
...
9ca8af9bf8
| Author | SHA1 | Date | |
|---|---|---|---|
| 9ca8af9bf8 | |||
| fcd7c80169 | |||
| 6448bec3f8 |
@@ -47,6 +47,8 @@ nowchess:
|
|||||||
k8s-rollout-label-selector: "app=nowchess-core"
|
k8s-rollout-label-selector: "app=nowchess-core"
|
||||||
startup-validation-timeout: 15s
|
startup-validation-timeout: 15s
|
||||||
failover-wait-timeout: 30s
|
failover-wait-timeout: 30s
|
||||||
|
scale-cpu-threshold-percent: 0.8
|
||||||
|
scale-memory-threshold-percent: 0.8
|
||||||
|
|
||||||
---
|
---
|
||||||
# dev profile
|
# dev profile
|
||||||
|
|||||||
+6
@@ -62,3 +62,9 @@ trait CoordinatorConfig:
|
|||||||
|
|
||||||
@WithName("failover-wait-timeout")
|
@WithName("failover-wait-timeout")
|
||||||
def failoverWaitTimeout: Duration
|
def failoverWaitTimeout: Duration
|
||||||
|
|
||||||
|
@WithName("scale-cpu-threshold-percent")
|
||||||
|
def scaleCpuThresholdPercent: Double
|
||||||
|
|
||||||
|
@WithName("scale-memory-threshold-percent")
|
||||||
|
def scaleMemoryThresholdPercent: Double
|
||||||
|
|||||||
+179
-102
@@ -10,9 +10,11 @@ import io.fabric8.kubernetes.client.KubernetesClient
|
|||||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||||
import io.quarkus.scheduler.Scheduled
|
import io.quarkus.scheduler.Scheduled
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
import io.fabric8.kubernetes.client.KubernetesClientException
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
@@ -37,9 +39,19 @@ class AutoScaler:
|
|||||||
private var meterRegistry: MeterRegistry = uninitialized
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||||
private val avgLoadRef = new AtomicReference[Double](0.0)
|
private val avgLoadRef = new AtomicReference[Double](0.0)
|
||||||
|
private val drainingForScaleDown = ConcurrentHashMap.newKeySet[String]()
|
||||||
|
|
||||||
|
def isDrainingForScaleDown(instanceId: String): Boolean =
|
||||||
|
drainingForScaleDown.contains(instanceId)
|
||||||
|
|
||||||
|
def clearDraining(instanceId: String): Unit =
|
||||||
|
drainingForScaleDown.remove(instanceId)
|
||||||
|
|
||||||
|
def clearDrainingByPodName(podName: String): Unit =
|
||||||
|
drainingForScaleDown.asScala.find(podName.contains).foreach(drainingForScaleDown.remove)
|
||||||
|
|
||||||
private def kubeClientOpt: Option[KubernetesClient] =
|
private def kubeClientOpt: Option[KubernetesClient] =
|
||||||
if kubeClientInstance.isUnsatisfied then None
|
if kubeClientInstance.isUnsatisfied then None
|
||||||
@@ -72,6 +84,34 @@ class AutoScaler:
|
|||||||
}
|
}
|
||||||
// scalafix:on DisableSyntax.asInstanceOf
|
// scalafix:on DisableSyntax.asInstanceOf
|
||||||
|
|
||||||
|
private def parseMillicores(s: String): Long =
|
||||||
|
if s.endsWith("n") then s.dropRight(1).toLongOption.map(_ / 1000000).getOrElse(0L)
|
||||||
|
else if s.endsWith("m") then s.dropRight(1).toLongOption.getOrElse(0L)
|
||||||
|
else s.toLongOption.map(_ * 1000).getOrElse(0L)
|
||||||
|
|
||||||
|
private def parseBytes(s: String): Long =
|
||||||
|
if s.endsWith("Ki") then s.dropRight(2).toLongOption.map(_ * 1024L).getOrElse(0L)
|
||||||
|
else if s.endsWith("Mi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L).getOrElse(0L)
|
||||||
|
else if s.endsWith("Gi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L * 1024L).getOrElse(0L)
|
||||||
|
else if s.endsWith("K") then s.dropRight(1).toLongOption.map(_ * 1000L).getOrElse(0L)
|
||||||
|
else if s.endsWith("M") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L).getOrElse(0L)
|
||||||
|
else if s.endsWith("G") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L * 1000L).getOrElse(0L)
|
||||||
|
else s.toLongOption.getOrElse(0L)
|
||||||
|
|
||||||
|
private def exceedsRatio(used: Long, request: Long, threshold: Double, resource: String, instanceId: String): Boolean =
|
||||||
|
if request <= 0 then false
|
||||||
|
else
|
||||||
|
val ratio = used.toDouble / request.toDouble
|
||||||
|
log.debugf(
|
||||||
|
"Instance %s %s: %d used / %d requested = %.0f%%",
|
||||||
|
instanceId,
|
||||||
|
resource,
|
||||||
|
used,
|
||||||
|
request,
|
||||||
|
ratio * 100,
|
||||||
|
)
|
||||||
|
ratio > threshold
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.asInstanceOf
|
// scalafix:off DisableSyntax.asInstanceOf
|
||||||
private def isResourceConstrained(instanceId: String): Boolean =
|
private def isResourceConstrained(instanceId: String): Boolean =
|
||||||
kubeClientOpt.fold(false) { kube =>
|
kubeClientOpt.fold(false) { kube =>
|
||||||
@@ -80,25 +120,37 @@ class AutoScaler:
|
|||||||
kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala
|
kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala
|
||||||
pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod =>
|
pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod =>
|
||||||
try
|
try
|
||||||
val metricsRes = kube
|
val requests = Option(pod.getSpec)
|
||||||
.genericKubernetesResources(metricsApiVersion, "PodMetrics")
|
.flatMap(s => Option(s.getContainers))
|
||||||
.inNamespace(config.k8sNamespace)
|
.flatMap(cs => if cs.isEmpty then None else Option(cs.get(0)))
|
||||||
.withName(pod.getMetadata.getName)
|
.flatMap(c => Option(c.getResources))
|
||||||
.get()
|
.flatMap(r => Option(r.getRequests))
|
||||||
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
|
|
||||||
Option(metricsMap.get("metrics"))
|
val cpuRequestMillis = requests.flatMap(m => Option(m.get("cpu"))).map(q => parseMillicores(q.toString)).getOrElse(0L)
|
||||||
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
|
val memRequestBytes = requests.flatMap(m => Option(m.get("memory"))).map(q => parseBytes(q.toString)).getOrElse(0L)
|
||||||
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
|
|
||||||
.filter(!_.isEmpty)
|
if cpuRequestMillis <= 0 && memRequestBytes <= 0 then
|
||||||
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
|
log.debugf("No resource requests found for instance %s, skipping resource check", instanceId)
|
||||||
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
|
false
|
||||||
.flatMap(u => Option(u.get("cpu")))
|
else
|
||||||
.map(_.toString)
|
val metricsRes = kube
|
||||||
.exists { cpuStr =>
|
.genericKubernetesResources(metricsApiVersion, "PodMetrics")
|
||||||
val cpuMillis =
|
.inNamespace(config.k8sNamespace)
|
||||||
if cpuStr.endsWith("m") then cpuStr.dropRight(1).toLongOption.getOrElse(0L)
|
.withName(pod.getMetadata.getName)
|
||||||
else cpuStr.toLongOption.map(_ * 1000).getOrElse(0L)
|
.get()
|
||||||
cpuMillis > 800
|
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
|
||||||
|
val usageOpt = Option(metricsMap.get("metrics"))
|
||||||
|
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
|
||||||
|
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
|
||||||
|
.filter(!_.isEmpty)
|
||||||
|
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
|
||||||
|
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
|
||||||
|
|
||||||
|
usageOpt.exists { usage =>
|
||||||
|
val cpuUsed = Option(usage.get("cpu")).map(v => parseMillicores(v.toString)).getOrElse(0L)
|
||||||
|
val memUsed = Option(usage.get("memory")).map(v => parseBytes(v.toString)).getOrElse(0L)
|
||||||
|
exceedsRatio(cpuUsed, cpuRequestMillis, config.scaleCpuThresholdPercent, "CPU", instanceId) ||
|
||||||
|
exceedsRatio(memUsed, memRequestBytes, config.scaleMemoryThresholdPercent, "memory", instanceId)
|
||||||
}
|
}
|
||||||
catch case _: Exception => false
|
catch case _: Exception => false
|
||||||
}
|
}
|
||||||
@@ -116,58 +168,90 @@ class AutoScaler:
|
|||||||
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
||||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||||
if instances.nonEmpty then
|
if instances.nonEmpty then
|
||||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||||
|
val scaleUpLoad = config.scaleUpThreshold * config.maxGamesPerCore
|
||||||
|
val scaleDownLoad = config.scaleDownThreshold * config.maxGamesPerCore
|
||||||
avgLoadRef.set(avgLoad)
|
avgLoadRef.set(avgLoad)
|
||||||
|
|
||||||
val hasHighCpuOrMemory = instances.exists(inst => isResourceConstrained(inst.instanceId))
|
val constrainedInstance = instances.find(inst => isResourceConstrained(inst.instanceId))
|
||||||
|
val hasHighCpuOrMemory = constrainedInstance.isDefined
|
||||||
|
|
||||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore || hasHighCpuOrMemory then scaleUp()
|
log.infof(
|
||||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
"Scale check: instances=%d avgLoad=%.1f scaleUpAt=%.1f scaleDownAt=%.1f resourceConstrained=%s",
|
||||||
|
instances.size,
|
||||||
|
avgLoad,
|
||||||
|
scaleUpLoad,
|
||||||
|
scaleDownLoad,
|
||||||
|
constrainedInstance.map(_.instanceId).getOrElse("none"),
|
||||||
|
)
|
||||||
|
|
||||||
|
if avgLoad > scaleUpLoad || hasHighCpuOrMemory then scaleUp()
|
||||||
|
else if avgLoad < scaleDownLoad && instances.size > config.scaleMinReplicas
|
||||||
then scaleDown()
|
then scaleDown()
|
||||||
|
|
||||||
|
private def patchRolloutReplicas(
|
||||||
|
kube: KubernetesClient,
|
||||||
|
direction: String,
|
||||||
|
delta: Int,
|
||||||
|
canScale: Int => Boolean,
|
||||||
|
atLimit: Int => Unit,
|
||||||
|
onSuccess: (Int, Int) => Unit,
|
||||||
|
maxRetries: Int = 3,
|
||||||
|
): Unit =
|
||||||
|
def attempt(retries: Int): Unit =
|
||||||
|
try
|
||||||
|
Option(
|
||||||
|
kube
|
||||||
|
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||||
|
.inNamespace(config.k8sNamespace)
|
||||||
|
.withName(config.k8sRolloutName)
|
||||||
|
.get(),
|
||||||
|
).foreach { rollout =>
|
||||||
|
rolloutSpec(rollout).foreach { spec =>
|
||||||
|
spec.get("replicas") match
|
||||||
|
case current: Integer =>
|
||||||
|
val n = current.intValue()
|
||||||
|
if !canScale(n) then atLimit(n)
|
||||||
|
else
|
||||||
|
spec.put("replicas", Integer.valueOf(n + delta))
|
||||||
|
kube
|
||||||
|
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||||
|
.inNamespace(config.k8sNamespace)
|
||||||
|
.resource(rollout)
|
||||||
|
.update()
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", direction).increment()
|
||||||
|
onSuccess(n, n + delta)
|
||||||
|
case _ => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
case ex: KubernetesClientException if ex.getCode == 409 =>
|
||||||
|
if retries > 0 then
|
||||||
|
log.debugf("Conflict scaling %s %s, retrying (%d left)", direction, config.k8sRolloutName, retries - 1)
|
||||||
|
attempt(retries - 1)
|
||||||
|
else
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
|
||||||
|
log.errorf(ex, "Failed to scale %s %s after conflict retries", direction, config.k8sRolloutName)
|
||||||
|
case ex: Exception =>
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", direction).increment()
|
||||||
|
log.errorf(ex, "Failed to scale %s %s", direction, config.k8sRolloutName)
|
||||||
|
attempt(maxRetries)
|
||||||
|
|
||||||
def scaleUp(): Unit =
|
def scaleUp(): Unit =
|
||||||
log.info("Scaling up Argo Rollout")
|
log.info("Scaling up Argo Rollout")
|
||||||
kubeClientOpt match
|
kubeClientOpt match
|
||||||
case None =>
|
case None => log.warn("Kubernetes client not available, cannot scale")
|
||||||
log.warn("Kubernetes client not available, cannot scale")
|
|
||||||
case Some(kube) =>
|
case Some(kube) =>
|
||||||
try
|
patchRolloutReplicas(
|
||||||
Option(
|
kube,
|
||||||
kube
|
direction = "up",
|
||||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
delta = 1,
|
||||||
.inNamespace(config.k8sNamespace)
|
canScale = _ < config.scaleMaxReplicas,
|
||||||
.withName(config.k8sRolloutName)
|
atLimit = n => log.infof("Already at max replicas %d for %s", n, config.k8sRolloutName),
|
||||||
.get(),
|
onSuccess = (from, to) =>
|
||||||
).foreach { rollout =>
|
log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, from, to)
|
||||||
rolloutSpec(rollout).foreach { spec =>
|
loadBalancer.rebalance,
|
||||||
spec.get("replicas") match
|
)
|
||||||
case replicas: Integer =>
|
|
||||||
val currentReplicas = replicas.intValue()
|
|
||||||
val maxReplicas = config.scaleMaxReplicas
|
|
||||||
|
|
||||||
if currentReplicas < maxReplicas then
|
|
||||||
spec.put("replicas", Integer.valueOf(currentReplicas + 1))
|
|
||||||
kube
|
|
||||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
|
||||||
.inNamespace(config.k8sNamespace)
|
|
||||||
.resource(rollout)
|
|
||||||
.update()
|
|
||||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment()
|
|
||||||
log.infof(
|
|
||||||
"Scaled up %s from %d to %d replicas",
|
|
||||||
config.k8sRolloutName,
|
|
||||||
currentReplicas,
|
|
||||||
currentReplicas + 1,
|
|
||||||
)
|
|
||||||
loadBalancer.rebalance
|
|
||||||
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
|
||||||
case _ => ()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
case ex: Exception =>
|
|
||||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment()
|
|
||||||
log.errorf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
|
||||||
|
|
||||||
def scaleDown(): Unit =
|
def scaleDown(): Unit =
|
||||||
log.info("Scaling down Argo Rollout")
|
log.info("Scaling down Argo Rollout")
|
||||||
@@ -177,6 +261,7 @@ class AutoScaler:
|
|||||||
|
|
||||||
underloadedInstance.foreach { inst =>
|
underloadedInstance.foreach { inst =>
|
||||||
log.infof("Marking instance %s for drain before scale-down", inst.instanceId)
|
log.infof("Marking instance %s for drain before scale-down", inst.instanceId)
|
||||||
|
drainingForScaleDown.add(inst.instanceId)
|
||||||
failoverService
|
failoverService
|
||||||
.onInstanceStreamDropped(inst.instanceId)
|
.onInstanceStreamDropped(inst.instanceId)
|
||||||
.subscribe()
|
.subscribe()
|
||||||
@@ -187,42 +272,34 @@ class AutoScaler:
|
|||||||
}
|
}
|
||||||
|
|
||||||
kubeClientOpt match
|
kubeClientOpt match
|
||||||
case None =>
|
case None => log.warn("Kubernetes client not available, cannot scale")
|
||||||
log.warn("Kubernetes client not available, cannot scale")
|
|
||||||
case Some(kube) =>
|
case Some(kube) =>
|
||||||
try
|
patchRolloutReplicas(
|
||||||
Option(
|
kube,
|
||||||
kube
|
direction = "down",
|
||||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
delta = -1,
|
||||||
.inNamespace(config.k8sNamespace)
|
canScale = _ > config.scaleMinReplicas,
|
||||||
.withName(config.k8sRolloutName)
|
atLimit = n => log.infof("Already at min replicas %d for %s", n, config.k8sRolloutName),
|
||||||
.get(),
|
onSuccess = (from, to) =>
|
||||||
).foreach { rollout =>
|
log.infof("Scaled down %s from %d to %d replicas", config.k8sRolloutName, from, to)
|
||||||
rolloutSpec(rollout).foreach { spec =>
|
underloadedInstance.foreach(inst => forceDeletePod(inst.instanceId, kube)),
|
||||||
spec.get("replicas") match
|
)
|
||||||
case replicas: Integer =>
|
|
||||||
val currentReplicas = replicas.intValue()
|
|
||||||
val minReplicas = config.scaleMinReplicas
|
|
||||||
|
|
||||||
if currentReplicas > minReplicas then
|
private def forceDeletePod(instanceId: String, kube: KubernetesClient): Unit =
|
||||||
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
try
|
||||||
kube
|
val pods = kube
|
||||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
.pods()
|
||||||
.inNamespace(config.k8sNamespace)
|
.inNamespace(config.k8sNamespace)
|
||||||
.resource(rollout)
|
.withLabel(config.k8sRolloutLabelSelector)
|
||||||
.update()
|
.list()
|
||||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
.getItems
|
||||||
log.infof(
|
.asScala
|
||||||
"Scaled down %s from %d to %d replicas",
|
pods.find(_.getMetadata.getName.contains(instanceId)) match
|
||||||
config.k8sRolloutName,
|
case Some(pod) =>
|
||||||
currentReplicas,
|
kube.pods().inNamespace(config.k8sNamespace).withName(pod.getMetadata.getName).withGracePeriod(0L).delete()
|
||||||
currentReplicas - 1,
|
log.infof("Force-deleted pod for drained instance %s", instanceId)
|
||||||
)
|
case None =>
|
||||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
log.debugf("No pod found for drained instance %s, skipping deletion", instanceId)
|
||||||
case _ => ()
|
catch
|
||||||
}
|
case ex: Exception =>
|
||||||
}
|
log.warnf(ex, "Failed to force-delete pod for drained instance %s", instanceId)
|
||||||
catch
|
|
||||||
case ex: Exception =>
|
|
||||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
|
||||||
log.errorf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
|
||||||
|
|||||||
+7
-7
@@ -88,7 +88,8 @@ class HealthMonitor:
|
|||||||
if evicted.nonEmpty then
|
if evicted.nonEmpty then
|
||||||
log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||||
evicted.foreach(deleteK8sPod)
|
evicted.foreach(deleteK8sPod)
|
||||||
autoScaler.scaleUp()
|
val unexpectedEvictions = evicted.filterNot(autoScaler.isDrainingForScaleDown)
|
||||||
|
if unexpectedEvictions.nonEmpty then autoScaler.scaleUp()
|
||||||
val instances = instanceRegistry.getAllInstances
|
val instances = instanceRegistry.getAllInstances
|
||||||
val failed = instances.collect { inst =>
|
val failed = instances.collect { inst =>
|
||||||
val isHealthy = checkHealth(inst.instanceId)
|
val isHealthy = checkHealth(inst.instanceId)
|
||||||
@@ -99,7 +100,8 @@ class HealthMonitor:
|
|||||||
Some(inst.instanceId)
|
Some(inst.instanceId)
|
||||||
else None
|
else None
|
||||||
}.flatten
|
}.flatten
|
||||||
if failed.nonEmpty then autoScaler.scaleUp()
|
val unexpectedFailures = failed.filterNot(autoScaler.isDrainingForScaleDown)
|
||||||
|
if unexpectedFailures.nonEmpty then autoScaler.scaleUp()
|
||||||
|
|
||||||
private def checkHealth(instanceId: String): Boolean =
|
private def checkHealth(instanceId: String): Boolean =
|
||||||
val redisHealthy = checkRedisHeartbeat(instanceId)
|
val redisHealthy = checkRedisHeartbeat(instanceId)
|
||||||
@@ -227,12 +229,10 @@ class HealthMonitor:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private def handlePodGone(pod: Pod): Unit =
|
private def handlePodGone(pod: Pod): Unit =
|
||||||
|
val podName = pod.getMetadata.getName
|
||||||
|
autoScaler.clearDrainingByPodName(podName)
|
||||||
findRegisteredInstance(pod).foreach { inst =>
|
findRegisteredInstance(pod).foreach { inst =>
|
||||||
log.warnf(
|
log.warnf("Pod %s deleted — triggering failover for %s", podName, inst.instanceId)
|
||||||
"Pod %s deleted — triggering failover for %s",
|
|
||||||
pod.getMetadata.getName,
|
|
||||||
inst.instanceId,
|
|
||||||
)
|
|
||||||
failoverService
|
failoverService
|
||||||
.onInstanceStreamDropped(inst.instanceId)
|
.onInstanceStreamDropped(inst.instanceId)
|
||||||
.subscribe()
|
.subscribe()
|
||||||
|
|||||||
@@ -1,13 +1,12 @@
|
|||||||
package de.nowchess.chess.redis
|
package de.nowchess.chess.redis
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import de.nowchess.api.dto.{GameStateEventDto, GameWritebackEventDto}
|
import de.nowchess.api.dto.{GameStateDto, GameStateEventDto, GameWritebackEventDto}
|
||||||
import de.nowchess.api.game.{CorrespondenceClockState, LiveClockState}
|
import de.nowchess.api.game.{CorrespondenceClockState, DrawReason, GameResult, LiveClockState, TimeControl, WinReason}
|
||||||
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
|
||||||
import de.nowchess.api.game.{DrawReason, GameResult, WinReason}
|
|
||||||
import de.nowchess.api.board.Color
|
import de.nowchess.api.board.Color
|
||||||
|
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||||
import de.nowchess.chess.observer.{GameEvent, Observer}
|
import de.nowchess.chess.observer.{GameEvent, Observer}
|
||||||
import de.nowchess.chess.registry.GameRegistry
|
import de.nowchess.chess.registry.{GameEntry, GameRegistry}
|
||||||
import de.nowchess.chess.resource.GameDtoMapper
|
import de.nowchess.chess.resource.GameDtoMapper
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
@@ -26,61 +25,69 @@ class GameRedisPublisher(
|
|||||||
onGameOver: String => Unit,
|
onGameOver: String => Unit,
|
||||||
) extends Observer:
|
) extends Observer:
|
||||||
|
|
||||||
|
def emitInitialWriteback(): Unit =
|
||||||
|
try
|
||||||
|
registry.get(gameId).foreach { entry =>
|
||||||
|
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
||||||
|
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
|
||||||
|
}
|
||||||
|
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to emit initial writeback for game %s", gameId)
|
||||||
|
|
||||||
def onGameEvent(event: GameEvent): Unit =
|
def onGameEvent(event: GameEvent): Unit =
|
||||||
try
|
try
|
||||||
GameRedisPublisher.log.debugf("Publishing game event for game %s", gameId)
|
GameRedisPublisher.log.debugf("Publishing game event for game %s", gameId)
|
||||||
registry.get(gameId).foreach { entry =>
|
registry.get(gameId).foreach { entry =>
|
||||||
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
|
||||||
val json = objectMapper.writeValueAsString(GameStateEventDto(dto))
|
redis.pubsub(classOf[String]).publish(s2cTopicName, objectMapper.writeValueAsString(GameStateEventDto(dto)))
|
||||||
redis.pubsub(classOf[String]).publish(s2cTopicName, json)
|
writebackEmit(objectMapper.writeValueAsString(buildWriteback(entry, dto)))
|
||||||
|
|
||||||
val clock = entry.engine.currentClockState
|
|
||||||
val wb = GameWritebackEventDto(
|
|
||||||
gameId = gameId,
|
|
||||||
fen = dto.fen,
|
|
||||||
pgn = dto.pgn,
|
|
||||||
moveCount = entry.engine.context.moves.size,
|
|
||||||
whiteId = entry.white.id.value,
|
|
||||||
whiteName = entry.white.displayName,
|
|
||||||
blackId = entry.black.id.value,
|
|
||||||
blackName = entry.black.displayName,
|
|
||||||
mode = entry.mode.toString,
|
|
||||||
resigned = entry.resigned,
|
|
||||||
limitSeconds = entry.engine.timeControl match {
|
|
||||||
case de.nowchess.api.game.TimeControl.Clock(l, _) => Some(l); case _ => None
|
|
||||||
},
|
|
||||||
incrementSeconds = entry.engine.timeControl match {
|
|
||||||
case de.nowchess.api.game.TimeControl.Clock(_, i) => Some(i); case _ => None
|
|
||||||
},
|
|
||||||
daysPerMove = entry.engine.timeControl match {
|
|
||||||
case de.nowchess.api.game.TimeControl.Correspondence(d) => Some(d); case _ => None
|
|
||||||
},
|
|
||||||
whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs },
|
|
||||||
blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs },
|
|
||||||
incrementMs = clock.collect { case c: LiveClockState => c.incrementMs },
|
|
||||||
clockLastTickAt = clock.collect { case c: LiveClockState => c.lastTickAt.toEpochMilli },
|
|
||||||
clockMoveDeadline = clock.collect { case c: CorrespondenceClockState => c.moveDeadline.toEpochMilli },
|
|
||||||
clockActiveColor = clock.map(_.activeColor.label.toLowerCase),
|
|
||||||
pendingDrawOffer = entry.engine.pendingDrawOfferBy.map(_.label.toLowerCase),
|
|
||||||
result = entry.engine.context.result.map {
|
|
||||||
case GameResult.Win(Color.White, _) => "white"
|
|
||||||
case GameResult.Win(Color.Black, _) => "black"
|
|
||||||
case GameResult.Draw(_) => "draw"
|
|
||||||
},
|
|
||||||
terminationReason = entry.engine.context.result.map {
|
|
||||||
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
|
|
||||||
case GameResult.Win(_, WinReason.Resignation) => "resignation"
|
|
||||||
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
|
|
||||||
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
|
|
||||||
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
|
|
||||||
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
|
|
||||||
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
|
|
||||||
case GameResult.Draw(DrawReason.Agreement) => "agreement"
|
|
||||||
},
|
|
||||||
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
|
|
||||||
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
|
|
||||||
)
|
|
||||||
writebackEmit(objectMapper.writeValueAsString(wb))
|
|
||||||
if entry.engine.context.result.isDefined then onGameOver(gameId)
|
if entry.engine.context.result.isDefined then onGameOver(gameId)
|
||||||
}
|
}
|
||||||
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
|
catch case ex: Exception => GameRedisPublisher.log.warnf(ex, "Failed to publish game event for game %s", gameId)
|
||||||
|
|
||||||
|
private def buildWriteback(entry: GameEntry, dto: GameStateDto): GameWritebackEventDto =
|
||||||
|
val clock = entry.engine.currentClockState
|
||||||
|
GameWritebackEventDto(
|
||||||
|
gameId = gameId,
|
||||||
|
fen = dto.fen,
|
||||||
|
pgn = dto.pgn,
|
||||||
|
moveCount = entry.engine.context.moves.size,
|
||||||
|
whiteId = entry.white.id.value,
|
||||||
|
whiteName = entry.white.displayName,
|
||||||
|
blackId = entry.black.id.value,
|
||||||
|
blackName = entry.black.displayName,
|
||||||
|
mode = entry.mode.toString,
|
||||||
|
resigned = entry.resigned,
|
||||||
|
limitSeconds = entry.engine.timeControl match {
|
||||||
|
case TimeControl.Clock(l, _) => Some(l); case _ => None
|
||||||
|
},
|
||||||
|
incrementSeconds = entry.engine.timeControl match {
|
||||||
|
case TimeControl.Clock(_, i) => Some(i); case _ => None
|
||||||
|
},
|
||||||
|
daysPerMove = entry.engine.timeControl match {
|
||||||
|
case TimeControl.Correspondence(d) => Some(d); case _ => None
|
||||||
|
},
|
||||||
|
whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs },
|
||||||
|
blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs },
|
||||||
|
incrementMs = clock.collect { case c: LiveClockState => c.incrementMs },
|
||||||
|
clockLastTickAt = clock.collect { case c: LiveClockState => c.lastTickAt.toEpochMilli },
|
||||||
|
clockMoveDeadline = clock.collect { case c: CorrespondenceClockState => c.moveDeadline.toEpochMilli },
|
||||||
|
clockActiveColor = clock.map(_.activeColor.label.toLowerCase),
|
||||||
|
pendingDrawOffer = entry.engine.pendingDrawOfferBy.map(_.label.toLowerCase),
|
||||||
|
result = entry.engine.context.result.map {
|
||||||
|
case GameResult.Win(Color.White, _) => "white"
|
||||||
|
case GameResult.Win(Color.Black, _) => "black"
|
||||||
|
case GameResult.Draw(_) => "draw"
|
||||||
|
},
|
||||||
|
terminationReason = entry.engine.context.result.map {
|
||||||
|
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
|
||||||
|
case GameResult.Win(_, WinReason.Resignation) => "resignation"
|
||||||
|
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
|
||||||
|
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
|
||||||
|
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
|
||||||
|
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
|
||||||
|
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
|
||||||
|
case GameResult.Draw(DrawReason.Agreement) => "agreement"
|
||||||
|
},
|
||||||
|
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
|
||||||
|
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
|
||||||
|
)
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import de.nowchess.chess.service.InstanceHeartbeatService
|
|||||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
|
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
|
||||||
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.annotation.PreDestroy
|
import jakarta.annotation.PreDestroy
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.enterprise.inject.Instance
|
import jakarta.enterprise.inject.Instance
|
||||||
@@ -45,6 +46,30 @@ class GameRedisSubscriberManager:
|
|||||||
private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]()
|
private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]()
|
||||||
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
|
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
|
||||||
|
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
|
private var clockExpireSubscriber: Option[ReactivePubSubCommands.ReactiveRedisSubscriber] = None
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire"
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
def subscribeClockExpiry(): Unit =
|
||||||
|
val handler: Consumer[String] = gameId => handleClockExpiry(gameId)
|
||||||
|
try
|
||||||
|
val subscriber = reactiveRedis
|
||||||
|
.pubsub(classOf[String])
|
||||||
|
.subscribe(clockExpireChannel, handler)
|
||||||
|
.await()
|
||||||
|
.atMost(java.time.Duration.ofSeconds(5))
|
||||||
|
clockExpireSubscriber = Some(subscriber)
|
||||||
|
log.infof("Subscribed to clock expiry channel %s", clockExpireChannel)
|
||||||
|
catch case ex: Exception => log.warnf(ex, "Failed to subscribe to clock expiry channel")
|
||||||
|
|
||||||
|
private def handleClockExpiry(gameId: String): Unit =
|
||||||
|
if !s2cObservers.containsKey(gameId) then
|
||||||
|
log.infof("Clock expired for game %s — loading engine to enforce timeout", gameId)
|
||||||
|
subscribeGame(gameId)
|
||||||
|
|
||||||
private def c2sTopic(gameId: String): String =
|
private def c2sTopic(gameId: String): String =
|
||||||
s"${redisConfig.prefix}:game:$gameId:c2s"
|
s"${redisConfig.prefix}:game:$gameId:c2s"
|
||||||
|
|
||||||
@@ -65,6 +90,7 @@ class GameRedisSubscriberManager:
|
|||||||
)
|
)
|
||||||
s2cObservers.put(gameId, obs)
|
s2cObservers.put(gameId, obs)
|
||||||
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
||||||
|
obs.emitInitialWriteback()
|
||||||
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
|
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
|
||||||
|
|
||||||
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
|
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
|
||||||
@@ -156,5 +182,6 @@ class GameRedisSubscriberManager:
|
|||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
def cleanup(): Unit =
|
def cleanup(): Unit =
|
||||||
|
clockExpireSubscriber.foreach(_.unsubscribe(clockExpireChannel).await().indefinitely())
|
||||||
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely())
|
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely())
|
||||||
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
|
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ dependencies {
|
|||||||
implementation("io.quarkus:quarkus-opentelemetry")
|
implementation("io.quarkus:quarkus-opentelemetry")
|
||||||
implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}")
|
implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}")
|
||||||
implementation("io.quarkus:quarkus-redis-client")
|
implementation("io.quarkus:quarkus-redis-client")
|
||||||
|
implementation("io.quarkus:quarkus-scheduler")
|
||||||
|
|
||||||
testImplementation(platform("org.junit:junit-bom:5.13.4"))
|
testImplementation(platform("org.junit:junit-bom:5.13.4"))
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter")
|
testImplementation("org.junit.jupiter:junit-jupiter")
|
||||||
|
|||||||
@@ -34,6 +34,19 @@ class GameRecordRepository:
|
|||||||
.asScala
|
.asScala
|
||||||
.toList
|
.toList
|
||||||
|
|
||||||
|
def findExpiredLiveClockGames(nowMs: Long): List[GameRecord] =
|
||||||
|
em.createQuery(
|
||||||
|
"SELECT g FROM GameRecord g WHERE g.result IS NULL AND g.clockLastTickAt IS NOT NULL AND g.whiteRemainingMs IS NOT NULL",
|
||||||
|
classOf[GameRecord],
|
||||||
|
).getResultList
|
||||||
|
.asScala
|
||||||
|
.toList
|
||||||
|
.filter { g =>
|
||||||
|
val remaining =
|
||||||
|
if g.clockActiveColor == "white" then g.whiteRemainingMs.longValue else g.blackRemainingMs.longValue
|
||||||
|
g.clockLastTickAt.longValue + remaining < nowMs
|
||||||
|
}
|
||||||
|
|
||||||
def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] =
|
def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] =
|
||||||
em.createQuery(
|
em.createQuery(
|
||||||
"SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC",
|
"SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC",
|
||||||
|
|||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package de.nowchess.store.service
|
||||||
|
|
||||||
|
import de.nowchess.store.config.RedisConfig
|
||||||
|
import de.nowchess.store.repository.GameRecordRepository
|
||||||
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.scheduler.Scheduled
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
|
import jakarta.inject.Inject
|
||||||
|
import org.jboss.logging.Logger
|
||||||
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
|
@ApplicationScoped
|
||||||
|
class ClockExpiryScanner:
|
||||||
|
@Inject
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
|
var repository: GameRecordRepository = uninitialized
|
||||||
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
|
// scalafix:on
|
||||||
|
|
||||||
|
private val log = Logger.getLogger(classOf[ClockExpiryScanner])
|
||||||
|
|
||||||
|
private def clockExpireChannel: String = s"${redisConfig.prefix}:game:clock:expire"
|
||||||
|
|
||||||
|
@Scheduled(every = "30s")
|
||||||
|
def scan(): Unit =
|
||||||
|
try
|
||||||
|
val nowMs = System.currentTimeMillis()
|
||||||
|
val expired = repository.findExpiredLiveClockGames(nowMs)
|
||||||
|
if expired.nonEmpty then
|
||||||
|
log.infof("Found %d games with expired clocks", expired.size)
|
||||||
|
expired.foreach { record =>
|
||||||
|
log.infof("Publishing clock expiry for game %s", record.gameId)
|
||||||
|
redis.pubsub(classOf[String]).publish(clockExpireChannel, record.gameId)
|
||||||
|
}
|
||||||
|
catch case ex: Exception => log.warnf(ex, "Clock expiry scan failed")
|
||||||
Reference in New Issue
Block a user