fix(store): cap game-writeback stream with MAXLEN trimming (#58)
Add approximate MAXLEN ~1000 to all xadd calls in GameWritebackStreamListener. Without trimming, ACKed messages accumulate in the stream indefinitely, wasting Redis memory. Closes NCS-88 https://knockoutwhist.youtrack.cloud/issue/NCS-88 --------- Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com> Reviewed-on: #58
This commit is contained in:
+15
-6
@@ -5,7 +5,7 @@ import de.nowchess.api.dto.GameWritebackEventDto
|
|||||||
import de.nowchess.store.config.RedisConfig
|
import de.nowchess.store.config.RedisConfig
|
||||||
import de.nowchess.store.service.GameWritebackService
|
import de.nowchess.store.service.GameWritebackService
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs}
|
import io.quarkus.redis.datasource.stream.{StreamMessage, XAddArgs, XGroupCreateArgs, XReadGroupArgs}
|
||||||
import io.quarkus.runtime.Startup
|
import io.quarkus.runtime.Startup
|
||||||
import jakarta.annotation.PostConstruct
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
@@ -32,10 +32,11 @@ class GameWritebackStreamListener:
|
|||||||
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
||||||
private val groupName = "store-writeback"
|
private val groupName = "store-writeback"
|
||||||
|
|
||||||
private def streamKey = s"${redisConfig.prefix}:game-writeback"
|
private def streamKey = s"${redisConfig.prefix}:game-writeback"
|
||||||
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
|
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
|
||||||
private val maxRetries = 3
|
private val maxRetries = 3
|
||||||
private val consumerId = UUID.randomUUID().toString
|
private val consumerId = UUID.randomUUID().toString
|
||||||
|
private val maxStreamLen = 1000L
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
def startListening(): Unit =
|
def startListening(): Unit =
|
||||||
@@ -98,6 +99,14 @@ class GameWritebackStreamListener:
|
|||||||
case Success(_) => ()
|
case Success(_) => ()
|
||||||
|
|
||||||
private def xadd(key: String, json: String, attempt: Int): Unit =
|
private def xadd(key: String, json: String, attempt: Int): Unit =
|
||||||
Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match
|
Try(
|
||||||
|
redis
|
||||||
|
.stream(classOf[String])
|
||||||
|
.xadd(
|
||||||
|
key,
|
||||||
|
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
|
||||||
|
Map("data" -> json, "attempt" -> attempt.toString).asJava,
|
||||||
|
),
|
||||||
|
) match
|
||||||
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
|
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
|
||||||
case Success(_) => ()
|
case Success(_) => ()
|
||||||
|
|||||||
Reference in New Issue
Block a user