diff --git a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala index 0f7cbaa..9b5faaf 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala @@ -36,7 +36,6 @@ class EventPublisher: new XAddArgs().maxlen(maxStreamLen).nearlyExactTrimming(), Map("data" -> json).asJava, ) - redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", json) () def publishChallengeCreated(destUserId: String, challengeId: String, challengerName: String): Unit = diff --git a/modules/bot-platform/build.gradle.kts b/modules/bot-platform/build.gradle.kts index 77885be..1b5ecec 100644 --- a/modules/bot-platform/build.gradle.kts +++ b/modules/bot-platform/build.gradle.kts @@ -73,6 +73,7 @@ dependencies { testImplementation(platform("org.junit:junit-bom:5.13.4")) testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("io.quarkus:quarkus-junit") + testImplementation("io.quarkus:quarkus-junit5-mockito") testImplementation("io.rest-assured:rest-assured") testImplementation("io.quarkus:quarkus-test-security") diff --git a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala index 195364c..13ae429 100644 --- a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala +++ b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala @@ -2,14 +2,18 @@ package de.nowchess.botplatform.registry import de.nowchess.botplatform.config.RedisConfig import io.quarkus.redis.datasource.RedisDataSource -import io.quarkus.redis.datasource.pubsub.PubSubCommands +import io.quarkus.redis.datasource.stream.{XGroupCreateArgs, XReadGroupArgs} import io.smallrye.mutiny.subscription.MultiEmitter import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject +import org.eclipse.microprofile.context.ManagedExecutor import org.jboss.logging.Logger import scala.compiletime.uninitialized +import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} +import java.time.Duration +import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import java.util.function.Consumer @ApplicationScoped class BotRegistry: @@ -17,31 +21,68 @@ class BotRegistry: private val log = Logger.getLogger(classOf[BotRegistry]) // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var executor: ManagedExecutor = uninitialized // scalafix:on DisableSyntax.var - private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]() + private val groupName = "bot-platform-consumer" + private val consumerId = UUID.randomUUID().toString + + private val emitters = ConcurrentHashMap[String, MultiEmitter[? >: String]]() def register(botId: String, emitter: MultiEmitter[? >: String]): Unit = - val channel = s"${redisConfig.prefix}:bot:$botId:events" - val handler: Consumer[String] = msg => emitter.emit(msg) - val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler) - connections.put(botId, (emitter, subscriber)) - log.infof("Bot %s registered", botId) + createGroupIfAbsent(botId) + emitters.put(botId, emitter) + executor.submit( + new Runnable: + def run(): Unit = pollLoop(botId, emitter), + ) + log.infof("Bot %s registered on stream consumer group", botId) () def unregister(botId: String): Unit = - Option(connections.remove(botId)).foreach { (_, subscriber) => - subscriber.unsubscribe(s"${redisConfig.prefix}:bot:$botId:events") - } + emitters.remove(botId) log.infof("Bot %s unregistered", botId) - def dispatch(botId: String, event: String): Unit = - log.debugf("Dispatching event to bot %s", botId) - redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event) - () - def registeredBots: List[String] = - import scala.jdk.CollectionConverters.* - connections.keys().asScala.toList + emitters.keys().asScala.toList + + private def streamKey(botId: String): String = + s"${redisConfig.prefix}:bot:$botId:events:stream" + + private def createGroupIfAbsent(botId: String): Unit = + Try( + redis + .stream(classOf[String]) + .xgroupCreate(streamKey(botId), groupName, "$", new XGroupCreateArgs().mkstream()), + ) match + case Failure(ex) if Option(ex.getMessage).exists(_.contains("BUSYGROUP")) => () + case Failure(ex) => log.warnf(ex, "Failed to create consumer group for bot %s", botId) + case Success(_) => () + + private def pollLoop(botId: String, myEmitter: MultiEmitter[? >: String]): Unit = + while emitters.get(botId) eq myEmitter do + Try { + val messages = redis + .stream(classOf[String]) + .xreadgroup( + groupName, + consumerId, + streamKey(botId), + ">", + new XReadGroupArgs().count(10).block(Duration.ofSeconds(2)), + ) + Option(messages).foreach(_.forEach { msg => + if emitters.get(botId) eq myEmitter then + myEmitter.emit(msg.payload().get("data")) + ack(botId, msg.id()) + }) + } match + case Failure(ex) => log.warnf(ex, "Error in poll loop for bot %s", botId) + case Success(_) => () + + private def ack(botId: String, id: String): Unit = + Try(redis.stream(classOf[String]).xack(streamKey(botId), groupName, id)) match + case Failure(ex) => log.warnf(ex, "Failed to ack message %s for bot %s", id, botId) + case Success(_) => () diff --git a/modules/bot-platform/src/test/scala/de/nowchess/botplatform/registry/BotRegistryTest.scala b/modules/bot-platform/src/test/scala/de/nowchess/botplatform/registry/BotRegistryTest.scala new file mode 100644 index 0000000..95f58db --- /dev/null +++ b/modules/bot-platform/src/test/scala/de/nowchess/botplatform/registry/BotRegistryTest.scala @@ -0,0 +1,83 @@ +package de.nowchess.botplatform.registry + +import de.nowchess.botplatform.config.RedisConfig +import io.quarkus.redis.datasource.RedisDataSource +import io.quarkus.redis.datasource.stream.{StreamCommands, XGroupCreateArgs} +import io.smallrye.mutiny.subscription.MultiEmitter +import org.eclipse.microprofile.context.ManagedExecutor +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.* +import org.mockito.Mockito.* + +class BotRegistryTest: + + // scalafix:off DisableSyntax.var + private var registry: BotRegistry = scala.compiletime.uninitialized + private var redis: RedisDataSource = scala.compiletime.uninitialized + private var streamCmds: StreamCommands[String, String, Nothing] = + scala.compiletime.uninitialized + private var redisConfig: RedisConfig = scala.compiletime.uninitialized + private var executor: ManagedExecutor = scala.compiletime.uninitialized + // scalafix:on DisableSyntax.var + + @BeforeEach + def setup(): Unit = + redis = mock(classOf[RedisDataSource]) + streamCmds = mock(classOf[StreamCommands[String, String, Nothing]]) + redisConfig = mock(classOf[RedisConfig]) + executor = mock(classOf[ManagedExecutor]) + + when(redis.stream(classOf[String])).thenReturn(streamCmds) + when(redisConfig.prefix).thenReturn("nowchess") + + registry = new BotRegistry + registry.redis = redis + registry.redisConfig = redisConfig + registry.executor = executor + + @Test + def registerStartsPollThread(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("bot1", emitter) + verify(executor).submit(any(classOf[Runnable])) + + @Test + def registerCreatesConsumerGroupWithMkstream(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("bot1", emitter) + verify(streamCmds) + .xgroupCreate( + org.mockito.ArgumentMatchers.eq("nowchess:bot:bot1:events:stream"), + org.mockito.ArgumentMatchers.eq("bot-platform-consumer"), + org.mockito.ArgumentMatchers.eq("$"), + any(classOf[XGroupCreateArgs]), + ) + + @Test + def registerTracksBot(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("bot42", emitter) + assertTrue(registry.registeredBots.contains("bot42")) + + @Test + def unregisterRemovesBot(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("botX", emitter) + registry.unregister("botX") + assertFalse(registry.registeredBots.contains("botX")) + + @Test + def busyGroupExceptionIsIgnoredOnRegister(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + when(streamCmds.xgroupCreate(any(), any(), any(), any())) + .thenThrow(new RuntimeException("BUSYGROUP Consumer Group name already exists")) + val exec: Executable = () => registry.register("botBusy", emitter) + assertDoesNotThrow(exec) + + @Test + def registerDoesNotInteractWithPubSub(): Unit = + val emitter = mock(classOf[MultiEmitter[String]]) + registry.register("botNoPubSub", emitter) + verify(redis, never()).pubsub(any(classOf[Class[?]]))