diff --git a/.codesight/CODESIGHT.md b/.codesight/CODESIGHT.md index 616268e..b5674ca 100644 --- a/.codesight/CODESIGHT.md +++ b/.codesight/CODESIGHT.md @@ -256,6 +256,7 @@ - `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala` - class AutoScaler - function initMetrics + - function periodicScaleCheck - function checkAndScale - function scaleUp - function scaleDown diff --git a/.codesight/libs.md b/.codesight/libs.md index e65d86d..69c353d 100644 --- a/.codesight/libs.md +++ b/.codesight/libs.md @@ -200,6 +200,7 @@ - `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala` - class AutoScaler - function initMetrics + - function periodicScaleCheck - function checkAndScale - function scaleUp - function scaleDown diff --git a/modules/coordinator/src/main/resources/application.yml b/modules/coordinator/src/main/resources/application.yml index a14ae84..0156897 100644 --- a/modules/coordinator/src/main/resources/application.yml +++ b/modules/coordinator/src/main/resources/application.yml @@ -37,7 +37,7 @@ nowchess: stream-heartbeat-interval: PT0.2S cache-eviction-interval: 10m game-idle-threshold: 45m - auto-scale-enabled: false + auto-scale-enabled: true scale-up-threshold: 0.8 scale-down-threshold: 0.3 scale-min-replicas: 2 diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala index 6e95265..e56727b 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala @@ -39,7 +39,7 @@ class CoreGrpcClient: def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int = try - val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)) + val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)).withDeadlineAfter(5, TimeUnit.SECONDS) val request = BatchResubscribeRequest.newBuilder().addAllGameIds(gameIds.asJava).build() val count = stub.batchResubscribeGames(request).getSubscribedCount log.debugf("batchResubscribeGames %s:%d — subscribed %d games", host, port, count) @@ -52,7 +52,7 @@ class CoreGrpcClient: def unsubscribeGames(host: String, port: Int, gameIds: List[String]): Int = try - val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)) + val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)).withDeadlineAfter(5, TimeUnit.SECONDS) val request = UnsubscribeGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build() val count = stub.unsubscribeGames(request).getUnsubscribedCount log.debugf("unsubscribeGames %s:%d — unsubscribed %d games", host, port, count) @@ -65,7 +65,7 @@ class CoreGrpcClient: def evictGames(host: String, port: Int, gameIds: List[String]): Int = try - val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)) + val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port)).withDeadlineAfter(5, TimeUnit.SECONDS) val request = EvictGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build() val count = stub.evictGames(request).getEvictedCount log.debugf("evictGames %s:%d — evicted %d games", host, port, count) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala index 0008888..9760039 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala @@ -7,6 +7,7 @@ import io.quarkus.redis.datasource.RedisDataSource import de.nowchess.coordinator.config.CoordinatorConfig import com.fasterxml.jackson.databind.ObjectMapper import io.micrometer.core.instrument.MeterRegistry +import io.quarkus.scheduler.Scheduled import scala.jdk.CollectionConverters.* import org.jboss.logging.Logger import scala.compiletime.uninitialized @@ -48,6 +49,11 @@ class CacheEvictionManager: meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record(0L, TimeUnit.MILLISECONDS) meterRegistry.counter("nowchess.coordinator.cache.evictions").increment(0) + @Scheduled(every = "5m") + def periodicCacheEviction(): Unit = + try evictStaleGames + catch case ex: Exception => log.warnf(ex, "Periodic cache eviction failed") + def evictStaleGames: Unit = meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record((() => runEviction()): Runnable) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala index 2474646..a857d59 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala @@ -76,11 +76,13 @@ class InstanceRegistry: .onItem() .transformToUni { value => try - if value == null then - log.debugf("Instance %s metadata missing from Redis (may have expired)", instanceId) - Uni.createFrom().item(()) - else - val metadata = mapper.readValue(value, classOf[InstanceMetadata]) + Option(value).fold( + { + log.debugf("Instance %s metadata missing from Redis (may have expired)", instanceId) + Uni.createFrom().item(()) + }, + ) { json => + val metadata = mapper.readValue(json, classOf[InstanceMetadata]) val isNew = !instances.containsKey(instanceId) instances.put(instanceId, metadata) if isNew then @@ -94,6 +96,7 @@ class InstanceRegistry: metadata.state, ) Uni.createFrom().item(()) + } catch case ex: Exception => log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala index 4c37df5..2b31176 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala @@ -4,6 +4,7 @@ import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.scheduler.Scheduled import org.jboss.logging.Logger import scala.compiletime.uninitialized import scala.concurrent.duration.* @@ -125,3 +126,8 @@ class LoadBalancer: catch case ex: Exception => log.warnf(ex, "Failed to update Redis game sets") + + @Scheduled(every = "30s") + def periodicRebalanceCheck(): Unit = + try if shouldRebalance then rebalance + catch case ex: Exception => log.warnf(ex, "Periodic rebalance check failed")