From 2e4ba435978ef415b4ee2d7d2fc4af3b4e834b3d Mon Sep 17 00:00:00 2001 From: Janis Date: Sat, 16 May 2026 15:09:04 +0200 Subject: [PATCH] feat: implement clock expiry scanning and handling for game records (#54) Reviewed-on: https://git.janis-eccarius.de/NowChess/NowChessSystems/pulls/54 --- .../src/main/resources/application.yml | 2 + .../config/CoordinatorConfig.scala | 6 ++ .../coordinator/service/AutoScaler.scala | 98 ++++++++++++++----- 3 files changed, 83 insertions(+), 23 deletions(-) diff --git a/modules/coordinator/src/main/resources/application.yml b/modules/coordinator/src/main/resources/application.yml index 419bdd6..0d51f6c 100644 --- a/modules/coordinator/src/main/resources/application.yml +++ b/modules/coordinator/src/main/resources/application.yml @@ -47,6 +47,8 @@ nowchess: k8s-rollout-label-selector: "app=nowchess-core" startup-validation-timeout: 15s failover-wait-timeout: 30s + scale-cpu-threshold-percent: 0.8 + scale-memory-threshold-percent: 0.8 --- # dev profile diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/CoordinatorConfig.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/CoordinatorConfig.scala index c9a033e..7396ab4 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/CoordinatorConfig.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/CoordinatorConfig.scala @@ -62,3 +62,9 @@ trait CoordinatorConfig: @WithName("failover-wait-timeout") def failoverWaitTimeout: Duration + + @WithName("scale-cpu-threshold-percent") + def scaleCpuThresholdPercent: Double + + @WithName("scale-memory-threshold-percent") + def scaleMemoryThresholdPercent: Double diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 90e8464..df440f1 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -84,6 +84,34 @@ class AutoScaler: } // scalafix:on DisableSyntax.asInstanceOf + private def parseMillicores(s: String): Long = + if s.endsWith("n") then s.dropRight(1).toLongOption.map(_ / 1000000).getOrElse(0L) + else if s.endsWith("m") then s.dropRight(1).toLongOption.getOrElse(0L) + else s.toLongOption.map(_ * 1000).getOrElse(0L) + + private def parseBytes(s: String): Long = + if s.endsWith("Ki") then s.dropRight(2).toLongOption.map(_ * 1024L).getOrElse(0L) + else if s.endsWith("Mi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L).getOrElse(0L) + else if s.endsWith("Gi") then s.dropRight(2).toLongOption.map(_ * 1024L * 1024L * 1024L).getOrElse(0L) + else if s.endsWith("K") then s.dropRight(1).toLongOption.map(_ * 1000L).getOrElse(0L) + else if s.endsWith("M") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L).getOrElse(0L) + else if s.endsWith("G") then s.dropRight(1).toLongOption.map(_ * 1000L * 1000L * 1000L).getOrElse(0L) + else s.toLongOption.getOrElse(0L) + + private def exceedsRatio(used: Long, request: Long, threshold: Double, resource: String, instanceId: String): Boolean = + if request <= 0 then false + else + val ratio = used.toDouble / request.toDouble + log.debugf( + "Instance %s %s: %d used / %d requested = %.0f%%", + instanceId, + resource, + used, + request, + ratio * 100, + ) + ratio > threshold + // scalafix:off DisableSyntax.asInstanceOf private def isResourceConstrained(instanceId: String): Boolean = kubeClientOpt.fold(false) { kube => @@ -92,25 +120,37 @@ class AutoScaler: kube.pods().inNamespace(config.k8sNamespace).withLabel(config.k8sRolloutLabelSelector).list().getItems.asScala pods.find(_.getMetadata.getName.contains(instanceId)).exists { pod => try - val metricsRes = kube - .genericKubernetesResources(metricsApiVersion, "PodMetrics") - .inNamespace(config.k8sNamespace) - .withName(pod.getMetadata.getName) - .get() - val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]] - Option(metricsMap.get("metrics")) - .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) - .flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]])) - .filter(!_.isEmpty) - .map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]]) - .flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]])) - .flatMap(u => Option(u.get("cpu"))) - .map(_.toString) - .exists { cpuStr => - val cpuMillis = - if cpuStr.endsWith("m") then cpuStr.dropRight(1).toLongOption.getOrElse(0L) - else cpuStr.toLongOption.map(_ * 1000).getOrElse(0L) - cpuMillis > 800 + val requests = Option(pod.getSpec) + .flatMap(s => Option(s.getContainers)) + .flatMap(cs => if cs.isEmpty then None else Option(cs.get(0))) + .flatMap(c => Option(c.getResources)) + .flatMap(r => Option(r.getRequests)) + + val cpuRequestMillis = requests.flatMap(m => Option(m.get("cpu"))).map(q => parseMillicores(q.toString)).getOrElse(0L) + val memRequestBytes = requests.flatMap(m => Option(m.get("memory"))).map(q => parseBytes(q.toString)).getOrElse(0L) + + if cpuRequestMillis <= 0 && memRequestBytes <= 0 then + log.debugf("No resource requests found for instance %s, skipping resource check", instanceId) + false + else + val metricsRes = kube + .genericKubernetesResources(metricsApiVersion, "PodMetrics") + .inNamespace(config.k8sNamespace) + .withName(pod.getMetadata.getName) + .get() + val metricsMap = metricsRes.asInstanceOf[java.util.Map[String, AnyRef]] + val usageOpt = Option(metricsMap.get("metrics")) + .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) + .flatMap(m => Option(m.get("containers")).map(_.asInstanceOf[java.util.List[AnyRef]])) + .filter(!_.isEmpty) + .map(_.get(0).asInstanceOf[java.util.Map[String, AnyRef]]) + .flatMap(c => Option(c.get("usage")).map(_.asInstanceOf[java.util.Map[String, AnyRef]])) + + usageOpt.exists { usage => + val cpuUsed = Option(usage.get("cpu")).map(v => parseMillicores(v.toString)).getOrElse(0L) + val memUsed = Option(usage.get("memory")).map(v => parseBytes(v.toString)).getOrElse(0L) + exceedsRatio(cpuUsed, cpuRequestMillis, config.scaleCpuThresholdPercent, "CPU", instanceId) || + exceedsRatio(memUsed, memRequestBytes, config.scaleMemoryThresholdPercent, "memory", instanceId) } catch case _: Exception => false } @@ -128,13 +168,25 @@ class AutoScaler: if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") if instances.nonEmpty then - val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size + val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size + val scaleUpLoad = config.scaleUpThreshold * config.maxGamesPerCore + val scaleDownLoad = config.scaleDownThreshold * config.maxGamesPerCore avgLoadRef.set(avgLoad) - val hasHighCpuOrMemory = instances.exists(inst => isResourceConstrained(inst.instanceId)) + val constrainedInstance = instances.find(inst => isResourceConstrained(inst.instanceId)) + val hasHighCpuOrMemory = constrainedInstance.isDefined - if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore || hasHighCpuOrMemory then scaleUp() - else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas + log.infof( + "Scale check: instances=%d avgLoad=%.1f scaleUpAt=%.1f scaleDownAt=%.1f resourceConstrained=%s", + instances.size, + avgLoad, + scaleUpLoad, + scaleDownLoad, + constrainedInstance.map(_.instanceId).getOrElse("none"), + ) + + if avgLoad > scaleUpLoad || hasHighCpuOrMemory then scaleUp() + else if avgLoad < scaleDownLoad && instances.size > config.scaleMinReplicas then scaleDown() private def patchRolloutReplicas(