fix(redis): enhance GameRedisSubscriberManager to use ReactiveRedisDataSource and improve subscription handling
Build & Test (NowChessSystems) TeamCity build finished

This commit is contained in:
2026-05-06 08:41:30 +02:00
parent e279c39246
commit 0eb752d493
2 changed files with 35 additions and 28 deletions
@@ -10,8 +10,9 @@ import de.nowchess.chess.observer.Observer
import de.nowchess.chess.registry.GameRegistry import de.nowchess.chess.registry.GameRegistry
import de.nowchess.chess.resource.GameDtoMapper import de.nowchess.chess.resource.GameDtoMapper
import de.nowchess.chess.service.InstanceHeartbeatService import de.nowchess.chess.service.InstanceHeartbeatService
import io.quarkus.redis.datasource.ReactiveRedisDataSource
import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
import jakarta.annotation.PreDestroy import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Instance import jakarta.enterprise.inject.Instance
@@ -29,6 +30,7 @@ class GameRedisSubscriberManager:
// scalafix:off DisableSyntax.var // scalafix:off DisableSyntax.var
@Inject var redis: RedisDataSource = uninitialized @Inject var redis: RedisDataSource = uninitialized
@Inject var reactiveRedis: ReactiveRedisDataSource = uninitialized
@Inject var registry: GameRegistry = uninitialized @Inject var registry: GameRegistry = uninitialized
@Inject var objectMapper: ObjectMapper = uninitialized @Inject var objectMapper: ObjectMapper = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized @Inject var redisConfig: RedisConfig = uninitialized
@@ -40,7 +42,7 @@ class GameRedisSubscriberManager:
if heartbeatServiceInstance.isUnsatisfied then None if heartbeatServiceInstance.isUnsatisfied then None
else Some(heartbeatServiceInstance.get()) else Some(heartbeatServiceInstance.get())
private val c2sListeners = new ConcurrentHashMap[String, PubSubCommands.RedisSubscriber]() private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]()
private val s2cObservers = new ConcurrentHashMap[String, Observer]() private val s2cObservers = new ConcurrentHashMap[String, Observer]()
private def c2sTopic(gameId: String): String = private def c2sTopic(gameId: String): String =
@@ -50,35 +52,37 @@ class GameRedisSubscriberManager:
s"${redisConfig.prefix}:game:$gameId:s2c" s"${redisConfig.prefix}:game:$gameId:s2c"
def subscribeGame(gameId: String): Unit = def subscribeGame(gameId: String): Unit =
try val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json)
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) val obs = new GameRedisPublisher(
val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler) gameId,
c2sListeners.put(gameId, subscriber) registry,
redis,
objectMapper,
s2cTopicName(gameId),
writebackFn,
ioClient,
unsubscribeGame,
)
s2cObservers.put(gameId, obs)
registry.get(gameId).foreach(_.engine.subscribe(obs))
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json) val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
val obs = new GameRedisPublisher( reactiveRedis
gameId, .pubsub(classOf[String])
registry, .subscribe(c2sTopic(gameId), handler)
redis, .subscribe()
objectMapper, .`with`(
s2cTopicName(gameId), subscriber => {
writebackFn, c2sListeners.put(gameId, subscriber)
ioClient, log.debugf("Subscribed to game %s", gameId)
unsubscribeGame, },
failure => log.warnf(failure, "Redis subscription failed for game %s", gameId),
) )
s2cObservers.put(gameId, obs)
registry.get(gameId).foreach(_.engine.subscribe(obs))
log.debugf("Subscribed to game %s", gameId)
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
catch
case e: Exception =>
log.warnf(e, "Redis subscription failed for game %s", gameId)
()
def unsubscribeGame(gameId: String): Unit = def unsubscribeGame(gameId: String): Unit =
Option(c2sListeners.remove(gameId)).foreach { subscriber => Option(c2sListeners.remove(gameId)).foreach { subscriber =>
subscriber.unsubscribe(c2sTopic(gameId)) subscriber.unsubscribe(c2sTopic(gameId)).subscribe().`with`(_ => (), _ => ())
} }
Option(s2cObservers.remove(gameId)).foreach { obs => Option(s2cObservers.remove(gameId)).foreach { obs =>
registry.get(gameId).foreach(_.engine.unsubscribe(obs)) registry.get(gameId).foreach(_.engine.unsubscribe(obs))
@@ -154,5 +158,5 @@ class GameRedisSubscriberManager:
@PreDestroy @PreDestroy
def cleanup(): Unit = def cleanup(): Unit =
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId))) c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely())
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs))) s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
@@ -18,7 +18,10 @@ class InternalClientHeadersFactory extends DefaultClientHeadersFactoryImpl {
var authEnabled: Boolean = uninitialized var authEnabled: Boolean = uninitialized
// scalafix:on DisableSyntax.var // scalafix:on DisableSyntax.var
override def update(incomingHeaders: MultivaluedMap[String, String], clientOutgoingHeaders: MultivaluedMap[String, String]): MultivaluedMap[String, String] = { override def update(
incomingHeaders: MultivaluedMap[String, String],
clientOutgoingHeaders: MultivaluedMap[String, String],
): MultivaluedMap[String, String] = {
val default = super.update(incomingHeaders, clientOutgoingHeaders) val default = super.update(incomingHeaders, clientOutgoingHeaders)
default.putSingle("X-Internal-Secret", secret) default.putSingle("X-Internal-Secret", secret)
default default