Files
NowChessSystems/add-a-new-microservice-ethereal-marshmallow.md
2026-04-29 22:06:01 +02:00

12 KiB
Raw Permalink Blame History

Plan: Add Coordinator Microservice

Context

NowChess scales core horizontally via shared Redis but lacks:

  • Instance visibility: no way to list running cores or their load
  • Load balancing: games land randomly on cores; no rebalancing
  • Failover: dead cores orphan subscriptions; bullet chess requires <1s recovery
  • Auto-scaling: manual ops to add/remove cores
  • Cache management: no eviction of stale games from core memory

Bullet chess games run on move timings of <3s. 30s failover = game lost on clock. Target: <300ms failover.


Architecture: Sub-1s Failover

Why Not Polling/TTL

  • TTL expiry: minimum 10-30s detection
  • HTTP polling 3x failure: 30s minimum
  • gRPC streaming TCP drop: 50-200ms — use this as primary

Primary: gRPC Bidirectional Streaming

  • Core opens a persistent bidirectional stream (CoreHeartbeatStream) to coordinator on startup
  • Core sends heartbeat frames every 200ms
  • Core crash = TCP RST/FIN → coordinator stream error in ~50-200ms
  • Stream also carries metadata updates (subscription count changes) in real-time

Fallback: Redis Heartbeat + K8s Watch

  • Redis heartbeat key {prefix}:instances:{instanceId} with 5s TTL, refreshed every 2s
  • K8s pod watch via Kubernetes Java client (event-driven; handles pod eviction/OOMKill)
  • Fallback covers: network partition (TCP stays up but core is zombie), coordinator restart gap

Design

1. Module: modules/coordinator

Language: Scala 3.5.1, Quarkus REST + gRPC
Ports: HTTP 8086, gRPC 9086
Dependencies: Redisson, Kubernetes Java client, Quarkus gRPC
Persistence: None (all state in Redis)


2. Instance Registry

Redis schema:

{prefix}:instances:{instanceId}
  - TTL: 5s (refreshed by core every 2s via background task)
  - Value: JSON
    {
      "instanceId": "core-abc123",
      "hostname": "core-pod-3",
      "httpPort": 8080,
      "grpcPort": 9080,
      "subscriptionCount": 147,
      "localCacheSize": 147,
      "lastHeartbeat": "2026-04-26T10:15:30.123Z"
    }

{prefix}:instance:{instanceId}:games
  - Type: Redis Set (no TTL — managed explicitly)
  - Members: all gameIds currently subscribed on this instance

Core changes (new InstanceHeartbeatService bean in modules/core):

  • @PostConstruct: generate stable instanceId (hostname + random suffix); open gRPC stream to coordinator; publish Redis heartbeat; register in {prefix}:instances:{instanceId}
  • Every 200ms: send heartbeat frame on gRPC stream (carries subscriptionCount)
  • Every 2s: refresh Redis heartbeat bucket TTL
  • subscribeGame(gameId): SADD {prefix}:instance:{instanceId}:games gameId
  • unsubscribeGame(gameId) / evictGame(gameId): SREM {prefix}:instance:{instanceId}:games gameId
  • @PreDestroy: delete Redis key + games set; close gRPC stream (clean shutdown)

3. Health Monitoring (3 signals, primary fast)

Signal Mechanism Detection time Role
gRPC stream drop TCP RST/FIN on bidirectional stream 50200ms Primary
Redis heartbeat expiry {prefix}:instances:{instanceId} TTL=5s 57s Fallback
K8s pod watch CoreV1Api.listNamespacedPod watch stream ~instant (pod events) Fallback

Dead decision:

  • gRPC stream drops → immediate failover (no confirmation needed; games must recover fast)
  • Redis heartbeat expires (gRPC still up) → verify with single HTTP /q/health call → if fail: failover
  • K8s pod NotReady (gRPC still up) → failover

4. Failover Protocol (<300ms target)

T+0ms     Core JVM crashes / network drops
T+50ms    Coordinator: gRPC stream error received
T+52ms    SMEMBERS {prefix}:instance:{instanceId}:games  → list of orphaned gameIds
T+55ms    Distribute gameIds across healthy cores (least-loaded first)
T+60ms    BatchResubscribeGames gRPC call(s) fire to healthy core(s)
T+150ms   Healthy cores resubscribed; Redis s2c topics live again
T+200ms   WebSocket clients reconnect; receive GameFullEventDto on CONNECTED

