From 0eb752d4935377f75aab710b7f4eda4b29098e6a Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 6 May 2026 08:41:30 +0200 Subject: [PATCH] fix(redis): enhance GameRedisSubscriberManager to use ReactiveRedisDataSource and improve subscription handling --- .../redis/GameRedisSubscriberManager.scala | 58 ++++++++++--------- .../InternalClientHeadersFactory.scala | 5 +- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index f41fa26..30011ab 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -10,8 +10,9 @@ import de.nowchess.chess.observer.Observer import de.nowchess.chess.registry.GameRegistry import de.nowchess.chess.resource.GameDtoMapper import de.nowchess.chess.service.InstanceHeartbeatService +import io.quarkus.redis.datasource.ReactiveRedisDataSource import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands import jakarta.annotation.PreDestroy import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.inject.Instance @@ -29,6 +30,7 @@ class GameRedisSubscriberManager: // scalafix:off DisableSyntax.var @Inject var redis: RedisDataSource = uninitialized + @Inject var reactiveRedis: ReactiveRedisDataSource = uninitialized @Inject var registry: GameRegistry = uninitialized @Inject var objectMapper: ObjectMapper = uninitialized @Inject var redisConfig: RedisConfig = uninitialized @@ -40,7 +42,7 @@ class GameRedisSubscriberManager: if heartbeatServiceInstance.isUnsatisfied then None else Some(heartbeatServiceInstance.get()) - private val c2sListeners = new ConcurrentHashMap[String, PubSubCommands.RedisSubscriber]() + private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]() private val s2cObservers = new ConcurrentHashMap[String, Observer]() private def c2sTopic(gameId: String): String = @@ -50,35 +52,37 @@ class GameRedisSubscriberManager: s"${redisConfig.prefix}:game:$gameId:s2c" def subscribeGame(gameId: String): Unit = - try - val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) - val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler) - c2sListeners.put(gameId, subscriber) + val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json) + val obs = new GameRedisPublisher( + gameId, + registry, + redis, + objectMapper, + s2cTopicName(gameId), + writebackFn, + ioClient, + unsubscribeGame, + ) + s2cObservers.put(gameId, obs) + registry.get(gameId).foreach(_.engine.subscribe(obs)) + heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) - val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json) - val obs = new GameRedisPublisher( - gameId, - registry, - redis, - objectMapper, - s2cTopicName(gameId), - writebackFn, - ioClient, - unsubscribeGame, + val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) + reactiveRedis + .pubsub(classOf[String]) + .subscribe(c2sTopic(gameId), handler) + .subscribe() + .`with`( + subscriber => { + c2sListeners.put(gameId, subscriber) + log.debugf("Subscribed to game %s", gameId) + }, + failure => log.warnf(failure, "Redis subscription failed for game %s", gameId), ) - s2cObservers.put(gameId, obs) - registry.get(gameId).foreach(_.engine.subscribe(obs)) - log.debugf("Subscribed to game %s", gameId) - - heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) - catch - case e: Exception => - log.warnf(e, "Redis subscription failed for game %s", gameId) - () def unsubscribeGame(gameId: String): Unit = Option(c2sListeners.remove(gameId)).foreach { subscriber => - subscriber.unsubscribe(c2sTopic(gameId)) + subscriber.unsubscribe(c2sTopic(gameId)).subscribe().`with`(_ => (), _ => ()) } Option(s2cObservers.remove(gameId)).foreach { obs => registry.get(gameId).foreach(_.engine.unsubscribe(obs)) @@ -154,5 +158,5 @@ class GameRedisSubscriberManager: @PreDestroy def cleanup(): Unit = - c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId))) + c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely()) s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs))) diff --git a/modules/security/src/main/scala/de/nowchess/security/InternalClientHeadersFactory.scala b/modules/security/src/main/scala/de/nowchess/security/InternalClientHeadersFactory.scala index 531c081..d81c433 100644 --- a/modules/security/src/main/scala/de/nowchess/security/InternalClientHeadersFactory.scala +++ b/modules/security/src/main/scala/de/nowchess/security/InternalClientHeadersFactory.scala @@ -18,7 +18,10 @@ class InternalClientHeadersFactory extends DefaultClientHeadersFactoryImpl { var authEnabled: Boolean = uninitialized // scalafix:on DisableSyntax.var - override def update(incomingHeaders: MultivaluedMap[String, String], clientOutgoingHeaders: MultivaluedMap[String, String]): MultivaluedMap[String, String] = { + override def update( + incomingHeaders: MultivaluedMap[String, String], + clientOutgoingHeaders: MultivaluedMap[String, String], + ): MultivaluedMap[String, String] = { val default = super.update(incomingHeaders, clientOutgoingHeaders) default.putSingle("X-Internal-Secret", secret) default