Compare commits

...

5 Commits

Author SHA1 Message Date
TeamCity 0060229ee9 ci: bump version with Build-81 2026-05-13 12:59:28 +00:00
Janis d5c8da20f8 fix: update grpcServer variable to use Instance wrapper and add optional access method
Build & Test (NowChessSystems) TeamCity build finished
2026-05-13 14:42:12 +02:00
Janis ad9495afa3 fix: clean up code formatting and improve error handling in gRPC server and failover service
Build & Test (NowChessSystems) TeamCity build failed
2026-05-13 13:16:22 +02:00
Janis 2b04d7fa71 fix: replace null checks with Option in coordinator
Build & Test (NowChessSystems) TeamCity build failed
Use Option instead of null checks in HealthMonitor and InstanceRegistry
per Scalafix DisableSyntax rule.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-13 12:44:34 +02:00
Janis 81b045d01b feat: add coordinator startup validation and K8s pod watch
Build & Test (NowChessSystems) TeamCity build failed
On startup, load all known instances from Redis and wait 15s for them to
reconnect via gRPC. Evict instances that don't reconnect within the timeout
and delete their K8s pods.

Replace one-shot pod status check with real fabric8 Watch. On pod Terminating
event, mark instance dead. On pod Deleted event, trigger failover. Failover
now waits reactively for at least one healthy instance before distributing
orphaned games, up to 30s timeout.

