From 9de49d31d9bee502462b4eba760f3bf7a3d979ba Mon Sep 17 00:00:00 2001 From: Janis Date: Tue, 9 Jun 2026 19:29:48 +0200 Subject: [PATCH] feat(ws): migrate challenge notifications to Redis Streams Replace pub/sub publish in EventPublisher with XADD to user event stream. UserWebSocketResource subscribes via XREADGROUP consumer group (per-connection group, '$' offset). DLQ after maxRetries=3 on delivery failure. Poll loop uses connection identity to prevent thread leak on reconnect. Closes NCS-104 https://knockoutwhist.youtrack.cloud/issue/NCS-104 --- .../account/service/EventPublisher.scala | 17 ++- .../account/service/EventPublisherTest.scala | 56 +++++++++ .../ws/resource/UserWebSocketResource.scala | 107 ++++++++++++++---- 3 files changed, 154 insertions(+), 26 deletions(-) create mode 100644 modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala diff --git a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala index 0f7cbaa..23dd87a 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala @@ -43,19 +43,26 @@ class EventPublisher: val payload = objectMapper.createObjectNode() payload.put("challengeId", challengeId) payload.put("challengerName", challengerName) - publish(s"${redisConfig.prefix}:user:$destUserId:events", EventType.ChallengeCreated, payload) + publishToUserStream(destUserId, EventType.ChallengeCreated, payload) def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit = val payload = objectMapper.createObjectNode() payload.put("challengeId", challengeId) payload.put("gameId", gameId) - publish(s"${redisConfig.prefix}:user:$challengerId:events", EventType.ChallengeAccepted, payload) + publishToUserStream(challengerId, EventType.ChallengeAccepted, payload) - private def publish( - channel: String, + private def publishToUserStream( + userId: String, eventType: EventType, payload: com.fasterxml.jackson.databind.node.ObjectNode, ): Unit = val envelope = EventEnvelope.of(eventType, payload) - redis.pubsub(classOf[String]).publish(channel, objectMapper.writeValueAsString(envelope)) + val json = objectMapper.writeValueAsString(envelope) + redis + .stream(classOf[String]) + .xadd( + s"${redisConfig.prefix}:user:$userId:events:stream", + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json).asJava, + ) () diff --git a/modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala b/modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala new file mode 100644 index 0000000..1e94893 --- /dev/null +++ b/modules/account/src/test/scala/de/nowchess/account/service/EventPublisherTest.scala @@ -0,0 +1,56 @@ +package de.nowchess.account.service + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import de.nowchess.account.config.RedisConfig +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs} +import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.* +import org.mockito.Mockito.* +import scala.compiletime.uninitialized + +class EventPublisherTest: + + // scalafix:off DisableSyntax.var + private var redis: RedisDataSource = uninitialized + private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized + private var redisConfig: RedisConfig = uninitialized + // scalafix:on DisableSyntax.var + + private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) + + @BeforeEach + def setup(): Unit = + redis = mock(classOf[RedisDataSource]) + streamCmds = mock(classOf[StreamCommands[String, String, Nothing]]) + redisConfig = mock(classOf[RedisConfig]) + when(redis.stream(classOf[String])).thenReturn(streamCmds) + when(redisConfig.prefix).thenReturn("nowchess") + + private def publisher: EventPublisher = + val p = new EventPublisher + p.redis = redis + p.redisConfig = redisConfig + p.objectMapper = objectMapper + p + + @Test + def publishChallengeCreatedWritesToUserStream(): Unit = + publisher.publishChallengeCreated("user1", "ch1", "Alice") + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:user:user1:events:stream"), + any(classOf[XAddArgs]), + any(), + ) + verify(redis, never()).pubsub(any(classOf[Class[?]])) + + @Test + def publishChallengeAcceptedWritesToUserStream(): Unit = + publisher.publishChallengeAccepted("user2", "ch1", "game42") + verify(streamCmds).xadd( + org.mockito.ArgumentMatchers.eq("nowchess:user:user2:events:stream"), + any(classOf[XAddArgs]), + any(), + ) + verify(redis, never()).pubsub(any(classOf[Class[?]])) diff --git a/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala b/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala index 89434f5..13c30ec 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala @@ -2,15 +2,18 @@ package de.nowchess.ws.resource import de.nowchess.ws.config.RedisConfig import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.stream.{XAddArgs, XGroupCreateArgs, XReadGroupArgs} import io.quarkus.websockets.next.* import io.smallrye.jwt.auth.principal.JWTParser import jakarta.inject.Inject +import org.eclipse.microprofile.context.ManagedExecutor import org.jboss.logging.Logger import scala.compiletime.uninitialized -import scala.util.Try +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import java.util.function.Consumer @WebSocket(path = "/api/user/ws") class UserWebSocketResource: @@ -18,20 +21,22 @@ class UserWebSocketResource: private val log = Logger.getLogger(classOf[UserWebSocketResource]) // scalafix:off DisableSyntax.var - @Inject - var redis: RedisDataSource = uninitialized - - @Inject - var redisConfig: RedisConfig = uninitialized - - @Inject - var jwtParser: JWTParser = uninitialized + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var jwtParser: JWTParser = uninitialized + @Inject var executor: ManagedExecutor = uninitialized // scalafix:on DisableSyntax.var - private val connections = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]() + private val consumerId = UUID.randomUUID().toString + private val maxRetries = 3 + private val maxStreamLen = 1000L - private def userTopic(userId: String): String = - s"${redisConfig.prefix}:user:$userId:events" + private val connections = new ConcurrentHashMap[String, (String, WebSocketConnection)]() + + private def userStreamKey(userId: String): String = + s"${redisConfig.prefix}:user:$userId:events:stream" + + private def dlqKey: String = s"${redisConfig.prefix}:dlq" @OnOpen def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit = @@ -45,16 +50,76 @@ class UserWebSocketResource: log.warn("WebSocket opened with no valid JWT — closing connection") connection.close().subscribe().`with`(_ => (), _ => ()) case Some(userId) => - log.infof("User WebSocket opened — userId=%s", userId) - val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ()) - val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler) - connections.put(connection.id(), (userId, subscriber)) + log.infof("User WebSocket opened — userId=%s connId=%s", userId, connection.id()) + createGroupIfAbsent(userId, connection.id()) + connections.put(connection.id(), (userId, connection)) + executor.submit( + new Runnable: + def run(): Unit = pollLoop(connection.id(), userId, connection), + ) val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}""" connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ()) @OnClose def onClose(connection: WebSocketConnection): Unit = log.infof("User WebSocket closed — connectionId=%s", connection.id()) - Option(connections.remove(connection.id())).foreach { (userId, subscriber) => - subscriber.unsubscribe(userTopic(userId)) - } + connections.remove(connection.id()) + + private def createGroupIfAbsent(userId: String, groupName: String): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(userStreamKey(userId), groupName, "$", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create consumer group for userId=%s", userId) + case Success(_) => () + + private def pollLoop(connectionId: String, userId: String, myConnection: WebSocketConnection): Unit = + while Option(connections.get(connectionId)).exists(_._2 eq myConnection) do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + connectionId, + consumerId, + userStreamKey(userId), + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach { msg => + if Option(connections.get(connectionId)).exists(_._2 eq myConnection) then + val json = msg.payload().get("data") + val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0) + Try(myConnection.sendText(json).await().atMost(Duration.ofSeconds(5))) match + case Success(_) => + ack(connectionId, userId, msg.id()) + case Failure(_) if attempt + 1 < maxRetries => + xadd(userStreamKey(userId), json, attempt + 1) + ack(connectionId, userId, msg.id()) + case Failure(ex) => + log.warnf(ex, "Delivery failed for userId=%s after %d attempts, sending to DLQ", userId, maxRetries) + xadd(dlqKey, json, attempt) + ack(connectionId, userId, msg.id()) + }) + } match + case Failure(ex) => log.warnf(ex, "Error in poll loop for userId=%s", userId) + case Success(_) => () + + private def ack(groupName: String, userId: String, id: String): Unit = + Try(redis.stream(classOf[String]).xack(userStreamKey(userId), groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack message %s for userId=%s", id, userId) + case Success(_) => () + + private def xadd(key: String, json: String, attempt: Int): Unit = + Try( + redis + .stream(classOf[String]) + .xadd( + key, + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json, "attempt" -> attempt.toString).asJava, + ), + ) match + case Failure(ex) => log.warnf(ex, "Failed to publish to stream %s", key) + case Success(_) => ()