diff --git a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala index 3fe09f7..bc5c6cd 100644 --- a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala +++ b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala @@ -5,7 +5,7 @@ import de.nowchess.api.dto.GameWritebackEventDto import de.nowchess.store.config.RedisConfig import de.nowchess.store.service.GameWritebackService import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs} +import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs} import io.quarkus.runtime.Startup import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped @@ -32,10 +32,11 @@ class GameWritebackStreamListener: private val log = Logger.getLogger(classOf[GameWritebackStreamListener]) private val groupName = "store-writeback" - private def streamKey = s"${redisConfig.prefix}:game-writeback" - private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq" - private val maxRetries = 3 - private val consumerId = UUID.randomUUID().toString + private def streamKey = s"${redisConfig.prefix}:game-writeback" + private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq" + private val maxRetries = 3 + private val consumerId = UUID.randomUUID().toString + private val maxStreamLen = 1000L @PostConstruct def startListening(): Unit = @@ -98,6 +99,14 @@ class GameWritebackStreamListener: case Success(_) => () private def xadd(key: String, json: String, attempt: Int): Unit = - Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match + Try( + redis + .stream(classOf[String]) + .xadd( + key, + new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), + Map("data" -> json, "attempt" -> attempt.toString).asJava, + ), + ) match case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key) case Success(_) => ()