- Add startupValidationTimeout and failoverWaitTimeout config (15s, 30s)
- CoordinatorGrpcServer tracks active gRPC streams
- InstanceRegistry.loadAllFromRedis() scans and loads instances on startup
- HealthMonitor startup observer validates instances and starts K8s watch
- FailoverService.onInstanceStreamDropped returns Uni[Unit] for reactive wait
- All failover service callers updated to subscribe to Uni result

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-13 09:55:38 +02:00
9 changed files with 239 additions and 51 deletions
+32
View File
@@ -334,3 +334,35 @@
* **middleware:** update paths for bot generation and stockfish configuration ([2dd0501](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2dd0501687db08dcd242359f6837125baf8a2fdc)) * **middleware:** update paths for bot generation and stockfish configuration ([2dd0501](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2dd0501687db08dcd242359f6837125baf8a2fdc))
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4)) * **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
* update HealthMonitor to evict instances without associated pods ([0f41f13](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0f41f13ce68b76846684bab67241a122250dfaf9)) * update HealthMonitor to evict instances without associated pods ([0f41f13](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0f41f13ce68b76846684bab67241a122250dfaf9))
## (2026-05-13)
### Features
* add coordinator startup validation and K8s pod watch ([81b045d](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/81b045d01bb054a4bc9dc9e02fc30f814e756205))
* add initialization metrics for various services ([d438e97](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d438e97f32bdde0bfc63c1b4a8cc810cdd093166))
* add OpenTelemetry trace configuration with parentbased sampler ([3904d5a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3904d5ad8ad4930ddee65287a7bfab785a6148f5))
* **config:** update application.yml for PostgreSQL and remove staging/production configurations ([2404e61](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2404e6164c3b50ffccbea5238d636060d6abe4d6))
* **config:** update application.yml for staging and production environments ([6113432](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/6113432a14c476a3a0dfc0d449e17d023697f2ba))
* configure logging and add OpenTelemetry support ([#49](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/49)) ([d57c488](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d57c4886612d1d92da0e1b79209fc83e6ef537a1))
* **docker:** add .dockerignore and .gitignore files for build exclusions ([c987d8e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c987d8e258c0e6c4cfbdaa8381c64c410d7a2b83))
* **docker:** add Dockerfiles for building Quarkus application in native and JVM modes ([3f2d2bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3f2d2bb4c97fa8cddba66e1da4427c54236dfeed))
* **docker:** add Dockerfiles for Quarkus application in JVM and native modes ([34b9933](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/34b993304670cf2aa62cd2f6460cee7b9864b08e))
* **logging:** add DEBUG/INFO/WARN logging across services (NCS-72) ([#41](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/41)) ([804a4bf](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/804a4bf179e3dfb19e2be4390e7e543caf5237c6))
* NCS-78 Add Traceability to the Applications ([#46](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/46)) ([649566e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/649566eb3fcf38f91c8896a739f74ea318af312d))
* NCS-78 Add Traceability to the Applications ([#47](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/47)) ([87dfc6c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/87dfc6c2bcce7f7d58fc641bd8d468a2e584c108))
* true-microservices ([#40](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/40)) ([5909242](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/590924254e8a2754de661a57a03e43f89ceb6299))
### Bug Fixes
* add instance-dead-timeout configuration and update HealthMonitor to use it for stale instance eviction ([be0b710](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/be0b710543b542da5c301efef7d2d587d0ba758a))
* clean up code formatting and improve error handling in gRPC server and failover service ([ad9495a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/ad9495afa3e93593b57154a187346c9b01393911))
* **coordinator:** refine type casting in rolloutSpec method ([#45](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/45)) ([d522f7f](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d522f7f6edf9c985f03dd16816439d4184f1a589))
* **coordinator:** use genericKubernetesResources API for Argo Rollout scaling ([#43](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/43)) ([fa3c6b2](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/fa3c6b2886dc59c14c5dad834acc9b41e42023bb))
* **coordinator:** use genericKubernetesResources API for Argo Rollout scaling ([#44](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/44)) ([82d0b75](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/82d0b754be1075084944b466858672d944f9f7d8))
* **dependencies:** replace Fabric8 Kubernetes client with Quarkus Kubernetes client ([5f44570](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5f44570b357277d09f33b7296860c421e2e70ce0))
* enhance AutoScaler and InstanceRegistry for replica management and stale instance eviction ([b4920d3](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b4920d3817e58bda94d7764e608b856ce9a909f7))
* **middleware:** update paths for bot generation and stockfish configuration ([2dd0501](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2dd0501687db08dcd242359f6837125baf8a2fdc))
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
* replace null checks with Option in coordinator ([2b04d7f](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2b04d7fa713e06662bff5afe3fb3f9d04541ce51))
* update grpcServer variable to use Instance wrapper and add optional access method ([d5c8da2](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d5c8da20f8805199e920ea5afbd9cdb39a078e40))
* update HealthMonitor to evict instances without associated pods ([0f41f13](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0f41f13ce68b76846684bab67241a122250dfaf9))
@@ -45,6 +45,8 @@ nowchess:
k8s-namespace: default k8s-namespace: default
k8s-rollout-name: nowchess-core k8s-rollout-name: nowchess-core
k8s-rollout-label-selector: "app=nowchess-core" k8s-rollout-label-selector: "app=nowchess-core"
startup-validation-timeout: 15s
failover-wait-timeout: 30s
--- ---
# dev profile # dev profile
@@ -56,3 +56,9 @@ trait CoordinatorConfig:
@WithName("k8s-rollout-label-selector") @WithName("k8s-rollout-label-selector")
def k8sRolloutLabelSelector: String def k8sRolloutLabelSelector: String
@WithName("startup-validation-timeout")
def startupValidationTimeout: Duration
@WithName("failover-wait-timeout")
def failoverWaitTimeout: Duration
@@ -9,6 +9,7 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
import io.grpc.stub.StreamObserver import io.grpc.stub.StreamObserver
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import org.jboss.logging.Logger import org.jboss.logging.Logger
import java.util.concurrent.ConcurrentHashMap
@GrpcService @GrpcService
@Singleton @Singleton
@@ -21,8 +22,9 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
private var failoverService: FailoverService = uninitialized private var failoverService: FailoverService = uninitialized
// scalafix:on DisableSyntax.var // scalafix:on DisableSyntax.var
private val mapper = ObjectMapper() private val mapper = ObjectMapper()
private val log = Logger.getLogger(classOf[CoordinatorGrpcServer]) private val log = Logger.getLogger(classOf[CoordinatorGrpcServer])
private val activeStreams = ConcurrentHashMap.newKeySet[String]()
override def heartbeatStream( override def heartbeatStream(
responseObserver: StreamObserver[CoordinatorCommand], responseObserver: StreamObserver[CoordinatorCommand],
@@ -38,6 +40,7 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
lastInstanceId = frame.getInstanceId lastInstanceId = frame.getInstanceId
if !firstFrameSeen then if !firstFrameSeen then
firstFrameSeen = true firstFrameSeen = true
activeStreams.add(frame.getInstanceId)
log.infof( log.infof(
"First heartbeat from instance %s (host=%s http=%d grpc=%d)", "First heartbeat from instance %s (host=%s http=%d grpc=%d)",
frame.getInstanceId, frame.getInstanceId,
@@ -60,10 +63,19 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
override def onError(t: Throwable): Unit = override def onError(t: Throwable): Unit =
log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId) log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId)
if lastInstanceId.nonEmpty then failoverService.onInstanceStreamDropped(lastInstanceId) if lastInstanceId.nonEmpty then
activeStreams.remove(lastInstanceId)
failoverService
.onInstanceStreamDropped(lastInstanceId)
.subscribe()
.`with`(
_ => (),
ex => log.warnf(ex, "Failover for %s failed", lastInstanceId),
)
override def onCompleted: Unit = override def onCompleted: Unit =
log.infof("Heartbeat stream completed for instance %s", lastInstanceId) log.infof("Heartbeat stream completed for instance %s", lastInstanceId)
activeStreams.remove(lastInstanceId)
override def batchResubscribeGames( override def batchResubscribeGames(
request: BatchResubscribeRequest, request: BatchResubscribeRequest,
@@ -108,7 +120,19 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
val instanceId = request.getInstanceId val instanceId = request.getInstanceId
log.infof("Drain request for instance %s", instanceId) log.infof("Drain request for instance %s", instanceId)
val gamesBefore = instanceRegistry.getInstance(instanceId).map(_.subscriptionCount).getOrElse(0) val gamesBefore = instanceRegistry.getInstance(instanceId).map(_.subscriptionCount).getOrElse(0)
failoverService.onInstanceStreamDropped(instanceId) failoverService
val response = DrainInstanceResponse.newBuilder().setGamesMigrated(gamesBefore).build() .onInstanceStreamDropped(instanceId)
responseObserver.onNext(response) .subscribe()
responseObserver.onCompleted() .`with`(
_ =>
val response = DrainInstanceResponse.newBuilder().setGamesMigrated(gamesBefore).build()
responseObserver.onNext(response)
responseObserver.onCompleted()
,
ex =>
log.warnf(ex, "Drain failed for %s", instanceId)
responseObserver.onError(ex),
)
def hasActiveStream(instanceId: String): Boolean =
activeStreams.contains(instanceId)
@@ -70,7 +70,13 @@ class CoordinatorResource:
@Produces(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_JSON))
def triggerFailover(@PathParam("instanceId") instanceId: String): scala.collection.Map[String, String] = def triggerFailover(@PathParam("instanceId") instanceId: String): scala.collection.Map[String, String] =
log.infof("Manual failover triggered for instance %s", instanceId) log.infof("Manual failover triggered for instance %s", instanceId)
failoverService.onInstanceStreamDropped(instanceId) failoverService
.onInstanceStreamDropped(instanceId)
.subscribe()
.`with`(
_ => (),
ex => log.warnf(ex, "Manual failover for %s failed", instanceId),
)
Map("status" -> "failover_started", "instanceId" -> instanceId) Map("status" -> "failover_started", "instanceId" -> instanceId)
@POST @POST
@@ -8,6 +8,9 @@ import scala.compiletime.uninitialized
import org.jboss.logging.Logger import org.jboss.logging.Logger
import de.nowchess.coordinator.dto.InstanceMetadata import de.nowchess.coordinator.dto.InstanceMetadata
import de.nowchess.coordinator.grpc.CoreGrpcClient import de.nowchess.coordinator.grpc.CoreGrpcClient
import de.nowchess.coordinator.config.CoordinatorConfig
import io.smallrye.mutiny.Uni
import java.time.Duration
@ApplicationScoped @ApplicationScoped
class FailoverService: class FailoverService:
@@ -21,6 +24,9 @@ class FailoverService:
@Inject @Inject
private var coreGrpcClient: CoreGrpcClient = uninitialized private var coreGrpcClient: CoreGrpcClient = uninitialized
@Inject
private var config: CoordinatorConfig = uninitialized
private val log = Logger.getLogger(classOf[FailoverService]) private val log = Logger.getLogger(classOf[FailoverService])
private var redisPrefix = "nowchess" private var redisPrefix = "nowchess"
// scalafix:on DisableSyntax.var // scalafix:on DisableSyntax.var
@@ -28,7 +34,7 @@ class FailoverService:
def setRedisPrefix(prefix: String): Unit = def setRedisPrefix(prefix: String): Unit =
redisPrefix = prefix redisPrefix = prefix
def onInstanceStreamDropped(instanceId: String): Unit = def onInstanceStreamDropped(instanceId: String): Uni[Unit] =
log.infof("Instance %s stream dropped, triggering failover", instanceId) log.infof("Instance %s stream dropped, triggering failover", instanceId)
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
@@ -37,19 +43,32 @@ class FailoverService:
val gameIds = getOrphanedGames(instanceId) val gameIds = getOrphanedGames(instanceId)
log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId) log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId)
if gameIds.nonEmpty then if gameIds.isEmpty then
val healthyInstances = instanceRegistry.getAllInstances cleanupDeadInstance(instanceId)
.filter(_.state == "HEALTHY") Uni.createFrom().item(())
.sortBy(_.subscriptionCount) else
waitForHealthyInstanceAsync()
.onItem()
.transform { _ =>
val healthyInstances = instanceRegistry.getAllInstances
.filter(_.state == "HEALTHY")
.sortBy(_.subscriptionCount)
distributeGames(gameIds, healthyInstances, instanceId)
if healthyInstances.nonEmpty then val elapsed = System.currentTimeMillis() - startTime
distributeGames(gameIds, healthyInstances, instanceId) log.infof("Failover completed in %dms for instance %s", elapsed, instanceId)
cleanupDeadInstance(instanceId)
val elapsed = System.currentTimeMillis() - startTime ()
log.infof("Failover completed in %dms for instance %s", elapsed, instanceId) }
else log.warnf("No healthy instances available for failover of %s", instanceId) .onFailure()
.recoverWithItem { _ =>
cleanupDeadInstance(instanceId) log.errorf(
"No healthy instance appeared within %s — games orphaned for %s",
config.failoverWaitTimeout,
instanceId,
)
()
}
private def getOrphanedGames(instanceId: String): List[String] = private def getOrphanedGames(instanceId: String): List[String] =
val setKey = s"$redisPrefix:instance:$instanceId:games" val setKey = s"$redisPrefix:instance:$instanceId:games"
@@ -101,3 +120,19 @@ class FailoverService:
val setKey = s"$redisPrefix:instance:$instanceId:games" val setKey = s"$redisPrefix:instance:$instanceId:games"
redis.key(classOf[String]).del(setKey) redis.key(classOf[String]).del(setKey)
log.infof("Cleaned up games set for instance %s", instanceId) log.infof("Cleaned up games set for instance %s", instanceId)
private def waitForHealthyInstanceAsync(): Uni[InstanceMetadata] =
Uni
.createFrom()
.deferred(() =>
instanceRegistry.getAllInstances
.filter(_.state == "HEALTHY")
.sortBy(_.subscriptionCount)
.headOption match
case Some(inst) => Uni.createFrom().item(inst)
case None => Uni.createFrom().failure(new RuntimeException("no healthy instance")),
)
.onFailure()
.retry()
.withBackOff(Duration.ofMillis(500))
.expireIn(config.failoverWaitTimeout.toMillis)
@@ -2,17 +2,23 @@ package de.nowchess.coordinator.service
import jakarta.annotation.PostConstruct import jakarta.annotation.PostConstruct
import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.event.Observes
import jakarta.enterprise.inject.Instance import jakarta.enterprise.inject.Instance
import jakarta.inject.Inject import jakarta.inject.Inject
import de.nowchess.coordinator.config.CoordinatorConfig import de.nowchess.coordinator.config.CoordinatorConfig
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.Watcher
import io.fabric8.kubernetes.client.WatcherException
import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.MeterRegistry
import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.runtime.StartupEvent
import scala.jdk.CollectionConverters.* import scala.jdk.CollectionConverters.*
import org.jboss.logging.Logger import org.jboss.logging.Logger
import scala.compiletime.uninitialized import scala.compiletime.uninitialized
import java.time.Instant import java.time.Instant
import de.nowchess.coordinator.grpc.CoordinatorGrpcServer
import de.nowchess.coordinator.dto.InstanceMetadata
@ApplicationScoped @ApplicationScoped
class HealthMonitor: class HealthMonitor:
@@ -32,6 +38,12 @@ class HealthMonitor:
@Inject @Inject
private var meterRegistry: MeterRegistry = uninitialized private var meterRegistry: MeterRegistry = uninitialized
@Inject
private var grpcServerInstance: Instance[CoordinatorGrpcServer] = uninitialized
@Inject
private var failoverService: FailoverService = uninitialized
private val log = Logger.getLogger(classOf[HealthMonitor]) private val log = Logger.getLogger(classOf[HealthMonitor])
private var redisPrefix = "nowchess" private var redisPrefix = "nowchess"
// scalafix:on DisableSyntax.var // scalafix:on DisableSyntax.var
@@ -40,6 +52,10 @@ class HealthMonitor:
if kubeClientInstance.isUnsatisfied then None if kubeClientInstance.isUnsatisfied then None
else Some(kubeClientInstance.get()) else Some(kubeClientInstance.get())
private def grpcServerOpt: Option[CoordinatorGrpcServer] =
if grpcServerInstance.isUnsatisfied then None
else Some(grpcServerInstance.get())
def setRedisPrefix(prefix: String): Unit = def setRedisPrefix(prefix: String): Unit =
redisPrefix = prefix redisPrefix = prefix
@@ -48,6 +64,15 @@ class HealthMonitor:
meterRegistry.counter("nowchess.coordinator.health.checks").increment(0) meterRegistry.counter("nowchess.coordinator.health.checks").increment(0)
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment(0) meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment(0)
def onStartup(@Observes ev: StartupEvent): Unit =
instanceRegistry.loadAllFromRedis()
val loaded = instanceRegistry.getAllInstances
log.infof("Startup: loaded %d instances from Redis", loaded.size)
if loaded.nonEmpty then
val timeoutMs = config.startupValidationTimeout.toMillis
Thread.ofVirtual().start(() => validateStartupInstances(timeoutMs))
startPodWatch()
def checkInstanceHealth: Unit = def checkInstanceHealth: Unit =
meterRegistry.counter("nowchess.coordinator.health.checks").increment() meterRegistry.counter("nowchess.coordinator.health.checks").increment()
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout) val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
@@ -98,41 +123,32 @@ class HealthMonitor:
true true
} }
def watchK8sPods: Unit = private def startPodWatch(): Unit =
kubeClientOpt match kubeClientOpt match
case None => case None => log.debug("K8s client unavailable, skipping pod watch")
log.debug("Kubernetes client not available for pod watch")
case Some(kube) => case Some(kube) =>
try try
val pods = kube kube
.pods() .pods()
.inNamespace(config.k8sNamespace) .inNamespace(config.k8sNamespace)
.withLabel(config.k8sRolloutLabelSelector) .withLabel(config.k8sRolloutLabelSelector)
.list() .watch(new Watcher[Pod]:
.getItems override def eventReceived(action: Watcher.Action, pod: Pod): Unit =
.asScala action match
case Watcher.Action.DELETED =>
handlePodGone(pod)
case Watcher.Action.MODIFIED if Option(pod.getMetadata.getDeletionTimestamp).isDefined =>
handlePodTerminating(pod)
case _ => ()
val instances = instanceRegistry.getAllInstances override def onClose(cause: WatcherException): Unit =
instances.foreach { inst => Option(cause).foreach { ex =>
val matchingPod = pods.find { pod => log.warnf(ex, "Pod watch closed, restarting")
pod.getMetadata.getName.contains(inst.instanceId) startPodWatch()
} },
)
matchingPod match log.info("Pod watch started")
case Some(pod) => catch case ex: Exception => log.warnf(ex, "Failed to start pod watch")
val isReady = isPodReady(pod)
if !isReady && inst.state == "HEALTHY" then
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
instanceRegistry.markInstanceDead(inst.instanceId)
deleteK8sPod(inst.instanceId)
case None =>
log.warnf("No pod found for instance %s, evicting from registry", inst.instanceId)
instanceRegistry.removeInstance(inst.instanceId)
}
catch
case ex: Exception =>
log.warnf(ex, "Failed to watch k8s pods")
private def isPodReady(pod: Pod): Boolean = private def isPodReady(pod: Pod): Boolean =
Option(pod.getStatus) Option(pod.getStatus)
@@ -164,3 +180,50 @@ class HealthMonitor:
catch catch
case ex: Exception => case ex: Exception =>
log.warnf(ex, "Failed to delete pod for instance %s", instanceId) log.warnf(ex, "Failed to delete pod for instance %s", instanceId)
private def validateStartupInstances(timeoutMs: Long): Unit =
Thread.sleep(timeoutMs)
grpcServerOpt.foreach { grpcServer =>
instanceRegistry.getAllInstances.foreach { inst =>
if !grpcServer.hasActiveStream(inst.instanceId) then
log.warnf(
"Startup: instance %s did not reconnect within %dms — evicting",
inst.instanceId,
timeoutMs,
)
instanceRegistry.removeInstance(inst.instanceId)
deleteK8sPod(inst.instanceId)
}
}
private def handlePodTerminating(pod: Pod): Unit =
findRegisteredInstance(pod).foreach { inst =>
if inst.state == "HEALTHY" then
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
log.warnf(
"Pod %s terminating — marking instance %s dead",
pod.getMetadata.getName,
inst.instanceId,
)
instanceRegistry.markInstanceDead(inst.instanceId)
}
private def handlePodGone(pod: Pod): Unit =
findRegisteredInstance(pod).foreach { inst =>
log.warnf(
"Pod %s deleted — triggering failover for %s",
pod.getMetadata.getName,
inst.instanceId,
)
failoverService
.onInstanceStreamDropped(inst.instanceId)
.subscribe()
.`with`(
_ => (),
ex => log.warnf(ex, "Failover for %s failed", inst.instanceId),
)
}
private def findRegisteredInstance(pod: Pod): Option[InstanceMetadata] =
val podName = pod.getMetadata.getName
instanceRegistry.getAllInstances.find(inst => podName.contains(inst.instanceId))
@@ -4,6 +4,7 @@ import jakarta.annotation.PostConstruct
import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject import jakarta.inject.Inject
import io.quarkus.redis.datasource.ReactiveRedisDataSource import io.quarkus.redis.datasource.ReactiveRedisDataSource
import io.quarkus.redis.datasource.RedisDataSource
import scala.jdk.CollectionConverters.* import scala.jdk.CollectionConverters.*
import scala.compiletime.uninitialized import scala.compiletime.uninitialized
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
@@ -19,7 +20,10 @@ class InstanceRegistry:
// scalafix:off DisableSyntax.var // scalafix:off DisableSyntax.var
@Inject @Inject
private var redis: ReactiveRedisDataSource = uninitialized private var redis: ReactiveRedisDataSource = uninitialized
private var redisPrefix = "nowchess"
@Inject
private var syncRedis: RedisDataSource = uninitialized
private var redisPrefix = "nowchess"
@Inject @Inject
private var meterRegistry: MeterRegistry = uninitialized private var meterRegistry: MeterRegistry = uninitialized
@@ -42,6 +46,22 @@ class InstanceRegistry:
def setRedisPrefix(prefix: String): Unit = def setRedisPrefix(prefix: String): Unit =
redisPrefix = prefix redisPrefix = prefix
def loadAllFromRedis(): Unit =
val keys = syncRedis.key(classOf[String]).keys(s"$redisPrefix:instances:*")
keys.asScala.foreach { key =>
val instanceId = key.stripPrefix(s"$redisPrefix:instances:")
val json = syncRedis.value(classOf[String]).get(key)
Option(json).foreach { jsonStr =>
try
val metadata = mapper.readValue(jsonStr, classOf[InstanceMetadata])
instances.put(instanceId, metadata)
log.infof("Startup: loaded instance %s from Redis", instanceId)
catch
case ex: Exception =>
log.warnf(ex, "Startup: failed to parse instance %s", instanceId)
}
}
def getInstance(instanceId: String): Option[InstanceMetadata] = def getInstance(instanceId: String): Option[InstanceMetadata] =
Option(instances.get(instanceId)) Option(instances.get(instanceId))
+1 -1
View File
@@ -1,3 +1,3 @@
MAJOR=0 MAJOR=0
MINOR=18 MINOR=19
PATCH=0 PATCH=0