Compare commits
5 Commits
core-0.37.0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 0060229ee9 | |||
| d5c8da20f8 | |||
| ad9495afa3 | |||
| 2b04d7fa71 | |||
| 81b045d01b |
@@ -334,3 +334,35 @@
|
|||||||
* **middleware:** update paths for bot generation and stockfish configuration ([2dd0501](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2dd0501687db08dcd242359f6837125baf8a2fdc))
|
* **middleware:** update paths for bot generation and stockfish configuration ([2dd0501](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2dd0501687db08dcd242359f6837125baf8a2fdc))
|
||||||
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
|
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
|
||||||
* update HealthMonitor to evict instances without associated pods ([0f41f13](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0f41f13ce68b76846684bab67241a122250dfaf9))
|
* update HealthMonitor to evict instances without associated pods ([0f41f13](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0f41f13ce68b76846684bab67241a122250dfaf9))
|
||||||
|
## (2026-05-13)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* add coordinator startup validation and K8s pod watch ([81b045d](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/81b045d01bb054a4bc9dc9e02fc30f814e756205))
|
||||||
|
* add initialization metrics for various services ([d438e97](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d438e97f32bdde0bfc63c1b4a8cc810cdd093166))
|
||||||
|
* add OpenTelemetry trace configuration with parentbased sampler ([3904d5a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3904d5ad8ad4930ddee65287a7bfab785a6148f5))
|
||||||
|
* **config:** update application.yml for PostgreSQL and remove staging/production configurations ([2404e61](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2404e6164c3b50ffccbea5238d636060d6abe4d6))
|
||||||
|
* **config:** update application.yml for staging and production environments ([6113432](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/6113432a14c476a3a0dfc0d449e17d023697f2ba))
|
||||||
|
* configure logging and add OpenTelemetry support ([#49](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/49)) ([d57c488](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d57c4886612d1d92da0e1b79209fc83e6ef537a1))
|
||||||
|
* **docker:** add .dockerignore and .gitignore files for build exclusions ([c987d8e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c987d8e258c0e6c4cfbdaa8381c64c410d7a2b83))
|
||||||
|
* **docker:** add Dockerfiles for building Quarkus application in native and JVM modes ([3f2d2bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3f2d2bb4c97fa8cddba66e1da4427c54236dfeed))
|
||||||
|
* **docker:** add Dockerfiles for Quarkus application in JVM and native modes ([34b9933](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/34b993304670cf2aa62cd2f6460cee7b9864b08e))
|
||||||
|
* **logging:** add DEBUG/INFO/WARN logging across services (NCS-72) ([#41](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/41)) ([804a4bf](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/804a4bf179e3dfb19e2be4390e7e543caf5237c6))
|
||||||
|
* NCS-78 Add Traceability to the Applications ([#46](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/46)) ([649566e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/649566eb3fcf38f91c8896a739f74ea318af312d))
|
||||||
|
* NCS-78 Add Traceability to the Applications ([#47](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/47)) ([87dfc6c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/87dfc6c2bcce7f7d58fc641bd8d468a2e584c108))
|
||||||
|
* true-microservices ([#40](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/40)) ([5909242](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/590924254e8a2754de661a57a03e43f89ceb6299))
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
* add instance-dead-timeout configuration and update HealthMonitor to use it for stale instance eviction ([be0b710](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/be0b710543b542da5c301efef7d2d587d0ba758a))
|
||||||
|
* clean up code formatting and improve error handling in gRPC server and failover service ([ad9495a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/ad9495afa3e93593b57154a187346c9b01393911))
|
||||||
|
* **coordinator:** refine type casting in rolloutSpec method ([#45](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/45)) ([d522f7f](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d522f7f6edf9c985f03dd16816439d4184f1a589))
|
||||||
|
* **coordinator:** use genericKubernetesResources API for Argo Rollout scaling ([#43](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/43)) ([fa3c6b2](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/fa3c6b2886dc59c14c5dad834acc9b41e42023bb))
|
||||||
|
* **coordinator:** use genericKubernetesResources API for Argo Rollout scaling ([#44](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/44)) ([82d0b75](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/82d0b754be1075084944b466858672d944f9f7d8))
|
||||||
|
* **dependencies:** replace Fabric8 Kubernetes client with Quarkus Kubernetes client ([5f44570](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5f44570b357277d09f33b7296860c421e2e70ce0))
|
||||||
|
* enhance AutoScaler and InstanceRegistry for replica management and stale instance eviction ([b4920d3](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b4920d3817e58bda94d7764e608b856ce9a909f7))
|
||||||
|
* **middleware:** update paths for bot generation and stockfish configuration ([2dd0501](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2dd0501687db08dcd242359f6837125baf8a2fdc))
|
||||||
|
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
|
||||||
|
* replace null checks with Option in coordinator ([2b04d7f](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2b04d7fa713e06662bff5afe3fb3f9d04541ce51))
|
||||||
|
* update grpcServer variable to use Instance wrapper and add optional access method ([d5c8da2](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d5c8da20f8805199e920ea5afbd9cdb39a078e40))
|
||||||
|
* update HealthMonitor to evict instances without associated pods ([0f41f13](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0f41f13ce68b76846684bab67241a122250dfaf9))
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ nowchess:
|
|||||||
k8s-namespace: default
|
k8s-namespace: default
|
||||||
k8s-rollout-name: nowchess-core
|
k8s-rollout-name: nowchess-core
|
||||||
k8s-rollout-label-selector: "app=nowchess-core"
|
k8s-rollout-label-selector: "app=nowchess-core"
|
||||||
|
startup-validation-timeout: 15s
|
||||||
|
failover-wait-timeout: 30s
|
||||||
|
|
||||||
---
|
---
|
||||||
# dev profile
|
# dev profile
|
||||||
|
|||||||
+6
@@ -56,3 +56,9 @@ trait CoordinatorConfig:
|
|||||||
|
|
||||||
@WithName("k8s-rollout-label-selector")
|
@WithName("k8s-rollout-label-selector")
|
||||||
def k8sRolloutLabelSelector: String
|
def k8sRolloutLabelSelector: String
|
||||||
|
|
||||||
|
@WithName("startup-validation-timeout")
|
||||||
|
def startupValidationTimeout: Duration
|
||||||
|
|
||||||
|
@WithName("failover-wait-timeout")
|
||||||
|
def failoverWaitTimeout: Duration
|
||||||
|
|||||||
+31
-7
@@ -9,6 +9,7 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
|
|||||||
import io.grpc.stub.StreamObserver
|
import io.grpc.stub.StreamObserver
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
@GrpcService
|
@GrpcService
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -21,8 +22,9 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
|||||||
private var failoverService: FailoverService = uninitialized
|
private var failoverService: FailoverService = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val mapper = ObjectMapper()
|
private val mapper = ObjectMapper()
|
||||||
private val log = Logger.getLogger(classOf[CoordinatorGrpcServer])
|
private val log = Logger.getLogger(classOf[CoordinatorGrpcServer])
|
||||||
|
private val activeStreams = ConcurrentHashMap.newKeySet[String]()
|
||||||
|
|
||||||
override def heartbeatStream(
|
override def heartbeatStream(
|
||||||
responseObserver: StreamObserver[CoordinatorCommand],
|
responseObserver: StreamObserver[CoordinatorCommand],
|
||||||
@@ -38,6 +40,7 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
|||||||
lastInstanceId = frame.getInstanceId
|
lastInstanceId = frame.getInstanceId
|
||||||
if !firstFrameSeen then
|
if !firstFrameSeen then
|
||||||
firstFrameSeen = true
|
firstFrameSeen = true
|
||||||
|
activeStreams.add(frame.getInstanceId)
|
||||||
log.infof(
|
log.infof(
|
||||||
"First heartbeat from instance %s (host=%s http=%d grpc=%d)",
|
"First heartbeat from instance %s (host=%s http=%d grpc=%d)",
|
||||||
frame.getInstanceId,
|
frame.getInstanceId,
|
||||||
@@ -60,10 +63,19 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
|||||||
|
|
||||||
override def onError(t: Throwable): Unit =
|
override def onError(t: Throwable): Unit =
|
||||||
log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId)
|
log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId)
|
||||||
if lastInstanceId.nonEmpty then failoverService.onInstanceStreamDropped(lastInstanceId)
|
if lastInstanceId.nonEmpty then
|
||||||
|
activeStreams.remove(lastInstanceId)
|
||||||
|
failoverService
|
||||||
|
.onInstanceStreamDropped(lastInstanceId)
|
||||||
|
.subscribe()
|
||||||
|
.`with`(
|
||||||
|
_ => (),
|
||||||
|
ex => log.warnf(ex, "Failover for %s failed", lastInstanceId),
|
||||||
|
)
|
||||||
|
|
||||||
override def onCompleted: Unit =
|
override def onCompleted: Unit =
|
||||||
log.infof("Heartbeat stream completed for instance %s", lastInstanceId)
|
log.infof("Heartbeat stream completed for instance %s", lastInstanceId)
|
||||||
|
activeStreams.remove(lastInstanceId)
|
||||||
|
|
||||||
override def batchResubscribeGames(
|
override def batchResubscribeGames(
|
||||||
request: BatchResubscribeRequest,
|
request: BatchResubscribeRequest,
|
||||||
@@ -108,7 +120,19 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
|||||||
val instanceId = request.getInstanceId
|
val instanceId = request.getInstanceId
|
||||||
log.infof("Drain request for instance %s", instanceId)
|
log.infof("Drain request for instance %s", instanceId)
|
||||||
val gamesBefore = instanceRegistry.getInstance(instanceId).map(_.subscriptionCount).getOrElse(0)
|
val gamesBefore = instanceRegistry.getInstance(instanceId).map(_.subscriptionCount).getOrElse(0)
|
||||||
failoverService.onInstanceStreamDropped(instanceId)
|
failoverService
|
||||||
val response = DrainInstanceResponse.newBuilder().setGamesMigrated(gamesBefore).build()
|
.onInstanceStreamDropped(instanceId)
|
||||||
responseObserver.onNext(response)
|
.subscribe()
|
||||||
responseObserver.onCompleted()
|
.`with`(
|
||||||
|
_ =>
|
||||||
|
val response = DrainInstanceResponse.newBuilder().setGamesMigrated(gamesBefore).build()
|
||||||
|
responseObserver.onNext(response)
|
||||||
|
responseObserver.onCompleted()
|
||||||
|
,
|
||||||
|
ex =>
|
||||||
|
log.warnf(ex, "Drain failed for %s", instanceId)
|
||||||
|
responseObserver.onError(ex),
|
||||||
|
)
|
||||||
|
|
||||||
|
def hasActiveStream(instanceId: String): Boolean =
|
||||||
|
activeStreams.contains(instanceId)
|
||||||
|
|||||||
+7
-1
@@ -70,7 +70,13 @@ class CoordinatorResource:
|
|||||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||||
def triggerFailover(@PathParam("instanceId") instanceId: String): scala.collection.Map[String, String] =
|
def triggerFailover(@PathParam("instanceId") instanceId: String): scala.collection.Map[String, String] =
|
||||||
log.infof("Manual failover triggered for instance %s", instanceId)
|
log.infof("Manual failover triggered for instance %s", instanceId)
|
||||||
failoverService.onInstanceStreamDropped(instanceId)
|
failoverService
|
||||||
|
.onInstanceStreamDropped(instanceId)
|
||||||
|
.subscribe()
|
||||||
|
.`with`(
|
||||||
|
_ => (),
|
||||||
|
ex => log.warnf(ex, "Manual failover for %s failed", instanceId),
|
||||||
|
)
|
||||||
Map("status" -> "failover_started", "instanceId" -> instanceId)
|
Map("status" -> "failover_started", "instanceId" -> instanceId)
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
|
|||||||
+48
-13
@@ -8,6 +8,9 @@ import scala.compiletime.uninitialized
|
|||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||||
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
||||||
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
|
import io.smallrye.mutiny.Uni
|
||||||
|
import java.time.Duration
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class FailoverService:
|
class FailoverService:
|
||||||
@@ -21,6 +24,9 @@ class FailoverService:
|
|||||||
@Inject
|
@Inject
|
||||||
private var coreGrpcClient: CoreGrpcClient = uninitialized
|
private var coreGrpcClient: CoreGrpcClient = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var config: CoordinatorConfig = uninitialized
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[FailoverService])
|
private val log = Logger.getLogger(classOf[FailoverService])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
@@ -28,7 +34,7 @@ class FailoverService:
|
|||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
def onInstanceStreamDropped(instanceId: String): Unit =
|
def onInstanceStreamDropped(instanceId: String): Uni[Unit] =
|
||||||
log.infof("Instance %s stream dropped, triggering failover", instanceId)
|
log.infof("Instance %s stream dropped, triggering failover", instanceId)
|
||||||
|
|
||||||
val startTime = System.currentTimeMillis()
|
val startTime = System.currentTimeMillis()
|
||||||
@@ -37,19 +43,32 @@ class FailoverService:
|
|||||||
val gameIds = getOrphanedGames(instanceId)
|
val gameIds = getOrphanedGames(instanceId)
|
||||||
log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId)
|
log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId)
|
||||||
|
|
||||||
if gameIds.nonEmpty then
|
if gameIds.isEmpty then
|
||||||
val healthyInstances = instanceRegistry.getAllInstances
|
cleanupDeadInstance(instanceId)
|
||||||
.filter(_.state == "HEALTHY")
|
Uni.createFrom().item(())
|
||||||
.sortBy(_.subscriptionCount)
|
else
|
||||||
|
waitForHealthyInstanceAsync()
|
||||||
|
.onItem()
|
||||||
|
.transform { _ =>
|
||||||
|
val healthyInstances = instanceRegistry.getAllInstances
|
||||||
|
.filter(_.state == "HEALTHY")
|
||||||
|
.sortBy(_.subscriptionCount)
|
||||||
|
distributeGames(gameIds, healthyInstances, instanceId)
|
||||||
|
|
||||||
if healthyInstances.nonEmpty then
|
val elapsed = System.currentTimeMillis() - startTime
|
||||||
distributeGames(gameIds, healthyInstances, instanceId)
|
log.infof("Failover completed in %dms for instance %s", elapsed, instanceId)
|
||||||
|
cleanupDeadInstance(instanceId)
|
||||||
val elapsed = System.currentTimeMillis() - startTime
|
()
|
||||||
log.infof("Failover completed in %dms for instance %s", elapsed, instanceId)
|
}
|
||||||
else log.warnf("No healthy instances available for failover of %s", instanceId)
|
.onFailure()
|
||||||
|
.recoverWithItem { _ =>
|
||||||
cleanupDeadInstance(instanceId)
|
log.errorf(
|
||||||
|
"No healthy instance appeared within %s — games orphaned for %s",
|
||||||
|
config.failoverWaitTimeout,
|
||||||
|
instanceId,
|
||||||
|
)
|
||||||
|
()
|
||||||
|
}
|
||||||
|
|
||||||
private def getOrphanedGames(instanceId: String): List[String] =
|
private def getOrphanedGames(instanceId: String): List[String] =
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
@@ -101,3 +120,19 @@ class FailoverService:
|
|||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
redis.key(classOf[String]).del(setKey)
|
redis.key(classOf[String]).del(setKey)
|
||||||
log.infof("Cleaned up games set for instance %s", instanceId)
|
log.infof("Cleaned up games set for instance %s", instanceId)
|
||||||
|
|
||||||
|
private def waitForHealthyInstanceAsync(): Uni[InstanceMetadata] =
|
||||||
|
Uni
|
||||||
|
.createFrom()
|
||||||
|
.deferred(() =>
|
||||||
|
instanceRegistry.getAllInstances
|
||||||
|
.filter(_.state == "HEALTHY")
|
||||||
|
.sortBy(_.subscriptionCount)
|
||||||
|
.headOption match
|
||||||
|
case Some(inst) => Uni.createFrom().item(inst)
|
||||||
|
case None => Uni.createFrom().failure(new RuntimeException("no healthy instance")),
|
||||||
|
)
|
||||||
|
.onFailure()
|
||||||
|
.retry()
|
||||||
|
.withBackOff(Duration.ofMillis(500))
|
||||||
|
.expireIn(config.failoverWaitTimeout.toMillis)
|
||||||
|
|||||||
+91
-28
@@ -2,17 +2,23 @@ package de.nowchess.coordinator.service
|
|||||||
|
|
||||||
import jakarta.annotation.PostConstruct
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
|
import jakarta.enterprise.event.Observes
|
||||||
import jakarta.enterprise.inject.Instance
|
import jakarta.enterprise.inject.Instance
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import io.fabric8.kubernetes.client.KubernetesClient
|
import io.fabric8.kubernetes.client.KubernetesClient
|
||||||
import io.fabric8.kubernetes.api.model.Pod
|
import io.fabric8.kubernetes.api.model.Pod
|
||||||
|
import io.fabric8.kubernetes.client.Watcher
|
||||||
|
import io.fabric8.kubernetes.client.WatcherException
|
||||||
import io.micrometer.core.instrument.MeterRegistry
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.runtime.StartupEvent
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
import de.nowchess.coordinator.grpc.CoordinatorGrpcServer
|
||||||
|
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class HealthMonitor:
|
class HealthMonitor:
|
||||||
@@ -32,6 +38,12 @@ class HealthMonitor:
|
|||||||
@Inject
|
@Inject
|
||||||
private var meterRegistry: MeterRegistry = uninitialized
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var grpcServerInstance: Instance[CoordinatorGrpcServer] = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var failoverService: FailoverService = uninitialized
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[HealthMonitor])
|
private val log = Logger.getLogger(classOf[HealthMonitor])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
@@ -40,6 +52,10 @@ class HealthMonitor:
|
|||||||
if kubeClientInstance.isUnsatisfied then None
|
if kubeClientInstance.isUnsatisfied then None
|
||||||
else Some(kubeClientInstance.get())
|
else Some(kubeClientInstance.get())
|
||||||
|
|
||||||
|
private def grpcServerOpt: Option[CoordinatorGrpcServer] =
|
||||||
|
if grpcServerInstance.isUnsatisfied then None
|
||||||
|
else Some(grpcServerInstance.get())
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
@@ -48,6 +64,15 @@ class HealthMonitor:
|
|||||||
meterRegistry.counter("nowchess.coordinator.health.checks").increment(0)
|
meterRegistry.counter("nowchess.coordinator.health.checks").increment(0)
|
||||||
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment(0)
|
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment(0)
|
||||||
|
|
||||||
|
def onStartup(@Observes ev: StartupEvent): Unit =
|
||||||
|
instanceRegistry.loadAllFromRedis()
|
||||||
|
val loaded = instanceRegistry.getAllInstances
|
||||||
|
log.infof("Startup: loaded %d instances from Redis", loaded.size)
|
||||||
|
if loaded.nonEmpty then
|
||||||
|
val timeoutMs = config.startupValidationTimeout.toMillis
|
||||||
|
Thread.ofVirtual().start(() => validateStartupInstances(timeoutMs))
|
||||||
|
startPodWatch()
|
||||||
|
|
||||||
def checkInstanceHealth: Unit =
|
def checkInstanceHealth: Unit =
|
||||||
meterRegistry.counter("nowchess.coordinator.health.checks").increment()
|
meterRegistry.counter("nowchess.coordinator.health.checks").increment()
|
||||||
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
||||||
@@ -98,41 +123,32 @@ class HealthMonitor:
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
def watchK8sPods: Unit =
|
private def startPodWatch(): Unit =
|
||||||
kubeClientOpt match
|
kubeClientOpt match
|
||||||
case None =>
|
case None => log.debug("K8s client unavailable, skipping pod watch")
|
||||||
log.debug("Kubernetes client not available for pod watch")
|
|
||||||
case Some(kube) =>
|
case Some(kube) =>
|
||||||
try
|
try
|
||||||
val pods = kube
|
kube
|
||||||
.pods()
|
.pods()
|
||||||
.inNamespace(config.k8sNamespace)
|
.inNamespace(config.k8sNamespace)
|
||||||
.withLabel(config.k8sRolloutLabelSelector)
|
.withLabel(config.k8sRolloutLabelSelector)
|
||||||
.list()
|
.watch(new Watcher[Pod]:
|
||||||
.getItems
|
override def eventReceived(action: Watcher.Action, pod: Pod): Unit =
|
||||||
.asScala
|
action match
|
||||||
|
case Watcher.Action.DELETED =>
|
||||||
|
handlePodGone(pod)
|
||||||
|
case Watcher.Action.MODIFIED if Option(pod.getMetadata.getDeletionTimestamp).isDefined =>
|
||||||
|
handlePodTerminating(pod)
|
||||||
|
case _ => ()
|
||||||
|
|
||||||
val instances = instanceRegistry.getAllInstances
|
override def onClose(cause: WatcherException): Unit =
|
||||||
instances.foreach { inst =>
|
Option(cause).foreach { ex =>
|
||||||
val matchingPod = pods.find { pod =>
|
log.warnf(ex, "Pod watch closed, restarting")
|
||||||
pod.getMetadata.getName.contains(inst.instanceId)
|
startPodWatch()
|
||||||
}
|
},
|
||||||
|
)
|
||||||
matchingPod match
|
log.info("Pod watch started")
|
||||||
case Some(pod) =>
|
catch case ex: Exception => log.warnf(ex, "Failed to start pod watch")
|
||||||
val isReady = isPodReady(pod)
|
|
||||||
if !isReady && inst.state == "HEALTHY" then
|
|
||||||
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
|
|
||||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
|
||||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
|
||||||
deleteK8sPod(inst.instanceId)
|
|
||||||
case None =>
|
|
||||||
log.warnf("No pod found for instance %s, evicting from registry", inst.instanceId)
|
|
||||||
instanceRegistry.removeInstance(inst.instanceId)
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
case ex: Exception =>
|
|
||||||
log.warnf(ex, "Failed to watch k8s pods")
|
|
||||||
|
|
||||||
private def isPodReady(pod: Pod): Boolean =
|
private def isPodReady(pod: Pod): Boolean =
|
||||||
Option(pod.getStatus)
|
Option(pod.getStatus)
|
||||||
@@ -164,3 +180,50 @@ class HealthMonitor:
|
|||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to delete pod for instance %s", instanceId)
|
log.warnf(ex, "Failed to delete pod for instance %s", instanceId)
|
||||||
|
|
||||||
|
private def validateStartupInstances(timeoutMs: Long): Unit =
|
||||||
|
Thread.sleep(timeoutMs)
|
||||||
|
grpcServerOpt.foreach { grpcServer =>
|
||||||
|
instanceRegistry.getAllInstances.foreach { inst =>
|
||||||
|
if !grpcServer.hasActiveStream(inst.instanceId) then
|
||||||
|
log.warnf(
|
||||||
|
"Startup: instance %s did not reconnect within %dms — evicting",
|
||||||
|
inst.instanceId,
|
||||||
|
timeoutMs,
|
||||||
|
)
|
||||||
|
instanceRegistry.removeInstance(inst.instanceId)
|
||||||
|
deleteK8sPod(inst.instanceId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def handlePodTerminating(pod: Pod): Unit =
|
||||||
|
findRegisteredInstance(pod).foreach { inst =>
|
||||||
|
if inst.state == "HEALTHY" then
|
||||||
|
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
|
||||||
|
log.warnf(
|
||||||
|
"Pod %s terminating — marking instance %s dead",
|
||||||
|
pod.getMetadata.getName,
|
||||||
|
inst.instanceId,
|
||||||
|
)
|
||||||
|
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def handlePodGone(pod: Pod): Unit =
|
||||||
|
findRegisteredInstance(pod).foreach { inst =>
|
||||||
|
log.warnf(
|
||||||
|
"Pod %s deleted — triggering failover for %s",
|
||||||
|
pod.getMetadata.getName,
|
||||||
|
inst.instanceId,
|
||||||
|
)
|
||||||
|
failoverService
|
||||||
|
.onInstanceStreamDropped(inst.instanceId)
|
||||||
|
.subscribe()
|
||||||
|
.`with`(
|
||||||
|
_ => (),
|
||||||
|
ex => log.warnf(ex, "Failover for %s failed", inst.instanceId),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def findRegisteredInstance(pod: Pod): Option[InstanceMetadata] =
|
||||||
|
val podName = pod.getMetadata.getName
|
||||||
|
instanceRegistry.getAllInstances.find(inst => podName.contains(inst.instanceId))
|
||||||
|
|||||||
+21
-1
@@ -4,6 +4,7 @@ import jakarta.annotation.PostConstruct
|
|||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
@@ -19,7 +20,10 @@ class InstanceRegistry:
|
|||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var redis: ReactiveRedisDataSource = uninitialized
|
private var redis: ReactiveRedisDataSource = uninitialized
|
||||||
private var redisPrefix = "nowchess"
|
|
||||||
|
@Inject
|
||||||
|
private var syncRedis: RedisDataSource = uninitialized
|
||||||
|
private var redisPrefix = "nowchess"
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private var meterRegistry: MeterRegistry = uninitialized
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
@@ -42,6 +46,22 @@ class InstanceRegistry:
|
|||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
|
def loadAllFromRedis(): Unit =
|
||||||
|
val keys = syncRedis.key(classOf[String]).keys(s"$redisPrefix:instances:*")
|
||||||
|
keys.asScala.foreach { key =>
|
||||||
|
val instanceId = key.stripPrefix(s"$redisPrefix:instances:")
|
||||||
|
val json = syncRedis.value(classOf[String]).get(key)
|
||||||
|
Option(json).foreach { jsonStr =>
|
||||||
|
try
|
||||||
|
val metadata = mapper.readValue(jsonStr, classOf[InstanceMetadata])
|
||||||
|
instances.put(instanceId, metadata)
|
||||||
|
log.infof("Startup: loaded instance %s from Redis", instanceId)
|
||||||
|
catch
|
||||||
|
case ex: Exception =>
|
||||||
|
log.warnf(ex, "Startup: failed to parse instance %s", instanceId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def getInstance(instanceId: String): Option[InstanceMetadata] =
|
def getInstance(instanceId: String): Option[InstanceMetadata] =
|
||||||
Option(instances.get(instanceId))
|
Option(instances.get(instanceId))
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
MAJOR=0
|
MAJOR=0
|
||||||
MINOR=18
|
MINOR=19
|
||||||
PATCH=0
|
PATCH=0
|
||||||
|
|||||||
Reference in New Issue
Block a user