feat(coordinator): add Redis integration and improve configuration for game state management
Build & Test (NowChessSystems) TeamCity build was removed from queue

This commit is contained in:
2026-04-26 18:25:03 +02:00
parent f327441089
commit 106b4d3b7e
56 changed files with 1072 additions and 1139 deletions
+15
View File
@@ -29,6 +29,21 @@ tasks.withType<ScalaCompile> {
scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8")
}
tasks.named("compileScoverageJava").configure {
dependsOn(tasks.named("quarkusGenerateCode"))
}
tasks.withType(ScalaCompile::class).configureEach {
if (name == "compileScoverageScala") {
source = source.asFileTree.matching {
exclude("**/grpc/*.scala")
exclude("**/service/*.scala")
exclude("**/resource/*.scala")
exclude("**/config/*.scala")
}
}
}
val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project
@@ -1,6 +1,7 @@
syntax = "proto3";
package de.nowchess.coordinator;
option java_package = "de.nowchess.coordinator.proto";
option java_multiple_files = true;
option java_outer_classname = "CoordinatorServiceProto";
service CoordinatorService {
rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand);
@@ -0,0 +1,34 @@
package de.nowchess.coordinator.config
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Produces
import io.fabric8.kubernetes.client.KubernetesClientBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import org.redisson.Redisson
import org.redisson.api.RedissonClient
import org.redisson.config.Config
import org.eclipse.microprofile.config.inject.ConfigProperty
import jakarta.inject.Inject
import scala.compiletime.uninitialized
@ApplicationScoped
class BeansProducer:
@Inject
@ConfigProperty(name = "nowchess.redis.host", defaultValue = "localhost")
private var redisHost: String = uninitialized
@Inject
@ConfigProperty(name = "nowchess.redis.port", defaultValue = "6379")
private var redisPort: Int = uninitialized
@Produces
@ApplicationScoped
def redissonClient: RedissonClient =
val config = Config()
config.useSingleServer().setAddress(s"redis://$redisHost:$redisPort")
Redisson.create(config)
@Produces
@ApplicationScoped
def kubernetesClient: KubernetesClient =
KubernetesClientBuilder().build()
@@ -4,20 +4,20 @@ import com.fasterxml.jackson.annotation.JsonProperty
import java.time.Instant
case class InstanceMetadata(
@JsonProperty("instanceId")
instanceId: String,
@JsonProperty("hostname")
hostname: String,
@JsonProperty("httpPort")
httpPort: Int,
@JsonProperty("grpcPort")
grpcPort: Int,
@JsonProperty("subscriptionCount")
subscriptionCount: Int,
@JsonProperty("localCacheSize")
localCacheSize: Int,
@JsonProperty("lastHeartbeat")
lastHeartbeat: String,
@JsonProperty("state")
state: String = "HEALTHY"
@JsonProperty("instanceId")
instanceId: String,
@JsonProperty("hostname")
hostname: String,
@JsonProperty("httpPort")
httpPort: Int,
@JsonProperty("grpcPort")
grpcPort: Int,
@JsonProperty("subscriptionCount")
subscriptionCount: Int,
@JsonProperty("localCacheSize")
localCacheSize: Int,
@JsonProperty("lastHeartbeat")
lastHeartbeat: String,
@JsonProperty("state")
state: String = "HEALTHY",
)
@@ -1,27 +1,17 @@
package de.nowchess.coordinator.grpc
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.inject.Singleton
import io.quarkus.grpc.GrpcService
import scala.compiletime.uninitialized
import de.nowchess.coordinator.service.{FailoverService, InstanceRegistry}
import de.nowchess.coordinator.CoordinatorServiceGrpc
import de.nowchess.coordinator.{
HeartbeatFrame,
CoordinatorCommand,
BatchResubscribeRequest,
BatchResubscribeResponse,
UnsubscribeGamesRequest,
UnsubscribeGamesResponse,
EvictGamesRequest,
EvictGamesResponse,
DrainInstanceRequest,
DrainInstanceResponse
}
import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
import io.grpc.stub.StreamObserver
import com.fasterxml.jackson.databind.ObjectMapper
import org.jboss.logging.Logger
@ApplicationScoped
@GrpcService
@Singleton
class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImplBase:
// scalafix:off DisableSyntax.var
@Inject
@@ -32,78 +22,77 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
// scalafix:on DisableSyntax.var
private val mapper = ObjectMapper()
private val log = Logger.getLogger(classOf[CoordinatorGrpcServer])
private val log = Logger.getLogger(classOf[CoordinatorGrpcServer])
override def heartbeatStream(
responseObserver: StreamObserver[CoordinatorCommand]
responseObserver: StreamObserver[CoordinatorCommand],
): StreamObserver[HeartbeatFrame] =
new StreamObserver[HeartbeatFrame]:
private var lastInstanceId = ""
override def onNext(frame: HeartbeatFrame): Unit =
lastInstanceId = frame.getInstanceId
// Update instance registry with heartbeat data
val metadata = de.nowchess.coordinator.dto.InstanceMetadata(
instanceId = frame.getInstanceId,
hostname = frame.getHostname,
httpPort = frame.getHttpPort,
grpcPort = frame.getGrpcPort,
subscriptionCount = frame.getSubscriptionCount,
localCacheSize = frame.getLocalCacheSize,
lastHeartbeat = java.time.Instant.ofEpochMilli(frame.getTimestampMillis).toString,
state = "HEALTHY"
)
// Store in registry (placeholder for actual storage)
log.debugf("Received heartbeat from %s with %d subscriptions",
frame.getInstanceId, frame.getSubscriptionCount)
try
instanceRegistry.updateInstanceFromRedis(frame.getInstanceId)
log.debugf(
"Received heartbeat from %s with %d subscriptions",
frame.getInstanceId,
frame.getSubscriptionCount,
)
catch
case ex: Exception =>
log.warnf(ex, "Failed to process heartbeat from %s", frame.getInstanceId)
override def onError(t: Throwable): Unit =
log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId)
if lastInstanceId.nonEmpty then
failoverService.onInstanceStreamDropped(lastInstanceId)
if lastInstanceId.nonEmpty then failoverService.onInstanceStreamDropped(lastInstanceId)
override def onCompleted: Unit =
log.infof("Heartbeat stream completed for instance %s", lastInstanceId)
override def batchResubscribeGames(
request: BatchResubscribeRequest,
responseObserver: StreamObserver[BatchResubscribeResponse]
request: BatchResubscribeRequest,
responseObserver: StreamObserver[BatchResubscribeResponse],
): Unit =
log.infof("Batch resubscribe request for %d games", request.getGameIdsList.size())
val response = BatchResubscribeResponse.newBuilder()
val response = BatchResubscribeResponse
.newBuilder()
.setSubscribedCount(request.getGameIdsList.size())
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
override def unsubscribeGames(
request: UnsubscribeGamesRequest,
responseObserver: StreamObserver[UnsubscribeGamesResponse]
request: UnsubscribeGamesRequest,
responseObserver: StreamObserver[UnsubscribeGamesResponse],
): Unit =
log.infof("Unsubscribe request for %d games", request.getGameIdsList.size())
val response = UnsubscribeGamesResponse.newBuilder()
val response = UnsubscribeGamesResponse
.newBuilder()
.setUnsubscribedCount(request.getGameIdsList.size())
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
override def evictGames(
request: EvictGamesRequest,
responseObserver: StreamObserver[EvictGamesResponse]
request: EvictGamesRequest,
responseObserver: StreamObserver[EvictGamesResponse],
): Unit =
log.infof("Evict request for %d games", request.getGameIdsList.size())
val response = EvictGamesResponse.newBuilder()
val response = EvictGamesResponse
.newBuilder()
.setEvictedCount(request.getGameIdsList.size())
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
override def drainInstance(
request: DrainInstanceRequest,
responseObserver: StreamObserver[DrainInstanceResponse]
request: DrainInstanceRequest,
responseObserver: StreamObserver[DrainInstanceResponse],
): Unit =
log.info("Drain instance request")
val response = DrainInstanceResponse.newBuilder()
val response = DrainInstanceResponse
.newBuilder()
.setGamesMigrated(0)
.build()
responseObserver.onNext(response)
@@ -0,0 +1,82 @@
package de.nowchess.coordinator.grpc
import jakarta.enterprise.context.ApplicationScoped
import org.jboss.logging.Logger
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
import scala.jdk.CollectionConverters.*
@ApplicationScoped
class CoreGrpcClient:
private val log = Logger.getLogger(classOf[CoreGrpcClient])
def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
val channel = createChannel(host, port)
try
val stub = CoordinatorServiceGrpc.newStub(channel)
val request = BatchResubscribeRequest
.newBuilder()
.addAllGameIds(gameIds.asJava)
.build()
val latch = new java.util.concurrent.CountDownLatch(1)
var result = 0
stub.batchResubscribeGames(
request,
new io.grpc.stub.StreamObserver[BatchResubscribeResponse]:
override def onNext(response: BatchResubscribeResponse): Unit =
result = response.getSubscribedCount
override def onError(t: Throwable): Unit =
log.warnf(t, "batchResubscribeGames RPC failed for %s:%d", host, port)
latch.countDown()
override def onCompleted(): Unit =
latch.countDown(),
)
latch.await()
result
finally channel.shutdown()
def unsubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
val channel = createChannel(host, port)
try
val stub = CoordinatorServiceGrpc.newBlockingStub(channel)
val request = UnsubscribeGamesRequest
.newBuilder()
.addAllGameIds(gameIds.asJava)
.build()
val response = stub.unsubscribeGames(request)
response.getUnsubscribedCount
catch
case ex: Exception =>
log.warnf(ex, "unsubscribeGames RPC failed for %s:%d", host, port)
0
finally channel.shutdown()
def evictGames(host: String, port: Int, gameIds: List[String]): Int =
val channel = createChannel(host, port)
try
val stub = CoordinatorServiceGrpc.newBlockingStub(channel)
val request = EvictGamesRequest
.newBuilder()
.addAllGameIds(gameIds.asJava)
.build()
val response = stub.evictGames(request)
response.getEvictedCount
catch
case ex: Exception =>
log.warnf(ex, "evictGames RPC failed for %s:%d", host, port)
0
finally channel.shutdown()
private def createChannel(host: String, port: Int): ManagedChannel =
ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build()
@@ -6,7 +6,7 @@ import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import scala.compiletime.uninitialized
import scala.jdk.CollectionConverters.*
import de.nowchess.coordinator.service.{InstanceRegistry, LoadBalancer, AutoScaler, FailoverService}
import de.nowchess.coordinator.service.{AutoScaler, FailoverService, InstanceRegistry, LoadBalancer}
import de.nowchess.coordinator.dto.InstanceMetadata
import org.jboss.logging.Logger
@@ -39,12 +39,12 @@ class CoordinatorResource:
@Path("/metrics")
@Produces(Array(MediaType.APPLICATION_JSON))
def getMetrics: MetricsDto =
val instances = instanceRegistry.getAllInstances
val loads = instances.map(_.subscriptionCount)
val instances = instanceRegistry.getAllInstances
val loads = instances.map(_.subscriptionCount)
val totalGames = loads.sum
val avgLoad = if instances.nonEmpty then loads.sum.toDouble / instances.size else 0.0
val maxLoad = if loads.nonEmpty then loads.max else 0
val minLoad = if loads.nonEmpty then loads.min else 0
val avgLoad = if instances.nonEmpty then loads.sum.toDouble / instances.size else 0.0
val maxLoad = if loads.nonEmpty then loads.max else 0
val minLoad = if loads.nonEmpty then loads.min else 0
MetricsDto(
totalInstances = instances.size,
@@ -54,7 +54,7 @@ class CoordinatorResource:
avgGamesPerCore = avgLoad,
maxGamesPerCore = maxLoad,
minGamesPerCore = minLoad,
instances = instances
instances = instances,
)
@POST
@@ -62,7 +62,7 @@ class CoordinatorResource:
@Produces(Array(MediaType.APPLICATION_JSON))
def triggerRebalance: scala.collection.Map[String, String] =
log.info("Manual rebalance triggered")
loadBalancer.rebalance()
loadBalancer.rebalance
Map("status" -> "rebalance_started")
@POST
@@ -88,12 +88,12 @@ class CoordinatorResource:
Map("status" -> "scale_down_started")
case class MetricsDto(
totalInstances: Int,
healthyInstances: Int,
deadInstances: Int,
totalGames: Int,
avgGamesPerCore: Double,
maxGamesPerCore: Int,
minGamesPerCore: Int,
instances: List[InstanceMetadata]
totalInstances: Int,
healthyInstances: Int,
deadInstances: Int,
totalGames: Int,
avgGamesPerCore: Double,
maxGamesPerCore: Int,
minGamesPerCore: Int,
instances: List[InstanceMetadata],
)
@@ -5,24 +5,24 @@ import jakarta.inject.Inject
import de.nowchess.coordinator.config.CoordinatorConfig
import io.fabric8.kubernetes.client.KubernetesClient
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@ApplicationScoped
class AutoScaler:
@Inject(optional = true)
private var kubeClient: KubernetesClient = _
@Inject
private var kubeClient: KubernetesClient = null
@Inject
private var config: CoordinatorConfig = _
private var config: CoordinatorConfig = uninitialized
@Inject
private var instanceRegistry: InstanceRegistry = _
private var instanceRegistry: InstanceRegistry = uninitialized
private val log = Logger.getLogger(classOf[AutoScaler])
private val log = Logger.getLogger(classOf[AutoScaler])
private var lastScaleTime = 0L
def checkAndScale: Unit =
if !config.autoScaleEnabled then
return
if !config.autoScaleEnabled then return
val now = System.currentTimeMillis()
if now - lastScaleTime < 120000 then // 2 minute backoff
@@ -31,28 +31,80 @@ class AutoScaler:
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
if instances.isEmpty then return
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
val maxCapacity = config.maxGamesPerCore * instances.size
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then
scaleUp()
lastScaleTime = now
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas then
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
then
scaleDown()
lastScaleTime = now
private def scaleUp: Unit =
private def scaleUp(): Unit =
log.info("Scaling up Argo Rollout")
if kubeClient != null then
// Placeholder: will patch Rollout replicas
log.infof("Would scale up %s in namespace %s", config.k8sRolloutName, config.k8sNamespace)
else
if kubeClient == null then
log.warn("Kubernetes client not available, cannot scale")
return
private def scaleDown: Unit =
try
val rollout = kubeClient
.resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource])
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.get()
if rollout != null then
val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]]
val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue()
val maxReplicas = config.scaleMaxReplicas
if currentReplicas < maxReplicas then
spec.put("replicas", currentReplicas + 1)
kubeClient
.resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource])
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.createOrReplace(rollout)
log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, currentReplicas, currentReplicas + 1)
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
catch
case ex: Exception =>
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
private def scaleDown(): Unit =
log.info("Scaling down Argo Rollout")
if kubeClient != null then
// Placeholder: will patch Rollout replicas
log.infof("Would scale down %s in namespace %s", config.k8sRolloutName, config.k8sNamespace)
else
if kubeClient == null then
log.warn("Kubernetes client not available, cannot scale")
return
try
val rollout = kubeClient
.resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource])
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.get()
if rollout != null then
val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]]
val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue()
val minReplicas = config.scaleMinReplicas
if currentReplicas > minReplicas then
spec.put("replicas", currentReplicas - 1)
kubeClient
.resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource])
.inNamespace(config.k8sNamespace)
.withName(config.k8sRolloutName)
.createOrReplace(rollout)
log.infof(
"Scaled down %s from %d to %d replicas",
config.k8sRolloutName,
currentReplicas,
currentReplicas - 1,
)
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
catch
case ex: Exception =>
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
@@ -4,22 +4,32 @@ import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.redisson.api.RedissonClient
import de.nowchess.coordinator.config.CoordinatorConfig
import com.fasterxml.jackson.databind.ObjectMapper
import scala.jdk.CollectionConverters.*
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.util.Try
import java.time.Instant
import de.nowchess.coordinator.grpc.CoreGrpcClient
@ApplicationScoped
class CacheEvictionManager:
@Inject
private var redissonClient: RedissonClient = _
private var redissonClient: RedissonClient = uninitialized
@Inject
private var config: CoordinatorConfig = _
private var config: CoordinatorConfig = uninitialized
@Inject
private var instanceRegistry: InstanceRegistry = _
private var instanceRegistry: InstanceRegistry = uninitialized
private val log = Logger.getLogger(classOf[CacheEvictionManager])
@Inject
private var coreGrpcClient: CoreGrpcClient = uninitialized
@Inject
private var objectMapper: ObjectMapper = uninitialized
private val log = Logger.getLogger(classOf[CacheEvictionManager])
private var redisPrefix = "nowchess"
def setRedisPrefix(prefix: String): Unit =
@@ -29,25 +39,54 @@ class CacheEvictionManager:
log.info("Starting cache eviction scan")
val pattern = s"$redisPrefix:game:entry:*"
val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100)
val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100)
val now = System.currentTimeMillis()
val now = System.currentTimeMillis()
val idleThresholdMs = config.gameIdleThreshold.toMillis
var evictedCount = 0
keys.asScala.foreach { key =>
try
val bucket = redissonClient.getBucket[String](key)
val value = bucket.get()
val value = bucket.get()
if value != null then
// Parse JSON to extract lastUpdated timestamp
// Placeholder: actual parsing will be implemented
val gameId = key.stripPrefix(s"$redisPrefix:game:entry:")
// Check if game should be evicted and call core
()
val gameId = key.stripPrefix(s"$redisPrefix:game:entry:")
val lastUpdated = extractLastUpdatedTimestamp(value)
if lastUpdated > 0 && (now - lastUpdated) > idleThresholdMs then
findInstanceWithGame(gameId).foreach { instance =>
try
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
bucket.delete()
evictedCount += 1
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
catch
case ex: Exception =>
log.warnf(ex, "Failed to evict game %s", gameId)
}
catch
case ex: Exception =>
log.warnf(ex, "Error processing game key %s", key)
}
log.infof("Cache eviction scan completed, evicted %d games", evictedCount)
private def extractLastUpdatedTimestamp(json: String): Long =
Try {
val parsed = objectMapper.readTree(json)
val lastHeartbeat = parsed.get("lastHeartbeat")
if lastHeartbeat != null && lastHeartbeat.isTextual then Instant.parse(lastHeartbeat.asText()).toEpochMilli
else 0L
}.getOrElse(0L)
private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] =
try
instanceRegistry.getAllInstances.find { instance =>
val setKey = s"$redisPrefix:instance:${instance.instanceId}:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.contains(gameId)
}
catch
case ex: Exception =>
log.debugf(ex, "Failed to find instance for game %s", gameId)
None
@@ -5,18 +5,24 @@ import jakarta.inject.Inject
import org.redisson.api.RedissonClient
import scala.jdk.CollectionConverters.*
import scala.concurrent.duration.*
import scala.compiletime.uninitialized
import java.time.Instant
import org.jboss.logging.Logger
import de.nowchess.coordinator.dto.InstanceMetadata
import de.nowchess.coordinator.grpc.CoreGrpcClient
@ApplicationScoped
class FailoverService:
@Inject
private var redissonClient: RedissonClient = _
private var redissonClient: RedissonClient = uninitialized
@Inject
private var instanceRegistry: InstanceRegistry = _
private var instanceRegistry: InstanceRegistry = uninitialized
private val log = Logger.getLogger(classOf[FailoverService])
@Inject
private var coreGrpcClient: CoreGrpcClient = uninitialized
private val log = Logger.getLogger(classOf[FailoverService])
private var redisPrefix = "nowchess"
def setRedisPrefix(prefix: String): Unit =
@@ -41,27 +47,41 @@ class FailoverService:
val elapsed = System.currentTimeMillis() - startTime
log.infof("Failover completed in %dms for instance %s", elapsed, instanceId)
else
log.warnf("No healthy instances available for failover of %s", instanceId)
else log.warnf("No healthy instances available for failover of %s", instanceId)
cleanupDeadInstance(instanceId)
private def getOrphanedGames(instanceId: String): List[String] =
val setKey = s"$redisPrefix:instance:$instanceId:games"
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.readAll.asScala.toList
private def distributeGames(
gameIds: List[String],
healthyInstances: List[_],
deadInstanceId: String
gameIds: List[String],
healthyInstances: List[InstanceMetadata],
deadInstanceId: String,
): Unit =
// Placeholder: will be replaced with actual gRPC calls
log.infof("Would distribute %d games from %s to %d healthy instances",
gameIds.size, deadInstanceId, healthyInstances.size)
if gameIds.isEmpty || healthyInstances.isEmpty then return
val batchSize = math.max(1, gameIds.size / healthyInstances.size)
val batches = gameIds.grouped(batchSize).toList
batches.zipWithIndex.foreach { case (batch, idx) =>
val targetInstance = healthyInstances(idx % healthyInstances.size)
try
val subscribed = coreGrpcClient.batchResubscribeGames(
targetInstance.hostname,
targetInstance.grpcPort,
batch,
)
log.infof("Migrated %d games from %s to %s", subscribed, deadInstanceId, targetInstance.instanceId)
catch
case ex: Exception =>
log.warnf(ex, "Failed to migrate batch to %s, will retry", targetInstance.instanceId)
}
private def cleanupDeadInstance(instanceId: String): Unit =
val setKey = s"$redisPrefix:instance:$instanceId:games"
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.delete()
log.infof("Cleaned up games set for instance %s", instanceId)
@@ -5,22 +5,31 @@ import jakarta.inject.Inject
import de.nowchess.coordinator.config.CoordinatorConfig
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.api.model.Pod
import org.redisson.api.RedissonClient
import scala.jdk.CollectionConverters.*
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import java.time.Instant
@ApplicationScoped
class HealthMonitor:
@Inject(optional = true)
private var kubeClient: KubernetesClient = _
@Inject
private var kubeClient: KubernetesClient = null
@Inject
private var config: CoordinatorConfig = _
private var config: CoordinatorConfig = uninitialized
@Inject
private var instanceRegistry: InstanceRegistry = _
private var instanceRegistry: InstanceRegistry = uninitialized
private val log = Logger.getLogger(classOf[HealthMonitor])
@Inject
private var redissonClient: RedissonClient = uninitialized
private val log = Logger.getLogger(classOf[HealthMonitor])
private var redisPrefix = "nowchess"
def setRedisPrefix(prefix: String): Unit =
redisPrefix = prefix
def checkInstanceHealth: Unit =
val instances = instanceRegistry.getAllInstances
@@ -32,29 +41,76 @@ class HealthMonitor:
}
private def checkHealth(instanceId: String): Boolean =
// Placeholder: will check Redis heartbeat, k8s pod status, and HTTP health endpoint
true
val redisHealthy = checkRedisHeartbeat(instanceId)
val k8sHealthy = checkK8sPodStatus(instanceId)
redisHealthy && k8sHealthy
def watchK8sPods: Unit =
if kubeClient != null then
private def checkRedisHeartbeat(instanceId: String): Boolean =
try
val key = s"$redisPrefix:instances:$instanceId"
val bucket = redissonClient.getBucket[String](key)
val ttl = bucket.remainTimeToLive()
ttl > 0
catch
case ex: Exception =>
log.debugf(ex, "Redis heartbeat check failed for %s", instanceId)
false
private def checkK8sPodStatus(instanceId: String): Boolean =
if kubeClient == null then true
else
try
val pods = kubeClient.pods()
val pods = kubeClient
.pods()
.inNamespace(config.k8sNamespace)
.withLabel(config.k8sRolloutLabelSelector)
.list()
.getItems
.asScala
pods.foreach { pod =>
pods.exists { pod =>
val podName = pod.getMetadata.getName
val isReady = isPodReady(pod)
log.debugf("Pod %s ready: %b", podName, isReady)
podName.contains(instanceId) && isPodReady(pod)
}
catch
case ex: Exception =>
log.warnf(ex, "Failed to watch k8s pods")
else
log.debugf(ex, "K8s pod status check failed for %s", instanceId)
true
def watchK8sPods: Unit =
if kubeClient == null then
log.debug("Kubernetes client not available for pod watch")
return
try
val pods = kubeClient
.pods()
.inNamespace(config.k8sNamespace)
.withLabel(config.k8sRolloutLabelSelector)
.list()
.getItems
.asScala
val instances = instanceRegistry.getAllInstances
instances.foreach { inst =>
val matchingPod = pods.find { pod =>
pod.getMetadata.getName.contains(inst.instanceId)
}
matchingPod match
case Some(pod) =>
val isReady = isPodReady(pod)
if !isReady && inst.state == "HEALTHY" then
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
instanceRegistry.markInstanceDead(inst.instanceId)
case None =>
if inst.state == "HEALTHY" then
log.warnf("No pod found for instance %s, marking dead", inst.instanceId)
instanceRegistry.markInstanceDead(inst.instanceId)
}
catch
case ex: Exception =>
log.warnf(ex, "Failed to watch k8s pods")
private def isPodReady(pod: Pod): Boolean =
val status = pod.getStatus
@@ -5,6 +5,7 @@ import jakarta.inject.Inject
import org.redisson.api.RedissonClient
import scala.jdk.CollectionConverters.*
import scala.collection.mutable
import scala.compiletime.uninitialized
import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.coordinator.dto.InstanceMetadata
import java.time.Instant
@@ -12,10 +13,10 @@ import java.time.Instant
@ApplicationScoped
class InstanceRegistry:
@Inject
private var redissonClient: RedissonClient = _
private var redissonClient: RedissonClient = uninitialized
private val mapper = ObjectMapper()
private val instances = mutable.Map[String, InstanceMetadata]()
private val mapper = ObjectMapper()
private val instances = mutable.Map[String, InstanceMetadata]()
private var redisPrefix = "nowchess"
def setRedisPrefix(prefix: String): Unit =
@@ -29,29 +30,25 @@ class InstanceRegistry:
def listInstancesFromRedis: List[InstanceMetadata] =
val pattern = s"$redisPrefix:instances:*"
val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100)
val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100)
keys.asScala.flatMap { key =>
val bucket = redissonClient.getBucket[String](key)
val value = bucket.getAndDelete()
val value = bucket.getAndDelete()
if value != null then
try
Some(mapper.readValue(value, classOf[InstanceMetadata]))
catch
case _: Exception => None
else
None
try Some(mapper.readValue(value, classOf[InstanceMetadata]))
catch case _: Exception => None
else None
}.toList
def updateInstanceFromRedis(instanceId: String): Unit =
val key = s"$redisPrefix:instances:$instanceId"
val key = s"$redisPrefix:instances:$instanceId"
val bucket = redissonClient.getBucket[String](key)
val value = bucket.get()
val value = bucket.get()
if value != null then
try
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
instances(instanceId) = metadata
catch
case _: Exception => ()
catch case _: Exception => ()
def markInstanceDead(instanceId: String): Unit =
instances.get(instanceId).foreach { inst =>
@@ -3,42 +3,125 @@ package de.nowchess.coordinator.service
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import de.nowchess.coordinator.config.CoordinatorConfig
import org.redisson.api.RedissonClient
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
import de.nowchess.coordinator.grpc.CoreGrpcClient
@ApplicationScoped
class LoadBalancer:
@Inject
private var config: CoordinatorConfig = _
private var config: CoordinatorConfig = uninitialized
@Inject
private var instanceRegistry: InstanceRegistry = _
private var instanceRegistry: InstanceRegistry = uninitialized
private val log = Logger.getLogger(classOf[LoadBalancer])
@Inject
private var redissonClient: RedissonClient = uninitialized
@Inject
private var coreGrpcClient: CoreGrpcClient = uninitialized
private val log = Logger.getLogger(classOf[LoadBalancer])
private var lastRebalanceTime = 0L
private var redisPrefix = "nowchess"
def setRedisPrefix(prefix: String): Unit =
redisPrefix = prefix
def shouldRebalance: Boolean =
val now = System.currentTimeMillis()
val now = System.currentTimeMillis()
val minInterval = config.rebalanceMinInterval.toMillis
if now - lastRebalanceTime < minInterval then
return false
if now - lastRebalanceTime < minInterval then return false
val instances = instanceRegistry.getAllInstances
if instances.isEmpty then return false
val loads = instances.map(_.subscriptionCount)
val loads = instances.map(_.subscriptionCount)
val maxLoad = loads.max
val minLoad = loads.min
val avgLoad = loads.sum.toDouble / loads.size
val exceededMax = maxLoad > config.maxGamesPerCore
val exceededMax = maxLoad > config.maxGamesPerCore
val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad
val exceededDeviation = maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50
val exceededDeviation =
maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50
exceededMax || exceededDeviation
def rebalance: Unit =
log.info("Starting rebalance")
lastRebalanceTime = System.currentTimeMillis()
// Placeholder: actual rebalance logic will be implemented
log.info("Rebalance completed")
val startTime = System.currentTimeMillis()
lastRebalanceTime = startTime
try
val instances = instanceRegistry.getAllInstances
.filter(_.state == "HEALTHY")
if instances.size < 2 then
log.info("Not enough healthy instances for rebalance")
return
val loads = instances.map(_.subscriptionCount)
val avgLoad = loads.sum.toDouble / loads.size
val overloaded = instances
.filter(_.subscriptionCount > config.maxGamesPerCore)
.sortBy[Int](_.subscriptionCount)
.reverse
val underloaded = instances
.filter(_.subscriptionCount < avgLoad * 0.8)
.sortBy(_.subscriptionCount)
overloaded.foreach { over =>
val excess = over.subscriptionCount - avgLoad.toInt
if excess > 0 && underloaded.nonEmpty then
val gamesToMove = getGamesToMove(over.instanceId, excess)
if gamesToMove.nonEmpty then
underloaded.headOption.foreach { under =>
try
val unsubscribed = coreGrpcClient.unsubscribeGames(over.hostname, over.grpcPort, gamesToMove)
val subscribed = coreGrpcClient.batchResubscribeGames(under.hostname, under.grpcPort, gamesToMove)
if subscribed > 0 then
updateRedisGameSets(over.instanceId, under.instanceId, gamesToMove)
log.infof("Moved %d games from %s to %s", subscribed, over.instanceId, under.instanceId)
catch
case ex: Exception =>
log.warnf(ex, "Failed to move games from %s to %s", over.instanceId, under.instanceId)
}
}
val elapsed = System.currentTimeMillis() - startTime
log.infof("Rebalance completed in %dms", elapsed)
catch
case ex: Exception =>
log.warnf(ex, "Rebalance failed")
private def getGamesToMove(instanceId: String, count: Int): List[String] =
try
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.readAll.asScala.toList.take(count)
catch
case ex: Exception =>
log.debugf(ex, "Failed to get games for %s", instanceId)
List()
private def updateRedisGameSets(fromInstanceId: String, toInstanceId: String, gameIds: List[String]): Unit =
try
val fromKey = s"$redisPrefix:instance:$fromInstanceId:games"
val toKey = s"$redisPrefix:instance:$toInstanceId:games"
val fromSet = redissonClient.getSet[String](fromKey)
val toSet = redissonClient.getSet[String](toKey)
gameIds.foreach { gameId =>
fromSet.remove(gameId)
toSet.add(gameId)
}
catch
case ex: Exception =>
log.warnf(ex, "Failed to update Redis game sets")