Failover steps (coordinator FailoverService):

  1. On stream drop for instanceId: a. Mark instance DEAD in local map b. SMEMBERS {prefix}:instance:{instanceId}:games c. Group gameIds into batches per target core (round-robin by load) d. For each target core: call BatchResubscribeGames(gameIds) e. Each target core: calls subscribeGame(gameId) for each (loads from Redis if not in local cache) f. DEL {prefix}:instance:{instanceId}:games (cleanup)
  2. Log failover event with count of games migrated + latency

5. Load Rebalancing

Thresholds (both must be evaluated):

  1. Absolute: any core > 500 games → rebalance
  2. Relative: max load > mean × 1.2 AND max - min > 50 games → rebalance

Algorithm (runs every 30s, min 60s between actual rebalances):

  1. Read all {prefix}:instances:* keys → load map
  2. Identify overloaded cores (exceed either threshold)
  3. For each overloaded core: pick excess = load - targetLoad games
  4. Assign excess games to underloaded cores
  5. Call UnsubscribeGames(gameIds) on overloaded core
  6. Call BatchResubscribeGames(gameIds) on target core
  7. Overloaded core: SREM each game from its set
  8. Target core: SADD each game to its set on subscribe

6. Auto-Scaling

Metric: avg subscriptionCount across all cores

Actions:

  • avg > scale-up-threshold (80% of max): patch nowchess-core Argo Rollout spec.replicas += 1
  • avg < scale-down-threshold (30% of max) AND replicas > min-replicas: drain one core then scale down
  • Backoff: min 2-minute interval between scale events

Argo Rollouts API:

  • CRD: argoproj.io/v1alpha1, Kind: Rollout, resource: rollouts
  • Scale via Fabric8 GenericKubernetesResource patch on spec.replicas
  • No StatefulSet — Argo Rollout owns pod lifecycle (canary/blue-green strategies respected)
  • Pod watch filter: label selector app=nowchess-core (Rollout sets this; rollouts-pod-template-hash is Argo's equivalent of pod-template-hash)

Drain before scale-down:

  1. Pick least-loaded core
  2. Migrate all its games to other cores via BatchResubscribeGames
  3. Call DrainInstance(instanceId) on that core (sets it to reject new subscriptions)
  4. After drain confirmed: patch Rollout spec.replicas -= 1

7. Cache Eviction

Trigger: coordinator scans {prefix}:game:entry:* every 10 minutes
Policy: if now - lastUpdated > 45min AND gameId in any instance's games set → call EvictGame
Effect: core removes game from localEngines and unsubscribeGame, SREM from instance set


8. Proto: coordinator_service.proto

syntax = "proto3";
package de.nowchess.coordinator;

service CoordinatorService {
  // Core → Coordinator: bidirectional stream for liveness
  rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand);

  // Coordinator → Core: batch resubscribe after failover or rebalance
  rpc BatchResubscribeGames(BatchResubscribeRequest) returns (BatchResubscribeResponse);

  // Coordinator → Core: unsubscribe games (rebalance source)
  rpc UnsubscribeGames(UnsubscribeGamesRequest) returns (UnsubscribeGamesResponse);

  // Coordinator → Core: evict idle games from local cache
  rpc EvictGames(EvictGamesRequest) returns (EvictGamesResponse);

  // Coordinator → Core: drain instance before scale-down
  rpc DrainInstance(DrainInstanceRequest) returns (DrainInstanceResponse);
}

message HeartbeatFrame {
  string instanceId = 1;
  string hostname = 2;
  int32 httpPort = 3;
  int32 grpcPort = 4;
  int32 subscriptionCount = 5;
  int32 localCacheSize = 6;
  int64 timestampMillis = 7;
}

message CoordinatorCommand {
  // Future: coordinator can push commands back (e.g., "start draining")
  string type = 1;
  string payload = 2;
}

message BatchResubscribeRequest {
  repeated string gameIds = 1;
}

message BatchResubscribeResponse {
  int32 subscribedCount = 1;
  repeated string failedGameIds = 2;
}

message UnsubscribeGamesRequest {
  repeated string gameIds = 1;
}

message UnsubscribeGamesResponse {
  int32 unsubscribedCount = 1;
}

message EvictGamesRequest {
  repeated string gameIds = 1;
}

message EvictGamesResponse {
  int32 evictedCount = 1;
}

message DrainInstanceRequest {}

message DrainInstanceResponse {
  int32 gamesMigrated = 0;
}

