feat(redis): implement game writeback stream processing with error handling and retries
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
This commit is contained in:
+3
-1
@@ -20,6 +20,7 @@ import jakarta.enterprise.inject.Instance
|
||||
import jakarta.inject.Inject
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.util.Try
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.function.Consumer
|
||||
@@ -77,7 +78,8 @@ class GameRedisSubscriberManager:
|
||||
s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||
|
||||
def subscribeGame(gameId: String): Unit =
|
||||
val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json)
|
||||
val writebackFn: String => Unit = json =>
|
||||
redis.stream(classOf[String]).xadd(s"${redisConfig.prefix}:game-writeback", Map("data" -> json).asJava)
|
||||
val obs = new GameRedisPublisher(
|
||||
gameId,
|
||||
registry,
|
||||
|
||||
+71
-18
@@ -2,8 +2,10 @@ package de.nowchess.store.redis
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.api.dto.GameWritebackEventDto
|
||||
import de.nowchess.store.config.RedisConfig
|
||||
import de.nowchess.store.service.GameWritebackService
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs}
|
||||
import io.quarkus.runtime.Startup
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
@@ -11,8 +13,9 @@ import jakarta.inject.Inject
|
||||
import org.eclipse.microprofile.context.ManagedExecutor
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import java.util.function.Consumer
|
||||
import java.util.UUID
|
||||
|
||||
@Startup
|
||||
@ApplicationScoped
|
||||
@@ -23,25 +26,75 @@ class GameWritebackStreamListener:
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
@Inject var writebackService: GameWritebackService = uninitialized
|
||||
@Inject var executor: ManagedExecutor = uninitialized
|
||||
@Inject var redisConfig: RedisConfig = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
||||
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
||||
private val groupName = "store-writeback"
|
||||
|
||||
private def streamKey = s"${redisConfig.prefix}:game-writeback"
|
||||
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
|
||||
private val maxRetries = 3
|
||||
private val consumerId = UUID.randomUUID().toString
|
||||
|
||||
@PostConstruct
|
||||
def startListening(): Unit =
|
||||
val handler: Consumer[String] = json =>
|
||||
Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match
|
||||
case Failure(ex) =>
|
||||
log.errorf(ex, "Failed to parse game-writeback event: %s", json)
|
||||
case Success(event) =>
|
||||
executor.submit(
|
||||
new Runnable:
|
||||
def run(): Unit =
|
||||
Try(writebackService.writeBack(event)) match
|
||||
case Failure(ex) =>
|
||||
log.errorf(ex, "Failed to write back game event for gameId=%s", event.gameId)
|
||||
case Success(_) => (),
|
||||
)
|
||||
redis.pubsub(classOf[String]).subscribe("game-writeback", handler)
|
||||
log.infof("Started listening to Writebacks")
|
||||
()
|
||||
createGroupIfAbsent()
|
||||
executor.submit(new Runnable:
|
||||
def run(): Unit = pollLoop()
|
||||
)
|
||||
log.infof("Started listening to game-writeback stream (consumer=%s)", consumerId)
|
||||
|
||||
private def createGroupIfAbsent(): Unit =
|
||||
Try(redis.stream(classOf[String]).xgroupCreate(streamKey, groupName, "0", new XGroupCreateArgs().mkstream())) match
|
||||
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||
case Failure(ex) => log.warnf(ex, "Failed to create consumer group")
|
||||
case Success(_) => ()
|
||||
|
||||
private def pollLoop(): Unit =
|
||||
while true do
|
||||
Try {
|
||||
val messages = redis.stream(classOf[String]).xreadgroup(
|
||||
groupName,
|
||||
consumerId,
|
||||
streamKey,
|
||||
">",
|
||||
new XReadGroupArgs().count(10).block(java.time.Duration.ofSeconds(2)),
|
||||
)
|
||||
if messages != null then messages.forEach(msg => handleMessage(msg))
|
||||
} match
|
||||
case Failure(ex) => log.warnf(ex, "Error in writeback poll loop")
|
||||
case Success(_) => ()
|
||||
|
||||
private def handleMessage(msg: StreamMessage[String, String, String]): Unit =
|
||||
val payload = msg.payload()
|
||||
val json = payload.get("data")
|
||||
val attempt = Option(payload.get("attempt")).flatMap(_.toIntOption).getOrElse(0)
|
||||
|
||||
Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match
|
||||
case Failure(ex) =>
|
||||
log.errorf(ex, "Unparseable writeback event, sending to DLQ: %s", json)
|
||||
xadd(dlqKey, json, attempt)
|
||||
ack(msg.id())
|
||||
case Success(event) =>
|
||||
Try(writebackService.writeBack(event)) match
|
||||
case Success(_) =>
|
||||
ack(msg.id())
|
||||
case Failure(ex) if attempt + 1 < maxRetries =>
|
||||
log.warnf(ex, "Writeback failed for gameId=%s attempt=%d, retrying", event.gameId, attempt)
|
||||
xadd(streamKey, json, attempt + 1)
|
||||
ack(msg.id())
|
||||
case Failure(ex) =>
|
||||
log.errorf(ex, "Writeback failed for gameId=%s after %d attempts, sending to DLQ", event.gameId, maxRetries)
|
||||
xadd(dlqKey, json, attempt)
|
||||
ack(msg.id())
|
||||
|
||||
private def ack(id: String): Unit =
|
||||
Try(redis.stream(classOf[String]).xack(streamKey, groupName, id)) match
|
||||
case Failure(ex) => log.warnf(ex, "Failed to ack message %s", id)
|
||||
case Success(_) => ()
|
||||
|
||||
private def xadd(key: String, json: String, attempt: Int): Unit =
|
||||
Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match
|
||||
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
|
||||
case Success(_) => ()
|
||||
|
||||
Reference in New Issue
Block a user