From e4011e73377e34ee8326ebe02634026fa4893a35 Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 3 Jun 2026 07:44:13 +0200 Subject: [PATCH] fix(store): cap game-writeback stream with MAXLEN trimming (#58) Add approximate MAXLEN ~1000 to all xadd calls in GameWritebackStreamListener. Without trimming, ACKed messages accumulate in the stream indefinitely, wasting Redis memory. Closes NCS-88 https://knockoutwhist.youtrack.cloud/issue/NCS-88 --------- Co-authored-by: Janis Eccarius Reviewed-on: https://git.janis-eccarius.de/NowChess/NowChessSystems/pulls/58 --- .../redis/GameWritebackStreamListener.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala index 3fe09f7..bc5c6cd 100644 --- a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala +++ b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala @@ -5,7 +5,7 @@ import de.nowchess.api.dto.GameWritebackEventDto import de.nowchess.store.config.RedisConfig import de.nowchess.store.service.GameWritebackService import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs} +import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs} import io.quarkus.runtime.Startup import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped @@ -32,10 +32,11 @@ class GameWritebackStreamListener: private val log = Logger.getLogger(classOf[GameWritebackStreamListener]) private val groupName = "store-writeback" - private def streamKey = s"${redisConfig.prefix}:game-writeback" - private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq" - private val maxRetries = 3 - private val consumerId = UUID.randomUUID().toString + private def streamKey = s"${redisConfig.prefix}:game-writeback" + private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq" + private val maxRetries = 3 + private val consumerId = UUID.randomUUID().toString + private val maxStreamLen = 1000L @PostConstruct def startListening(): Unit = @@ -98,6 +99,14 @@ class GameWritebackStreamListener: case Success(_) => () private def xadd(key: String, json: String, attempt: Int): Unit = - Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match + Try( + redis + .stream(classOf[String]) + .xadd( + key, + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json, "attempt" -> attempt.toString).asJava, + ), + ) match case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key) case Success(_) => ()