Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c84be9edbb | |||
| 80dc577080 | |||
| 28894850ee |
@@ -47,6 +47,8 @@ nowchess:
|
|||||||
k8s-rollout-label-selector: "app=nowchess-core"
|
k8s-rollout-label-selector: "app=nowchess-core"
|
||||||
startup-validation-timeout: 15s
|
startup-validation-timeout: 15s
|
||||||
failover-wait-timeout: 30s
|
failover-wait-timeout: 30s
|
||||||
|
scale-cpu-threshold-percent: 0.8
|
||||||
|
scale-memory-threshold-percent: 0.8
|
||||||
|
|
||||||
---
|
---
|
||||||
# dev profile
|
# dev profile
|
||||||
|
|||||||
+6
@@ -62,3 +62,9 @@ trait CoordinatorConfig:
|
|||||||
|
|
||||||
@WithName("failover-wait-timeout")
|
@WithName("failover-wait-timeout")
|
||||||
def failoverWaitTimeout: Duration
|
def failoverWaitTimeout: Duration
|
||||||
|
|
||||||
|
@WithName("scale-cpu-threshold-percent")
|
||||||
|
def scaleCpuThresholdPercent: Double
|
||||||
|
|
||||||
|
@WithName("scale-memory-threshold-percent")
|
||||||
|
def scaleMemoryThresholdPercent: Double
|
||||||
|
|||||||
+75
-23
@@ -84,6 +84,34 @@ class AutoScaler:
|
|||||||
}
|
}
|
||||||
// scalafix:on DisableSyntax.asInstanceOf
|
// scalafix:on DisableSyntax.asInstanceOf
|
||||||
|
|
||||||
|
private def parseMillicores(s: String): Long =
|
||||||
|
if s.endsWith("n") then s.dropRight(1).toLongOption.map(_ / 1000000).getOrElse(0L)
|
||||||
|
else if s.endsWith("m") then s.dropRight(1).toLongOption.getOrElse(0L)
|
||||||
|
else s.toLongOption.map(_ * 1000).getOrElse(0L)
|
||||||
|
|
||||||
|
private def parseBytes(s: String): Long =
|
||||||
|
if s.endsWith("Ki") then s.dropRight(2).toLongOption.map(_ * 1024L).getOrElse(0L)
|
||||||
|
else if s.endsWith("Mi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L).getOrElse(0L)
|
||||||
|
else if s.endsWith("Gi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L * 1024L).getOrElse(0L)
|
||||||
|
else if s.endsWith("K") then s.dropRight(1).toLongOption.map(_ * 1000L).getOrElse(0L)
|
||||||
|
else if s.endsWith("M") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L).getOrElse(0L)
|
||||||
|
else if s.endsWith("G") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L * 1000L).getOrElse(0L)
|
||||||
|
else s.toLongOption.getOrElse(0L)
|
||||||
|
|
||||||
|
private def exceedsRatio(used: Long, request: Long, threshold: Double, resource: String, instanceId: String): Boolean =
|
||||||
|
if request <= 0 then false
|
||||||
|
else
|
||||||
|
val ratio = used.toDouble / request.toDouble
|
||||||
|
log.debugf(
|
||||||
|
"Instance %s %s: %d used / %d requested = %.0f%%",
|
||||||
|
instanceId,
|
||||||
|
resource,
|
||||||
|
used,
|
||||||
|
request,
|
||||||
|
ratio * 100,
|
||||||
|
)
|
||||||
|
ratio > threshold
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.asInstanceOf
|
// scalafix:off DisableSyntax.asInstanceOf
|
||||||
private def isResourceConstrained(instanceId: String): Boolean =
|
private def isResourceConstrained(instanceId: String): Boolean =
|
||||||
kubeClientOpt.fold(false) { kube =>
|
kubeClientOpt.fold(false) { kube =>
|
||||||
@@ -92,25 +120,37 @@ class AutoScaler:
|
|||||||
kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala
|
kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala
|
||||||
pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod =>
|
pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod =>
|
||||||
try
|
try
|
||||||
val metricsRes = kube
|
val requests = Option(pod.getSpec)
|
||||||
.genericKubernetesResources(metricsApiVersion, "PodMetrics")
|
.flatMap(s => Option(s.getContainers))
|
||||||
.inNamespace(config.k8sNamespace)
|
.flatMap(cs => if cs.isEmpty then None else Option(cs.get(0)))
|
||||||
.withName(pod.getMetadata.getName)
|
.flatMap(c => Option(c.getResources))
|
||||||
.get()
|
.flatMap(r => Option(r.getRequests))
|
||||||
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
|
|
||||||
Option(metricsMap.get("metrics"))
|
val cpuRequestMillis = requests.flatMap(m => Option(m.get("cpu"))).map(q => parseMillicores(q.toString)).getOrElse(0L)
|
||||||
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
|
val memRequestBytes = requests.flatMap(m => Option(m.get("memory"))).map(q => parseBytes(q.toString)).getOrElse(0L)
|
||||||
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
|
|
||||||
.filter(!_.isEmpty)
|
if cpuRequestMillis <= 0 && memRequestBytes <= 0 then
|
||||||
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
|
log.debugf("No resource requests found for instance %s, skipping resource check", instanceId)
|
||||||
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
|
false
|
||||||
.flatMap(u => Option(u.get("cpu")))
|
else
|
||||||
.map(_.toString)
|
val metricsRes = kube
|
||||||
.exists { cpuStr =>
|
.genericKubernetesResources(metricsApiVersion, "PodMetrics")
|
||||||
val cpuMillis =
|
.inNamespace(config.k8sNamespace)
|
||||||
if cpuStr.endsWith("m") then cpuStr.dropRight(1).toLongOption.getOrElse(0L)
|
.withName(pod.getMetadata.getName)
|
||||||
else cpuStr.toLongOption.map(_ * 1000).getOrElse(0L)
|
.get()
|
||||||
cpuMillis > 800
|
val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]]
|
||||||
|
val usageOpt = Option(metricsMap.get("metrics"))
|
||||||
|
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
|
||||||
|
.flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]]))
|
||||||
|
.filter(!_.isEmpty)
|
||||||
|
.map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]])
|
||||||
|
.flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]]))
|
||||||
|
|
||||||
|
usageOpt.exists { usage =>
|
||||||
|
val cpuUsed = Option(usage.get("cpu")).map(v => parseMillicores(v.toString)).getOrElse(0L)
|
||||||
|
val memUsed = Option(usage.get("memory")).map(v => parseBytes(v.toString)).getOrElse(0L)
|
||||||
|
exceedsRatio(cpuUsed, cpuRequestMillis, config.scaleCpuThresholdPercent, "CPU", instanceId) ||
|
||||||
|
exceedsRatio(memUsed, memRequestBytes, config.scaleMemoryThresholdPercent, "memory", instanceId)
|
||||||
}
|
}
|
||||||
catch case _: Exception => false
|
catch case _: Exception => false
|
||||||
}
|
}
|
||||||
@@ -128,13 +168,25 @@ class AutoScaler:
|
|||||||
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
||||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||||
if instances.nonEmpty then
|
if instances.nonEmpty then
|
||||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||||
|
val scaleUpLoad = config.scaleUpThreshold * config.maxGamesPerCore
|
||||||
|
val scaleDownLoad = config.scaleDownThreshold * config.maxGamesPerCore
|
||||||
avgLoadRef.set(avgLoad)
|
avgLoadRef.set(avgLoad)
|
||||||
|
|
||||||
val hasHighCpuOrMemory = instances.exists(inst => isResourceConstrained(inst.instanceId))
|
val constrainedInstance = instances.find(inst => isResourceConstrained(inst.instanceId))
|
||||||
|
val hasHighCpuOrMemory = constrainedInstance.isDefined
|
||||||
|
|
||||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore || hasHighCpuOrMemory then scaleUp()
|
log.infof(
|
||||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
"Scale check: instances=%d avgLoad=%.1f scaleUpAt=%.1f scaleDownAt=%.1f resourceConstrained=%s",
|
||||||
|
instances.size,
|
||||||
|
avgLoad,
|
||||||
|
scaleUpLoad,
|
||||||
|
scaleDownLoad,
|
||||||
|
constrainedInstance.map(_.instanceId).getOrElse("none"),
|
||||||
|
)
|
||||||
|
|
||||||
|
if avgLoad > scaleUpLoad || hasHighCpuOrMemory then scaleUp()
|
||||||
|
else if avgLoad < scaleDownLoad && instances.size > config.scaleMinReplicas
|
||||||
then scaleDown()
|
then scaleDown()
|
||||||
|
|
||||||
private def patchRolloutReplicas(
|
private def patchRolloutReplicas(
|
||||||
|
|||||||
Reference in New Issue
Block a user