feat(redis): migrate from Redisson to Quarkus Redis client and update configuration

This commit is contained in:
2026-04-28 22:44:10 +02:00
parent 0652dd2d2f
commit 5d97c3c8b5
58 changed files with 209 additions and 504 deletions
@@ -3,6 +3,8 @@ quarkus:
port: 8080
application:
name: nowchess-core
redis:
hosts: redis://${REDIS_HOST:localhost}:${REDIS_PORT:6379}
grpc:
clients:
rule-grpc:
@@ -7,12 +7,6 @@ import scala.compiletime.uninitialized
@ApplicationScoped
class RedisConfig:
// scalafix:off DisableSyntax.var
@ConfigProperty(name = "nowchess.redis.host", defaultValue = "localhost")
var host: String = uninitialized
@ConfigProperty(name = "nowchess.redis.port", defaultValue = "6379")
var port: Int = uninitialized
@ConfigProperty(name = "nowchess.redis.prefix", defaultValue = "nowchess")
var prefix: String = uninitialized
// scalafix:on DisableSyntax.var
@@ -1,35 +0,0 @@
package de.nowchess.chess.config
import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Produces
import jakarta.inject.Inject
import org.redisson.Redisson
import org.redisson.api.RedissonClient
import org.redisson.config.Config
import scala.compiletime.uninitialized
@ApplicationScoped
class RedissonProducer:
// scalafix:off DisableSyntax.var
@Inject
var redisConfig: RedisConfig = uninitialized
private var clientOpt: Option[RedissonClient] = None
// scalafix:on DisableSyntax.var
@Produces
@ApplicationScoped
def produceRedissonClient(): RedissonClient =
val config = new Config()
config.useSingleServer().setAddress(s"redis://${redisConfig.host}:${redisConfig.port}")
config.useSingleServer().setConnectionMinimumIdleSize(1)
config.useSingleServer().setConnectTimeout(500)
val client = Redisson.create(config)
clientOpt = Some(client)
client
@PreDestroy
def shutdown(): Unit =
clientOpt.foreach(_.shutdown())
@@ -9,12 +9,12 @@ import de.nowchess.api.board.Color
import de.nowchess.chess.observer.{GameEvent, Observer}
import de.nowchess.chess.registry.GameRegistry
import de.nowchess.chess.resource.GameDtoMapper
import org.redisson.api.RTopic
import io.quarkus.redis.datasource.RedisDataSource
class GameRedisPublisher(
gameId: String,
registry: GameRegistry,
redisson: org.redisson.api.RedissonClient,
redis: RedisDataSource,
objectMapper: ObjectMapper,
s2cTopicName: String,
writebackEmit: String => Unit,
@@ -26,7 +26,7 @@ class GameRedisPublisher(
registry.get(gameId).foreach { entry =>
val dto = GameDtoMapper.toGameStateDto(entry, ioClient)
val json = objectMapper.writeValueAsString(GameStateEventDto(dto))
redisson.getTopic(s2cTopicName).publish(json)
redis.pubsub(classOf[String]).publish(s2cTopicName, json)
val clock = entry.engine.currentClockState
val wb = GameWritebackEventDto(
@@ -10,20 +10,21 @@ import de.nowchess.chess.observer.Observer
import de.nowchess.chess.registry.GameRegistry
import de.nowchess.chess.resource.GameDtoMapper
import de.nowchess.chess.service.InstanceHeartbeatService
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.redisson.api.listener.MessageListener
import org.redisson.api.RedissonClient
import scala.compiletime.uninitialized
import scala.util.Try
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
@ApplicationScoped
class GameRedisSubscriberManager:
// scalafix:off DisableSyntax.var
@Inject var redisson: RedissonClient = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var registry: GameRegistry = uninitialized
@Inject var objectMapper: ObjectMapper = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@@ -31,7 +32,7 @@ class GameRedisSubscriberManager:
@Inject var heartbeatService: InstanceHeartbeatService = null
// scalafix:on DisableSyntax.var
private val c2sListeners = new ConcurrentHashMap[String, Int]()
private val c2sListeners = new ConcurrentHashMap[String, PubSubCommands.RedisSubscriber]()
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
private def c2sTopic(gameId: String): String =
@@ -42,21 +43,15 @@ class GameRedisSubscriberManager:
def subscribeGame(gameId: String): Unit =
try
val topic = redisson.getTopic(c2sTopic(gameId))
val listenerId = topic.addListener(
classOf[String],
new MessageListener[String]:
def onMessage(channel: CharSequence, msg: String): Unit =
handleC2sMessage(gameId, msg),
)
c2sListeners.put(gameId, listenerId)
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler)
c2sListeners.put(gameId, subscriber)
val writebackTopic = redisson.getTopic("game-writeback")
val writebackFn: String => Unit = json => writebackTopic.publish(json)
val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json)
val obs = new GameRedisPublisher(
gameId,
registry,
redisson,
redis,
objectMapper,
s2cTopicName(gameId),
writebackFn,
@@ -73,8 +68,8 @@ class GameRedisSubscriberManager:
()
def unsubscribeGame(gameId: String): Unit =
Option(c2sListeners.remove(gameId)).foreach { listenerId =>
redisson.getTopic(c2sTopic(gameId)).removeListener(listenerId)
Option(c2sListeners.remove(gameId)).foreach { subscriber =>
subscriber.unsubscribe(c2sTopic(gameId))
}
Option(s2cObservers.remove(gameId)).foreach { obs =>
registry.get(gameId).foreach(_.engine.unsubscribe(obs))
@@ -84,16 +79,16 @@ class GameRedisSubscriberManager:
private def handleC2sMessage(gameId: String, msg: String): Unit =
parseC2sMessage(msg) match
case Some(C2sMessage.Connected) => handleConnected(gameId)
case Some(C2sMessage.Connected) => handleConnected(gameId)
case Some(C2sMessage.Move(uci, playerId)) => handleMove(gameId, uci, playerId)
case Some(C2sMessage.Ping) => ()
case None => ()
case Some(C2sMessage.Ping) => ()
case None => ()
private def handleConnected(gameId: String): Unit =
registry.get(gameId).foreach { entry =>
val dto = GameDtoMapper.toGameFullDto(entry, ioClient)
val json = objectMapper.writeValueAsString(GameFullEventDto(dto))
redisson.getTopic(s2cTopicName(gameId)).publish(json)
redis.pubsub(classOf[String]).publish(s2cTopicName(gameId), json)
}
private def handleMove(gameId: String, uci: String, playerId: Option[String]): Unit =
@@ -120,8 +115,8 @@ class GameRedisSubscriberManager:
val pid = Option(node.get("playerId")).map(_.asText()).filter(_.nonEmpty)
C2sMessage.Move(u.asText(), pid)
}
case "PING" => Some(C2sMessage.Ping)
case _ => None
case "PING" => Some(C2sMessage.Ping)
case _ => None
}
}
@@ -157,5 +152,5 @@ class GameRedisSubscriberManager:
@PreDestroy
def cleanup(): Unit =
c2sListeners.forEach((gameId, listenerId) => redisson.getTopic(c2sTopic(gameId)).removeListener(listenerId))
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)))
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
@@ -12,23 +12,22 @@ import de.nowchess.chess.grpc.RuleSetGrpcAdapter
import de.nowchess.chess.config.RedisConfig
import de.nowchess.chess.grpc.IoGrpcClientWrapper
import de.nowchess.chess.resource.GameDtoMapper
import io.quarkus.redis.datasource.RedisDataSource
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.eclipse.microprofile.rest.client.inject.RestClient
import org.redisson.api.RedissonClient
import scala.annotation.nowarn
import scala.compiletime.uninitialized
import scala.util.Try
import java.nio.charset.StandardCharsets
import java.security.{MessageDigest, SecureRandom}
import java.time.Instant
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.ConcurrentHashMap
@ApplicationScoped
class RedisGameRegistry extends GameRegistry:
@Inject
// scalafix:off DisableSyntax.var
var redisson: RedissonClient = uninitialized
var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var objectMapper: ObjectMapper = uninitialized
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
@@ -40,7 +39,6 @@ class RedisGameRegistry extends GameRegistry:
private val rng = new SecureRandom()
private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId"
private def bucket(gameId: String) = redisson.getBucket[String](cacheKey(gameId))
def generateId(): String =
val chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
@@ -49,9 +47,7 @@ class RedisGameRegistry extends GameRegistry:
def store(entry: GameEntry): Unit =
localEngines.put(entry.gameId, entry)
val combined = ioClient.exportCombined(entry.engine.context)
val b = bucket(entry.gameId)
b.set(toJson(entry, combined.fen, combined.pgn))
(b.expire(30, TimeUnit.MINUTES): @nowarn)
redis.value(classOf[String]).setex(cacheKey(entry.gameId), 1800L, toJson(entry, combined.fen, combined.pgn))
def get(gameId: String): Option[GameEntry] =
Option(localEngines.get(gameId)) match
@@ -66,12 +62,10 @@ class RedisGameRegistry extends GameRegistry:
def update(entry: GameEntry): Unit =
localEngines.put(entry.gameId, entry)
val combined = ioClient.exportCombined(entry.engine.context)
val b = bucket(entry.gameId)
b.set(toJson(entry, combined.fen, combined.pgn))
(b.expire(30, TimeUnit.MINUTES): @nowarn)
redis.value(classOf[String]).setex(cacheKey(entry.gameId), 1800L, toJson(entry, combined.fen, combined.pgn))
private def readRedisDto(gameId: String): Option[GameCacheDto] =
Try(Option(bucket(gameId).get())).toOption.flatten.flatMap { json =>
Try(Option(redis.value(classOf[String]).get(cacheKey(gameId)))).toOption.flatten.flatMap { json =>
Try(objectMapper.readValue(json, classOf[GameCacheDto])).toOption
}
@@ -111,9 +105,7 @@ class RedisGameRegistry extends GameRegistry:
}.toOption
.map { case (dto, entry) =>
localEngines.put(gameId, entry)
val b = bucket(gameId)
b.set(objectMapper.writeValueAsString(dto))
(b.expire(30, TimeUnit.MINUTES): @nowarn)
redis.value(classOf[String]).setex(cacheKey(gameId), 1800L, objectMapper.writeValueAsString(dto))
entry
}
@@ -7,8 +7,7 @@ import io.quarkus.runtime.StartupEvent
import io.quarkus.runtime.ShutdownEvent
import io.quarkus.grpc.GrpcClient
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.redisson.api.RedissonClient
import scala.annotation.nowarn
import io.quarkus.redis.datasource.RedisDataSource
import scala.compiletime.uninitialized
import java.util.concurrent.{Executors, TimeUnit}
import java.net.InetAddress
@@ -22,7 +21,7 @@ import io.grpc.Channel
@ApplicationScoped
class InstanceHeartbeatService:
@Inject
private var redissonClient: RedissonClient = uninitialized
private var redis: RedisDataSource = uninitialized
@GrpcClient("coordinator-grpc")
private var channel: Channel = uninitialized
@@ -95,17 +94,15 @@ class InstanceHeartbeatService:
def addGameSubscription(gameId: String): Unit =
if !coordinatorEnabled then return
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.add(gameId)
val setKey = s"$redisPrefix:instance:$instanceId:games"
redis.set(classOf[String]).sadd(setKey, gameId)
subscriptionCount += 1
def removeGameSubscription(gameId: String): Unit =
if !coordinatorEnabled then return
val setKey = s"$redisPrefix:instance:$instanceId:games"
val gameSet = redissonClient.getSet[String](setKey)
gameSet.remove(gameId)
val setKey = s"$redisPrefix:instance:$instanceId:games"
redis.set(classOf[String]).srem(setKey, gameId)
subscriptionCount = Math.max(0, subscriptionCount - 1)
private def generateInstanceId(): Unit =
@@ -177,8 +174,7 @@ class InstanceHeartbeatService:
private def refreshRedisHeartbeat(): Unit =
try
val key = s"$redisPrefix:instances:$instanceId"
val bucket = redissonClient.getBucket[String](key)
val key = s"$redisPrefix:instances:$instanceId"
val metadata = Map(
"instanceId" -> instanceId,
@@ -192,8 +188,7 @@ class InstanceHeartbeatService:
)
val json = mapper.writeValueAsString(metadata)
bucket.set(json)
(bucket.expire(5, TimeUnit.SECONDS): @nowarn)
redis.value(classOf[String]).setex(key, 5L, json)
catch
case ex: Exception =>
log.warnf(ex, "Failed to refresh Redis heartbeat")
@@ -208,10 +203,10 @@ class InstanceHeartbeatService:
if instanceId.nonEmpty then
val key = s"$redisPrefix:instances:$instanceId"
redissonClient.getBucket[String](key).delete()
redis.key(classOf[String]).del(key)
val setKey = s"$redisPrefix:instance:$instanceId:games"
redissonClient.getSet[String](setKey).delete()
redis.key(classOf[String]).del(setKey)
heartbeatExecutor.shutdown()
redisHeartbeatExecutor.shutdown()
@@ -12,6 +12,10 @@ quarkus:
url: http://localhost:8085
nowchess:
internal:
secret: test-secret
auth:
enabled: false
coordinator:
enabled: false
redis:
@@ -1,17 +1,17 @@
package de.nowchess.chess.config
import io.quarkus.redis.datasource.RedisDataSource
import jakarta.annotation.Priority
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Alternative
import jakarta.enterprise.inject.Produces
import org.mockito.Mockito
import org.redisson.api.RedissonClient
@Alternative
@Priority(1)
@ApplicationScoped
class MockRedissonProducer:
class MockRedisDataSourceProducer:
@Produces
@ApplicationScoped
def produceRedissonClient(): RedissonClient =
Mockito.mock(classOf[RedissonClient], Mockito.RETURNS_DEEP_STUBS)
def produceRedisDataSource(): RedisDataSource =
Mockito.mock(classOf[RedisDataSource], Mockito.RETURNS_DEEP_STUBS)
@@ -40,7 +40,7 @@ class GameResourceIntegrationTest:
@BeforeEach
def setupMocks(): Unit =
when(jwt.getClaim[AnyRef]("type")).thenReturn("bot")
when(jwt.getClaim[AnyRef]("type")).thenReturn("user")
when(ioWrapper.importFen(any[String]())).thenReturn(GameContext.initial)
when(ioWrapper.importPgn(any[String]())).thenAnswer((inv: InvocationOnMock) =>