feat(coordinator): scaffold microservice for <300ms failover and load balancing

- Add coordinator module with gRPC stream-based instance health detection
- Implement InstanceHeartbeatService in core: bidirectional stream to coordinator every 200ms
- Track game subscriptions per core via Redis Sets (SADD/SREM)
- Add gRPC handlers for batch resubscribe/unsubscribe/evict/drain operations
- Implement coordinator services: InstanceRegistry, FailoverService, LoadBalancer, AutoScaler, CacheEvictionManager
- Add REST API for metrics and manual failover/rebalance/scaling
- Proto definition: coordinator_service.proto with HeartbeatStream + batch game operations
- Failover timeline: gRPC stream drop (50-200ms) → game migration (<300ms target)
- Support for Argo Rollouts auto-scaling (k8s CRD patching via Fabric8 client)

Note: Proto compilation issues documented in COORDINATOR_IMPLEMENTATION.md. Requires:
- Add task dependency: tasks.compileScala dependsOn tasks.compileJava
- Fix deprecated @Inject var = _ → = uninitialized syntax
- Implement remaining service methods (gRPC clients, FailoverService distribution)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-04-26 08:34:53 +02:00
parent 83f84371be
commit f327441089
23 changed files with 1526 additions and 1 deletions
@@ -0,0 +1,57 @@
syntax = "proto3";
package de.nowchess.coordinator;
service CoordinatorService {
rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand);
rpc BatchResubscribeGames(BatchResubscribeRequest) returns (BatchResubscribeResponse);
rpc UnsubscribeGames(UnsubscribeGamesRequest) returns (UnsubscribeGamesResponse);
rpc EvictGames(EvictGamesRequest) returns (EvictGamesResponse);
rpc DrainInstance(DrainInstanceRequest) returns (DrainInstanceResponse);
}
message HeartbeatFrame {
string instanceId = 1;
string hostname = 2;
int32 httpPort = 3;
int32 grpcPort = 4;
int32 subscriptionCount = 5;
int32 localCacheSize = 6;
int64 timestampMillis = 7;
}
message CoordinatorCommand {
string type = 1;
string payload = 2;
}
message BatchResubscribeRequest {
repeated string gameIds = 1;
}
message BatchResubscribeResponse {
int32 subscribedCount = 1;
repeated string failedGameIds = 2;
}
message UnsubscribeGamesRequest {
repeated string gameIds = 1;
}
message UnsubscribeGamesResponse {
int32 unsubscribedCount = 1;
}
message EvictGamesRequest {
repeated string gameIds = 1;
}
message EvictGamesResponse {
int32 evictedCount = 1;
}
message DrainInstanceRequest {}
message DrainInstanceResponse {
int32 gamesMigrated = 1;
}
@@ -11,6 +11,9 @@ quarkus:
io-grpc:
host: localhost
port: 8081
coordinator-grpc:
host: localhost
port: 9086
server:
use-separate-server: false
@@ -20,6 +23,13 @@ nowchess:
port: 6379
prefix: nowchess
coordinator:
host: localhost
grpc-port: 9086
stream-heartbeat-interval: 200ms
redis-heartbeat-interval: 2s
instance-id: ${HOSTNAME:local}-${quarkus.uuid}
"%dev":
mp:
jwt:
@@ -72,6 +82,9 @@ nowchess:
io-grpc:
host: ${IO_SERVICE_HOST}
port: ${IO_SERVICE_GRPC_PORT:9081}
coordinator-grpc:
host: ${COORDINATOR_SERVICE_HOST:localhost}
port: ${COORDINATOR_SERVICE_GRPC_PORT:9086}
rest-client:
io-service:
url: ${IO_SERVICE_URL}
@@ -84,3 +97,10 @@ nowchess:
host: ${REDIS_HOST}
port: ${REDIS_PORT:6379}
prefix: ${REDIS_PREFIX:nowchess}
coordinator:
host: ${COORDINATOR_SERVICE_HOST:localhost}
grpc-port: ${COORDINATOR_SERVICE_GRPC_PORT:9086}
stream-heartbeat-interval: 200ms
redis-heartbeat-interval: 2s
instance-id: ${HOSTNAME:local}-${quarkus.uuid}
@@ -0,0 +1,67 @@
package de.nowchess.chess.grpc
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import de.nowchess.coordinator.CoordinatorServiceGrpc
import de.nowchess.coordinator.{
BatchResubscribeRequest,
BatchResubscribeResponse,
UnsubscribeGamesRequest,
UnsubscribeGamesResponse,
EvictGamesRequest,
EvictGamesResponse,
DrainInstanceRequest,
DrainInstanceResponse
}
import de.nowchess.chess.redis.GameRedisSubscriberManager
import io.grpc.stub.StreamObserver
import scala.jdk.CollectionConverters.*
@ApplicationScoped
class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServiceImplBase:
@Inject
private var gameSubscriberManager: GameRedisSubscriberManager = _
override def batchResubscribeGames(
request: BatchResubscribeRequest,
responseObserver: StreamObserver[BatchResubscribeResponse]
): Unit =
val count = gameSubscriberManager.batchResubscribeGames(request.getGameIdsList)
val response = BatchResubscribeResponse.newBuilder()
.setSubscribedCount(count)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
override def unsubscribeGames(
request: UnsubscribeGamesRequest,
responseObserver: StreamObserver[UnsubscribeGamesResponse]
): Unit =
val count = gameSubscriberManager.unsubscribeGames(request.getGameIdsList)
val response = UnsubscribeGamesResponse.newBuilder()
.setUnsubscribedCount(count)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
override def evictGames(
request: EvictGamesRequest,
responseObserver: StreamObserver[EvictGamesResponse]
): Unit =
val count = gameSubscriberManager.evictGames(request.getGameIdsList)
val response = EvictGamesResponse.newBuilder()
.setEvictedCount(count)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
override def drainInstance(
request: DrainInstanceRequest,
responseObserver: StreamObserver[DrainInstanceResponse]
): Unit =
gameSubscriberManager.drainInstance()
val response = DrainInstanceResponse.newBuilder()
.setGamesMigrated(0)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
@@ -7,6 +7,7 @@ import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.observer.Observer
import de.nowchess.chess.registry.GameRegistry
import de.nowchess.chess.resource.GameDtoMapper
import de.nowchess.chess.service.InstanceHeartbeatService
import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@@ -25,6 +26,7 @@ class GameRedisSubscriberManager:
@Inject var objectMapper: ObjectMapper = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
@Inject(optional = true) var heartbeatService: InstanceHeartbeatService = uninitialized
// scalafix:on DisableSyntax.var
private val c2sListeners = new ConcurrentHashMap[String, Int]()
@@ -50,6 +52,9 @@ class GameRedisSubscriberManager:
val obs = new GameRedisPublisher(gameId, registry, redisson, objectMapper, s2cTopicName(gameId), writebackFn, ioClient, unsubscribeGame)
s2cObservers.put(gameId, obs)
registry.get(gameId).foreach(_.engine.subscribe(obs))
if heartbeatService != null then
heartbeatService.addGameSubscription(gameId)
catch
case e: Exception =>
System.err.println(s"Warning: Redis subscription failed for game $gameId: ${e.getMessage}")
@@ -63,6 +68,9 @@ class GameRedisSubscriberManager:
registry.get(gameId).foreach(_.engine.unsubscribe(obs))
}
if heartbeatService != null then
heartbeatService.removeGameSubscription(gameId)
private def handleC2sMessage(gameId: String, msg: String): Unit =
parseC2sMessage(msg) match
case Some(C2sMessage.Connected) => handleConnected(gameId)
@@ -92,6 +100,35 @@ class GameRedisSubscriberManager:
}
}
def batchResubscribeGames(gameIds: java.util.List[String]): Int =
var count = 0
gameIds.forEach { gameId =>
subscribeGame(gameId)
count += 1
}
count
def unsubscribeGames(gameIds: java.util.List[String]): Int =
var count = 0
gameIds.forEach { gameId =>
unsubscribeGame(gameId)
count += 1
}
count
def evictGames(gameIds: java.util.List[String]): Int =
var count = 0
gameIds.forEach { gameId =>
unsubscribeGame(gameId)
registry.remove(gameId)
count += 1
}
count
def drainInstance(): Unit =
val gameIds = new java.util.ArrayList(c2sListeners.keySet())
gameIds.forEach(unsubscribeGame)
@PreDestroy
def cleanup(): Unit =
c2sListeners.forEach((gameId, listenerId) =>
@@ -0,0 +1,186 @@
package de.nowchess.chess.service
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.event.Observes
import jakarta.inject.Inject
import io.quarkus.runtime.StartupEvent
import io.quarkus.runtime.ShutdownEvent
import io.quarkus.grpc.GrpcClient
import org.redisson.api.RedissonClient
import scala.concurrent.duration.*
import java.util.concurrent.{Executors, TimeUnit}
import java.net.InetAddress
import com.fasterxml.jackson.databind.ObjectMapper
import org.jboss.logging.Logger
import de.nowchess.coordinator.{HeartbeatFrame, CoordinatorServiceGrpc}
import de.nowchess.coordinator.CoordinatorServiceGrpc.CoordinatorServiceStub
import io.grpc.stub.StreamObserver
import scala.jdk.FutureConverters.*
@ApplicationScoped
class InstanceHeartbeatService:
@Inject
private var redissonClient: RedissonClient = _
@GrpcClient("coordinator-grpc")
private var coordinatorStub: CoordinatorServiceStub = _
private val log = Logger.getLogger(classOf[InstanceHeartbeatService])
private val mapper = ObjectMapper()
private var instanceId = ""
private var redisPrefix = "nowchess"
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
private var subscriptionCount = 0
private var localCacheSize = 0
def onStart(@Observes event: StartupEvent): Unit =
try
generateInstanceId()
initializeHeartbeatStream()
scheduleHeartbeats()
log.infof("Instance heartbeat service started with ID: %s", instanceId)
catch
case ex: Exception =>
log.errorf(ex, "Failed to start instance heartbeat service")
def onShutdown(@Observes event: ShutdownEvent): Unit =
try
cleanup()
log.info("Instance heartbeat service stopped")
catch
case ex: Exception =>
log.errorf(ex, "Error during heartbeat service shutdown")
def setRedisPrefix(prefix: String): Unit =
redisPrefix = prefix
def setSubscriptionCount(count: Int): Unit =
subscriptionCount = count
def setLocalCacheSize(count: Int): Unit =
localCacheSize = count
def addGameSubscription(gameId: String): Unit =
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.add(gameId)
subscriptionCount += 1
def removeGameSubscription(gameId: String): Unit =
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.remove(gameId)
subscriptionCount = Math.max(0, subscriptionCount - 1)
private def generateInstanceId(): Unit =
val hostname = try
InetAddress.getLocalHost.getHostName
catch
case _: Exception => "unknown"
val uuid = java.util.UUID.randomUUID().toString.take(8)
instanceId = s"$hostname-$uuid"
private def initializeHeartbeatStream(): Unit =
try
val responseObserver = new StreamObserver[de.nowchess.coordinator.CoordinatorCommand]:
override def onNext(value: de.nowchess.coordinator.CoordinatorCommand): Unit =
log.debugf("Received coordinator command: %s", value.getType)
override def onError(t: Throwable): Unit =
log.warnf(t, "Heartbeat stream error")
// Reconnect on error
() // Placeholder for reconnect logic
override def onCompleted: Unit =
log.info("Heartbeat stream completed")
streamObserver = Some(coordinatorStub.heartbeatStream(responseObserver))
log.info("Connected to coordinator heartbeat stream")
catch
case ex: Exception =>
log.warnf(ex, "Failed to connect to coordinator")
streamObserver = None
private def scheduleHeartbeats(): Unit =
// Send heartbeat every 200ms
heartbeatExecutor.scheduleAtFixedRate(
() => sendHeartbeat(),
0,
200,
TimeUnit.MILLISECONDS
)
// Refresh Redis TTL every 2s
redisHeartbeatExecutor.scheduleAtFixedRate(
() => refreshRedisHeartbeat(),
0,
2,
TimeUnit.SECONDS
)
private def sendHeartbeat(): Unit =
streamObserver.foreach { observer =>
try
val frame = HeartbeatFrame.newBuilder()
.setInstanceId(instanceId)
.setHostname(getHostname)
.setHttpPort(8080) // Placeholder, should be configurable
.setGrpcPort(9080) // Placeholder
.setSubscriptionCount(subscriptionCount)
.setLocalCacheSize(localCacheSize)
.setTimestampMillis(System.currentTimeMillis())
.build()
observer.onNext(frame)
catch
case ex: Exception =>
log.warnf(ex, "Failed to send heartbeat frame")
}
private def refreshRedisHeartbeat(): Unit =
try
val key = s"$redisPrefix:instances:$instanceId"
val bucket = redissonClient.getBucket[String](key)
val metadata = Map(
"instanceId" -> instanceId,
"hostname" -> getHostname,
"httpPort" -> 8080,
"grpcPort" -> 9080,
"subscriptionCount" -> subscriptionCount,
"localCacheSize" -> localCacheSize,
"lastHeartbeat" -> java.time.Instant.now().toString,
"state" -> "HEALTHY"
)
val json = mapper.writeValueAsString(metadata)
bucket.set(json, 5, TimeUnit.SECONDS) // 5-second TTL, refreshed every 2s
catch
case ex: Exception =>
log.warnf(ex, "Failed to refresh Redis heartbeat")
private def getHostname: String =
try
InetAddress.getLocalHost.getHostName
catch
case _: Exception => "unknown"
private def cleanup(): Unit =
streamObserver.foreach(_.onCompleted())
streamObserver = None
val key = s"$redisPrefix:instances:$instanceId"
redissonClient.getBucket[String](key).delete()
val setKey = s"$redisPrefix:instance:$instanceId:games"
redissonClient.getSet[String](setKey).delete()
heartbeatExecutor.shutdown()
redisHeartbeatExecutor.shutdown()
if !heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS) then
heartbeatExecutor.shutdownNow()
if !redisHeartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS) then
redisHeartbeatExecutor.shutdownNow()