From a9f4606b40fcaf9ec59fc2503a40054264bcbccf Mon Sep 17 00:00:00 2001 From: Janis Date: Mon, 18 May 2026 20:25:13 +0200 Subject: [PATCH] feat: force delete pod immediately on heartbeat loss When instance stream drops, immediately force delete the K8s pod (grace period 0). No waiting for health check or pod watch events. Reduces failover latency and ensures stale pods don't linger. Co-Authored-By: Claude Haiku 4.5 --- .../coordinator/service/FailoverService.scala | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala index fedc7ff..2bab61f 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala @@ -1,6 +1,7 @@ package de.nowchess.coordinator.service import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.inject.Instance import jakarta.inject.Inject import io.quarkus.redis.datasource.RedisDataSource import scala.jdk.CollectionConverters.* @@ -9,6 +10,7 @@ import org.jboss.logging.Logger import de.nowchess.coordinator.dto.InstanceMetadata import de.nowchess.coordinator.grpc.CoreGrpcClient import de.nowchess.coordinator.config.CoordinatorConfig +import io.fabric8.kubernetes.client.KubernetesClient import io.smallrye.mutiny.Uni import java.time.Duration @@ -27,10 +29,17 @@ class FailoverService: @Inject private var config: CoordinatorConfig = uninitialized + @Inject + private var kubeClientInstance: Instance[KubernetesClient] = uninitialized + private val log = Logger.getLogger(classOf[FailoverService]) private var redisPrefix = "nowchess" // scalafix:on DisableSyntax.var + private def kubeClientOpt: Option[KubernetesClient] = + if kubeClientInstance.isUnsatisfied then None + else Some(kubeClientInstance.get()) + def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -39,6 +48,7 @@ class FailoverService: val startTime = System.currentTimeMillis() instanceRegistry.markInstanceDead(instanceId) + deleteK8sPod(instanceId) val gameIds = getOrphanedGames(instanceId) log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId) @@ -129,6 +139,31 @@ class FailoverService: case ex: Exception => log.errorf(ex, "Failed to update game instance mappings") + private def deleteK8sPod(instanceId: String): Unit = + kubeClientOpt match + case None => + log.debugf("Kubernetes client not available, skipping pod deletion for %s", instanceId) + case Some(kube) => + try + val pods = kube + .pods() + .inNamespace(config.k8sNamespace) + .withLabel(config.k8sRolloutLabelSelector) + .list() + .getItems + .asScala + + pods.find(pod => instanceId.contains(pod.getMetadata.getName)) match + case Some(pod) => + val podName = pod.getMetadata.getName + kube.pods().inNamespace(config.k8sNamespace).withName(podName).withGracePeriod(0L).delete() + log.infof("Force-deleted pod %s for dead instance %s", podName, instanceId) + case None => + log.debugf("No pod found for instance %s, skipping deletion", instanceId) + catch + case ex: Exception => + log.errorf(ex, "Failed to delete pod for instance %s", instanceId) + private def cleanupDeadInstance(instanceId: String): Unit = val setKey = s"$redisPrefix:instance:$instanceId:games" redis.key(classOf[String]).del(setKey)