9. Coordinator REST API (internal)

  • GET /api/coordinator/instances — all cores with load, health state
  • GET /api/coordinator/metrics — load distribution, rebalance history
  • POST /api/coordinator/rebalance — manual rebalance trigger
  • POST /api/coordinator/failover/{instanceId} — manual failover
  • POST /api/coordinator/scale-up / scale-down — manual scaling

10. Configuration

modules/coordinator/src/main/resources/application.yml:

quarkus.application.name: nowchess-coordinator
quarkus.http.port: 8086
quarkus.grpc.server.port: 9086

nowchess.coordinator.max-games-per-core: 500
nowchess.coordinator.max-deviation-percent: 20
nowchess.coordinator.rebalance-interval: 30s
nowchess.coordinator.rebalance-min-interval: 60s
nowchess.coordinator.heartbeat-ttl: 5s
nowchess.coordinator.stream-heartbeat-interval: 200ms
nowchess.coordinator.cache-eviction-interval: 10m
nowchess.coordinator.game-idle-threshold: 45m
nowchess.coordinator.auto-scale-enabled: false
nowchess.coordinator.scale-up-threshold: 0.8
nowchess.coordinator.scale-down-threshold: 0.3
nowchess.coordinator.scale-min-replicas: 2
nowchess.coordinator.scale-max-replicas: 10
nowchess.coordinator.k8s-namespace: default
nowchess.coordinator.k8s-rollout-name: nowchess-core
nowchess.coordinator.k8s-rollout-label-selector: app=nowchess-core

quarkus.kubernetes-client.trust-certs: true

Core application.yml additions:

nowchess.coordinator.host: localhost
nowchess.coordinator.grpc-port: 9086
nowchess.coordinator.stream-heartbeat-interval: 200ms
nowchess.coordinator.redis-heartbeat-interval: 2s
nowchess.coordinator.instance-id: ${HOSTNAME:local}-${quarkus.uuid}

11. Files to Create / Modify

New — modules/coordinator/:

build.gradle.kts
src/main/proto/coordinator_service.proto
src/main/resources/application.yml
src/main/scala/de/nowchess/coordinator/
  resource/CoordinatorResource.scala       # REST endpoints
  service/InstanceRegistry.scala           # Redis instance list + in-memory map
  service/HealthMonitor.scala              # gRPC stream watcher + Redis TTL + k8s watch
  service/FailoverService.scala            # dead core → BatchResubscribe
  service/LoadBalancer.scala               # rebalance logic
  service/AutoScaler.scala                 # k8s StatefulSet scaling
  service/CacheEvictionManager.scala       # idle game eviction
  grpc/CoordinatorGrpcServer.scala         # CoordinatorService gRPC impl (for HeartbeatStream)

Modify — modules/core/:

  • build.gradle.kts — add coordinator_service.proto stub, keep grpc dep
  • src/main/proto/coordinator_service.proto — copy (or symlink) proto for stub generation
  • src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scalaSADD/SREM on subscribe/unsubscribe + implement BatchResubscribeGames, UnsubscribeGames, EvictGames, DrainInstance gRPC handlers
  • src/main/scala/de/nowchess/chess/ — new InstanceHeartbeatService.scala (startup, gRPC stream, Redis TTL refresh)
  • src/main/resources/application.yml — coordinator connection config

Modify — root:

  • settings.gradle.kts — add include("modules/coordinator")

Verification

  1. ./compile — coordinator and core compile cleanly
  2. Stream detection: start core + coordinator; kill core JVM (kill -9); coordinator logs failover within 300ms
  3. Game continuity: active game on killed core; WebSocket client reconnects and receives game state
  4. Rebalance: create 600 games on core-1 (2-core setup); coordinator rebalances ~100 to core-2
  5. Fallback: disconnect gRPC stream manually but keep core alive; Redis TTL fallback triggers within 7s
  6. Cache eviction: create idle game; coordinator calls EvictGames after 45min idle
  7. REST metrics: curl localhost:8086/api/coordinator/metrics returns per-core load + health
  8. Restart recovery: restart coordinator; gRPC streams re-establish from cores; state rebuilt from Redis

Dependencies (new)

  • io.fabric8:kubernetes-client:6.13.0 (Fabric8 k8s client — handles Argo Rollout CRD via GenericKubernetesResource; no Argo Java SDK needed)
  • Redisson — already in core, reuse via shared config
  • Quarkus gRPC — already in core, reuse