diff --git a/COORDINATOR_BUGS.md b/COORDINATOR_BUGS.md new file mode 100644 index 0000000..c4e6949 --- /dev/null +++ b/COORDINATOR_BUGS.md @@ -0,0 +1,125 @@ +# Coordinator Module - Bug Report + +## Critical Bugs + +### 1. Cache Eviction Kills Correspondence Games (HIGH) +**File:** `CacheEvictionManager.scala:96-101` +**Problem:** Uses `lastHeartbeat` timestamp from GameCacheDto to determine if game is idle. But `lastHeartbeat` is set at store/update time, not move time. Correspondence games with days between moves get evicted while active. + +**Current Code:** +```scala +private def extractLastUpdatedTimestamp(json: String): Long = + Try { + val parsed = objectMapper.readTree(json) + Option(parsed.get("lastHeartbeat")) + .filter(_.isTextual) + .fold(0L)(lh => Instant.parse(lh.asText()).toEpochMilli) + }.getOrElse(0L) +``` + +**Impact:** Active correspondence games deleted from cache after idle threshold (config-dependent, typically hours/days) +**Fix:** Track actual move timestamp separately in GameCacheDto or check game state instead of heartbeat + +--- + +### 2. Concurrent Rebalance Race Condition (HIGH) +**File:** `LoadBalancer.scala:108-115` +**Problem:** `getGamesToMove()` reads games from Redis set but doesn't remove them atomically. If multiple rebalance calls run concurrently, same game can be selected in different batches and moved to multiple instances. + +**Current Code:** +```scala +private def getGamesToMove(instanceId: String, count: Int): List[String] = + try + val setKey = s"$redisPrefix:instance:$instanceId:games" + redis.set(classOf[String]).smembers(setKey).asScala.toList.take(count) // Read-only, no removal + catch + case ex: Exception => + log.debugf(ex, "Failed to get games for %s", instanceId) + List() +``` + +**Impact:** Game subscribed to 2+ instances, state corruption, double-processing +**Fix:** Use Redis SPOP (atomic pop) or Lua script for atomic read+remove + +--- + +### 3. Pod Matching is Unreliable (MEDIUM) +**File:** `HealthMonitor.scala:134, 188` +**Problem:** Uses `.contains()` string matching for pod name. Pod "core-1" matches instance "core-11"; loose matching causes wrong pod operations. + +**Current Code:** +```scala +instanceId.contains(podName) // Line 134 +// and +pods.find(pod => instanceId.contains(pod.getMetadata.getName)) // Line 188 +``` + +**Impact:** Wrong pod deleted/evicted when multiple similar names exist +**Fix:** Exact match or structured ID encoding + +--- + +## Medium Priority Bugs + +### 4. Inefficient Game-to-Instance Lookup (MEDIUM) +**File:** `CacheEvictionManager.scala:104-113` +**Problem:** Linear scan through ALL instances to find which one holds a game. Runs per-game during eviction scan every 5 minutes. + +**Current Code:** +```scala +private def findInstanceWithGame(gameId: String): Option[InstanceMetadata] = + try + instanceRegistry.getAllInstances.find { instance => // O(n) instances + val setKey = s"$redisPrefix:instance:${instance.instanceId}:games" + redis.set(classOf[String]).sismember(setKey, gameId) + } +``` + +**Impact:** Eviction scans slow with many instances (100+ instances = 100+ Redis ops per game) +**Fix:** Maintain `nowchess:game:$gameId:instance` → instanceId mapping in Redis + +--- + +### 5. Instance Registry Lookup on Pod Events (MEDIUM) +**File:** `HealthMonitor.scala:245-247` +**Problem:** Linear search through all instances every pod state change. Pod watch fires frequently. + +**Current Code:** +```scala +private def findRegisteredInstance(pod: Pod): Option[InstanceMetadata] = + val podName = pod.getMetadata.getName + instanceRegistry.getAllInstances.find(inst => inst.instanceId.contains(podName)) +``` + +**Impact:** O(n) lookup on hot path (pod watch events) +**Fix:** Maintain pod-name → instanceId index or use proper ID encoding + +--- + +## Low Priority Bugs + +### 6. Non-idiomatic Sorting (LOW) +**File:** `LoadBalancer.scala:72` +**Problem:** Uses `.sortBy[Int](_.subscriptionCount).reverse` instead of `.sortByDescending()` + +**Current Code:** +```scala +val overloaded = instances + .filter(_.subscriptionCount > config.maxGamesPerCore) + .sortBy[Int](_.subscriptionCount) // Type annotation unnecessary + .reverse +``` + +**Impact:** Micro code-quality issue +**Fix:** Use `.sortByDescending(_.subscriptionCount)` + +--- + +## Fix Priority + +1. **Cache eviction (HIGH)** — Data loss risk +2. **Rebalance race (HIGH)** — State corruption risk +3. **Pod matching (MEDIUM)** — Operational blast radius +4. **Game lookup (MEDIUM)** — Performance under scale +5. **Instance lookup (MEDIUM)** — Hot path perf +6. **Sorting (LOW)** — Code style diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala index e974ef9..1466992 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala @@ -75,6 +75,7 @@ class CacheEvictionManager: try coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId)) redis.key(classOf[String]).del(key) + redis.key(classOf[String]).del(s"$redisPrefix:game:$gameId:instance") meterRegistry.counter("nowchess.coordinator.cache.evictions").increment() log.infof("Evicted idle game %s from %s", gameId, instance.instanceId) count + 1 @@ -96,17 +97,18 @@ class CacheEvictionManager: private def extractLastUpdatedTimestamp(json: String): Long = Try { val parsed = objectMapper.readTree(json) - Option(parsed.get("lastHeartbeat")) - .filter(_.isTextual) - .fold(0L)(lh => Instant.parse(lh.asText()).toEpochMilli) + Option(parsed.get("lastUpdatedMs")) + .filter(_.isNumber) + .fold(0L)(_.asLong()) }.getOrElse(0L) private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] = try - instanceRegistry.getAllInstances.find { instance => - val setKey = s"$redisPrefix:instance:${instance.instanceId}:games" - redis.set(classOf[String]).sismember(setKey, gameId) - } + val mapKey = s"$redisPrefix:game:$gameId:instance" + Option(redis.value(classOf[String]).get(mapKey)) + .flatMap { instanceId => + instanceRegistry.getInstance(instanceId) + } catch case ex: Exception => log.debugf(ex, "Failed to find instance for game %s", gameId) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala index be4e441..fedc7ff 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala @@ -107,6 +107,7 @@ class FailoverService: try val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch) if subscribed > 0 then + updateGameInstanceMappings(batch, deadId, target.instanceId) log.infof("Migrated %d games from %s to %s", subscribed, deadId, target.instanceId) true else false @@ -116,6 +117,18 @@ class FailoverService: false if success then true else tryMigrateBatch(batch, batchIdx, instances, deadId, attempt + 1) + private def updateGameInstanceMappings(gameIds: List[String], deadId: String, targetId: String): Unit = + try + val fromKey = s"$redisPrefix:instance:$deadId:games" + val toKey = s"$redisPrefix:instance:$targetId:games" + gameIds.foreach { gameId => + redis.set(classOf[String]).sadd(toKey, gameId) + redis.value(classOf[String]).set(s"$redisPrefix:game:$gameId:instance", targetId) + } + catch + case ex: Exception => + log.errorf(ex, "Failed to update game instance mappings") + private def cleanupDeadInstance(instanceId: String): Unit = val setKey = s"$redisPrefix:instance:$instanceId:games" redis.key(classOf[String]).del(setKey) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala index 9b2de02..0ddc215 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala @@ -131,7 +131,7 @@ class HealthMonitor: pods.exists { pod => val podName = pod.getMetadata.getName - instanceId.contains(podName) && isPodReady(pod) + podName.endsWith(instanceId) && isPodReady(pod) } catch case ex: Exception => @@ -185,7 +185,7 @@ class HealthMonitor: .getItems .asScala - pods.find(pod => instanceId.contains(pod.getMetadata.getName)) match + pods.find(pod => pod.getMetadata.getName.endsWith(instanceId)) match case Some(pod) => val podName = pod.getMetadata.getName kube.pods().inNamespace(config.k8sNamespace).withName(podName).withGracePeriod(0L).delete() @@ -244,4 +244,4 @@ class HealthMonitor: private def findRegisteredInstance(pod: Pod): Option[InstanceMetadata] = val podName = pod.getMetadata.getName - instanceRegistry.getAllInstances.find(inst => inst.instanceId.contains(podName)) + instanceRegistry.getAllInstances.find(inst => podName.endsWith(inst.instanceId)) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala index ac5ca43..f3b94a6 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala @@ -69,7 +69,7 @@ class LoadBalancer: val overloaded = instances .filter(_.subscriptionCount > config.maxGamesPerCore) - .sortBy[Int](_.subscriptionCount) + .sortBy(_.subscriptionCount) .reverse val underloaded = instances .filter(_.subscriptionCount < avgLoad * 0.8) @@ -108,7 +108,10 @@ class LoadBalancer: private def getGamesToMove(instanceId: String, count: Int): List[String] = try val setKey = s"$redisPrefix:instance:$instanceId:games" - redis.set(classOf[String]).smembers(setKey).asScala.toList.take(count) + val result = scala.collection.mutable.ListBuffer[String]() + for _ <- 0 until count do + Option(redis.set(classOf[String]).spop(setKey)).foreach(result += _) + result.toList catch case ex: Exception => log.debugf(ex, "Failed to get games for %s", instanceId) @@ -116,12 +119,10 @@ class LoadBalancer: private def updateRedisGameSets(fromInstanceId: String, toInstanceId: String, gameIds: List[String]): Unit = try - val fromKey = s"$redisPrefix:instance:$fromInstanceId:games" - val toKey = s"$redisPrefix:instance:$toInstanceId:games" - + val toKey = s"$redisPrefix:instance:$toInstanceId:games" gameIds.foreach { gameId => - redis.set(classOf[String]).srem(fromKey, gameId) redis.set(classOf[String]).sadd(toKey, gameId) + redis.value(classOf[String]).set(s"$redisPrefix:game:$gameId:instance", toInstanceId) } catch case ex: Exception => diff --git a/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala b/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala index 05f14d2..e182ce7 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala @@ -22,4 +22,5 @@ case class GameCacheDto( pendingDrawOffer: Option[String], redoStack: List[String] = Nil, pendingTakebackRequest: Option[String] = None, + lastUpdatedMs: Long = System.currentTimeMillis(), ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala b/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala index 9a93c2a..925c459 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala @@ -143,6 +143,7 @@ class RedisGameRegistry extends GameRegistry: clockMoveDeadline = Option(record.clockMoveDeadline).map(_.longValue), clockActiveColor = Option(record.clockActiveColor), pendingDrawOffer = Option(record.pendingDrawOffer), + lastUpdatedMs = System.currentTimeMillis(), ) (dto, reconstruct(dto)) } match