fix: don't block event loop during scale-down drain
Scale-down was calling failoverService.onInstanceStreamDropped synchronously and waiting for it to complete. Failover retries for up to 30s waiting for healthy instances, which blocks the Quarkus event loop thread. This caused: - Event loop blocked for 15+ seconds - Redis health checks timing out (also on event loop) - Scale-down operations failing Fix: Trigger drain asynchronously without waiting. Scale-down proceeds immediately while drain happens in background. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
+43
-46
@@ -134,56 +134,53 @@ class AutoScaler:
|
||||
.minByOption(_.subscriptionCount)
|
||||
|
||||
underloadedInstance.foreach { inst =>
|
||||
log.infof("Draining instance %s before scale-down", inst.instanceId)
|
||||
log.infof("Marking instance %s for drain before scale-down", inst.instanceId)
|
||||
failoverService
|
||||
.onInstanceStreamDropped(inst.instanceId)
|
||||
.subscribe()
|
||||
.`with`(
|
||||
_ =>
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.warn("Kubernetes client not available, cannot scale")
|
||||
case Some(kube) =>
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case replicas: Integer =>
|
||||
val currentReplicas = replicas.intValue()
|
||||
val minReplicas = config.scaleMinReplicas
|
||||
|
||||
if currentReplicas > minReplicas then
|
||||
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
currentReplicas,
|
||||
currentReplicas - 1,
|
||||
)
|
||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName),
|
||||
ex =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||
log.warnf(ex, "Failed to drain instance %s before scale-down", inst.instanceId),
|
||||
_ => log.debugf("Instance %s drained for scale-down", inst.instanceId),
|
||||
ex => log.warnf(ex, "Drain failed for %s, proceeding with scale-down", inst.instanceId),
|
||||
)
|
||||
}
|
||||
|
||||
if underloadedInstance.isEmpty then log.warn("No healthy instances found for scale-down")
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.warn("Kubernetes client not available, cannot scale")
|
||||
case Some(kube) =>
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case replicas: Integer =>
|
||||
val currentReplicas = replicas.intValue()
|
||||
val minReplicas = config.scaleMinReplicas
|
||||
|
||||
if currentReplicas > minReplicas then
|
||||
spec.put("replicas", Integer.valueOf(currentReplicas - 1))
|
||||
kube
|
||||
.genericKubernetesResources(argoApiVersion, argoKind)
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
currentReplicas,
|
||||
currentReplicas - 1,
|
||||
)
|
||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||
|
||||
Reference in New Issue
Block a user