Compare commits
4 Commits
store-0.21.0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| ae3ef766e8 | |||
| 487711628f | |||
| 008d72d826 | |||
| 576e3fea9b |
@@ -109,3 +109,21 @@
|
|||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
||||||
* IO microservice ([#38](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/38)) ([a386f57](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a386f57c21d34ead6cc6f92836c52b714597e289))
|
* IO microservice ([#38](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/38)) ([a386f57](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a386f57c21d34ead6cc6f92836c52b714597e289))
|
||||||
|
## (2026-05-22)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* **dto:** update GameWritebackEventDto for JSON deserialization and remove unused mixin ([576e3fe](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/576e3fea9bf1082549ea53efd3288474c42be93d))
|
||||||
|
* NCS-13 Implement Threefold Repetition ([#31](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/31)) ([767d305](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/767d3051a76c266050b6335774d66e2db2273c16))
|
||||||
|
* NCS-14 implemented insufficient moves rule ([#30](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/30)) ([b0399a4](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b0399a4e489950083066c9538df9a84dcc7a4613))
|
||||||
|
* NCS-21 Write Scripts to automate certain tasks ([#15](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/15)) ([8051871](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/80518719d536a087d339fe02530825dc07f8b388))
|
||||||
|
* NCS-25 Add linters to keep quality up ([#27](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/27)) ([fd4e67d](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/fd4e67d4f782a7e955822d90cb909d0a81676fb2))
|
||||||
|
* NCS-37 Quarkus integration ([#35](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/35)) ([f088c4e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/f088c4e9ffcc498d3d1b6f01e8f50042d5830d55))
|
||||||
|
* NCS-41 Bot Platform ([#33](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/33)) ([8744bee](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/8744bee2dd20966dae90a09c21a43d5b06f59e00))
|
||||||
|
* **rule:** Rules as a microservice ([#39](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/39)) ([093134d](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/093134d36c6844ba02a36a28d5d044f09291cd1d))
|
||||||
|
* true-microservices ([#40](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/40)) ([5909242](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/590924254e8a2754de661a57a03e43f89ceb6299))
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
* **dependencies:** correct Jackson databind dependency group ID ([008d72d](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/008d72d826707c04205bac7de25170fae5fed861))
|
||||||
|
* IO microservice ([#38](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/38)) ([a386f57](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a386f57c21d34ead6cc6f92836c52b714597e289))
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ dependencies {
|
|||||||
strictly(versions["SCALA_LIBRARY"]!!)
|
strictly(versions["SCALA_LIBRARY"]!!)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
implementation("com.fasterxml.jackson.core:jackson-databind:${versions["JACKSON"]!!}")
|
||||||
|
|
||||||
testImplementation(platform("org.junit:junit-bom:5.13.4"))
|
testImplementation(platform("org.junit:junit-bom:5.13.4"))
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter")
|
testImplementation("org.junit.jupiter:junit-jupiter")
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
package de.nowchess.api.dto
|
package de.nowchess.api.dto
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
|
||||||
|
|
||||||
case class GameWritebackEventDto(
|
case class GameWritebackEventDto(
|
||||||
gameId: String,
|
gameId: String,
|
||||||
fen: String,
|
fen: String,
|
||||||
@@ -14,11 +16,11 @@ case class GameWritebackEventDto(
|
|||||||
limitSeconds: Option[Int],
|
limitSeconds: Option[Int],
|
||||||
incrementSeconds: Option[Int],
|
incrementSeconds: Option[Int],
|
||||||
daysPerMove: Option[Int],
|
daysPerMove: Option[Int],
|
||||||
whiteRemainingMs: Option[Long],
|
@JsonDeserialize(contentAs = classOf[java.lang.Long]) whiteRemainingMs: Option[Long],
|
||||||
blackRemainingMs: Option[Long],
|
@JsonDeserialize(contentAs = classOf[java.lang.Long]) blackRemainingMs: Option[Long],
|
||||||
incrementMs: Option[Long],
|
@JsonDeserialize(contentAs = classOf[java.lang.Long]) incrementMs: Option[Long],
|
||||||
clockLastTickAt: Option[Long],
|
@JsonDeserialize(contentAs = classOf[java.lang.Long]) clockLastTickAt: Option[Long],
|
||||||
clockMoveDeadline: Option[Long],
|
@JsonDeserialize(contentAs = classOf[java.lang.Long]) clockMoveDeadline: Option[Long],
|
||||||
clockActiveColor: Option[String],
|
clockActiveColor: Option[String],
|
||||||
pendingDrawOffer: Option[String],
|
pendingDrawOffer: Option[String],
|
||||||
result: Option[String] = None,
|
result: Option[String] = None,
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
MAJOR=0
|
MAJOR=0
|
||||||
MINOR=12
|
MINOR=13
|
||||||
PATCH=0
|
PATCH=0
|
||||||
|
|||||||
+3
-1
@@ -20,6 +20,7 @@ import jakarta.enterprise.inject.Instance
|
|||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.function.Consumer
|
import java.util.function.Consumer
|
||||||
@@ -77,7 +78,8 @@ class GameRedisSubscriberManager:
|
|||||||
s"${redisConfig.prefix}:game:$gameId:s2c"
|
s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||||
|
|
||||||
def subscribeGame(gameId: String): Unit =
|
def subscribeGame(gameId: String): Unit =
|
||||||
val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json)
|
val writebackFn: String => Unit = json =>
|
||||||
|
redis.stream(classOf[String]).xadd(s"${redisConfig.prefix}:game-writeback", Map("data" -> json).asJava)
|
||||||
val obs = new GameRedisPublisher(
|
val obs = new GameRedisPublisher(
|
||||||
gameId,
|
gameId,
|
||||||
registry,
|
registry,
|
||||||
|
|||||||
@@ -391,3 +391,33 @@
|
|||||||
* **redis:** add log message for starting Writeback listener ([b610678](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b610678005de645115f48348e66aa9e6f5deb3d5))
|
* **redis:** add log message for starting Writeback listener ([b610678](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b610678005de645115f48348e66aa9e6f5deb3d5))
|
||||||
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
|
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
|
||||||
* remove unused HTTP root-path configurations from application.yml ([3ed3e59](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3ed3e59ee456d54cd3d65ece4f36623e256b9736))
|
* remove unused HTTP root-path configurations from application.yml ([3ed3e59](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3ed3e59ee456d54cd3d65ece4f36623e256b9736))
|
||||||
|
## (2026-05-22)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* add initialization metrics for various services ([d438e97](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d438e97f32bdde0bfc63c1b4a8cc810cdd093166))
|
||||||
|
* add OpenTelemetry trace configuration with parentbased sampler ([3904d5a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3904d5ad8ad4930ddee65287a7bfab785a6148f5))
|
||||||
|
* **config:** add GameWritebackEventDtoMixin for JSON deserialization ([381161f](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/381161f00345612a1789e08243746083dff884c5))
|
||||||
|
* **config:** update application.yml for PostgreSQL and remove staging/production configurations ([2404e61](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2404e6164c3b50ffccbea5238d636060d6abe4d6))
|
||||||
|
* **config:** update application.yml for staging and production environments ([6113432](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/6113432a14c476a3a0dfc0d449e17d023697f2ba))
|
||||||
|
* **config:** update application.yml to nest HTTP port configuration ([3efebd5](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3efebd5ed0493159c51f7246d18d59bac58cf875))
|
||||||
|
* configure logging and add OpenTelemetry support ([#49](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/49)) ([d57c488](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d57c4886612d1d92da0e1b79209fc83e6ef537a1))
|
||||||
|
* **docker:** add .dockerignore and .gitignore files for build exclusions ([c987d8e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c987d8e258c0e6c4cfbdaa8381c64c410d7a2b83))
|
||||||
|
* **docker:** add Dockerfiles for building Quarkus application in native and JVM modes ([3f2d2bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3f2d2bb4c97fa8cddba66e1da4427c54236dfeed))
|
||||||
|
* **docker:** add Dockerfiles for Quarkus application in JVM and native modes ([34b9933](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/34b993304670cf2aa62cd2f6460cee7b9864b08e))
|
||||||
|
* **dto:** update GameWritebackEventDto for JSON deserialization and remove unused mixin ([576e3fe](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/576e3fea9bf1082549ea53efd3288474c42be93d))
|
||||||
|
* implement clock expiry scanning and handling for game records ([#53](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/53)) ([8f9eb12](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/8f9eb12f663efabe4dc72b94394438652ad0ef02))
|
||||||
|
* NCS-78 Add Traceability to the Applications ([#46](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/46)) ([649566e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/649566eb3fcf38f91c8896a739f74ea318af312d))
|
||||||
|
* NCS-78 Add Traceability to the Applications ([#47](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/47)) ([87dfc6c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/87dfc6c2bcce7f7d58fc641bd8d468a2e584c108))
|
||||||
|
* **redis:** add @Startup annotation to GameWritebackStreamListener ([d61fe97](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d61fe97b4c8e2db5e34b4a14d995297cc09f9435))
|
||||||
|
* **redis:** use ManagedExecutor for asynchronous writeback processing ([af6b0ed](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/af6b0ed8b73724fcc8f20dfccbe6fe8f84fd792d))
|
||||||
|
* true-microservices ([#40](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/40)) ([5909242](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/590924254e8a2754de661a57a03e43f89ceb6299))
|
||||||
|
* update application.yml with new API root paths and add Micrometer and OpenTelemetry dependencies ([72ce262](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/72ce262bc491f94297700e6002fb5d0812e2cc2a))
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
* ensure full hierarchy registration for reflection in NativeReflectionConfig ([ebba729](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/ebba729af3265df1619dfbe46fd1945b2a7e30b7))
|
||||||
|
* NCS-85 Database Writeback fails without Logs ([#52](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/52)) ([7323908](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/73239088d985f01aa6b1067ed9097a845e471d4f))
|
||||||
|
* **redis:** add log message for starting Writeback listener ([b610678](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b610678005de645115f48348e66aa9e6f5deb3d5))
|
||||||
|
* **redis:** update Redis configuration with max pool size and waiting parameters ([5baf6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/5baf6a7cdbea484fc49c02e2b5a1c3919b7fa2c4))
|
||||||
|
* remove unused HTTP root-path configurations from application.yml ([3ed3e59](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3ed3e59ee456d54cd3d65ece4f36623e256b9736))
|
||||||
|
|||||||
-10
@@ -1,10 +0,0 @@
|
|||||||
package de.nowchess.store.config
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
|
|
||||||
|
|
||||||
abstract class GameWritebackEventDtoMixin:
|
|
||||||
@JsonDeserialize(contentAs = classOf[java.lang.Long]) val whiteRemainingMs: Option[Long]
|
|
||||||
@JsonDeserialize(contentAs = classOf[java.lang.Long]) val blackRemainingMs: Option[Long]
|
|
||||||
@JsonDeserialize(contentAs = classOf[java.lang.Long]) val incrementMs: Option[Long]
|
|
||||||
@JsonDeserialize(contentAs = classOf[java.lang.Long]) val clockLastTickAt: Option[Long]
|
|
||||||
@JsonDeserialize(contentAs = classOf[java.lang.Long]) val clockMoveDeadline: Option[Long]
|
|
||||||
@@ -3,7 +3,6 @@ package de.nowchess.store.config
|
|||||||
import com.fasterxml.jackson.core.Version
|
import com.fasterxml.jackson.core.Version
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
||||||
import de.nowchess.api.dto.GameWritebackEventDto
|
|
||||||
import io.quarkus.jackson.ObjectMapperCustomizer
|
import io.quarkus.jackson.ObjectMapperCustomizer
|
||||||
import jakarta.inject.Singleton
|
import jakarta.inject.Singleton
|
||||||
|
|
||||||
@@ -16,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer:
|
|||||||
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
||||||
// scalafix:on DisableSyntax.null
|
// scalafix:on DisableSyntax.null
|
||||||
})
|
})
|
||||||
mapper.addMixIn(classOf[GameWritebackEventDto], classOf[GameWritebackEventDtoMixin])
|
|
||||||
|
|||||||
+65
-12
@@ -2,8 +2,10 @@ package de.nowchess.store.redis
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import de.nowchess.api.dto.GameWritebackEventDto
|
import de.nowchess.api.dto.GameWritebackEventDto
|
||||||
|
import de.nowchess.store.config.RedisConfig
|
||||||
import de.nowchess.store.service.GameWritebackService
|
import de.nowchess.store.service.GameWritebackService
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import io.quarkus.redis.datasource.stream.{StreamMessage, XGroupCreateArgs, XReadGroupArgs}
|
||||||
import io.quarkus.runtime.Startup
|
import io.quarkus.runtime.Startup
|
||||||
import jakarta.annotation.PostConstruct
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
@@ -11,8 +13,9 @@ import jakarta.inject.Inject
|
|||||||
import org.eclipse.microprofile.context.ManagedExecutor
|
import org.eclipse.microprofile.context.ManagedExecutor
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Failure, Success, Try}
|
||||||
import java.util.function.Consumer
|
import java.util.UUID
|
||||||
|
|
||||||
@Startup
|
@Startup
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
@@ -23,25 +26,75 @@ class GameWritebackStreamListener:
|
|||||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||||
@Inject var writebackService: GameWritebackService = uninitialized
|
@Inject var writebackService: GameWritebackService = uninitialized
|
||||||
@Inject var executor: ManagedExecutor = uninitialized
|
@Inject var executor: ManagedExecutor = uninitialized
|
||||||
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
// scalafix:on
|
// scalafix:on
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
private val log = Logger.getLogger(classOf[GameWritebackStreamListener])
|
||||||
|
private val groupName = "store-writeback"
|
||||||
|
|
||||||
|
private def streamKey = s"${redisConfig.prefix}:game-writeback"
|
||||||
|
private def dlqKey = s"${redisConfig.prefix}:game-writeback-dlq"
|
||||||
|
private val maxRetries = 3
|
||||||
|
private val consumerId = UUID.randomUUID().toString
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
def startListening(): Unit =
|
def startListening(): Unit =
|
||||||
val handler: Consumer[String] = json =>
|
createGroupIfAbsent()
|
||||||
|
executor.submit(new Runnable:
|
||||||
|
def run(): Unit = pollLoop()
|
||||||
|
)
|
||||||
|
log.infof("Started listening to game-writeback stream (consumer=%s)", consumerId)
|
||||||
|
|
||||||
|
private def createGroupIfAbsent(): Unit =
|
||||||
|
Try(redis.stream(classOf[String]).xgroupCreate(streamKey, groupName, "0", new XGroupCreateArgs().mkstream())) match
|
||||||
|
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to create consumer group")
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def pollLoop(): Unit =
|
||||||
|
while true do
|
||||||
|
Try {
|
||||||
|
val messages = redis.stream(classOf[String]).xreadgroup(
|
||||||
|
groupName,
|
||||||
|
consumerId,
|
||||||
|
streamKey,
|
||||||
|
">",
|
||||||
|
new XReadGroupArgs().count(10).block(java.time.Duration.ofSeconds(2)),
|
||||||
|
)
|
||||||
|
if messages != null then messages.forEach(msg => handleMessage(msg))
|
||||||
|
} match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Error in writeback poll loop")
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def handleMessage(msg: StreamMessage[String, String, String]): Unit =
|
||||||
|
val payload = msg.payload()
|
||||||
|
val json = payload.get("data")
|
||||||
|
val attempt = Option(payload.get("attempt")).flatMap(_.toIntOption).getOrElse(0)
|
||||||
|
|
||||||
Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match
|
Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) match
|
||||||
case Failure(ex) =>
|
case Failure(ex) =>
|
||||||
log.errorf(ex, "Failed to parse game-writeback event: %s", json)
|
log.errorf(ex, "Unparseable writeback event, sending to DLQ: %s", json)
|
||||||
|
xadd(dlqKey, json, attempt)
|
||||||
|
ack(msg.id())
|
||||||
case Success(event) =>
|
case Success(event) =>
|
||||||
executor.submit(
|
|
||||||
new Runnable:
|
|
||||||
def run(): Unit =
|
|
||||||
Try(writebackService.writeBack(event)) match
|
Try(writebackService.writeBack(event)) match
|
||||||
|
case Success(_) =>
|
||||||
|
ack(msg.id())
|
||||||
|
case Failure(ex) if attempt + 1 < maxRetries =>
|
||||||
|
log.warnf(ex, "Writeback failed for gameId=%s attempt=%d, retrying", event.gameId, attempt)
|
||||||
|
xadd(streamKey, json, attempt + 1)
|
||||||
|
ack(msg.id())
|
||||||
case Failure(ex) =>
|
case Failure(ex) =>
|
||||||
log.errorf(ex, "Failed to write back game event for gameId=%s", event.gameId)
|
log.errorf(ex, "Writeback failed for gameId=%s after %d attempts, sending to DLQ", event.gameId, maxRetries)
|
||||||
case Success(_) => (),
|
xadd(dlqKey, json, attempt)
|
||||||
)
|
ack(msg.id())
|
||||||
redis.pubsub(classOf[String]).subscribe("game-writeback", handler)
|
|
||||||
log.infof("Started listening to Writebacks")
|
private def ack(id: String): Unit =
|
||||||
()
|
Try(redis.stream(classOf[String]).xack(streamKey, groupName, id)) match
|
||||||
|
case Failure(ex) => log.warnf(ex, "Failed to ack message %s", id)
|
||||||
|
case Success(_) => ()
|
||||||
|
|
||||||
|
private def xadd(key: String, json: String, attempt: Int): Unit =
|
||||||
|
Try(redis.stream(classOf[String]).xadd(key, Map("data" -> json, "attempt" -> attempt.toString).asJava)) match
|
||||||
|
case Failure(ex) => log.errorf(ex, "Failed to publish to stream %s", key)
|
||||||
|
case Success(_) => ()
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
MAJOR=0
|
MAJOR=0
|
||||||
MINOR=21
|
MINOR=22
|
||||||
PATCH=0
|
PATCH=0
|
||||||
|
|||||||
Reference in New Issue
Block a user