feat: force delete pod immediately on heartbeat loss
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
When instance stream drops, immediately force delete the K8s pod (grace period 0). No waiting for health check or pod watch events. Reduces failover latency and ensures stale pods don't linger. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
+35
@@ -1,6 +1,7 @@
|
|||||||
package de.nowchess.coordinator.service
|
package de.nowchess.coordinator.service
|
||||||
|
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
|
import jakarta.enterprise.inject.Instance
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
@@ -9,6 +10,7 @@ import org.jboss.logging.Logger
|
|||||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||||
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
|
import io.fabric8.kubernetes.client.KubernetesClient
|
||||||
import io.smallrye.mutiny.Uni
|
import io.smallrye.mutiny.Uni
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
@@ -27,10 +29,17 @@ class FailoverService:
|
|||||||
@Inject
|
@Inject
|
||||||
private var config: CoordinatorConfig = uninitialized
|
private var config: CoordinatorConfig = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var kubeClientInstance: Instance[KubernetesClient] = uninitialized
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[FailoverService])
|
private val log = Logger.getLogger(classOf[FailoverService])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private def kubeClientOpt: Option[KubernetesClient] =
|
||||||
|
if kubeClientInstance.isUnsatisfied then None
|
||||||
|
else Some(kubeClientInstance.get())
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
@@ -39,6 +48,7 @@ class FailoverService:
|
|||||||
|
|
||||||
val startTime = System.currentTimeMillis()
|
val startTime = System.currentTimeMillis()
|
||||||
instanceRegistry.markInstanceDead(instanceId)
|
instanceRegistry.markInstanceDead(instanceId)
|
||||||
|
deleteK8sPod(instanceId)
|
||||||
|
|
||||||
val gameIds = getOrphanedGames(instanceId)
|
val gameIds = getOrphanedGames(instanceId)
|
||||||
log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId)
|
log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId)
|
||||||
@@ -129,6 +139,31 @@ class FailoverService:
|
|||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.errorf(ex, "Failed to update game instance mappings")
|
log.errorf(ex, "Failed to update game instance mappings")
|
||||||
|
|
||||||
|
private def deleteK8sPod(instanceId: String): Unit =
|
||||||
|
kubeClientOpt match
|
||||||
|
case None =>
|
||||||
|
log.debugf("Kubernetes client not available, skipping pod deletion for %s", instanceId)
|
||||||
|
case Some(kube) =>
|
||||||
|
try
|
||||||
|
val pods = kube
|
||||||
|
.pods()
|
||||||
|
.inNamespace(config.k8sNamespace)
|
||||||
|
.withLabel(config.k8sRolloutLabelSelector)
|
||||||
|
.list()
|
||||||
|
.getItems
|
||||||
|
.asScala
|
||||||
|
|
||||||
|
pods.find(pod => instanceId.contains(pod.getMetadata.getName)) match
|
||||||
|
case Some(pod) =>
|
||||||
|
val podName = pod.getMetadata.getName
|
||||||
|
kube.pods().inNamespace(config.k8sNamespace).withName(podName).withGracePeriod(0L).delete()
|
||||||
|
log.infof("Force-deleted pod %s for dead instance %s", podName, instanceId)
|
||||||
|
case None =>
|
||||||
|
log.debugf("No pod found for instance %s, skipping deletion", instanceId)
|
||||||
|
catch
|
||||||
|
case ex: Exception =>
|
||||||
|
log.errorf(ex, "Failed to delete pod for instance %s", instanceId)
|
||||||
|
|
||||||
private def cleanupDeadInstance(instanceId: String): Unit =
|
private def cleanupDeadInstance(instanceId: String): Unit =
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
redis.key(classOf[String]).del(setKey)
|
redis.key(classOf[String]).del(setKey)
|
||||||
|
|||||||
Reference in New Issue
Block a user