feat(bot-platform): migrate BotRegistry to Redis Streams consumer group (#63)

Replace pub/sub subscribe with XREADGROUP on bot game-start stream.
Remove dual-write from EventPublisher.publishGameStart.
Consumer group: bot-platform-consumer, XACK after forwarding.
Poll loop uses emitter identity to prevent thread leak on re-registration.
Group created with '$' offset — no historical replay on first connect.

Closes NCS-101
https://knockoutwhist.youtrack.cloud/issue/NCS-101

Reviewed-on: #63
This commit was merged in pull request #63.
This commit is contained in:
2026-06-09 21:48:21 +02:00
parent 225c2285b7
commit 0ad2e10999
4 changed files with 145 additions and 21 deletions
@@ -36,7 +36,6 @@ class EventPublisher:
new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(),
Map("data" -> json).asJava,
)
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json)
()
def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit =
+1
View File
@@ -73,6 +73,7 @@ dependencies {
testImplementation(platform("org.junit:junit-bom:5.13.4"))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("io.quarkus:quarkus-junit")
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.rest-assured:rest-assured")
testImplementation("io.quarkus:quarkus-test-security")
@@ -2,14 +2,18 @@ package de.nowchess.botplatform.registry
import de.nowchess.botplatform.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.pubsub.PubSubCommands
import io.quarkus.redis.datasource.stream.{XGroupCreateArgs, XReadGroupArgs}
import io.smallrye.mutiny.subscription.MultiEmitter
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.eclipse.microprofile.context.ManagedExecutor
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success, Try}
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
@ApplicationScoped
class BotRegistry:
@@ -17,31 +21,68 @@ class BotRegistry:
private val log = Logger.getLogger(classOf[BotRegistry])
// scalafix:off DisableSyntax.var
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var redis: RedisDataSource = uninitialized
@Inject var redisConfig: RedisConfig = uninitialized
@Inject var executor: ManagedExecutor = uninitialized
// scalafix:on DisableSyntax.var
private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]()
private val groupName = "bot-platform-consumer"
private val consumerId = UUID.randomUUID().toString
private val emitters = ConcurrentHashMap[String, MultiEmitter[? >: String]]()
def register(botId: String, emitter: MultiEmitter[? >: String]): Unit =
val channel = s"${redisConfig.prefix}:bot:$botId:events"
val handler: Consumer[String] = msg => emitter.emit(msg)
val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler)
connections.put(botId, (emitter, subscriber))
log.infof("Bot %s registered", botId)
createGroupIfAbsent(botId)
emitters.put(botId, emitter)
executor.submit(
new Runnable:
def run(): Unit = pollLoop(botId, emitter),
)
log.infof("Bot %s registered on stream consumer group", botId)
()
def unregister(botId: String): Unit =
Option(connections.remove(botId)).foreach { (_, subscriber) =>
subscriber.unsubscribe(s"${redisConfig.prefix}:bot:$botId:events")
}
emitters.remove(botId)
log.infof("Bot %s unregistered", botId)
def dispatch(botId: String, event: String): Unit =
log.debugf("Dispatching event to bot %s", botId)
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event)
()
def registeredBots: List[String] =
import scala.jdk.CollectionConverters.*
connections.keys().asScala.toList
emitters.keys().asScala.toList
private def streamKey(botId: String): String =
s"${redisConfig.prefix}:bot:$botId:events:stream"
private def createGroupIfAbsent(botId: String): Unit =
Try(
redis
.stream(classOf[String])
.xgroupCreate(streamKey(botId), groupName, "$", new XGroupCreateArgs().mkstream()),
) match
case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => ()
case Failure(ex) => log.warnf(ex, "Failed to create consumer group for bot %s", botId)
case Success(_) => ()
private def pollLoop(botId: String, myEmitter: MultiEmitter[? >: String]): Unit =
while emitters.get(botId) eq myEmitter do
Try {
val messages = redis
.stream(classOf[String])
.xreadgroup(
groupName,
consumerId,
streamKey(botId),
">",
new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)),
)
Option(messages).foreach(_.forEach { msg =>
if emitters.get(botId) eq myEmitter then
myEmitter.emit(msg.payload().get("data"))
ack(botId, msg.id())
})
} match
case Failure(ex) => log.warnf(ex, "Error in poll loop for bot %s", botId)
case Success(_) => ()
private def ack(botId: String, id: String): Unit =
Try(redis.stream(classOf[String]).xack(streamKey(botId), groupName, id)) match
case Failure(ex) => log.warnf(ex, "Failed to ack message %s for bot %s", id, botId)
case Success(_) => ()
@@ -0,0 +1,83 @@
package de.nowchess.botplatform.registry
import de.nowchess.botplatform.config.RedisConfig
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.stream.{StreamCommands, XGroupCreateArgs}
import io.smallrye.mutiny.subscription.MultiEmitter
import org.eclipse.microprofile.context.ManagedExecutor
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.*
import org.mockito.Mockito.*
class BotRegistryTest:
// scalafix:off DisableSyntax.var
private var registry: BotRegistry = scala.compiletime.uninitialized
private var redis: RedisDataSource = scala.compiletime.uninitialized
private var streamCmds: StreamCommands[String, String, Nothing] =
scala.compiletime.uninitialized
private var redisConfig: RedisConfig = scala.compiletime.uninitialized
private var executor: ManagedExecutor = scala.compiletime.uninitialized
// scalafix:on DisableSyntax.var
@BeforeEach
def setup(): Unit =
redis = mock(classOf[RedisDataSource])
streamCmds = mock(classOf[StreamCommands[String, String, Nothing]])
redisConfig = mock(classOf[RedisConfig])
executor = mock(classOf[ManagedExecutor])
when(redis.stream(classOf[String])).thenReturn(streamCmds)
when(redisConfig.prefix).thenReturn("nowchess")
registry = new BotRegistry
registry.redis = redis
registry.redisConfig = redisConfig
registry.executor = executor
@Test
def registerStartsPollThread(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("bot1", emitter)
verify(executor).submit(any(classOf[Runnable]))
@Test
def registerCreatesConsumerGroupWithMkstream(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("bot1", emitter)
verify(streamCmds)
.xgroupCreate(
org.mockito.ArgumentMatchers.eq("nowchess:bot:bot1:events:stream"),
org.mockito.ArgumentMatchers.eq("bot-platform-consumer"),
org.mockito.ArgumentMatchers.eq("$"),
any(classOf[XGroupCreateArgs]),
)
@Test
def registerTracksBot(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("bot42", emitter)
assertTrue(registry.registeredBots.contains("bot42"))
@Test
def unregisterRemovesBot(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("botX", emitter)
registry.unregister("botX")
assertFalse(registry.registeredBots.contains("botX"))
@Test
def busyGroupExceptionIsIgnoredOnRegister(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
when(streamCmds.xgroupCreate(any(), any(), any(), any()))
.thenThrow(new RuntimeException("BUSYGROUP Consumer Group name already exists"))
val exec: Executable = () => registry.register("botBusy", emitter)
assertDoesNotThrow(exec)
@Test
def registerDoesNotInteractWithPubSub(): Unit =
val emitter = mock(classOf[MultiEmitter[String]])
registry.register("botNoPubSub", emitter)
verify(redis, never()).pubsub(any(classOf[Class[?]]))