fix: coordinator auto-scaling, cache eviction, rebalancing, and grpc timeouts
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
Critical fixes: - Enable auto-scaling (was disabled in config) - Add periodic cache eviction (5m interval) — CacheEvictionManager never ran - Add periodic rebalance check (30s) — proactive load balancing - Add 5s timeout to all gRPC calls (batchResubscribe, unsubscribe, evict) - Use Option instead of null checks (scalafix compliance) These gaps left the coordinator unable to: 1. Scale up when instances overloaded (scaling was disabled) 2. Clean up idle games from memory (no scheduled eviction) 3. Rebalance load proactively (only on scale-up) 4. Handle hung instances (no RPC timeouts, operations could hang forever) Combined with prior fixes for instance metadata parsing and heartbeat TTL, the coordinator now handles overload scenarios correctly. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -256,6 +256,7 @@
|
|||||||
- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala`
|
- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala`
|
||||||
- class AutoScaler
|
- class AutoScaler
|
||||||
- function initMetrics
|
- function initMetrics
|
||||||
|
- function periodicScaleCheck
|
||||||
- function checkAndScale
|
- function checkAndScale
|
||||||
- function scaleUp
|
- function scaleUp
|
||||||
- function scaleDown
|
- function scaleDown
|
||||||
|
|||||||
@@ -200,6 +200,7 @@
|
|||||||
- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala`
|
- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala`
|
||||||
- class AutoScaler
|
- class AutoScaler
|
||||||
- function initMetrics
|
- function initMetrics
|
||||||
|
- function periodicScaleCheck
|
||||||
- function checkAndScale
|
- function checkAndScale
|
||||||
- function scaleUp
|
- function scaleUp
|
||||||
- function scaleDown
|
- function scaleDown
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ nowchess:
|
|||||||
stream-heartbeat-interval: PT0.2S
|
stream-heartbeat-interval: PT0.2S
|
||||||
cache-eviction-interval: 10m
|
cache-eviction-interval: 10m
|
||||||
game-idle-threshold: 45m
|
game-idle-threshold: 45m
|
||||||
auto-scale-enabled: false
|
auto-scale-enabled: true
|
||||||
scale-up-threshold: 0.8
|
scale-up-threshold: 0.8
|
||||||
scale-down-threshold: 0.3
|
scale-down-threshold: 0.3
|
||||||
scale-min-replicas: 2
|
scale-min-replicas: 2
|
||||||
|
|||||||
+3
-3
@@ -39,7 +39,7 @@ class CoreGrpcClient:
|
|||||||
|
|
||||||
def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
|
def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
|
||||||
try
|
try
|
||||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)).withDeadlineAfter(5, TimeUnit.SECONDS)
|
||||||
val request = BatchResubscribeRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
val request = BatchResubscribeRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||||
val count = stub.batchResubscribeGames(request).getSubscribedCount
|
val count = stub.batchResubscribeGames(request).getSubscribedCount
|
||||||
log.debugf("batchResubscribeGames %s:%d — subscribed %d games", host, port, count)
|
log.debugf("batchResubscribeGames %s:%d — subscribed %d games", host, port, count)
|
||||||
@@ -52,7 +52,7 @@ class CoreGrpcClient:
|
|||||||
|
|
||||||
def unsubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
|
def unsubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
|
||||||
try
|
try
|
||||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)).withDeadlineAfter(5, TimeUnit.SECONDS)
|
||||||
val request = UnsubscribeGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
val request = UnsubscribeGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||||
val count = stub.unsubscribeGames(request).getUnsubscribedCount
|
val count = stub.unsubscribeGames(request).getUnsubscribedCount
|
||||||
log.debugf("unsubscribeGames %s:%d — unsubscribed %d games", host, port, count)
|
log.debugf("unsubscribeGames %s:%d — unsubscribed %d games", host, port, count)
|
||||||
@@ -65,7 +65,7 @@ class CoreGrpcClient:
|
|||||||
|
|
||||||
def evictGames(host: String, port: Int, gameIds: List[String]): Int =
|
def evictGames(host: String, port: Int, gameIds: List[String]): Int =
|
||||||
try
|
try
|
||||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)).withDeadlineAfter(5, TimeUnit.SECONDS)
|
||||||
val request = EvictGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
val request = EvictGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||||
val count = stub.evictGames(request).getEvictedCount
|
val count = stub.evictGames(request).getEvictedCount
|
||||||
log.debugf("evictGames %s:%d — evicted %d games", host, port, count)
|
log.debugf("evictGames %s:%d — evicted %d games", host, port, count)
|
||||||
|
|||||||
+6
@@ -7,6 +7,7 @@ import io.quarkus.redis.datasource.RedisDataSource
|
|||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import io.micrometer.core.instrument.MeterRegistry
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
|
import io.quarkus.scheduler.Scheduled
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
@@ -48,6 +49,11 @@ class CacheEvictionManager:
|
|||||||
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record(0L, TimeUnit.MILLISECONDS)
|
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record(0L, TimeUnit.MILLISECONDS)
|
||||||
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment(0)
|
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment(0)
|
||||||
|
|
||||||
|
@Scheduled(every = "5m")
|
||||||
|
def periodicCacheEviction(): Unit =
|
||||||
|
try evictStaleGames
|
||||||
|
catch case ex: Exception => log.warnf(ex, "Periodic cache eviction failed")
|
||||||
|
|
||||||
def evictStaleGames: Unit =
|
def evictStaleGames: Unit =
|
||||||
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record((() => runEviction()): Runnable)
|
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record((() => runEviction()): Runnable)
|
||||||
|
|
||||||
|
|||||||
+8
-5
@@ -76,11 +76,13 @@ class InstanceRegistry:
|
|||||||
.onItem()
|
.onItem()
|
||||||
.transformToUni { value =>
|
.transformToUni { value =>
|
||||||
try
|
try
|
||||||
if value == null then
|
Option(value).fold(
|
||||||
log.debugf("Instance %s metadata missing from Redis (may have expired)", instanceId)
|
{
|
||||||
Uni.createFrom().item(())
|
log.debugf("Instance %s metadata missing from Redis (may have expired)", instanceId)
|
||||||
else
|
Uni.createFrom().item(())
|
||||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
},
|
||||||
|
) { json =>
|
||||||
|
val metadata = mapper.readValue(json, classOf[InstanceMetadata])
|
||||||
val isNew = !instances.containsKey(instanceId)
|
val isNew = !instances.containsKey(instanceId)
|
||||||
instances.put(instanceId, metadata)
|
instances.put(instanceId, metadata)
|
||||||
if isNew then
|
if isNew then
|
||||||
@@ -94,6 +96,7 @@ class InstanceRegistry:
|
|||||||
metadata.state,
|
metadata.state,
|
||||||
)
|
)
|
||||||
Uni.createFrom().item(())
|
Uni.createFrom().item(())
|
||||||
|
}
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId)
|
log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import jakarta.enterprise.context.ApplicationScoped
|
|||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.scheduler.Scheduled
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
import scala.concurrent.duration.*
|
import scala.concurrent.duration.*
|
||||||
@@ -125,3 +126,8 @@ class LoadBalancer:
|
|||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to update Redis game sets")
|
log.warnf(ex, "Failed to update Redis game sets")
|
||||||
|
|
||||||
|
@Scheduled(every = "30s")
|
||||||
|
def periodicRebalanceCheck(): Unit =
|
||||||
|
try if shouldRebalance then rebalance
|
||||||
|
catch case ex: Exception => log.warnf(ex, "Periodic rebalance check failed")
|
||||||
|
|||||||
Reference in New Issue
Block a user