fix(store): cap game-writeback stream with MAXLEN trimming
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
Add approximate MAXLEN ~1000 to all xadd calls in GameWritebackStreamListener. Without trimming, ACKed messages accumulate in the stream indefinitely, wasting Redis memory. Closes NCS-88 https://knockoutwhist.youtrack.cloud/issue/NCS-88
This commit is contained in:
+15
-6
@@ -5,7 +5,7 @@ import de.nowchess.api.dto.GameWritebackEventDto
|
||||
import de.nowchess.store.config.RedisConfig
|
||||
import de.nowchess.store.service.GameWritebackService
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs}
|
||||
import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs}
|
||||
import io.quarkus.runtime.Startup
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
@@ -32,10 +32,11 @@ class GameWritebackStreamListener:
|
||||
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
||||
private val groupName = "store-writeback"
|
||||
|
||||
private def streamKey = s"${redisConfig.prefix}:game-writeback"
|
||||
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
|
||||
private val maxRetries = 3
|
||||
private val consumerId = UUID.randomUUID().toString
|
||||
private def streamKey = s"${redisConfig.prefix}:game-writeback"
|
||||
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
|
||||
private val maxRetries = 3
|
||||
private val consumerId = UUID.randomUUID().toString
|
||||
private val maxStreamLen = 1000L
|
||||
|
||||
@PostConstruct
|
||||
def startListening(): Unit =
|
||||
@@ -98,6 +99,14 @@ class GameWritebackStreamListener:
|
||||
case Success(_) => ()
|
||||
|
||||
private def xadd(key: String, json: String, attempt: Int): Unit =
|
||||
Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match
|
||||
Try(
|
||||
redis
|
||||
.stream(classOf[String])
|
||||
.xadd(
|
||||
key,
|
||||
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||
Map("data" -> json, "attempt" -> attempt.toString).asJava,
|
||||
),
|
||||
) match
|
||||
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
|
||||
case Success(_) => ()
|
||||
|
||||
Reference in New Issue
Block a user