From db7a22bd93e634b44cbd7fe4b394a0acb634b9f6 Mon Sep 17 00:00:00 2001 From: Janis Date: Sun, 10 May 2026 17:41:08 +0200 Subject: [PATCH] feat: add metrics for user registrations, logins, challenges, and game writebacks --- .../account/service/AccountService.scala | 31 +++++ .../account/service/ChallengeService.scala | 28 ++++ .../coordinator/service/AutoScaler.scala | 19 +++ .../service/CacheEvictionManager.scala | 11 +- .../coordinator/service/HealthMonitor.scala | 6 + .../service/InstanceRegistry.scala | 15 +++ .../de/nowchess/chess/engine/GameEngine.scala | 91 ++++++++++--- .../chess/registry/RedisGameRegistry.scala | 17 ++- .../service/InstanceHeartbeatService.scala | 19 ++- .../nowchess/bot/logic/AlphaBetaSearch.scala | 17 ++- .../bot/service/OfficialBotService.scala | 15 ++- .../store/service/GameWritebackService.scala | 127 ++++++++++-------- .../ws/resource/GameWebSocketResource.scala | 46 +++++-- 13 files changed, 344 insertions(+), 98 deletions(-) diff --git a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala index 83404dd..79f1245 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala @@ -4,6 +4,7 @@ import de.nowchess.account.domain.{BotAccount, OfficialBotAccount, UserAccount} import de.nowchess.account.dto.{LoginRequest, RegisterRequest} import de.nowchess.account.error.AccountError import de.nowchess.account.repository.{BotAccountRepository, OfficialBotAccountRepository, UserAccountRepository} +import io.micrometer.core.instrument.MeterRegistry import io.quarkus.elytron.security.common.BcryptUtil import io.smallrye.jwt.build.Jwt import jakarta.enterprise.context.ApplicationScoped @@ -29,11 +30,21 @@ class AccountService: @Inject var officialBotAccountRepository: OfficialBotAccountRepository = uninitialized + + @Inject + var meterRegistry: MeterRegistry = uninitialized // scalafix:on @Transactional def register(req: RegisterRequest): Either[AccountError, UserAccount] = log.infof("Registering user %s", req.username) + val result = registerAccount(req) + result match + case Right(_) => meterRegistry.counter("nowchess.users.registrations", "result", "success").increment() + case Left(_) => meterRegistry.counter("nowchess.users.registrations", "result", "failure").increment() + result + + private def registerAccount(req: RegisterRequest): Either[AccountError, UserAccount] = if userAccountRepository.findByUsername(req.username).isDefined then Left(AccountError.UsernameTaken(req.username)) else if userAccountRepository.findByEmail(req.email).isDefined then Left(AccountError.EmailAlreadyRegistered(req.email)) @@ -48,6 +59,15 @@ class AccountService: Right(account) def login(req: LoginRequest): Either[AccountError, String] = + val result = authenticateUser(req) + result match + case Right(_) => meterRegistry.counter("nowchess.auth.logins", "result", "success").increment() + case Left(error) => + meterRegistry.counter("nowchess.auth.logins", "result", "failure").increment() + meterRegistry.counter("nowchess.auth.login.failures", "reason", loginFailureReason(error)).increment() + result + + private def authenticateUser(req: LoginRequest): Either[AccountError, String] = userAccountRepository.findByUsername(req.username) match case None => log.warnf("Login failed for unknown user %s", req.username) @@ -69,6 +89,17 @@ class AccountService: .sign(), ) + private def loginFailureReason(error: AccountError): String = error match + case AccountError.InvalidCredentials => "invalid_credentials" + case AccountError.UserBanned => "user_banned" + case AccountError.UsernameTaken(_) => "username_taken" + case AccountError.EmailAlreadyRegistered(_) => "email_registered" + case AccountError.UserNotFound => "user_not_found" + case AccountError.BotNotFound => "bot_not_found" + case AccountError.BotLimitExceeded => "bot_limit_exceeded" + case AccountError.NotAuthorized => "not_authorized" + case AccountError.BotBanned => "bot_banned" + def findByUsername(username: String): Option[UserAccount] = userAccountRepository.findByUsername(username) diff --git a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala index 532b4c8..fd903fc 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala @@ -18,6 +18,7 @@ import de.nowchess.account.dto.{ } import de.nowchess.account.error.ChallengeError import de.nowchess.account.repository.{ChallengeRepository, UserAccountRepository} +import io.micrometer.core.instrument.MeterRegistry import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import jakarta.transaction.Transactional @@ -48,10 +49,22 @@ class ChallengeService: @Inject var eventPublisher: EventPublisher = uninitialized + + @Inject + var meterRegistry: MeterRegistry = uninitialized // scalafix:on @Transactional def create(challengerId: UUID, destUsername: String, req: ChallengeRequest): Either[ChallengeError, Challenge] = + val result = createChallenge(challengerId, destUsername, req) + result.foreach(_ => meterRegistry.counter("nowchess.challenges.created").increment()) + result + + private def createChallenge( + challengerId: UUID, + destUsername: String, + req: ChallengeRequest, + ): Either[ChallengeError, Challenge] = for destUser <- userAccountRepository.findByUsername(destUsername).toRight(ChallengeError.UserNotFound(destUsername)) challenger <- userAccountRepository.findById(challengerId).toRight(ChallengeError.ChallengerNotFound) @@ -80,6 +93,11 @@ class ChallengeService: @Transactional def accept(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] = + val result = acceptChallenge(challengeId, userId) + result.foreach(_ => meterRegistry.counter("nowchess.challenges.accepted").increment()) + result + + private def acceptChallenge(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] = for challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound) _ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive) @@ -96,6 +114,11 @@ class ChallengeService: @Transactional def decline(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] = + val result = declineChallenge(challengeId, userId, req) + result.foreach(_ => meterRegistry.counter("nowchess.challenges.declined").increment()) + result + + private def declineChallenge(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] = for challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound) _ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive) @@ -109,6 +132,11 @@ class ChallengeService: @Transactional def cancel(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] = + val result = cancelChallenge(challengeId, userId) + result.foreach(_ => meterRegistry.counter("nowchess.challenges.cancelled").increment()) + result + + private def cancelChallenge(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] = for challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound) _ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 41389e8..f04ae97 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -1,13 +1,16 @@ package de.nowchess.coordinator.service +import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped import jakarta.enterprise.inject.Instance import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.api.model.GenericKubernetesResource import io.fabric8.kubernetes.client.KubernetesClient +import io.micrometer.core.instrument.{Gauge, MeterRegistry} import org.jboss.logging.Logger +import java.util.concurrent.atomic.AtomicReference import scala.compiletime.uninitialized @ApplicationScoped @@ -21,10 +24,14 @@ class AutoScaler: @Inject private var instanceRegistry: InstanceRegistry = uninitialized + + @Inject + private var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var private val log = Logger.getLogger(classOf[AutoScaler]) private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L) + private val avgLoadRef = new AtomicReference[Double](0.0) private def kubeClientOpt: Option[KubernetesClient] = if kubeClientInstance.isUnsatisfied then None @@ -33,6 +40,13 @@ class AutoScaler: private val argoApiVersion = "argoproj.io/v1alpha1" private val argoKind = "Rollout" + @PostConstruct + def initMetrics(): Unit = + Gauge + .builder("nowchess.coordinator.load.average", avgLoadRef, _.get()) + .register(meterRegistry) + () + // scalafix:off DisableSyntax.asInstanceOf private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] = Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] => @@ -48,6 +62,7 @@ class AutoScaler: val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") if instances.nonEmpty then val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size + avgLoadRef.set(avgLoad) if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp() else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore then scaleDown() @@ -79,6 +94,7 @@ class AutoScaler: .inNamespace(config.k8sNamespace) .resource(rollout) .update() + meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment() log.infof( "Scaled up %s from %d to %d replicas", config.k8sRolloutName, @@ -91,6 +107,7 @@ class AutoScaler: } catch case ex: Exception => + meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment() log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName) def scaleDown(): Unit = @@ -120,6 +137,7 @@ class AutoScaler: .inNamespace(config.k8sNamespace) .resource(rollout) .update() + meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment() log.infof( "Scaled down %s from %d to %d replicas", config.k8sRolloutName, @@ -132,4 +150,5 @@ class AutoScaler: } catch case ex: Exception => + meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment() log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala index 40a62e9..1263772 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala @@ -5,6 +5,7 @@ import jakarta.inject.Inject import io.quarkus.redis.datasource.RedisDataSource import de.nowchess.coordinator.config.CoordinatorConfig import com.fasterxml.jackson.databind.ObjectMapper +import io.micrometer.core.instrument.MeterRegistry import scala.jdk.CollectionConverters.* import org.jboss.logging.Logger import scala.compiletime.uninitialized @@ -30,6 +31,9 @@ class CacheEvictionManager: @Inject private var objectMapper: ObjectMapper = uninitialized + @Inject + private var meterRegistry: MeterRegistry = uninitialized + private val log = Logger.getLogger(classOf[CacheEvictionManager]) private var redisPrefix = "nowchess" // scalafix:on DisableSyntax.var @@ -38,8 +42,12 @@ class CacheEvictionManager: redisPrefix = prefix def evictStaleGames: Unit = - log.info("Starting cache eviction scan") + meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record { () => + runEviction() + } + private def runEviction(): Unit = + log.info("Starting cache eviction scan") val pattern = s"$redisPrefix:game:entry:*" val keys = redis.key(classOf[String]).keys(pattern) val now = System.currentTimeMillis() @@ -56,6 +64,7 @@ class CacheEvictionManager: try coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId)) redis.key(classOf[String]).del(key) + meterRegistry.counter("nowchess.coordinator.cache.evictions").increment() log.infof("Evicted idle game %s from %s", gameId, instance.instanceId) count + 1 catch diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala index 6c166c3..15714d9 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala @@ -6,6 +6,7 @@ import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.api.model.Pod +import io.micrometer.core.instrument.MeterRegistry import io.quarkus.redis.datasource.RedisDataSource import scala.jdk.CollectionConverters.* import org.jboss.logging.Logger @@ -27,6 +28,9 @@ class HealthMonitor: @Inject private var redis: RedisDataSource = uninitialized + @Inject + private var meterRegistry: MeterRegistry = uninitialized + private val log = Logger.getLogger(classOf[HealthMonitor]) private var redisPrefix = "nowchess" // scalafix:on DisableSyntax.var @@ -39,6 +43,7 @@ class HealthMonitor: redisPrefix = prefix def checkInstanceHealth: Unit = + meterRegistry.counter("nowchess.coordinator.health.checks").increment() val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout) if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", ")) val instances = instanceRegistry.getAllInstances @@ -108,6 +113,7 @@ class HealthMonitor: case Some(pod) => val isReady = isPodReady(pod) if !isReady && inst.state == "HEALTHY" then + meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment() log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId) instanceRegistry.markInstanceDead(inst.instanceId) case None => diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala index 2d13f92..c09e96a 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala @@ -1,5 +1,6 @@ package de.nowchess.coordinator.service +import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import io.quarkus.redis.datasource.ReactiveRedisDataSource @@ -9,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.nowchess.coordinator.dto.InstanceMetadata import java.util.concurrent.ConcurrentHashMap import java.time.{Duration, Instant} +import io.micrometer.core.instrument.{Gauge, MeterRegistry} import io.smallrye.mutiny.Uni import org.jboss.logging.Logger @@ -18,12 +20,22 @@ class InstanceRegistry: @Inject private var redis: ReactiveRedisDataSource = uninitialized private var redisPrefix = "nowchess" + + @Inject + private var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var private val log = Logger.getLogger(classOf[InstanceRegistry]) private val mapper = ObjectMapper() private val instances = ConcurrentHashMap[String, InstanceMetadata]() + @PostConstruct + def initMetrics(): Unit = + Gauge + .builder("nowchess.coordinator.instances.active", instances, m => m.size().toDouble) + .register(meterRegistry) + () + def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -45,6 +57,7 @@ class InstanceRegistry: val isNew = !instances.containsKey(instanceId) instances.put(instanceId, metadata) if isNew then + meterRegistry.counter("nowchess.coordinator.instances.joined").increment() log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount) else log.debugf( @@ -68,6 +81,7 @@ class InstanceRegistry: def removeInstance(instanceId: String): Unit = instances.remove(instanceId) + meterRegistry.counter("nowchess.coordinator.instances.removed").increment() log.infof("Instance %s removed from registry", instanceId) def evictStaleInstances(maxAge: Duration): List[String] = @@ -83,6 +97,7 @@ class InstanceRegistry: .toList stale.foreach { id => instances.remove(id) + meterRegistry.counter("nowchess.coordinator.instances.evicted").increment() log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge) } stale diff --git a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala index 561de76..678c484 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala @@ -17,10 +17,12 @@ import de.nowchess.chess.observer.* import de.nowchess.api.error.GameError import de.nowchess.api.game.WinReason.{Checkmate, Resignation} import de.nowchess.api.io.{GameContextExport, GameContextImport} -import de.nowchess.api.rules.RuleSet +import de.nowchess.api.rules.{PostMoveStatus, RuleSet} +import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer} import java.time.Instant import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger /** Pure game engine that manages game state and notifies observers of state changes. All rule queries delegate to the * injected RuleSet. All user interactions go through Commands; state changes are broadcast via GameEvents. @@ -33,6 +35,7 @@ class GameEngine( initialDrawOffer: Option[Color] = None, initialRedoStack: List[Move] = Nil, initialTakebackRequest: Option[Color] = None, + private val meterRegistry: Option[MeterRegistry] = None, ) extends Observable: // Ensure that initialBoard is set correctly for threefold repetition detection private val contextWithInitialBoard = @@ -57,6 +60,17 @@ class GameEngine( @SuppressWarnings(Array("DisableSyntax.var")) private var pendingTakebackRequest: Option[Color] = initialTakebackRequest + meterRegistry.foreach { reg => + GameEngine.activeGamesCount.incrementAndGet() + reg.counter("nowchess.games.started").increment() + } + private def gamesCompletedCounter(result: String): Counter = + meterRegistry.map(_.counter("nowchess.games.completed", "result", result)).orNull + private def movesProcessedCounter: Counter = + meterRegistry.map(_.counter("nowchess.moves.processed")).orNull + private def movesDurationTimer: Timer = + meterRegistry.map(_.timer("nowchess.moves.duration")).orNull + // Start scheduler immediately for live clocks so passive expiry fires without waiting for a move. clockState.foreach(scheduleExpiryCheck) @@ -165,6 +179,8 @@ class GameEngine( pendingTakebackRequest = None stopClock() redoStack = Nil + Option(gamesCompletedCounter("resignation")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() notifyObservers(ResignEvent(currentContext, color)) } @@ -197,6 +213,8 @@ class GameEngine( pendingTakebackRequest = None stopClock() redoStack = Nil + Option(gamesCompletedCounter("draw.agreement")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() notifyObservers(DrawEvent(currentContext, DrawReason.Agreement)) } @@ -223,11 +241,15 @@ class GameEngine( currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.FiftyMoveRule))) stopClock() redoStack = Nil + Option(gamesCompletedCounter("draw.fifty_move")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() notifyObservers(DrawEvent(currentContext, DrawReason.FiftyMoveRule)) else if ruleSet.isThreefoldRepetition(currentContext) then currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.ThreefoldRepetition))) stopClock() redoStack = Nil + Option(gamesCompletedCounter("draw.threefold")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() notifyObservers(DrawEvent(currentContext, DrawReason.ThreefoldRepetition)) else notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.DrawCannotBeClaimed)) } @@ -311,9 +333,18 @@ class GameEngine( currentContext = currentContext.withResult(Some(GameResult.Draw(reason))) stopClock() redoStack = Nil + Option(gamesCompletedCounter(drawReasonTag(reason))).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() notifyObservers(DrawEvent(currentContext, reason)) } + private def drawReasonTag(reason: DrawReason): String = reason match + case DrawReason.Agreement => "draw.agreement" + case DrawReason.FiftyMoveRule => "draw.fifty_move" + case DrawReason.ThreefoldRepetition => "draw.threefold" + case DrawReason.Stalemate => "draw.stalemate" + case DrawReason.InsufficientMaterial => "draw.insufficient" + /** Inject clock state directly (for testing). */ private[engine] def injectClockState(cs: Option[ClockState]): Unit = synchronized { clockState = cs } @@ -334,6 +365,11 @@ class GameEngine( pendingDrawOffer = None pendingTakebackRequest = None redoStack = Nil + val tag = result match + case GameResult.Draw(_) => "draw.insufficient" + case _ => "timeout" + Option(gamesCompletedCounter(tag)).foreach(_.increment()) + activeGamesCount.decrementAndGet() notifyObservers(TimeFlagEvent(currentContext, flagged)) private def scheduleExpiryCheck(cs: ClockState): Unit = @@ -369,6 +405,12 @@ class GameEngine( // ──── Private helpers ──── private def executeMove(move: Move): Unit = + Option(movesDurationTimer) match + case Some(timer) => timer.record(() => executeMoveBody(move)) + case None => executeMoveBody(move) + Option(movesProcessedCounter).foreach(_.increment()) + + private def executeMoveBody(move: Move): Unit = if !isRedoing then redoStack = Nil pendingTakebackRequest = None @@ -391,28 +433,36 @@ class GameEngine( ) val status = ruleSet.postMoveStatus(currentContext) - if currentContext.result.isEmpty then - if status.isCheckmate then - val winner = currentContext.turn.opposite - currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate))) - cancelScheduled() - notifyObservers(CheckmateEvent(currentContext, winner)) - redoStack = Nil - else if status.isStalemate then - currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate))) - cancelScheduled() - notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate)) - redoStack = Nil - else if status.isInsufficientMaterial then - currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial))) - cancelScheduled() - notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial)) - redoStack = Nil - else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext)) + if currentContext.result.isEmpty then applyPostMoveStatus(status) if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext)) if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext)) + private def applyPostMoveStatus(status: PostMoveStatus): Unit = + if status.isCheckmate then + val winner = currentContext.turn.opposite + currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate))) + cancelScheduled() + Option(gamesCompletedCounter("checkmate")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() + notifyObservers(CheckmateEvent(currentContext, winner)) + redoStack = Nil + else if status.isStalemate then + currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate))) + cancelScheduled() + Option(gamesCompletedCounter("draw.stalemate")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() + notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate)) + redoStack = Nil + else if status.isInsufficientMaterial then + currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial))) + cancelScheduled() + Option(gamesCompletedCounter("draw.insufficient")).foreach(_.increment()) + GameEngine.activeGamesCount.decrementAndGet() + notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial)) + redoStack = Nil + else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext)) + private def translateMoveToNotation(move: Move, boardBefore: Board): String = move.moveType match case MoveType.CastleKingside => "O-O" @@ -526,3 +576,6 @@ class GameEngine( pendingTakebackRequest = None notifyObservers(TakebackDeclinedEvent(currentContext, color)) } + +object GameEngine: + val activeGamesCount: AtomicInteger = new AtomicInteger(0) diff --git a/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala b/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala index cbb122c..256639c 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala @@ -12,7 +12,9 @@ import de.nowchess.chess.grpc.RuleSetGrpcAdapter import de.nowchess.chess.config.RedisConfig import de.nowchess.chess.grpc.IoGrpcClientWrapper import de.nowchess.chess.resource.GameDtoMapper +import io.micrometer.core.instrument.{Gauge, MeterRegistry} import io.quarkus.redis.datasource.RedisDataSource +import jakarta.annotation.PostConstruct import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import org.eclipse.microprofile.rest.client.inject.RestClient @@ -34,12 +36,20 @@ class RedisGameRegistry extends GameRegistry: @Inject var ioClient: IoGrpcClientWrapper = uninitialized @Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized @Inject @RestClient var storeClient: StoreServiceClient = uninitialized + @Inject var meterRegistry: MeterRegistry = uninitialized // scalafix:on private val log = Logger.getLogger(classOf[RedisGameRegistry]) private val localEngines = ConcurrentHashMap[String, GameEntry]() private val rng = new SecureRandom() + @PostConstruct + def initMetrics(): Unit = + Gauge + .builder("nowchess.games.active", GameEngine.activeGamesCount, _.get().toDouble) + .register(meterRegistry) + () + private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId" def generateId(): String = @@ -58,14 +68,17 @@ class RedisGameRegistry extends GameRegistry: ) def get(gameId: String): Option[GameEntry] = - Option(localEngines.get(gameId)) match + val result = Option(localEngines.get(gameId)) match case Some(localEntry) => + meterRegistry.counter("nowchess.games.cache.hits", "source", "local").increment() readRedisDto(gameId).flatMap(dto => Try(reconstruct(dto)).toOption) match case Some(redisEntry) if !sameSnapshot(localEntry, redisEntry) => localEngines.put(gameId, redisEntry) Some(redisEntry) case _ => Some(localEntry) case None => fromRedis(gameId).orElse(fromDb(gameId)) + if result.isEmpty then meterRegistry.counter("nowchess.games.cache.misses").increment() + result def update(entry: GameEntry): Unit = localEngines.put(entry.gameId, entry) @@ -86,6 +99,7 @@ class RedisGameRegistry extends GameRegistry: } } .map { entry => + meterRegistry.counter("nowchess.games.cache.hits", "source", "redis").increment() localEngines.put(gameId, entry) log.infof("Loaded game %s from Redis cache", gameId) entry @@ -118,6 +132,7 @@ class RedisGameRegistry extends GameRegistry: (dto, reconstruct(dto)) } match case scala.util.Success((dto, entry)) => + meterRegistry.counter("nowchess.games.cache.hits", "source", "db").increment() log.infof("Loaded game %s from store service", gameId) localEngines.put(gameId, entry) redis.value(classOf[String]).setex(cacheKey(gameId), 1800L, objectMapper.writeValueAsString(dto)) diff --git a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala index a46a2b5..83214a4 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala @@ -19,6 +19,8 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *} import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub import io.grpc.stub.StreamObserver import io.grpc.Channel +import io.micrometer.core.instrument.{Gauge, MeterRegistry} +import java.util.concurrent.atomic.AtomicInteger @ApplicationScoped class InstanceHeartbeatService: @@ -32,6 +34,9 @@ class InstanceHeartbeatService: @Inject private var mapper: ObjectMapper = uninitialized + @Inject + private var meterRegistry: MeterRegistry = uninitialized + @GrpcClient("coordinator-grpc") private var channel: Channel = uninitialized @@ -51,14 +56,15 @@ class InstanceHeartbeatService: private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None private var heartbeatExecutor = Executors.newScheduledThreadPool(1) private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1) - private var subscriptionCount = 0 private var localCacheSize = 0 private var serviceActive = false private var shuttingDown = false // scalafix:on DisableSyntax.var private val redisHeartbeatPending = new AtomicBoolean(false) + private val subscriptionCount = new AtomicInteger(0) def onStart(@Observes event: StartupEvent): Unit = + Gauge.builder("nowchess.instance.subscriptions", subscriptionCount, _.get().toDouble).register(meterRegistry) if coordinatorEnabled then try shuttingDown = false @@ -90,7 +96,7 @@ class InstanceHeartbeatService: redisPrefix = prefix def setSubscriptionCount(count: Int): Unit = - subscriptionCount = count + subscriptionCount.set(count) def setLocalCacheSize(count: Int): Unit = localCacheSize = count @@ -99,13 +105,13 @@ class InstanceHeartbeatService: if coordinatorEnabled then val setKey = s"$redisPrefix:instance:$instanceId:games" redis.set(classOf[String]).sadd(setKey, gameId) - subscriptionCount += 1 + subscriptionCount.incrementAndGet() def removeGameSubscription(gameId: String): Unit = if coordinatorEnabled then val setKey = s"$redisPrefix:instance:$instanceId:games" redis.set(classOf[String]).srem(setKey, gameId) - subscriptionCount = Math.max(0, subscriptionCount - 1) + subscriptionCount.updateAndGet(c => Math.max(0, c - 1)) private def generateInstanceId(): Unit = val hostname = @@ -162,13 +168,14 @@ class InstanceHeartbeatService: .setHostname(getHostname) .setHttpPort(httpPort) .setGrpcPort(grpcPort) - .setSubscriptionCount(subscriptionCount) + .setSubscriptionCount(subscriptionCount.get()) .setLocalCacheSize(localCacheSize) .setTimestampMillis(System.currentTimeMillis()) .build() observer.onNext(frame) catch case ex: Exception => + meterRegistry.counter("nowchess.heartbeat.failures").increment() log.warnf(ex, "Failed to send heartbeat frame") } @@ -182,7 +189,7 @@ class InstanceHeartbeatService: "hostname" -> getHostname, "httpPort" -> httpPort, "grpcPort" -> grpcPort, - "subscriptionCount" -> subscriptionCount, + "subscriptionCount" -> subscriptionCount.get(), "localCacheSize" -> localCacheSize, "lastHeartbeat" -> java.time.Instant.now().toString, "state" -> "HEALTHY", diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/logic/AlphaBetaSearch.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/logic/AlphaBetaSearch.scala index cf41d70..dbd2360 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/logic/AlphaBetaSearch.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/logic/AlphaBetaSearch.scala @@ -7,6 +7,7 @@ import de.nowchess.bot.ai.Evaluation import de.nowchess.bot.util.ZobristHash import de.nowchess.api.rules.RuleSet import de.nowchess.rules.sets.DefaultRules +import io.micrometer.core.instrument.MeterRegistry import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} final class AlphaBetaSearch( @@ -14,6 +15,7 @@ final class AlphaBetaSearch( tt: TranspositionTable = TranspositionTable(), weights: Evaluation, numThreads: Int = Runtime.getRuntime.availableProcessors, + meterRegistry: MeterRegistry = null, ): private val INF = Int.MaxValue / 2 @@ -85,8 +87,8 @@ final class AlphaBetaSearch( val rootHash = ZobristHash.hash(context) @scala.annotation.tailrec - def loop(bestSoFar: Option[Move], prevScore: Int, depth: Int): Option[Move] = - if isOutOfTime then bestSoFar + def loop(bestSoFar: Option[Move], prevScore: Int, depth: Int, lastDepth: Int): (Option[Move], Int) = + if isOutOfTime then (bestSoFar, lastDepth) else val (alpha, beta) = if depth == 1 then (-INF, INF) else (prevScore - ASPIRATION_DELTA, prevScore + ASPIRATION_DELTA) @@ -99,9 +101,16 @@ final class AlphaBetaSearch( rootHash, excludedRootMoves, ) - loop(move.orElse(bestSoFar), score, depth + 1) + loop(move.orElse(bestSoFar), score, depth + 1, depth) - loop(None, 0, 1) + val (result, depthReached) = loop(None, 0, 1, 0) + recordSearchMetrics(depthReached) + result + + private def recordSearchMetrics(depthReached: Int): Unit = + if meterRegistry != null then + meterRegistry.summary("nowchess.bot.search.nodes").record(nodeCount.get().toDouble) + meterRegistry.summary("nowchess.bot.search.depth").record(depthReached.toDouble) private def isOutOfTime: Boolean = System.currentTimeMillis - timeStartMs.get >= timeLimitMs.get diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala index b797bf1..9cfd11c 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala @@ -6,6 +6,7 @@ import de.nowchess.bot.BotController import de.nowchess.bot.BotDifficulty import de.nowchess.bot.config.RedisConfig import de.nowchess.io.fen.FenParser +import io.micrometer.core.instrument.MeterRegistry import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.runtime.StartupEvent import jakarta.enterprise.context.ApplicationScoped @@ -18,10 +19,11 @@ import java.util.function.Consumer class OfficialBotService: // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized - @Inject var objectMapper: ObjectMapper = uninitialized - @Inject var botController: BotController = uninitialized + @Inject var redis: RedisDataSource = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var botController: BotController = uninitialized + @Inject var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var private val terminalStatuses = @@ -85,7 +87,10 @@ class OfficialBotService: val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium) botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot => FenParser.parseFen(fen).toOption.foreach { context => - bot(context).foreach { move => + val timer = meterRegistry.timer("nowchess.bot.move.duration", "bot", botName) + val moveOpt = timer.recordCallable(() => bot(context)) + moveOpt.flatten.foreach { move => + meterRegistry.counter("nowchess.bot.moves.computed", "bot", botName).increment() val uci = toUci(move) val c2sTopic = s"${redisConfig.prefix}:game:$gameId:c2s" val moveMsg = s"""{"type":"MOVE","uci":"$uci","playerId":"$botAccountId"}""" diff --git a/modules/store/src/main/scala/de/nowchess/store/service/GameWritebackService.scala b/modules/store/src/main/scala/de/nowchess/store/service/GameWritebackService.scala index f006ead..ce0ab9b 100644 --- a/modules/store/src/main/scala/de/nowchess/store/service/GameWritebackService.scala +++ b/modules/store/src/main/scala/de/nowchess/store/service/GameWritebackService.scala @@ -3,6 +3,7 @@ package de.nowchess.store.service import de.nowchess.api.dto.GameWritebackEventDto import de.nowchess.store.domain.GameRecord import de.nowchess.store.repository.GameRecordRepository +import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer} import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import jakarta.transaction.Transactional @@ -12,64 +13,84 @@ import java.time.Instant @ApplicationScoped class GameWritebackService: - @Inject + // scalafix:off DisableSyntax.var + @Inject var repository: GameRecordRepository = uninitialized - // scalafix:on + + @Inject + var meterRegistry: MeterRegistry = uninitialized + // scalafix:on DisableSyntax.var + + private lazy val writebackTimer: Timer = + meterRegistry.timer("nowchess.store.writeback.duration") + + private lazy val writebackSkipped: Counter = + meterRegistry.counter("nowchess.store.writeback.skipped") + + private def gamesWrittenCounter(operation: String): Counter = + meterRegistry.counter("nowchess.store.games.written", "operation", operation) @Transactional def writeBack(event: GameWritebackEventDto): Unit = + writebackTimer.record(() => doWriteBack(event)) + + private def doWriteBack(event: GameWritebackEventDto): Unit = repository.findByGameId(event.gameId) match case None => - val record = new GameRecord - record.gameId = event.gameId - record.fen = event.fen - record.pgn = event.pgn - record.moveCount = event.moveCount - record.whiteId = event.whiteId - record.whiteName = event.whiteName - record.blackId = event.blackId - record.blackName = event.blackName - record.mode = event.mode - record.resigned = event.resigned - record.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull - record.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull - record.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull - record.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull - record.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull - record.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull - record.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull - record.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull - record.clockActiveColor = event.clockActiveColor.orNull - record.pendingDrawOffer = event.pendingDrawOffer.orNull - record.result = event.result.orNull - record.terminationReason = event.terminationReason.orNull - record.createdAt = Instant.now() - record.updatedAt = Instant.now() - repository.persist(record) + createRecord(event) + gamesWrittenCounter("create").increment() case Some(r) if event.moveCount > r.moveCount || event.pgn != r.pgn => - r.fen = event.fen - r.pgn = event.pgn - r.moveCount = event.moveCount - r.whiteId = event.whiteId - r.whiteName = event.whiteName - r.blackId = event.blackId - r.blackName = event.blackName - r.mode = event.mode - r.resigned = event.resigned - r.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull - r.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull - r.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull - r.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull - r.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull - r.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull - r.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull - r.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull - r.clockActiveColor = event.clockActiveColor.orNull - r.pendingDrawOffer = event.pendingDrawOffer.orNull - r.pendingTakebackOffer = event.pendingTakebackRequest.orNull - r.result = event.result.orNull - r.terminationReason = event.terminationReason.orNull - r.updatedAt = Instant.now() - repository.merge(r) - case _ => () + updateRecord(r, event) + gamesWrittenCounter("update").increment() + case _ => + writebackSkipped.increment() + + private def createRecord(event: GameWritebackEventDto): Unit = + val record = new GameRecord + record.gameId = event.gameId + record.fen = event.fen + record.pgn = event.pgn + record.moveCount = event.moveCount + record.whiteId = event.whiteId + record.whiteName = event.whiteName + record.blackId = event.blackId + record.blackName = event.blackName + record.mode = event.mode + record.resigned = event.resigned + applyClockFields(record, event) + record.result = event.result.orNull + record.terminationReason = event.terminationReason.orNull + record.createdAt = Instant.now() + record.updatedAt = Instant.now() + repository.persist(record) + + private def updateRecord(r: GameRecord, event: GameWritebackEventDto): Unit = + r.fen = event.fen + r.pgn = event.pgn + r.moveCount = event.moveCount + r.whiteId = event.whiteId + r.whiteName = event.whiteName + r.blackId = event.blackId + r.blackName = event.blackName + r.mode = event.mode + r.resigned = event.resigned + applyClockFields(r, event) + r.pendingDrawOffer = event.pendingDrawOffer.orNull + r.pendingTakebackOffer = event.pendingTakebackRequest.orNull + r.result = event.result.orNull + r.terminationReason = event.terminationReason.orNull + r.updatedAt = Instant.now() + repository.merge(r) + + private def applyClockFields(r: GameRecord, event: GameWritebackEventDto): Unit = + r.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull + r.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull + r.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull + r.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull + r.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull + r.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull + r.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull + r.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull + r.clockActiveColor = event.clockActiveColor.orNull + r.pendingDrawOffer = event.pendingDrawOffer.orNull diff --git a/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala b/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala index 42dcace..f665994 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala @@ -1,6 +1,7 @@ package de.nowchess.ws.resource import de.nowchess.ws.config.RedisConfig +import io.micrometer.core.instrument.{Counter, Gauge, MeterRegistry} import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.pubsub.PubSubCommands import io.quarkus.websockets.next.* @@ -26,10 +27,27 @@ class GameWebSocketResource: @Inject var jwtParser: JWTParser = uninitialized + + @Inject + var meterRegistry: MeterRegistry = uninitialized // scalafix:on DisableSyntax.var private val connections = new ConcurrentHashMap[String, ConnectionMeta]() + private lazy val connectionsOpened: Counter = + meterRegistry.counter("nowchess.ws.connections.opened") + + private lazy val connectionsClosed: Counter = + meterRegistry.counter("nowchess.ws.connections.closed") + + private lazy val messagesReceived: Counter = + meterRegistry.counter("nowchess.ws.messages.received") + + private lazy val activeGauge: Unit = + Gauge + .builder("nowchess.ws.connections.active", connections, _.size().toDouble) + .register(meterRegistry) + private def s2cTopic(gameId: String): String = s"${redisConfig.prefix}:game:$gameId:s2c" @@ -38,22 +56,19 @@ class GameWebSocketResource: @OnOpen def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit = - val gameId = connection.pathParam("gameId") - val playerId = Option(handshake.header("Authorization")) - .filter(_.nonEmpty) - .flatMap(token => Try(jwtParser.parse(token)).toOption) - .map(_.getSubject) + activeGauge + val gameId = connection.pathParam("gameId") + val playerId = resolvePlayerId(handshake) log.infof("Game WebSocket opened — gameId=%s playerId=%s", gameId, playerId.getOrElse("anonymous")) val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ()) val subscriber = redis.pubsub(classOf[String]).subscribe(s2cTopic(gameId), handler) connections.put(connection.id(), ConnectionMeta(gameId, subscriber, playerId)) - val connectedMsg = playerId match - case Some(pid) => s"""{"type":"CONNECTED","gameId":"$gameId","playerId":"$pid"}""" - case None => s"""{"type":"CONNECTED","gameId":"$gameId"}""" - redis.pubsub(classOf[String]).publish(c2sTopic(gameId), connectedMsg) + connectionsOpened.increment() + publishConnected(gameId, playerId) @OnTextMessage def onTextMessage(connection: WebSocketConnection, message: String): Unit = + messagesReceived.increment() Option(connections.get(connection.id())).foreach { meta => val enriched = meta.playerId match case Some(pid) => injectPlayerId(message, pid) @@ -66,8 +81,21 @@ class GameWebSocketResource: Option(connections.remove(connection.id())).foreach { meta => log.infof("Game WebSocket closed — gameId=%s", meta.gameId) meta.subscriber.unsubscribe(s2cTopic(meta.gameId)) + connectionsClosed.increment() } + private def resolvePlayerId(handshake: HandshakeRequest): Option[String] = + Option(handshake.header("Authorization")) + .filter(_.nonEmpty) + .flatMap(token => Try(jwtParser.parse(token)).toOption) + .map(_.getSubject) + + private def publishConnected(gameId: String, playerId: Option[String]): Unit = + val connectedMsg = playerId match + case Some(pid) => s"""{"type":"CONNECTED","gameId":"$gameId","playerId":"$pid"}""" + case None => s"""{"type":"CONNECTED","gameId":"$gameId"}""" + redis.pubsub(classOf[String]).publish(c2sTopic(gameId), connectedMsg) + private def injectPlayerId(msg: String, pid: String): String = val trimmed = msg.trim if trimmed.endsWith("}") then trimmed.dropRight(1) + s""","playerId":"$pid"}"""