fix: update grpcServer variable to use Instance wrapper and add optional access method
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
This commit is contained in:
+16
-10
@@ -39,7 +39,7 @@ class HealthMonitor:
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var grpcServer: CoordinatorGrpcServer = uninitialized
|
||||
private var grpcServerInstance: Instance[CoordinatorGrpcServer] = uninitialized
|
||||
|
||||
@Inject
|
||||
private var failoverService: FailoverService = uninitialized
|
||||
@@ -52,6 +52,10 @@ class HealthMonitor:
|
||||
if kubeClientInstance.isUnsatisfied then None
|
||||
else Some(kubeClientInstance.get())
|
||||
|
||||
private def grpcServerOpt: Option[CoordinatorGrpcServer] =
|
||||
if grpcServerInstance.isUnsatisfied then None
|
||||
else Some(grpcServerInstance.get())
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
@@ -179,15 +183,17 @@ class HealthMonitor:
|
||||
|
||||
private def validateStartupInstances(timeoutMs: Long): Unit =
|
||||
Thread.sleep(timeoutMs)
|
||||
instanceRegistry.getAllInstances.foreach { inst =>
|
||||
if !grpcServer.hasActiveStream(inst.instanceId) then
|
||||
log.warnf(
|
||||
"Startup: instance %s did not reconnect within %dms — evicting",
|
||||
inst.instanceId,
|
||||
timeoutMs,
|
||||
)
|
||||
instanceRegistry.removeInstance(inst.instanceId)
|
||||
deleteK8sPod(inst.instanceId)
|
||||
grpcServerOpt.foreach { grpcServer =>
|
||||
instanceRegistry.getAllInstances.foreach { inst =>
|
||||
if !grpcServer.hasActiveStream(inst.instanceId) then
|
||||
log.warnf(
|
||||
"Startup: instance %s did not reconnect within %dms — evicting",
|
||||
inst.instanceId,
|
||||
timeoutMs,
|
||||
)
|
||||
instanceRegistry.removeInstance(inst.instanceId)
|
||||
deleteK8sPod(inst.instanceId)
|
||||
}
|
||||
}
|
||||
|
||||
private def handlePodTerminating(pod: Pod): Unit =
|
||||
|
||||
Reference in New Issue
Block a user