diff --git a/.codesight/CODESIGHT.md b/.codesight/CODESIGHT.md index b74c2a9..616268e 100644 --- a/.codesight/CODESIGHT.md +++ b/.codesight/CODESIGHT.md @@ -235,10 +235,10 @@ - function rebalanceInterval - function rebalanceMinInterval - function heartbeatTtl - - _...12 more_ + - _...14 more_ - `modules/coordinator/src/main/scala/de/nowchess/coordinator/config/JacksonConfig.scala` — class JacksonConfig, function customize - `modules/coordinator/src/main/scala/de/nowchess/coordinator/config/NativeReflectionConfig.scala` — class NativeReflectionConfig -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala` — class CoordinatorGrpcServer +- `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala` — class CoordinatorGrpcServer, function hasActiveStream - `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala` - class CoreGrpcClient - function shutdown @@ -272,16 +272,17 @@ - class HealthMonitor - function setRedisPrefix - function initializeMetrics + - function onStartup + - function periodicHealthCheck - function checkInstanceHealth - - function watchK8sPods - `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala` - class InstanceRegistry - function initMetrics - function setRedisPrefix + - function loadAllFromRedis - function getInstance - function getAllInstances - - function updateInstanceFromRedis - - _...3 more_ + - _...4 more_ - `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala` - class LoadBalancer - function setRedisPrefix diff --git a/.codesight/libs.md b/.codesight/libs.md index 15b418e..e65d86d 100644 --- a/.codesight/libs.md +++ b/.codesight/libs.md @@ -179,10 +179,10 @@ - function rebalanceInterval - function rebalanceMinInterval - function heartbeatTtl - - _...12 more_ + - _...14 more_ - `modules/coordinator/src/main/scala/de/nowchess/coordinator/config/JacksonConfig.scala` — class JacksonConfig, function customize - `modules/coordinator/src/main/scala/de/nowchess/coordinator/config/NativeReflectionConfig.scala` — class NativeReflectionConfig -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala` — class CoordinatorGrpcServer +- `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala` — class CoordinatorGrpcServer, function hasActiveStream - `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala` - class CoreGrpcClient - function shutdown @@ -216,16 +216,17 @@ - class HealthMonitor - function setRedisPrefix - function initializeMetrics + - function onStartup + - function periodicHealthCheck - function checkInstanceHealth - - function watchK8sPods - `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala` - class InstanceRegistry - function initMetrics - function setRedisPrefix + - function loadAllFromRedis - function getInstance - function getAllInstances - - function updateInstanceFromRedis - - _...3 more_ + - _...4 more_ - `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala` - class LoadBalancer - function setRedisPrefix diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index 30011ab..05e8f91 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -68,17 +68,15 @@ class GameRedisSubscriberManager: heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) - reactiveRedis - .pubsub(classOf[String]) - .subscribe(c2sTopic(gameId), handler) - .subscribe() - .`with`( - subscriber => { - c2sListeners.put(gameId, subscriber) - log.debugf("Subscribed to game %s", gameId) - }, - failure => log.warnf(failure, "Redis subscription failed for game %s", gameId), - ) + try + val subscriber = reactiveRedis + .pubsub(classOf[String]) + .subscribe(c2sTopic(gameId), handler) + .await() + .atMost(java.time.Duration.ofSeconds(5)) + c2sListeners.put(gameId, subscriber) + log.debugf("Subscribed to game %s", gameId) + catch case ex: Exception => log.warnf(ex, "Redis subscription failed for game %s", gameId) def unsubscribeGame(gameId: String): Unit = Option(c2sListeners.remove(gameId)).foreach { subscriber =>