feat: implement clock expiry scanning and handling for game records (#54)
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
Reviewed-on: #54
This commit was merged in pull request #54.
This commit is contained in:
@@ -47,6 +47,8 @@ nowchess:
|
||||
k8s-rollout-label-selector: "app=nowchess-core"
|
||||
startup-validation-timeout: 15s
|
||||
failover-wait-timeout: 30s
|
||||
scale-cpu-threshold-percent: 0.8
|
||||
scale-memory-threshold-percent: 0.8
|
||||
|
||||
---
|
||||
# dev profile
|
||||
|
||||
+6
@@ -62,3 +62,9 @@ trait CoordinatorConfig:
|
||||
|
||||
@WithName("failover-wait-timeout")
|
||||
def failoverWaitTimeout: Duration
|
||||
|
||||
@WithName("scale-cpu-threshold-percent")
|
||||
def scaleCpuThresholdPercent: Double
|
||||
|
||||
@WithName("scale-memory-threshold-percent")
|
||||
def scaleMemoryThresholdPercent: Double
|
||||
|
||||
+75
-23
@@ -84,6 +84,34 @@ class AutoScaler:
|
||||
}
|
||||
// scalafix:on DisableSyntax.asInstanceOf
|
||||
|
||||
private def parseMillicores(s: String): Long =
|
||||
if s.endsWith("n") then s.dropRight(1).toLongOption.map(_ / 1000000).getOrElse(0L)
|
||||
else if s.endsWith("m") then s.dropRight(1).toLongOption.getOrElse(0L)
|
||||
else s.toLongOption.map(_ * 1000).getOrElse(0L)
|
||||
|
||||
private def parseBytes(s: String): Long =
|
||||
if s.endsWith("Ki") then s.dropRight(2).toLongOption.map(_ * 1024L).getOrElse(0L)
|
||||
else if s.endsWith("Mi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L).getOrElse(0L)
|
||||
else if s.endsWith("Gi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L * 1024L).getOrElse(0L)
|
||||
else if s.endsWith("K") then s.dropRight(1).toLongOption.map(_ * 1000L).getOrElse(0L)
|
||||
else if s.endsWith("M") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L).getOrElse(0L)
|
||||
else if s.endsWith("G") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L * 1000L).getOrElse(0L)
|
||||
else s.toLongOption.getOrElse(0L)
|
||||
|
||||
private def exceedsRatio(used: Long, request: Long, threshold: Double, resource: String, instanceId: String): Boolean =
|
||||
if request <= 0 then false
|
||||
else
|
||||
val ratio = used.toDouble / request.toDouble
|
||||
log.debugf(
|
||||
"Instance %s %s: %d used / %d requested = %.0f%%",
|
||||
instanceId,
|
||||
resource,
|
||||
used,
|
||||
request,
|
||||
ratio * 100,
|
||||
)
|
||||
ratio > threshold
|
||||
|
||||
// scalafix:off DisableSyntax.asInstanceOf
|
||||
private def isResourceConstrained(instanceId: String): Boolean =
|
||||
kubeClientOpt.fold(false) { kube =>
|
||||
@@ -92,25 +120,37 @@ class AutoScaler:
|
||||
kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala
|
||||
pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod =>
|
||||
try
|
||||
val metricsRes = kube
|
||||
.genericKubernetesResources(metricsApiVersion, "PodMetrics")
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(pod.getMetadata.getName)
|
||||
.get()
|
||||
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
|
||||
Option(metricsMap.get("metrics"))
|
||||
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
|
||||
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
|
||||
.filter(!_.isEmpty)
|
||||
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
|
||||
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
|
||||
.flatMap(u => Option(u.get("cpu")))
|
||||
.map(_.toString)
|
||||
.exists { cpuStr =>
|
||||
val cpuMillis =
|
||||
if cpuStr.endsWith("m") then cpuStr.dropRight(1).toLongOption.getOrElse(0L)
|
||||
else cpuStr.toLongOption.map(_ * 1000).getOrElse(0L)
|
||||
cpuMillis > 800
|
||||
val requests = Option(pod.getSpec)
|
||||
.flatMap(s => Option(s.getContainers))
|
||||
.flatMap(cs => if cs.isEmpty then None else Option(cs.get(0)))
|
||||
.flatMap(c => Option(c.getResources))
|
||||
.flatMap(r => Option(r.getRequests))
|
||||
|
||||
val cpuRequestMillis = requests.flatMap(m => Option(m.get("cpu"))).map(q => parseMillicores(q.toString)).getOrElse(0L)
|
||||
val memRequestBytes = requests.flatMap(m => Option(m.get("memory"))).map(q => parseBytes(q.toString)).getOrElse(0L)
|
||||
|
||||
if cpuRequestMillis <= 0 && memRequestBytes <= 0 then
|
||||
log.debugf("No resource requests found for instance %s, skipping resource check", instanceId)
|
||||
false
|
||||
else
|
||||
val metricsRes = kube
|
||||
.genericKubernetesResources(metricsApiVersion, "PodMetrics")
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(pod.getMetadata.getName)
|
||||
.get()
|
||||
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
|
||||
val usageOpt = Option(metricsMap.get("metrics"))
|
||||
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
|
||||
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
|
||||
.filter(!_.isEmpty)
|
||||
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
|
||||
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
|
||||
|
||||
usageOpt.exists { usage =>
|
||||
val cpuUsed = Option(usage.get("cpu")).map(v => parseMillicores(v.toString)).getOrElse(0L)
|
||||
val memUsed = Option(usage.get("memory")).map(v => parseBytes(v.toString)).getOrElse(0L)
|
||||
exceedsRatio(cpuUsed, cpuRequestMillis, config.scaleCpuThresholdPercent, "CPU", instanceId) ||
|
||||
exceedsRatio(memUsed, memRequestBytes, config.scaleMemoryThresholdPercent, "memory", instanceId)
|
||||
}
|
||||
catch case _: Exception => false
|
||||
}
|
||||
@@ -128,13 +168,25 @@ class AutoScaler:
|
||||
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||
if instances.nonEmpty then
|
||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||
val scaleUpLoad = config.scaleUpThreshold * config.maxGamesPerCore
|
||||
val scaleDownLoad = config.scaleDownThreshold * config.maxGamesPerCore
|
||||
avgLoadRef.set(avgLoad)
|
||||
|
||||
val hasHighCpuOrMemory = instances.exists(inst => isResourceConstrained(inst.instanceId))
|
||||
val constrainedInstance = instances.find(inst => isResourceConstrained(inst.instanceId))
|
||||
val hasHighCpuOrMemory = constrainedInstance.isDefined
|
||||
|
||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore || hasHighCpuOrMemory then scaleUp()
|
||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
||||
log.infof(
|
||||
"Scale check: instances=%d avgLoad=%.1f scaleUpAt=%.1f scaleDownAt=%.1f resourceConstrained=%s",
|
||||
instances.size,
|
||||
avgLoad,
|
||||
scaleUpLoad,
|
||||
scaleDownLoad,
|
||||
constrainedInstance.map(_.instanceId).getOrElse("none"),
|
||||
)
|
||||
|
||||
if avgLoad > scaleUpLoad || hasHighCpuOrMemory then scaleUp()
|
||||
else if avgLoad < scaleDownLoad && instances.size > config.scaleMinReplicas
|
||||
then scaleDown()
|
||||
|
||||
private def patchRolloutReplicas(
|
||||
|
||||
Reference in New Issue
Block a user