feat(ws): migrate challenge notifications to Redis Streams (#66)

Replace pub/sub publish in EventPublisher with XADD to user event stream.
UserWebSocketResource subscribes via XREADGROUP consumer group (per-connection
group, '$' offset). DLQ after maxRetries=3 on delivery failure. Poll loop
uses connection identity to prevent thread leak on reconnect.

Closes NCS-104
https://knockoutwhist.youtrack.cloud/issue/NCS-104

Reviewed-on: #66
This commit was merged in pull request #66.
This commit is contained in:
2026-06-09 21:49:21 +02:00
parent d66b6fa471
commit 55f102cbaa
3 changed files with 154 additions and 26 deletions
@@ -42,19 +42,26 @@ class EventPublisher:
val payload = objectMapper.createObjectNode()
payload.put("challengeId", challengeId)
payload.put("challengerName", challengerName)
publish(s"${redisConfig.prefix}:user:$destUserId:events", EventType.ChallengeCreated, payload)
publishToUserStream(destUserId, EventType.ChallengeCreated, payload)
def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit =
val payload = objectMapper.createObjectNode()
payload.put("challengeId", challengeId)
payload.put("gameId", gameId)
publish(s"${redisConfig.prefix}:user:$challengerId:events", EventType.ChallengeAccepted, payload)
publishToUserStream(challengerId, EventType.ChallengeAccepted, payload)
private def publish(
channel: String,
private def publishToUserStream(
userId: String,
eventType: EventType,
payload: com.fasterxml.jackson.databind.node.ObjectNode,
): Unit =
val envelope = EventEnvelope.of(eventType, payload)
redis.pubsub(classOf[String]).publish(channel, objectMapper.writeValueAsString(envelope))
val json = objectMapper.writeValueAsString(envelope)
redis
.stream(classOf[String])
.xadd(
s"${redisConfig.prefix}:user:$userId:events:stream",
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json).asJava,
)
()
@@ -0,0 +1,56 @@
package de.nowchess.account.service
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import de.nowchess.account.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.*
import org.mockito.Mockito.*
import scala.compiletime.uninitialized
class EventPublisherTest:
// scalafix:off DisableSyntax.var
private var redis: RedisDataSource = uninitialized
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
private var redisConfig: RedisConfig = uninitialized
// scalafix:on DisableSyntax.var
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
@BeforeEach
def setup(): Unit =
redis = mock(classOf[RedisDataSource])
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
redisConfig = mock(classOf[RedisConfig])
when(redis.stream(classOf[String])).thenReturn(streamCmds)
when(redisConfig.prefix).thenReturn("nowchess")
private def publisher: EventPublisher =
val p = new EventPublisher
p.redis = redis
p.redisConfig = redisConfig
p.objectMapper = objectMapper
p
@Test
def publishChallengeCreatedWritesToUserStream(): Unit =
publisher.publishChallengeCreated("user1", "ch1", "Alice")
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:user:user1:events:stream"),
any(classOf[XAddArgs]),
any(),
)
verify(redis, never()).pubsub(any(classOf[Class[?]]))
@Test
def publishChallengeAcceptedWritesToUserStream(): Unit =
publisher.publishChallengeAccepted("user2", "ch1", "game42")
verify(streamCmds).xadd(
org.mockito.ArgumentMatchers.eq("nowchess:user:user2:events:stream"),
any(classOf[XAddArgs]),
any(),
)
verify(redis, never()).pubsub(any(classOf[Class[?]]))
@@ -2,15 +2,18 @@ package de.nowchess.ws.resource
import de.nowchess.ws.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import io.quarkus.redis.datasource.stream.{XAddArgs, XGroupCreateArgs, XReadGroupArgs}
import io.quarkus.websockets.next.*
import io.smallrye.jwt.auth.principal.JWTParser
import jakarta.inject.Inject
import org.eclipse.microprofile.context.ManagedExecutor
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.util.Try
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success, Try}
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
@WebSocket(path = "/api/user/ws")
class UserWebSocketResource:
@@ -18,20 +21,22 @@ class UserWebSocketResource:
private val log = Logger.getLogger(classOf[UserWebSocketResource])
// scalafix:off DisableSyntax.var
@Inject
var redis: RedisDataSource = uninitialized
@Inject
var redisConfig: RedisConfig = uninitialized
@Inject
var jwtParser: JWTParser = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var jwtParser: JWTParser = uninitialized
@Inject var executor: ManagedExecutor = uninitialized
// scalafix:on DisableSyntax.var
private val connections = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
private val consumerId = UUID.randomUUID().toString
private val maxRetries = 3
private val maxStreamLen = 1000L
private def userTopic(userId: String): String =
s"${redisConfig.prefix}:user:$userId:events"
private val connections = new ConcurrentHashMap[String, (String, WebSocketConnection)]()
private def userStreamKey(userId: String): String =
s"${redisConfig.prefix}:user:$userId:events:stream"
private def dlqKey: String = s"${redisConfig.prefix}:dlq"
@OnOpen
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
@@ -45,16 +50,76 @@ class UserWebSocketResource:
log.warn("WebSocket opened with no valid JWT — closing connection")
connection.close().subscribe().`with`(_ => (), _ => ())
case Some(userId) =>
log.infof("User WebSocket opened — userId=%s", userId)
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler)
connections.put(connection.id(), (userId, subscriber))
log.infof("User WebSocket opened — userId=%s connId=%s", userId, connection.id())
createGroupIfAbsent(userId, connection.id())
connections.put(connection.id(), (userId, connection))
executor.submit(
new Runnable:
def run(): Unit = pollLoop(connection.id(), userId, connection),
)
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
@OnClose
def onClose(connection: WebSocketConnection): Unit =
log.infof("User WebSocket closed — connectionId=%s", connection.id())
Option(connections.remove(connection.id())).foreach { (userId, subscriber) =>
subscriber.unsubscribe(userTopic(userId))
}
connections.remove(connection.id())
private def createGroupIfAbsent(userId: String, groupName: String): Unit =
Try(
redis
.stream(classOf[String])
.xgroupCreate(userStreamKey(userId), groupName, "$", new XGroupCreateArgs().mkstream()),
) match
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for userId=%s", userId)
case Success(_) => ()
private def pollLoop(connectionId: String, userId: String, myConnection: WebSocketConnection): Unit =
while Option(connections.get(connectionId)).exists(_._2 eq myConnection) do
Try {
val messages = redis
.stream(classOf[String])
.xreadgroup(
connectionId,
consumerId,
userStreamKey(userId),
">",
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
)
Option(messages).foreach(_.forEach { msg =>
if Option(connections.get(connectionId)).exists(_._2 eq myConnection) then
val json = msg.payload().get("data")
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
Try(myConnection.sendText(json).await().atMost(Duration.ofSeconds(5))) match
case Success(_) =>
ack(connectionId, userId, msg.id())
case Failure(_) if attempt + 1 < maxRetries =>
xadd(userStreamKey(userId), json, attempt + 1)
ack(connectionId, userId, msg.id())
case Failure(ex) =>
log.warnf(ex, "Delivery failed for userId=%s after %d attempts, sending to DLQ", userId, maxRetries)
xadd(dlqKey, json, attempt)
ack(connectionId, userId, msg.id())
})
} match
case Failure(ex) => log.warnf(ex, "Error in poll loop for userId=%s", userId)
case Success(_) => ()
private def ack(groupName: String, userId: String, id: String): Unit =
Try(redis.stream(classOf[String]).xack(userStreamKey(userId), groupName, id)) match
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for userId=%s", id, userId)
case Success(_) => ()
private def xadd(key: String, json: String, attempt: Int): Unit =
Try(
redis
.stream(classOf[String])
.xadd(
key,
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json, "attempt" -> attempt.toString).asJava,
),
) match
case Failure(ex) => log.warnf(ex, "Failed to publish to stream %s", key)
case Success(_) => ()