feat(ws): migrate challenge notifications to Redis Streams
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
Replace pub/sub publish in EventPublisher with XADD to user event stream. UserWebSocketResource subscribes via XREADGROUP consumer group (per-connection group, '$' offset). DLQ after maxRetries=3 on delivery failure. Poll loop uses connection identity to prevent thread leak on reconnect. Closes NCS-104 https://knockoutwhist.youtrack.cloud/issue/NCS-104
This commit is contained in:
@@ -43,19 +43,26 @@ class EventPublisher:
|
||||
val payload = objectMapper.createObjectNode()
|
||||
payload.put("challengeId", challengeId)
|
||||
payload.put("challengerName", challengerName)
|
||||
publish(s"${redisConfig.prefix}:user:$destUserId:events", EventType.ChallengeCreated, payload)
|
||||
publishToUserStream(destUserId, EventType.ChallengeCreated, payload)
|
||||
|
||||
def publishChallengeAccepted(challengerId: String, challengeId: String, gameId: String): Unit =
|
||||
val payload = objectMapper.createObjectNode()
|
||||
payload.put("challengeId", challengeId)
|
||||
payload.put("gameId", gameId)
|
||||
publish(s"${redisConfig.prefix}:user:$challengerId:events", EventType.ChallengeAccepted, payload)
|
||||
publishToUserStream(challengerId, EventType.ChallengeAccepted, payload)
|
||||
|
||||
private def publish(
|
||||
channel: String,
|
||||
private def publishToUserStream(
|
||||
userId: String,
|
||||
eventType: EventType,
|
||||
payload: com.fasterxml.jackson.databind.node.ObjectNode,
|
||||
): Unit =
|
||||
val envelope = EventEnvelope.of(eventType, payload)
|
||||
redis.pubsub(classOf[String]).publish(channel, objectMapper.writeValueAsString(envelope))
|
||||
val json = objectMapper.writeValueAsString(envelope)
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xadd(
|
||||
s"${redisConfig.prefix}:user:$userId:events:stream",
|
||||
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||
Map("data" -> json).asJava,
|
||||
)
|
||||
()
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package de.nowchess.account.service
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
|
||||
import de.nowchess.account.config.RedisConfig
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.stream.{StreamCommands, XAddArgs}
|
||||
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||
import org.mockito.ArgumentMatchers.*
|
||||
import org.mockito.Mockito.*
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
class EventPublisherTest:
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
private var streamCmds: StreamCommands[String, String, Nothing] = uninitialized
|
||||
private var redisConfig: RedisConfig = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
|
||||
|
||||
@BeforeEach
|
||||
def setup(): Unit =
|
||||
redis = mock(classOf[RedisDataSource])
|
||||
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
|
||||
redisConfig = mock(classOf[RedisConfig])
|
||||
when(redis.stream(classOf[String])).thenReturn(streamCmds)
|
||||
when(redisConfig.prefix).thenReturn("nowchess")
|
||||
|
||||
private def publisher: EventPublisher =
|
||||
val p = new EventPublisher
|
||||
p.redis = redis
|
||||
p.redisConfig = redisConfig
|
||||
p.objectMapper = objectMapper
|
||||
p
|
||||
|
||||
@Test
|
||||
def publishChallengeCreatedWritesToUserStream(): Unit =
|
||||
publisher.publishChallengeCreated("user1", "ch1", "Alice")
|
||||
verify(streamCmds).xadd(
|
||||
org.mockito.ArgumentMatchers.eq("nowchess:user:user1:events:stream"),
|
||||
any(classOf[XAddArgs]),
|
||||
any(),
|
||||
)
|
||||
verify(redis, never()).pubsub(any(classOf[Class[?]]))
|
||||
|
||||
@Test
|
||||
def publishChallengeAcceptedWritesToUserStream(): Unit =
|
||||
publisher.publishChallengeAccepted("user2", "ch1", "game42")
|
||||
verify(streamCmds).xadd(
|
||||
org.mockito.ArgumentMatchers.eq("nowchess:user:user2:events:stream"),
|
||||
any(classOf[XAddArgs]),
|
||||
any(),
|
||||
)
|
||||
verify(redis, never()).pubsub(any(classOf[Class[?]]))
|
||||
@@ -2,15 +2,18 @@ package de.nowchess.ws.resource
|
||||
|
||||
import de.nowchess.ws.config.RedisConfig
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||
import io.quarkus.redis.datasource.stream.{XAddArgs, XGroupCreateArgs, XReadGroupArgs}
|
||||
import io.quarkus.websockets.next.*
|
||||
import io.smallrye.jwt.auth.principal.JWTParser
|
||||
import jakarta.inject.Inject
|
||||
import org.eclipse.microprofile.context.ManagedExecutor
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.util.Try
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import java.time.Duration
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.function.Consumer
|
||||
|
||||
@WebSocket(path = "/api/user/ws")
|
||||
class UserWebSocketResource:
|
||||
@@ -18,20 +21,22 @@ class UserWebSocketResource:
|
||||
private val log = Logger.getLogger(classOf[UserWebSocketResource])
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
var redis: RedisDataSource = uninitialized
|
||||
|
||||
@Inject
|
||||
var redisConfig: RedisConfig = uninitialized
|
||||
|
||||
@Inject
|
||||
var jwtParser: JWTParser = uninitialized
|
||||
@Inject var redis: RedisDataSource = uninitialized
|
||||
@Inject var redisConfig: RedisConfig = uninitialized
|
||||
@Inject var jwtParser: JWTParser = uninitialized
|
||||
@Inject var executor: ManagedExecutor = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val connections = new ConcurrentHashMap[String, (String, PubSubCommands.RedisSubscriber)]()
|
||||
private val consumerId = UUID.randomUUID().toString
|
||||
private val maxRetries = 3
|
||||
private val maxStreamLen = 1000L
|
||||
|
||||
private def userTopic(userId: String): String =
|
||||
s"${redisConfig.prefix}:user:$userId:events"
|
||||
private val connections = new ConcurrentHashMap[String, (String, WebSocketConnection)]()
|
||||
|
||||
private def userStreamKey(userId: String): String =
|
||||
s"${redisConfig.prefix}:user:$userId:events:stream"
|
||||
|
||||
private def dlqKey: String = s"${redisConfig.prefix}:dlq"
|
||||
|
||||
@OnOpen
|
||||
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
||||
@@ -45,16 +50,76 @@ class UserWebSocketResource:
|
||||
log.warn("WebSocket opened with no valid JWT — closing connection")
|
||||
connection.close().subscribe().`with`(_ => (), _ => ())
|
||||
case Some(userId) =>
|
||||
log.infof("User WebSocket opened — userId=%s", userId)
|
||||
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
||||
val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler)
|
||||
connections.put(connection.id(), (userId, subscriber))
|
||||
log.infof("User WebSocket opened — userId=%s connId=%s", userId, connection.id())
|
||||
createGroupIfAbsent(userId, connection.id())
|
||||
connections.put(connection.id(), (userId, connection))
|
||||
executor.submit(
|
||||
new Runnable:
|
||||
def run(): Unit = pollLoop(connection.id(), userId, connection),
|
||||
)
|
||||
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
|
||||
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
|
||||
|
||||
@OnClose
|
||||
def onClose(connection: WebSocketConnection): Unit =
|
||||
log.infof("User WebSocket closed — connectionId=%s", connection.id())
|
||||
Option(connections.remove(connection.id())).foreach { (userId, subscriber) =>
|
||||
subscriber.unsubscribe(userTopic(userId))
|
||||
}
|
||||
connections.remove(connection.id())
|
||||
|
||||
private def createGroupIfAbsent(userId: String, groupName: String): Unit =
|
||||
Try(
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xgroupCreate(userStreamKey(userId), groupName, "$", new XGroupCreateArgs().mkstream()),
|
||||
) match
|
||||
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for userId=%s", userId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def pollLoop(connectionId: String, userId: String, myConnection: WebSocketConnection): Unit =
|
||||
while Option(connections.get(connectionId)).exists(_._2 eq myConnection) do
|
||||
Try {
|
||||
val messages = redis
|
||||
.stream(classOf[String])
|
||||
.xreadgroup(
|
||||
connectionId,
|
||||
consumerId,
|
||||
userStreamKey(userId),
|
||||
">",
|
||||
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
|
||||
)
|
||||
Option(messages).foreach(_.forEach { msg =>
|
||||
if Option(connections.get(connectionId)).exists(_._2 eq myConnection) then
|
||||
val json = msg.payload().get("data")
|
||||
val attempt = Option(msg.payload().get("attempt")).flatMap(_.toIntOption).getOrElse(0)
|
||||
Try(myConnection.sendText(json).await().atMost(Duration.ofSeconds(5))) match
|
||||
case Success(_) =>
|
||||
ack(connectionId, userId, msg.id())
|
||||
case Failure(_) if attempt + 1 < maxRetries =>
|
||||
xadd(userStreamKey(userId), json, attempt + 1)
|
||||
ack(connectionId, userId, msg.id())
|
||||
case Failure(ex) =>
|
||||
log.warnf(ex, "Delivery failed for userId=%s after %d attempts, sending to DLQ", userId, maxRetries)
|
||||
xadd(dlqKey, json, attempt)
|
||||
ack(connectionId, userId, msg.id())
|
||||
})
|
||||
} match
|
||||
case Failure(ex) => log.warnf(ex, "Error in poll loop for userId=%s", userId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def ack(groupName: String, userId: String, id: String): Unit =
|
||||
Try(redis.stream(classOf[String]).xack(userStreamKey(userId), groupName, id)) match
|
||||
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for userId=%s", id, userId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def xadd(key: String, json: String, attempt: Int): Unit =
|
||||
Try(
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xadd(
|
||||
key,
|
||||
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||
Map("data" -> json, "attempt" -> attempt.toString).asJava,
|
||||
),
|
||||
) match
|
||||
case Failure(ex) => log.warnf(ex, "Failed to publish to stream %s", key)
|
||||
case Success(_) => ()
|
||||
|
||||
Reference in New Issue
Block a user