12 KiB
Plan: Add Coordinator Microservice
Context
NowChess scales core horizontally via shared Redis but lacks:
- Instance visibility: no way to list running cores or their load
- Load balancing: games land randomly on cores; no rebalancing
- Failover: dead cores orphan subscriptions; bullet chess requires <1s recovery
- Auto-scaling: manual ops to add/remove cores
- Cache management: no eviction of stale games from core memory
Bullet chess games run on move timings of <3s. 30s failover = game lost on clock. Target: <300ms failover.
Architecture: Sub-1s Failover
Why Not Polling/TTL
- TTL expiry: minimum 10-30s detection
- HTTP polling 3x failure: 30s minimum
- gRPC streaming TCP drop: 50-200ms — use this as primary
Primary: gRPC Bidirectional Streaming
- Core opens a persistent bidirectional stream (
CoreHeartbeatStream) to coordinator on startup - Core sends heartbeat frames every 200ms
- Core crash = TCP RST/FIN → coordinator stream error in ~50-200ms
- Stream also carries metadata updates (subscription count changes) in real-time
Fallback: Redis Heartbeat + K8s Watch
- Redis heartbeat key
{prefix}:instances:{instanceId}with 5s TTL, refreshed every 2s - K8s pod watch via Kubernetes Java client (event-driven; handles pod eviction/OOMKill)
- Fallback covers: network partition (TCP stays up but core is zombie), coordinator restart gap
Design
1. Module: modules/coordinator
Language: Scala 3.5.1, Quarkus REST + gRPC
Ports: HTTP 8086, gRPC 9086
Dependencies: Redisson, Kubernetes Java client, Quarkus gRPC
Persistence: None (all state in Redis)
2. Instance Registry
Redis schema:
{prefix}:instances:{instanceId}
- TTL: 5s (refreshed by core every 2s via background task)
- Value: JSON
{
"instanceId": "core-abc123",
"hostname": "core-pod-3",
"httpPort": 8080,
"grpcPort": 9080,
"subscriptionCount": 147,
"localCacheSize": 147,
"lastHeartbeat": "2026-04-26T10:15:30.123Z"
}
{prefix}:instance:{instanceId}:games
- Type: Redis Set (no TTL — managed explicitly)
- Members: all gameIds currently subscribed on this instance
Core changes (new InstanceHeartbeatService bean in modules/core):
@PostConstruct: generate stableinstanceId(hostname + random suffix); open gRPC stream to coordinator; publish Redis heartbeat; register in{prefix}:instances:{instanceId}- Every 200ms: send heartbeat frame on gRPC stream (carries
subscriptionCount) - Every 2s: refresh Redis heartbeat bucket TTL
subscribeGame(gameId):SADD {prefix}:instance:{instanceId}:games gameIdunsubscribeGame(gameId)/evictGame(gameId):SREM {prefix}:instance:{instanceId}:games gameId@PreDestroy: delete Redis key + games set; close gRPC stream (clean shutdown)
3. Health Monitoring (3 signals, primary fast)
| Signal | Mechanism | Detection time | Role |
|---|---|---|---|
| gRPC stream drop | TCP RST/FIN on bidirectional stream | 50–200ms | Primary |
| Redis heartbeat expiry | {prefix}:instances:{instanceId} TTL=5s |
5–7s | Fallback |
| K8s pod watch | CoreV1Api.listNamespacedPod watch stream |
~instant (pod events) | Fallback |
Dead decision:
- gRPC stream drops → immediate failover (no confirmation needed; games must recover fast)
- Redis heartbeat expires (gRPC still up) → verify with single HTTP
/q/healthcall → if fail: failover - K8s pod NotReady (gRPC still up) → failover
4. Failover Protocol (<300ms target)
T+0ms Core JVM crashes / network drops
T+50ms Coordinator: gRPC stream error received
T+52ms SMEMBERS {prefix}:instance:{instanceId}:games → list of orphaned gameIds
T+55ms Distribute gameIds across healthy cores (least-loaded first)
T+60ms BatchResubscribeGames gRPC call(s) fire to healthy core(s)
T+150ms Healthy cores resubscribed; Redis s2c topics live again
T+200ms WebSocket clients reconnect; receive GameFullEventDto on CONNECTED
Failover steps (coordinator FailoverService):
- On stream drop for
instanceId: a. Mark instance DEAD in local map b.SMEMBERS {prefix}:instance:{instanceId}:gamesc. Group gameIds into batches per target core (round-robin by load) d. For each target core: callBatchResubscribeGames(gameIds)e. Each target core: callssubscribeGame(gameId)for each (loads from Redis if not in local cache) f.DEL {prefix}:instance:{instanceId}:games(cleanup) - Log failover event with count of games migrated + latency
5. Load Rebalancing
Thresholds (both must be evaluated):
- Absolute: any core > 500 games → rebalance
- Relative: max load > mean × 1.2 AND max - min > 50 games → rebalance
Algorithm (runs every 30s, min 60s between actual rebalances):
- Read all
{prefix}:instances:*keys → load map - Identify overloaded cores (exceed either threshold)
- For each overloaded core: pick
excess = load - targetLoadgames - Assign excess games to underloaded cores
- Call
UnsubscribeGames(gameIds)on overloaded core - Call
BatchResubscribeGames(gameIds)on target core - Overloaded core:
SREMeach game from its set - Target core:
SADDeach game to its set on subscribe
6. Auto-Scaling
Metric: avg subscriptionCount across all cores
Actions:
- avg >
scale-up-threshold(80% of max): patchnowchess-coreArgo Rolloutspec.replicas += 1 - avg <
scale-down-threshold(30% of max) ANDreplicas > min-replicas: drain one core then scale down - Backoff: min 2-minute interval between scale events
Argo Rollouts API:
- CRD:
argoproj.io/v1alpha1, Kind:Rollout, resource:rollouts - Scale via Fabric8
GenericKubernetesResourcepatch onspec.replicas - No StatefulSet — Argo Rollout owns pod lifecycle (canary/blue-green strategies respected)
- Pod watch filter: label selector
app=nowchess-core(Rollout sets this;rollouts-pod-template-hashis Argo's equivalent ofpod-template-hash)
Drain before scale-down:
- Pick least-loaded core
- Migrate all its games to other cores via
BatchResubscribeGames - Call
DrainInstance(instanceId)on that core (sets it to reject new subscriptions) - After drain confirmed: patch Rollout
spec.replicas -= 1
7. Cache Eviction
Trigger: coordinator scans {prefix}:game:entry:* every 10 minutes
Policy: if now - lastUpdated > 45min AND gameId in any instance's games set → call EvictGame
Effect: core removes game from localEngines and unsubscribeGame, SREM from instance set
8. Proto: coordinator_service.proto
syntax = "proto3";
package de.nowchess.coordinator;
service CoordinatorService {
// Core → Coordinator: bidirectional stream for liveness
rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand);
// Coordinator → Core: batch resubscribe after failover or rebalance
rpc BatchResubscribeGames(BatchResubscribeRequest) returns (BatchResubscribeResponse);
// Coordinator → Core: unsubscribe games (rebalance source)
rpc UnsubscribeGames(UnsubscribeGamesRequest) returns (UnsubscribeGamesResponse);
// Coordinator → Core: evict idle games from local cache
rpc EvictGames(EvictGamesRequest) returns (EvictGamesResponse);
// Coordinator → Core: drain instance before scale-down
rpc DrainInstance(DrainInstanceRequest) returns (DrainInstanceResponse);
}
message HeartbeatFrame {
string instanceId = 1;
string hostname = 2;
int32 httpPort = 3;
int32 grpcPort = 4;
int32 subscriptionCount = 5;
int32 localCacheSize = 6;
int64 timestampMillis = 7;
}
message CoordinatorCommand {
// Future: coordinator can push commands back (e.g., "start draining")
string type = 1;
string payload = 2;
}
message BatchResubscribeRequest {
repeated string gameIds = 1;
}
message BatchResubscribeResponse {
int32 subscribedCount = 1;
repeated string failedGameIds = 2;
}
message UnsubscribeGamesRequest {
repeated string gameIds = 1;
}
message UnsubscribeGamesResponse {
int32 unsubscribedCount = 1;
}
message EvictGamesRequest {
repeated string gameIds = 1;
}
message EvictGamesResponse {
int32 evictedCount = 1;
}
message DrainInstanceRequest {}
message DrainInstanceResponse {
int32 gamesMigrated = 0;
}
9. Coordinator REST API (internal)
GET /api/coordinator/instances— all cores with load, health stateGET /api/coordinator/metrics— load distribution, rebalance historyPOST /api/coordinator/rebalance— manual rebalance triggerPOST /api/coordinator/failover/{instanceId}— manual failoverPOST /api/coordinator/scale-up/scale-down— manual scaling
10. Configuration
modules/coordinator/src/main/resources/application.yml:
quarkus.application.name: nowchess-coordinator
quarkus.http.port: 8086
quarkus.grpc.server.port: 9086
nowchess.coordinator.max-games-per-core: 500
nowchess.coordinator.max-deviation-percent: 20
nowchess.coordinator.rebalance-interval: 30s
nowchess.coordinator.rebalance-min-interval: 60s
nowchess.coordinator.heartbeat-ttl: 5s
nowchess.coordinator.stream-heartbeat-interval: 200ms
nowchess.coordinator.cache-eviction-interval: 10m
nowchess.coordinator.game-idle-threshold: 45m
nowchess.coordinator.auto-scale-enabled: false
nowchess.coordinator.scale-up-threshold: 0.8
nowchess.coordinator.scale-down-threshold: 0.3
nowchess.coordinator.scale-min-replicas: 2
nowchess.coordinator.scale-max-replicas: 10
nowchess.coordinator.k8s-namespace: default
nowchess.coordinator.k8s-rollout-name: nowchess-core
nowchess.coordinator.k8s-rollout-label-selector: app=nowchess-core
quarkus.kubernetes-client.trust-certs: true
Core application.yml additions:
nowchess.coordinator.host: localhost
nowchess.coordinator.grpc-port: 9086
nowchess.coordinator.stream-heartbeat-interval: 200ms
nowchess.coordinator.redis-heartbeat-interval: 2s
nowchess.coordinator.instance-id: ${HOSTNAME:local}-${quarkus.uuid}
11. Files to Create / Modify
New — modules/coordinator/:
build.gradle.kts
src/main/proto/coordinator_service.proto
src/main/resources/application.yml
src/main/scala/de/nowchess/coordinator/
resource/CoordinatorResource.scala # REST endpoints
service/InstanceRegistry.scala # Redis instance list + in-memory map
service/HealthMonitor.scala # gRPC stream watcher + Redis TTL + k8s watch
service/FailoverService.scala # dead core → BatchResubscribe
service/LoadBalancer.scala # rebalance logic
service/AutoScaler.scala # k8s StatefulSet scaling
service/CacheEvictionManager.scala # idle game eviction
grpc/CoordinatorGrpcServer.scala # CoordinatorService gRPC impl (for HeartbeatStream)
Modify — modules/core/:
build.gradle.kts— addcoordinator_service.protostub, keep grpc depsrc/main/proto/coordinator_service.proto— copy (or symlink) proto for stub generationsrc/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala—SADD/SREMon subscribe/unsubscribe + implementBatchResubscribeGames,UnsubscribeGames,EvictGames,DrainInstancegRPC handlerssrc/main/scala/de/nowchess/chess/— newInstanceHeartbeatService.scala(startup, gRPC stream, Redis TTL refresh)src/main/resources/application.yml— coordinator connection config
Modify — root:
settings.gradle.kts— addinclude("modules/coordinator")
Verification
./compile— coordinator and core compile cleanly- Stream detection: start core + coordinator; kill core JVM (
kill -9); coordinator logs failover within 300ms - Game continuity: active game on killed core; WebSocket client reconnects and receives game state
- Rebalance: create 600 games on core-1 (2-core setup); coordinator rebalances ~100 to core-2
- Fallback: disconnect gRPC stream manually but keep core alive; Redis TTL fallback triggers within 7s
- Cache eviction: create idle game; coordinator calls
EvictGamesafter 45min idle - REST metrics:
curl localhost:8086/api/coordinator/metricsreturns per-core load + health - Restart recovery: restart coordinator; gRPC streams re-establish from cores; state rebuilt from Redis
Dependencies (new)
io.fabric8:kubernetes-client:6.13.0(Fabric8 k8s client — handles ArgoRolloutCRD viaGenericKubernetesResource; no Argo Java SDK needed)- Redisson — already in core, reuse via shared config
- Quarkus gRPC — already in core, reuse