fix(store): cap game-writeback stream with MAXLEN trimming (#58)
Build & Test (NowChessSystems) TeamCity build finished

Add approximate MAXLEN ~1000 to all xadd calls in
GameWritebackStreamListener. Without trimming, ACKed messages
accumulate in the stream indefinitely, wasting Redis memory.

Closes NCS-88
https://knockoutwhist.youtrack.cloud/issue/NCS-88

---------

Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com>
Reviewed-on: #58
This commit was merged in pull request #58.
This commit is contained in:
2026-06-03 07:44:13 +02:00
parent 98b0bba398
commit 32c388760b
@@ -5,7 +5,7 @@ import de.nowchess.api.dto.GameWritebackEventDto
import de.nowchess.store.config.RedisConfig
import de.nowchess.store.service.GameWritebackService
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs}
import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs}
import io.quarkus.runtime.Startup
import jakarta.annotation.PostConstruct
import jakarta.enterprise.context.ApplicationScoped
@@ -32,10 +32,11 @@ class GameWritebackStreamListener:
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
private val groupName = "store-writeback"
private def streamKey = s"${redisConfig.prefix}:game-writeback"
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
private val maxRetries = 3
private val consumerId = UUID.randomUUID().toString
private def streamKey = s"${redisConfig.prefix}:game-writeback"
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
private val maxRetries = 3
private val consumerId = UUID.randomUUID().toString
private val maxStreamLen = 1000L
@PostConstruct
def startListening(): Unit =
@@ -98,6 +99,14 @@ class GameWritebackStreamListener:
case Success(_) => ()
private def xadd(key: String, json: String, attempt: Int): Unit =
Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match
Try(
redis
.stream(classOf[String])
.xadd(
key,
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json, "attempt" -> attempt.toString).asJava,
),
) match
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
case Success(_) => ()