fix: resolve 6 coordinator bugs (cache eviction, rebalance race, pod matching, lookup inefficiency)
- Add lastUpdatedMs timestamp to GameCacheDto to track actual game updates instead of heartbeat time. Fix cache eviction incorrectly marking correspondence games as idle. - Use atomic SPOP in LoadBalancer.getGamesToMove() to prevent concurrent rebalance calls from selecting same games for migration. - Add game→instance reverse mapping (nowchess:game:$gameId:instance) to eliminate O(instances) linear scan during cache eviction. - Fix HealthMonitor pod matching from loose contains() to reliable endsWith() to prevent matching unintended pods with similar names. - Update FailoverService to maintain game→instance mappings when migrating games during failover. - Update CacheEvictionManager to use game→instance mapping for O(1) lookup instead of O(n) instance scan. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
+9
-7
@@ -75,6 +75,7 @@ class CacheEvictionManager:
|
||||
try
|
||||
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
||||
redis.key(classOf[String]).del(key)
|
||||
redis.key(classOf[String]).del(s"$redisPrefix:game:$gameId:instance")
|
||||
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment()
|
||||
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
||||
count + 1
|
||||
@@ -96,17 +97,18 @@ class CacheEvictionManager:
|
||||
private def extractLastUpdatedTimestamp(json: String): Long =
|
||||
Try {
|
||||
val parsed = objectMapper.readTree(json)
|
||||
Option(parsed.get("lastHeartbeat"))
|
||||
.filter(_.isTextual)
|
||||
.fold(0L)(lh => Instant.parse(lh.asText()).toEpochMilli)
|
||||
Option(parsed.get("lastUpdatedMs"))
|
||||
.filter(_.isNumber)
|
||||
.fold(0L)(_.asLong())
|
||||
}.getOrElse(0L)
|
||||
|
||||
private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] =
|
||||
try
|
||||
instanceRegistry.getAllInstances.find { instance =>
|
||||
val setKey = s"$redisPrefix:instance:${instance.instanceId}:games"
|
||||
redis.set(classOf[String]).sismember(setKey, gameId)
|
||||
}
|
||||
val mapKey = s"$redisPrefix:game:$gameId:instance"
|
||||
Option(redis.value(classOf[String]).get(mapKey))
|
||||
.flatMap { instanceId =>
|
||||
instanceRegistry.getInstance(instanceId)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.debugf(ex, "Failed to find instance for game %s", gameId)
|
||||
|
||||
+13
@@ -107,6 +107,7 @@ class FailoverService:
|
||||
try
|
||||
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
||||
if subscribed > 0 then
|
||||
updateGameInstanceMappings(batch, deadId, target.instanceId)
|
||||
log.infof("Migrated %d games from %s to %s", subscribed, deadId, target.instanceId)
|
||||
true
|
||||
else false
|
||||
@@ -116,6 +117,18 @@ class FailoverService:
|
||||
false
|
||||
if success then true else tryMigrateBatch(batch, batchIdx, instances, deadId, attempt + 1)
|
||||
|
||||
private def updateGameInstanceMappings(gameIds: List[String], deadId: String, targetId: String): Unit =
|
||||
try
|
||||
val fromKey = s"$redisPrefix:instance:$deadId:games"
|
||||
val toKey = s"$redisPrefix:instance:$targetId:games"
|
||||
gameIds.foreach { gameId =>
|
||||
redis.set(classOf[String]).sadd(toKey, gameId)
|
||||
redis.value(classOf[String]).set(s"$redisPrefix:game:$gameId:instance", targetId)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.errorf(ex, "Failed to update game instance mappings")
|
||||
|
||||
private def cleanupDeadInstance(instanceId: String): Unit =
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.key(classOf[String]).del(setKey)
|
||||
|
||||
+3
-3
@@ -131,7 +131,7 @@ class HealthMonitor:
|
||||
|
||||
pods.exists { pod =>
|
||||
val podName = pod.getMetadata.getName
|
||||
instanceId.contains(podName) && isPodReady(pod)
|
||||
podName.endsWith(instanceId) && isPodReady(pod)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
@@ -185,7 +185,7 @@ class HealthMonitor:
|
||||
.getItems
|
||||
.asScala
|
||||
|
||||
pods.find(pod => instanceId.contains(pod.getMetadata.getName)) match
|
||||
pods.find(pod => pod.getMetadata.getName.endsWith(instanceId)) match
|
||||
case Some(pod) =>
|
||||
val podName = pod.getMetadata.getName
|
||||
kube.pods().inNamespace(config.k8sNamespace).withName(podName).withGracePeriod(0L).delete()
|
||||
@@ -244,4 +244,4 @@ class HealthMonitor:
|
||||
|
||||
private def findRegisteredInstance(pod: Pod): Option[InstanceMetadata] =
|
||||
val podName = pod.getMetadata.getName
|
||||
instanceRegistry.getAllInstances.find(inst => inst.instanceId.contains(podName))
|
||||
instanceRegistry.getAllInstances.find(inst => podName.endsWith(inst.instanceId))
|
||||
|
||||
+7
-6
@@ -69,7 +69,7 @@ class LoadBalancer:
|
||||
|
||||
val overloaded = instances
|
||||
.filter(_.subscriptionCount > config.maxGamesPerCore)
|
||||
.sortBy[Int](_.subscriptionCount)
|
||||
.sortBy(_.subscriptionCount)
|
||||
.reverse
|
||||
val underloaded = instances
|
||||
.filter(_.subscriptionCount < avgLoad * 0.8)
|
||||
@@ -108,7 +108,10 @@ class LoadBalancer:
|
||||
private def getGamesToMove(instanceId: String, count: Int): List[String] =
|
||||
try
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).smembers(setKey).asScala.toList.take(count)
|
||||
val result = scala.collection.mutable.ListBuffer[String]()
|
||||
for _ <- 0 until count do
|
||||
Option(redis.set(classOf[String]).spop(setKey)).foreach(result += _)
|
||||
result.toList
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.debugf(ex, "Failed to get games for %s", instanceId)
|
||||
@@ -116,12 +119,10 @@ class LoadBalancer:
|
||||
|
||||
private def updateRedisGameSets(fromInstanceId: String, toInstanceId: String, gameIds: List[String]): Unit =
|
||||
try
|
||||
val fromKey = s"$redisPrefix:instance:$fromInstanceId:games"
|
||||
val toKey = s"$redisPrefix:instance:$toInstanceId:games"
|
||||
|
||||
val toKey = s"$redisPrefix:instance:$toInstanceId:games"
|
||||
gameIds.foreach { gameId =>
|
||||
redis.set(classOf[String]).srem(fromKey, gameId)
|
||||
redis.set(classOf[String]).sadd(toKey, gameId)
|
||||
redis.value(classOf[String]).set(s"$redisPrefix:game:$gameId:instance", toInstanceId)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
|
||||
Reference in New Issue
Block a user