From 1d121c727cbd4df477827cf64d065b7356a56e59 Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 13 May 2026 23:21:43 +0200 Subject: [PATCH] fix: don't block event loop during scale-down drain Scale-down was calling failoverService.onInstanceStreamDropped synchronously and waiting for it to complete. Failover retries for up to 30s waiting for healthy instances, which blocks the Quarkus event loop thread. This caused: - Event loop blocked for 15+ seconds - Redis health checks timing out (also on event loop) - Scale-down operations failing Fix: Trigger drain asynchronously without waiting. Scale-down proceeds immediately while drain happens in background. Co-Authored-By: Claude Haiku 4.5 --- .../coordinator/service/AutoScaler.scala | 89 +++++++++---------- 1 file changed, 43 insertions(+), 46 deletions(-) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index de6300c..14e8d35 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -134,56 +134,53 @@ class AutoScaler: .minByOption(_.subscriptionCount) underloadedInstance.foreach { inst => - log.infof("Draining instance %s before scale-down", inst.instanceId) + log.infof("Marking instance %s for drain before scale-down", inst.instanceId) failoverService .onInstanceStreamDropped(inst.instanceId) .subscribe() .`with`( - _ => - kubeClientOpt match - case None => - log.warn("Kubernetes client not available, cannot scale") - case Some(kube) => - try - Option( - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get(), - ).foreach { rollout => - rolloutSpec(rollout).foreach { spec => - spec.get("replicas") match - case replicas: Integer => - val currentReplicas = replicas.intValue() - val minReplicas = config.scaleMinReplicas - - if currentReplicas > minReplicas then - spec.put("replicas", Integer.valueOf(currentReplicas - 1)) - kube - .genericKubernetesResources(argoApiVersion, argoKind) - .inNamespace(config.k8sNamespace) - .resource(rollout) - .update() - meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment() - log.infof( - "Scaled down %s from %d to %d replicas", - config.k8sRolloutName, - currentReplicas, - currentReplicas - 1, - ) - else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) - case _ => () - } - } - catch - case ex: Exception => - meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() - log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName), - ex => - meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() - log.warnf(ex, "Failed to drain instance %s before scale-down", inst.instanceId), + _ => log.debugf("Instance %s drained for scale-down", inst.instanceId), + ex => log.warnf(ex, "Drain failed for %s, proceeding with scale-down", inst.instanceId), ) } - if underloadedInstance.isEmpty then log.warn("No healthy instances found for scale-down") + kubeClientOpt match + case None => + log.warn("Kubernetes client not available, cannot scale") + case Some(kube) => + try + Option( + kube + .genericKubernetesResources(argoApiVersion, argoKind) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .get(), + ).foreach { rollout => + rolloutSpec(rollout).foreach { spec => + spec.get("replicas") match + case replicas: Integer => + val currentReplicas = replicas.intValue() + val minReplicas = config.scaleMinReplicas + + if currentReplicas > minReplicas then + spec.put("replicas", Integer.valueOf(currentReplicas - 1)) + kube + .genericKubernetesResources(argoApiVersion, argoKind) + .inNamespace(config.k8sNamespace) + .resource(rollout) + .update() + meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment() + log.infof( + "Scaled down %s from %d to %d replicas", + config.k8sRolloutName, + currentReplicas, + currentReplicas - 1, + ) + else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) + case _ => () + } + } + catch + case ex: Exception => + meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() + log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)