From ae3ef766e8b7596a09e466cd4fb386119f17ca5c Mon Sep 17 00:00:00 2001 From: Janis Date: Fri, 22 May 2026 12:37:39 +0200 Subject: [PATCH] feat(redis): implement game writeback stream processing with error handling and retries --- .../redis/GameRedisSubscriberManager.scala | 4 +- .../redis/GameWritebackStreamListener.scala | 89 +++++++++++++++---- 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index c7843b1..04f0477 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -20,6 +20,7 @@ import jakarta.enterprise.inject.Instance import jakarta.inject.Inject import org.jboss.logging.Logger import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* import scala.util.Try import java.util.concurrent.ConcurrentHashMap import java.util.function.Consumer @@ -77,7 +78,8 @@ class GameRedisSubscriberManager: s"${redisConfig.prefix}:game:$gameId:s2c" def subscribeGame(gameId: String): Unit = - val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json) + val writebackFn: String => Unit = json => + redis.stream(classOf[String]).xadd(s"${redisConfig.prefix}:game-writeback", Map("data" -> json).asJava) val obs = new GameRedisPublisher( gameId, registry, diff --git a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala index cf68766..519ae2d 100644 --- a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala +++ b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala @@ -2,8 +2,10 @@ package de.nowchess.store.redis import com.fasterxml.jackson.databind.ObjectMapper import de.nowchess.api.dto.GameWritebackEventDto +import de.nowchess.store.config.RedisConfig import de.nowchess.store.service.GameWritebackService import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs} import io.quarkus.runtime.Startup import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped @@ -11,8 +13,9 @@ import jakarta.inject.Inject import org.eclipse.microprofile.context.ManagedExecutor import org.jboss.logging.Logger import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* import scala.util.{Failure, Success, Try} -import java.util.function.Consumer +import java.util.UUID @Startup @ApplicationScoped @@ -23,25 +26,75 @@ class GameWritebackStreamListener: @Inject var objectMapper: ObjectMapper = uninitialized @Inject var writebackService: GameWritebackService = uninitialized @Inject var executor: ManagedExecutor = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized // scalafix:on - private val log = Logger.getLogger(classOf[GameWritebackStreamListener]) + private val log = Logger.getLogger(classOf[GameWritebackStreamListener]) + private val groupName = "store-writeback" + + private def streamKey = s"${redisConfig.prefix}:game-writeback" + private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq" + private val maxRetries = 3 + private val consumerId = UUID.randomUUID().toString @PostConstruct def startListening(): Unit = - val handler: Consumer[String] = json => - Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match - case Failure(ex) => - log.errorf(ex, "Failed to parse game-writeback event: %s", json) - case Success(event) => - executor.submit( - new Runnable: - def run(): Unit = - Try(writebackService.writeBack(event)) match - case Failure(ex) => - log.errorf(ex, "Failed to write back game event for gameId=%s", event.gameId) - case Success(_) => (), - ) - redis.pubsub(classOf[String]).subscribe("game-writeback", handler) - log.infof("Started listening to Writebacks") - () + createGroupIfAbsent() + executor.submit(new Runnable: + def run(): Unit = pollLoop() + ) + log.infof("Started listening to game-writeback stream (consumer=%s)", consumerId) + + private def createGroupIfAbsent(): Unit = + Try(redis.stream(classOf[String]).xgroupCreate(streamKey, groupName, "0", new XGroupCreateArgs().mkstream())) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create consumer group") + case Success(_) => () + + private def pollLoop(): Unit = + while true do + Try { + val messages = redis.stream(classOf[String]).xreadgroup( + groupName, + consumerId, + streamKey, + ">", + new XReadGroupArgs().count(10).block(java.time.Duration.ofSeconds(2)), + ) + if messages != null then messages.forEach(msg => handleMessage(msg)) + } match + case Failure(ex) => log.warnf(ex, "Error in writeback poll loop") + case Success(_) => () + + private def handleMessage(msg: StreamMessage[String, String, String]): Unit = + val payload = msg.payload() + val json = payload.get("data") + val attempt = Option(payload.get("attempt")).flatMap(_.toIntOption).getOrElse(0) + + Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match + case Failure(ex) => + log.errorf(ex, "Unparseable writeback event, sending to DLQ: %s", json) + xadd(dlqKey, json, attempt) + ack(msg.id()) + case Success(event) => + Try(writebackService.writeBack(event)) match + case Success(_) => + ack(msg.id()) + case Failure(ex) if attempt + 1 < maxRetries => + log.warnf(ex, "Writeback failed for gameId=%s attempt=%d, retrying", event.gameId, attempt) + xadd(streamKey, json, attempt + 1) + ack(msg.id()) + case Failure(ex) => + log.errorf(ex, "Writeback failed for gameId=%s after %d attempts, sending to DLQ", event.gameId, maxRetries) + xadd(dlqKey, json, attempt) + ack(msg.id()) + + private def ack(id: String): Unit = + Try(redis.stream(classOf[String]).xack(streamKey, groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack message %s", id) + case Success(_) => () + + private def xadd(key: String, json: String, attempt: Int): Unit = + Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match + case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key) + case Success(_) => ()