feat: add metrics for user registrations, logins, challenges, and game writebacks
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
This commit is contained in:
@@ -4,6 +4,7 @@ import de.nowchess.account.domain.{BotAccount, OfficialBotAccount, UserAccount}
|
||||
import de.nowchess.account.dto.{LoginRequest, RegisterRequest}
|
||||
import de.nowchess.account.error.AccountError
|
||||
import de.nowchess.account.repository.{BotAccountRepository, OfficialBotAccountRepository, UserAccountRepository}
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import io.quarkus.elytron.security.common.BcryptUtil
|
||||
import io.smallrye.jwt.build.Jwt
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
@@ -29,11 +30,21 @@ class AccountService:
|
||||
|
||||
@Inject
|
||||
var officialBotAccountRepository: OfficialBotAccountRepository = uninitialized
|
||||
|
||||
@Inject
|
||||
var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
@Transactional
|
||||
def register(req: RegisterRequest): Either[AccountError, UserAccount] =
|
||||
log.infof("Registering user %s", req.username)
|
||||
val result = registerAccount(req)
|
||||
result match
|
||||
case Right(_) => meterRegistry.counter("nowchess.users.registrations", "result", "success").increment()
|
||||
case Left(_) => meterRegistry.counter("nowchess.users.registrations", "result", "failure").increment()
|
||||
result
|
||||
|
||||
private def registerAccount(req: RegisterRequest): Either[AccountError, UserAccount] =
|
||||
if userAccountRepository.findByUsername(req.username).isDefined then Left(AccountError.UsernameTaken(req.username))
|
||||
else if userAccountRepository.findByEmail(req.email).isDefined then
|
||||
Left(AccountError.EmailAlreadyRegistered(req.email))
|
||||
@@ -48,6 +59,15 @@ class AccountService:
|
||||
Right(account)
|
||||
|
||||
def login(req: LoginRequest): Either[AccountError, String] =
|
||||
val result = authenticateUser(req)
|
||||
result match
|
||||
case Right(_) => meterRegistry.counter("nowchess.auth.logins", "result", "success").increment()
|
||||
case Left(error) =>
|
||||
meterRegistry.counter("nowchess.auth.logins", "result", "failure").increment()
|
||||
meterRegistry.counter("nowchess.auth.login.failures", "reason", loginFailureReason(error)).increment()
|
||||
result
|
||||
|
||||
private def authenticateUser(req: LoginRequest): Either[AccountError, String] =
|
||||
userAccountRepository.findByUsername(req.username) match
|
||||
case None =>
|
||||
log.warnf("Login failed for unknown user %s", req.username)
|
||||
@@ -69,6 +89,17 @@ class AccountService:
|
||||
.sign(),
|
||||
)
|
||||
|
||||
private def loginFailureReason(error: AccountError): String = error match
|
||||
case AccountError.InvalidCredentials => "invalid_credentials"
|
||||
case AccountError.UserBanned => "user_banned"
|
||||
case AccountError.UsernameTaken(_) => "username_taken"
|
||||
case AccountError.EmailAlreadyRegistered(_) => "email_registered"
|
||||
case AccountError.UserNotFound => "user_not_found"
|
||||
case AccountError.BotNotFound => "bot_not_found"
|
||||
case AccountError.BotLimitExceeded => "bot_limit_exceeded"
|
||||
case AccountError.NotAuthorized => "not_authorized"
|
||||
case AccountError.BotBanned => "bot_banned"
|
||||
|
||||
def findByUsername(username: String): Option[UserAccount] =
|
||||
userAccountRepository.findByUsername(username)
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import de.nowchess.account.dto.{
|
||||
}
|
||||
import de.nowchess.account.error.ChallengeError
|
||||
import de.nowchess.account.repository.{ChallengeRepository, UserAccountRepository}
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.transaction.Transactional
|
||||
@@ -48,10 +49,22 @@ class ChallengeService:
|
||||
|
||||
@Inject
|
||||
var eventPublisher: EventPublisher = uninitialized
|
||||
|
||||
@Inject
|
||||
var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
@Transactional
|
||||
def create(challengerId: UUID, destUsername: String, req: ChallengeRequest): Either[ChallengeError, Challenge] =
|
||||
val result = createChallenge(challengerId, destUsername, req)
|
||||
result.foreach(_ => meterRegistry.counter("nowchess.challenges.created").increment())
|
||||
result
|
||||
|
||||
private def createChallenge(
|
||||
challengerId: UUID,
|
||||
destUsername: String,
|
||||
req: ChallengeRequest,
|
||||
): Either[ChallengeError, Challenge] =
|
||||
for
|
||||
destUser <- userAccountRepository.findByUsername(destUsername).toRight(ChallengeError.UserNotFound(destUsername))
|
||||
challenger <- userAccountRepository.findById(challengerId).toRight(ChallengeError.ChallengerNotFound)
|
||||
@@ -80,6 +93,11 @@ class ChallengeService:
|
||||
|
||||
@Transactional
|
||||
def accept(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||
val result = acceptChallenge(challengeId, userId)
|
||||
result.foreach(_ => meterRegistry.counter("nowchess.challenges.accepted").increment())
|
||||
result
|
||||
|
||||
private def acceptChallenge(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||
for
|
||||
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
||||
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
||||
@@ -96,6 +114,11 @@ class ChallengeService:
|
||||
|
||||
@Transactional
|
||||
def decline(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] =
|
||||
val result = declineChallenge(challengeId, userId, req)
|
||||
result.foreach(_ => meterRegistry.counter("nowchess.challenges.declined").increment())
|
||||
result
|
||||
|
||||
private def declineChallenge(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] =
|
||||
for
|
||||
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
||||
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
||||
@@ -109,6 +132,11 @@ class ChallengeService:
|
||||
|
||||
@Transactional
|
||||
def cancel(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||
val result = cancelChallenge(challengeId, userId)
|
||||
result.foreach(_ => meterRegistry.counter("nowchess.challenges.cancelled").increment())
|
||||
result
|
||||
|
||||
private def cancelChallenge(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||
for
|
||||
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
||||
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.enterprise.inject.Instance
|
||||
import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -21,10 +24,14 @@ class AutoScaler:
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||
private val avgLoadRef = new AtomicReference[Double](0.0)
|
||||
|
||||
private def kubeClientOpt: Option[KubernetesClient] =
|
||||
if kubeClientInstance.isUnsatisfied then None
|
||||
@@ -33,6 +40,13 @@ class AutoScaler:
|
||||
private val argoApiVersion = "argoproj.io/v1alpha1"
|
||||
private val argoKind = "Rollout"
|
||||
|
||||
@PostConstruct
|
||||
def initMetrics(): Unit =
|
||||
Gauge
|
||||
.builder("nowchess.coordinator.load.average", avgLoadRef, _.get())
|
||||
.register(meterRegistry)
|
||||
()
|
||||
|
||||
// scalafix:off DisableSyntax.asInstanceOf
|
||||
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
||||
Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] =>
|
||||
@@ -48,6 +62,7 @@ class AutoScaler:
|
||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||
if instances.nonEmpty then
|
||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||
avgLoadRef.set(avgLoad)
|
||||
|
||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore then scaleDown()
|
||||
@@ -79,6 +94,7 @@ class AutoScaler:
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment()
|
||||
log.infof(
|
||||
"Scaled up %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
@@ -91,6 +107,7 @@ class AutoScaler:
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment()
|
||||
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
||||
|
||||
def scaleDown(): Unit =
|
||||
@@ -120,6 +137,7 @@ class AutoScaler:
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
@@ -132,4 +150,5 @@ class AutoScaler:
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||
|
||||
+10
-1
@@ -5,6 +5,7 @@ import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
@@ -30,6 +31,9 @@ class CacheEvictionManager:
|
||||
@Inject
|
||||
private var objectMapper: ObjectMapper = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
@@ -38,8 +42,12 @@ class CacheEvictionManager:
|
||||
redisPrefix = prefix
|
||||
|
||||
def evictStaleGames: Unit =
|
||||
log.info("Starting cache eviction scan")
|
||||
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record { () =>
|
||||
runEviction()
|
||||
}
|
||||
|
||||
private def runEviction(): Unit =
|
||||
log.info("Starting cache eviction scan")
|
||||
val pattern = s"$redisPrefix:game:entry:*"
|
||||
val keys = redis.key(classOf[String]).keys(pattern)
|
||||
val now = System.currentTimeMillis()
|
||||
@@ -56,6 +64,7 @@ class CacheEvictionManager:
|
||||
try
|
||||
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
||||
redis.key(classOf[String]).del(key)
|
||||
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment()
|
||||
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
||||
count + 1
|
||||
catch
|
||||
|
||||
@@ -6,6 +6,7 @@ import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.fabric8.kubernetes.api.model.Pod
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import org.jboss.logging.Logger
|
||||
@@ -27,6 +28,9 @@ class HealthMonitor:
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[HealthMonitor])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
@@ -39,6 +43,7 @@ class HealthMonitor:
|
||||
redisPrefix = prefix
|
||||
|
||||
def checkInstanceHealth: Unit =
|
||||
meterRegistry.counter("nowchess.coordinator.health.checks").increment()
|
||||
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
||||
if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
@@ -108,6 +113,7 @@ class HealthMonitor:
|
||||
case Some(pod) =>
|
||||
val isReady = isPodReady(pod)
|
||||
if !isReady && inst.state == "HEALTHY" then
|
||||
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
|
||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
case None =>
|
||||
|
||||
+15
@@ -1,5 +1,6 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||
@@ -9,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.time.{Duration, Instant}
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import io.smallrye.mutiny.Uni
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
@@ -18,12 +20,22 @@ class InstanceRegistry:
|
||||
@Inject
|
||||
private var redis: ReactiveRedisDataSource = uninitialized
|
||||
private var redisPrefix = "nowchess"
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[InstanceRegistry])
|
||||
private val mapper = ObjectMapper()
|
||||
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
||||
|
||||
@PostConstruct
|
||||
def initMetrics(): Unit =
|
||||
Gauge
|
||||
.builder("nowchess.coordinator.instances.active", instances, m => m.size().toDouble)
|
||||
.register(meterRegistry)
|
||||
()
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
@@ -45,6 +57,7 @@ class InstanceRegistry:
|
||||
val isNew = !instances.containsKey(instanceId)
|
||||
instances.put(instanceId, metadata)
|
||||
if isNew then
|
||||
meterRegistry.counter("nowchess.coordinator.instances.joined").increment()
|
||||
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
||||
else
|
||||
log.debugf(
|
||||
@@ -68,6 +81,7 @@ class InstanceRegistry:
|
||||
|
||||
def removeInstance(instanceId: String): Unit =
|
||||
instances.remove(instanceId)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.removed").increment()
|
||||
log.infof("Instance %s removed from registry", instanceId)
|
||||
|
||||
def evictStaleInstances(maxAge: Duration): List[String] =
|
||||
@@ -83,6 +97,7 @@ class InstanceRegistry:
|
||||
.toList
|
||||
stale.foreach { id =>
|
||||
instances.remove(id)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.evicted").increment()
|
||||
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
||||
}
|
||||
stale
|
||||
|
||||
@@ -17,10 +17,12 @@ import de.nowchess.chess.observer.*
|
||||
import de.nowchess.api.error.GameError
|
||||
import de.nowchess.api.game.WinReason.{Checkmate, Resignation}
|
||||
import de.nowchess.api.io.{GameContextExport, GameContextImport}
|
||||
import de.nowchess.api.rules.RuleSet
|
||||
import de.nowchess.api.rules.{PostMoveStatus, RuleSet}
|
||||
|
||||
import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer}
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/** Pure game engine that manages game state and notifies observers of state changes. All rule queries delegate to the
|
||||
* injected RuleSet. All user interactions go through Commands; state changes are broadcast via GameEvents.
|
||||
@@ -33,6 +35,7 @@ class GameEngine(
|
||||
initialDrawOffer: Option[Color] = None,
|
||||
initialRedoStack: List[Move] = Nil,
|
||||
initialTakebackRequest: Option[Color] = None,
|
||||
private val meterRegistry: Option[MeterRegistry] = None,
|
||||
) extends Observable:
|
||||
// Ensure that initialBoard is set correctly for threefold repetition detection
|
||||
private val contextWithInitialBoard =
|
||||
@@ -57,6 +60,17 @@ class GameEngine(
|
||||
@SuppressWarnings(Array("DisableSyntax.var"))
|
||||
private var pendingTakebackRequest: Option[Color] = initialTakebackRequest
|
||||
|
||||
meterRegistry.foreach { reg =>
|
||||
GameEngine.activeGamesCount.incrementAndGet()
|
||||
reg.counter("nowchess.games.started").increment()
|
||||
}
|
||||
private def gamesCompletedCounter(result: String): Counter =
|
||||
meterRegistry.map(_.counter("nowchess.games.completed", "result", result)).orNull
|
||||
private def movesProcessedCounter: Counter =
|
||||
meterRegistry.map(_.counter("nowchess.moves.processed")).orNull
|
||||
private def movesDurationTimer: Timer =
|
||||
meterRegistry.map(_.timer("nowchess.moves.duration")).orNull
|
||||
|
||||
// Start scheduler immediately for live clocks so passive expiry fires without waiting for a move.
|
||||
clockState.foreach(scheduleExpiryCheck)
|
||||
|
||||
@@ -165,6 +179,8 @@ class GameEngine(
|
||||
pendingTakebackRequest = None
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("resignation")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(ResignEvent(currentContext, color))
|
||||
}
|
||||
|
||||
@@ -197,6 +213,8 @@ class GameEngine(
|
||||
pendingTakebackRequest = None
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("draw.agreement")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.Agreement))
|
||||
}
|
||||
|
||||
@@ -223,11 +241,15 @@ class GameEngine(
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.FiftyMoveRule)))
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("draw.fifty_move")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.FiftyMoveRule))
|
||||
else if ruleSet.isThreefoldRepetition(currentContext) then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.ThreefoldRepetition)))
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("draw.threefold")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.ThreefoldRepetition))
|
||||
else notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.DrawCannotBeClaimed))
|
||||
}
|
||||
@@ -311,9 +333,18 @@ class GameEngine(
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(reason)))
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter(drawReasonTag(reason))).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, reason))
|
||||
}
|
||||
|
||||
private def drawReasonTag(reason: DrawReason): String = reason match
|
||||
case DrawReason.Agreement => "draw.agreement"
|
||||
case DrawReason.FiftyMoveRule => "draw.fifty_move"
|
||||
case DrawReason.ThreefoldRepetition => "draw.threefold"
|
||||
case DrawReason.Stalemate => "draw.stalemate"
|
||||
case DrawReason.InsufficientMaterial => "draw.insufficient"
|
||||
|
||||
/** Inject clock state directly (for testing). */
|
||||
private[engine] def injectClockState(cs: Option[ClockState]): Unit = synchronized { clockState = cs }
|
||||
|
||||
@@ -334,6 +365,11 @@ class GameEngine(
|
||||
pendingDrawOffer = None
|
||||
pendingTakebackRequest = None
|
||||
redoStack = Nil
|
||||
val tag = result match
|
||||
case GameResult.Draw(_) => "draw.insufficient"
|
||||
case _ => "timeout"
|
||||
Option(gamesCompletedCounter(tag)).foreach(_.increment())
|
||||
activeGamesCount.decrementAndGet()
|
||||
notifyObservers(TimeFlagEvent(currentContext, flagged))
|
||||
|
||||
private def scheduleExpiryCheck(cs: ClockState): Unit =
|
||||
@@ -369,6 +405,12 @@ class GameEngine(
|
||||
// ──── Private helpers ────
|
||||
|
||||
private def executeMove(move: Move): Unit =
|
||||
Option(movesDurationTimer) match
|
||||
case Some(timer) => timer.record(() => executeMoveBody(move))
|
||||
case None => executeMoveBody(move)
|
||||
Option(movesProcessedCounter).foreach(_.increment())
|
||||
|
||||
private def executeMoveBody(move: Move): Unit =
|
||||
if !isRedoing then
|
||||
redoStack = Nil
|
||||
pendingTakebackRequest = None
|
||||
@@ -391,28 +433,36 @@ class GameEngine(
|
||||
)
|
||||
|
||||
val status = ruleSet.postMoveStatus(currentContext)
|
||||
if currentContext.result.isEmpty then
|
||||
if status.isCheckmate then
|
||||
val winner = currentContext.turn.opposite
|
||||
currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate)))
|
||||
cancelScheduled()
|
||||
notifyObservers(CheckmateEvent(currentContext, winner))
|
||||
redoStack = Nil
|
||||
else if status.isStalemate then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate)))
|
||||
cancelScheduled()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate))
|
||||
redoStack = Nil
|
||||
else if status.isInsufficientMaterial then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial)))
|
||||
cancelScheduled()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial))
|
||||
redoStack = Nil
|
||||
else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext))
|
||||
if currentContext.result.isEmpty then applyPostMoveStatus(status)
|
||||
|
||||
if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext))
|
||||
if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext))
|
||||
|
||||
private def applyPostMoveStatus(status: PostMoveStatus): Unit =
|
||||
if status.isCheckmate then
|
||||
val winner = currentContext.turn.opposite
|
||||
currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate)))
|
||||
cancelScheduled()
|
||||
Option(gamesCompletedCounter("checkmate")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(CheckmateEvent(currentContext, winner))
|
||||
redoStack = Nil
|
||||
else if status.isStalemate then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate)))
|
||||
cancelScheduled()
|
||||
Option(gamesCompletedCounter("draw.stalemate")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate))
|
||||
redoStack = Nil
|
||||
else if status.isInsufficientMaterial then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial)))
|
||||
cancelScheduled()
|
||||
Option(gamesCompletedCounter("draw.insufficient")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial))
|
||||
redoStack = Nil
|
||||
else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext))
|
||||
|
||||
private def translateMoveToNotation(move: Move, boardBefore: Board): String =
|
||||
move.moveType match
|
||||
case MoveType.CastleKingside => "O-O"
|
||||
@@ -526,3 +576,6 @@ class GameEngine(
|
||||
pendingTakebackRequest = None
|
||||
notifyObservers(TakebackDeclinedEvent(currentContext, color))
|
||||
}
|
||||
|
||||
object GameEngine:
|
||||
val activeGamesCount: AtomicInteger = new AtomicInteger(0)
|
||||
|
||||
@@ -12,7 +12,9 @@ import de.nowchess.chess.grpc.RuleSetGrpcAdapter
|
||||
import de.nowchess.chess.config.RedisConfig
|
||||
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||
import de.nowchess.chess.resource.GameDtoMapper
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient
|
||||
@@ -34,12 +36,20 @@ class RedisGameRegistry extends GameRegistry:
|
||||
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
|
||||
@Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized
|
||||
@Inject @RestClient var storeClient: StoreServiceClient = uninitialized
|
||||
@Inject var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
private val log = Logger.getLogger(classOf[RedisGameRegistry])
|
||||
private val localEngines = ConcurrentHashMap[String, GameEntry]()
|
||||
private val rng = new SecureRandom()
|
||||
|
||||
@PostConstruct
|
||||
def initMetrics(): Unit =
|
||||
Gauge
|
||||
.builder("nowchess.games.active", GameEngine.activeGamesCount, _.get().toDouble)
|
||||
.register(meterRegistry)
|
||||
()
|
||||
|
||||
private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId"
|
||||
|
||||
def generateId(): String =
|
||||
@@ -58,14 +68,17 @@ class RedisGameRegistry extends GameRegistry:
|
||||
)
|
||||
|
||||
def get(gameId: String): Option[GameEntry] =
|
||||
Option(localEngines.get(gameId)) match
|
||||
val result = Option(localEngines.get(gameId)) match
|
||||
case Some(localEntry) =>
|
||||
meterRegistry.counter("nowchess.games.cache.hits", "source", "local").increment()
|
||||
readRedisDto(gameId).flatMap(dto => Try(reconstruct(dto)).toOption) match
|
||||
case Some(redisEntry) if !sameSnapshot(localEntry, redisEntry) =>
|
||||
localEngines.put(gameId, redisEntry)
|
||||
Some(redisEntry)
|
||||
case _ => Some(localEntry)
|
||||
case None => fromRedis(gameId).orElse(fromDb(gameId))
|
||||
if result.isEmpty then meterRegistry.counter("nowchess.games.cache.misses").increment()
|
||||
result
|
||||
|
||||
def update(entry: GameEntry): Unit =
|
||||
localEngines.put(entry.gameId, entry)
|
||||
@@ -86,6 +99,7 @@ class RedisGameRegistry extends GameRegistry:
|
||||
}
|
||||
}
|
||||
.map { entry =>
|
||||
meterRegistry.counter("nowchess.games.cache.hits", "source", "redis").increment()
|
||||
localEngines.put(gameId, entry)
|
||||
log.infof("Loaded game %s from Redis cache", gameId)
|
||||
entry
|
||||
@@ -118,6 +132,7 @@ class RedisGameRegistry extends GameRegistry:
|
||||
(dto, reconstruct(dto))
|
||||
} match
|
||||
case scala.util.Success((dto, entry)) =>
|
||||
meterRegistry.counter("nowchess.games.cache.hits", "source", "db").increment()
|
||||
log.infof("Loaded game %s from store service", gameId)
|
||||
localEngines.put(gameId, entry)
|
||||
redis.value(classOf[String]).setex(cacheKey(gameId), 1800L, objectMapper.writeValueAsString(dto))
|
||||
|
||||
+13
-6
@@ -19,6 +19,8 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
|
||||
import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub
|
||||
import io.grpc.stub.StreamObserver
|
||||
import io.grpc.Channel
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
@ApplicationScoped
|
||||
class InstanceHeartbeatService:
|
||||
@@ -32,6 +34,9 @@ class InstanceHeartbeatService:
|
||||
@Inject
|
||||
private var mapper: ObjectMapper = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
@GrpcClient("coordinator-grpc")
|
||||
private var channel: Channel = uninitialized
|
||||
|
||||
@@ -51,14 +56,15 @@ class InstanceHeartbeatService:
|
||||
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
|
||||
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||
private var subscriptionCount = 0
|
||||
private var localCacheSize = 0
|
||||
private var serviceActive = false
|
||||
private var shuttingDown = false
|
||||
// scalafix:on DisableSyntax.var
|
||||
private val redisHeartbeatPending = new AtomicBoolean(false)
|
||||
private val subscriptionCount = new AtomicInteger(0)
|
||||
|
||||
def onStart(@Observes event: StartupEvent): Unit =
|
||||
Gauge.builder("nowchess.instance.subscriptions", subscriptionCount, _.get().toDouble).register(meterRegistry)
|
||||
if coordinatorEnabled then
|
||||
try
|
||||
shuttingDown = false
|
||||
@@ -90,7 +96,7 @@ class InstanceHeartbeatService:
|
||||
redisPrefix = prefix
|
||||
|
||||
def setSubscriptionCount(count: Int): Unit =
|
||||
subscriptionCount = count
|
||||
subscriptionCount.set(count)
|
||||
|
||||
def setLocalCacheSize(count: Int): Unit =
|
||||
localCacheSize = count
|
||||
@@ -99,13 +105,13 @@ class InstanceHeartbeatService:
|
||||
if coordinatorEnabled then
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).sadd(setKey, gameId)
|
||||
subscriptionCount += 1
|
||||
subscriptionCount.incrementAndGet()
|
||||
|
||||
def removeGameSubscription(gameId: String): Unit =
|
||||
if coordinatorEnabled then
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).srem(setKey, gameId)
|
||||
subscriptionCount = Math.max(0, subscriptionCount - 1)
|
||||
subscriptionCount.updateAndGet(c => Math.max(0, c - 1))
|
||||
|
||||
private def generateInstanceId(): Unit =
|
||||
val hostname =
|
||||
@@ -162,13 +168,14 @@ class InstanceHeartbeatService:
|
||||
.setHostname(getHostname)
|
||||
.setHttpPort(httpPort)
|
||||
.setGrpcPort(grpcPort)
|
||||
.setSubscriptionCount(subscriptionCount)
|
||||
.setSubscriptionCount(subscriptionCount.get())
|
||||
.setLocalCacheSize(localCacheSize)
|
||||
.setTimestampMillis(System.currentTimeMillis())
|
||||
.build()
|
||||
observer.onNext(frame)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.heartbeat.failures").increment()
|
||||
log.warnf(ex, "Failed to send heartbeat frame")
|
||||
}
|
||||
|
||||
@@ -182,7 +189,7 @@ class InstanceHeartbeatService:
|
||||
"hostname" -> getHostname,
|
||||
"httpPort" -> httpPort,
|
||||
"grpcPort" -> grpcPort,
|
||||
"subscriptionCount" -> subscriptionCount,
|
||||
"subscriptionCount" -> subscriptionCount.get(),
|
||||
"localCacheSize" -> localCacheSize,
|
||||
"lastHeartbeat" -> java.time.Instant.now().toString,
|
||||
"state" -> "HEALTHY",
|
||||
|
||||
@@ -7,6 +7,7 @@ import de.nowchess.bot.ai.Evaluation
|
||||
import de.nowchess.bot.util.ZobristHash
|
||||
import de.nowchess.api.rules.RuleSet
|
||||
import de.nowchess.rules.sets.DefaultRules
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
|
||||
|
||||
final class AlphaBetaSearch(
|
||||
@@ -14,6 +15,7 @@ final class AlphaBetaSearch(
|
||||
tt: TranspositionTable = TranspositionTable(),
|
||||
weights: Evaluation,
|
||||
numThreads: Int = Runtime.getRuntime.availableProcessors,
|
||||
meterRegistry: MeterRegistry = null,
|
||||
):
|
||||
|
||||
private val INF = Int.MaxValue / 2
|
||||
@@ -85,8 +87,8 @@ final class AlphaBetaSearch(
|
||||
val rootHash = ZobristHash.hash(context)
|
||||
|
||||
@scala.annotation.tailrec
|
||||
def loop(bestSoFar: Option[Move], prevScore: Int, depth: Int): Option[Move] =
|
||||
if isOutOfTime then bestSoFar
|
||||
def loop(bestSoFar: Option[Move], prevScore: Int, depth: Int, lastDepth: Int): (Option[Move], Int) =
|
||||
if isOutOfTime then (bestSoFar, lastDepth)
|
||||
else
|
||||
val (alpha, beta) =
|
||||
if depth == 1 then (-INF, INF) else (prevScore - ASPIRATION_DELTA, prevScore + ASPIRATION_DELTA)
|
||||
@@ -99,9 +101,16 @@ final class AlphaBetaSearch(
|
||||
rootHash,
|
||||
excludedRootMoves,
|
||||
)
|
||||
loop(move.orElse(bestSoFar), score, depth + 1)
|
||||
loop(move.orElse(bestSoFar), score, depth + 1, depth)
|
||||
|
||||
loop(None, 0, 1)
|
||||
val (result, depthReached) = loop(None, 0, 1, 0)
|
||||
recordSearchMetrics(depthReached)
|
||||
result
|
||||
|
||||
private def recordSearchMetrics(depthReached: Int): Unit =
|
||||
if meterRegistry != null then
|
||||
meterRegistry.summary("nowchess.bot.search.nodes").record(nodeCount.get().toDouble)
|
||||
meterRegistry.summary("nowchess.bot.search.depth").record(depthReached.toDouble)
|
||||
|
||||
private def isOutOfTime: Boolean =
|
||||
System.currentTimeMillis - timeStartMs.get >= timeLimitMs.get
|
||||
|
||||
+10
-5
@@ -6,6 +6,7 @@ import de.nowchess.bot.BotController
|
||||
import de.nowchess.bot.BotDifficulty
|
||||
import de.nowchess.bot.config.RedisConfig
|
||||
import de.nowchess.io.fen.FenParser
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.runtime.StartupEvent
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
@@ -18,10 +19,11 @@ import java.util.function.Consumer
|
||||
class OfficialBotService:
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var redis: RedisDataSource = uninitialized
|
||||
@Inject var redisConfig: RedisConfig = uninitialized
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
@Inject var botController: BotController = uninitialized
|
||||
@Inject var redis: RedisDataSource = uninitialized
|
||||
@Inject var redisConfig: RedisConfig = uninitialized
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
@Inject var botController: BotController = uninitialized
|
||||
@Inject var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val terminalStatuses =
|
||||
@@ -85,7 +87,10 @@ class OfficialBotService:
|
||||
val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium)
|
||||
botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot =>
|
||||
FenParser.parseFen(fen).toOption.foreach { context =>
|
||||
bot(context).foreach { move =>
|
||||
val timer = meterRegistry.timer("nowchess.bot.move.duration", "bot", botName)
|
||||
val moveOpt = timer.recordCallable(() => bot(context))
|
||||
moveOpt.flatten.foreach { move =>
|
||||
meterRegistry.counter("nowchess.bot.moves.computed", "bot", botName).increment()
|
||||
val uci = toUci(move)
|
||||
val c2sTopic = s"${redisConfig.prefix}:game:$gameId:c2s"
|
||||
val moveMsg = s"""{"type":"MOVE","uci":"$uci","playerId":"$botAccountId"}"""
|
||||
|
||||
@@ -3,6 +3,7 @@ package de.nowchess.store.service
|
||||
import de.nowchess.api.dto.GameWritebackEventDto
|
||||
import de.nowchess.store.domain.GameRecord
|
||||
import de.nowchess.store.repository.GameRecordRepository
|
||||
import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer}
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.transaction.Transactional
|
||||
@@ -12,64 +13,84 @@ import java.time.Instant
|
||||
|
||||
@ApplicationScoped
|
||||
class GameWritebackService:
|
||||
@Inject
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
var repository: GameRecordRepository = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
@Inject
|
||||
var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private lazy val writebackTimer: Timer =
|
||||
meterRegistry.timer("nowchess.store.writeback.duration")
|
||||
|
||||
private lazy val writebackSkipped: Counter =
|
||||
meterRegistry.counter("nowchess.store.writeback.skipped")
|
||||
|
||||
private def gamesWrittenCounter(operation: String): Counter =
|
||||
meterRegistry.counter("nowchess.store.games.written", "operation", operation)
|
||||
|
||||
@Transactional
|
||||
def writeBack(event: GameWritebackEventDto): Unit =
|
||||
writebackTimer.record(() => doWriteBack(event))
|
||||
|
||||
private def doWriteBack(event: GameWritebackEventDto): Unit =
|
||||
repository.findByGameId(event.gameId) match
|
||||
case None =>
|
||||
val record = new GameRecord
|
||||
record.gameId = event.gameId
|
||||
record.fen = event.fen
|
||||
record.pgn = event.pgn
|
||||
record.moveCount = event.moveCount
|
||||
record.whiteId = event.whiteId
|
||||
record.whiteName = event.whiteName
|
||||
record.blackId = event.blackId
|
||||
record.blackName = event.blackName
|
||||
record.mode = event.mode
|
||||
record.resigned = event.resigned
|
||||
record.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull
|
||||
record.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull
|
||||
record.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull
|
||||
record.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||
record.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||
record.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull
|
||||
record.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull
|
||||
record.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull
|
||||
record.clockActiveColor = event.clockActiveColor.orNull
|
||||
record.pendingDrawOffer = event.pendingDrawOffer.orNull
|
||||
record.result = event.result.orNull
|
||||
record.terminationReason = event.terminationReason.orNull
|
||||
record.createdAt = Instant.now()
|
||||
record.updatedAt = Instant.now()
|
||||
repository.persist(record)
|
||||
createRecord(event)
|
||||
gamesWrittenCounter("create").increment()
|
||||
case Some(r) if event.moveCount > r.moveCount || event.pgn != r.pgn =>
|
||||
r.fen = event.fen
|
||||
r.pgn = event.pgn
|
||||
r.moveCount = event.moveCount
|
||||
r.whiteId = event.whiteId
|
||||
r.whiteName = event.whiteName
|
||||
r.blackId = event.blackId
|
||||
r.blackName = event.blackName
|
||||
r.mode = event.mode
|
||||
r.resigned = event.resigned
|
||||
r.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull
|
||||
r.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull
|
||||
r.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull
|
||||
r.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||
r.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||
r.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull
|
||||
r.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull
|
||||
r.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull
|
||||
r.clockActiveColor = event.clockActiveColor.orNull
|
||||
r.pendingDrawOffer = event.pendingDrawOffer.orNull
|
||||
r.pendingTakebackOffer = event.pendingTakebackRequest.orNull
|
||||
r.result = event.result.orNull
|
||||
r.terminationReason = event.terminationReason.orNull
|
||||
r.updatedAt = Instant.now()
|
||||
repository.merge(r)
|
||||
case _ => ()
|
||||
updateRecord(r, event)
|
||||
gamesWrittenCounter("update").increment()
|
||||
case _ =>
|
||||
writebackSkipped.increment()
|
||||
|
||||
private def createRecord(event: GameWritebackEventDto): Unit =
|
||||
val record = new GameRecord
|
||||
record.gameId = event.gameId
|
||||
record.fen = event.fen
|
||||
record.pgn = event.pgn
|
||||
record.moveCount = event.moveCount
|
||||
record.whiteId = event.whiteId
|
||||
record.whiteName = event.whiteName
|
||||
record.blackId = event.blackId
|
||||
record.blackName = event.blackName
|
||||
record.mode = event.mode
|
||||
record.resigned = event.resigned
|
||||
applyClockFields(record, event)
|
||||
record.result = event.result.orNull
|
||||
record.terminationReason = event.terminationReason.orNull
|
||||
record.createdAt = Instant.now()
|
||||
record.updatedAt = Instant.now()
|
||||
repository.persist(record)
|
||||
|
||||
private def updateRecord(r: GameRecord, event: GameWritebackEventDto): Unit =
|
||||
r.fen = event.fen
|
||||
r.pgn = event.pgn
|
||||
r.moveCount = event.moveCount
|
||||
r.whiteId = event.whiteId
|
||||
r.whiteName = event.whiteName
|
||||
r.blackId = event.blackId
|
||||
r.blackName = event.blackName
|
||||
r.mode = event.mode
|
||||
r.resigned = event.resigned
|
||||
applyClockFields(r, event)
|
||||
r.pendingDrawOffer = event.pendingDrawOffer.orNull
|
||||
r.pendingTakebackOffer = event.pendingTakebackRequest.orNull
|
||||
r.result = event.result.orNull
|
||||
r.terminationReason = event.terminationReason.orNull
|
||||
r.updatedAt = Instant.now()
|
||||
repository.merge(r)
|
||||
|
||||
private def applyClockFields(r: GameRecord, event: GameWritebackEventDto): Unit =
|
||||
r.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull
|
||||
r.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull
|
||||
r.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull
|
||||
r.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||
r.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||
r.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull
|
||||
r.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull
|
||||
r.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull
|
||||
r.clockActiveColor = event.clockActiveColor.orNull
|
||||
r.pendingDrawOffer = event.pendingDrawOffer.orNull
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package de.nowchess.ws.resource
|
||||
|
||||
import de.nowchess.ws.config.RedisConfig
|
||||
import io.micrometer.core.instrument.{Counter, Gauge, MeterRegistry}
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||
import io.quarkus.websockets.next.*
|
||||
@@ -26,10 +27,27 @@ class GameWebSocketResource:
|
||||
|
||||
@Inject
|
||||
var jwtParser: JWTParser = uninitialized
|
||||
|
||||
@Inject
|
||||
var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val connections = new ConcurrentHashMap[String, ConnectionMeta]()
|
||||
|
||||
private lazy val connectionsOpened: Counter =
|
||||
meterRegistry.counter("nowchess.ws.connections.opened")
|
||||
|
||||
private lazy val connectionsClosed: Counter =
|
||||
meterRegistry.counter("nowchess.ws.connections.closed")
|
||||
|
||||
private lazy val messagesReceived: Counter =
|
||||
meterRegistry.counter("nowchess.ws.messages.received")
|
||||
|
||||
private lazy val activeGauge: Unit =
|
||||
Gauge
|
||||
.builder("nowchess.ws.connections.active", connections, _.size().toDouble)
|
||||
.register(meterRegistry)
|
||||
|
||||
private def s2cTopic(gameId: String): String =
|
||||
s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||
|
||||
@@ -38,22 +56,19 @@ class GameWebSocketResource:
|
||||
|
||||
@OnOpen
|
||||
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
||||
val gameId = connection.pathParam("gameId")
|
||||
val playerId = Option(handshake.header("Authorization"))
|
||||
.filter(_.nonEmpty)
|
||||
.flatMap(token => Try(jwtParser.parse(token)).toOption)
|
||||
.map(_.getSubject)
|
||||
activeGauge
|
||||
val gameId = connection.pathParam("gameId")
|
||||
val playerId = resolvePlayerId(handshake)
|
||||
log.infof("Game WebSocket opened — gameId=%s playerId=%s", gameId, playerId.getOrElse("anonymous"))
|
||||
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
||||
val subscriber = redis.pubsub(classOf[String]).subscribe(s2cTopic(gameId), handler)
|
||||
connections.put(connection.id(), ConnectionMeta(gameId, subscriber, playerId))
|
||||
val connectedMsg = playerId match
|
||||
case Some(pid) => s"""{"type":"CONNECTED","gameId":"$gameId","playerId":"$pid"}"""
|
||||
case None => s"""{"type":"CONNECTED","gameId":"$gameId"}"""
|
||||
redis.pubsub(classOf[String]).publish(c2sTopic(gameId), connectedMsg)
|
||||
connectionsOpened.increment()
|
||||
publishConnected(gameId, playerId)
|
||||
|
||||
@OnTextMessage
|
||||
def onTextMessage(connection: WebSocketConnection, message: String): Unit =
|
||||
messagesReceived.increment()
|
||||
Option(connections.get(connection.id())).foreach { meta =>
|
||||
val enriched = meta.playerId match
|
||||
case Some(pid) => injectPlayerId(message, pid)
|
||||
@@ -66,8 +81,21 @@ class GameWebSocketResource:
|
||||
Option(connections.remove(connection.id())).foreach { meta =>
|
||||
log.infof("Game WebSocket closed — gameId=%s", meta.gameId)
|
||||
meta.subscriber.unsubscribe(s2cTopic(meta.gameId))
|
||||
connectionsClosed.increment()
|
||||
}
|
||||
|
||||
private def resolvePlayerId(handshake: HandshakeRequest): Option[String] =
|
||||
Option(handshake.header("Authorization"))
|
||||
.filter(_.nonEmpty)
|
||||
.flatMap(token => Try(jwtParser.parse(token)).toOption)
|
||||
.map(_.getSubject)
|
||||
|
||||
private def publishConnected(gameId: String, playerId: Option[String]): Unit =
|
||||
val connectedMsg = playerId match
|
||||
case Some(pid) => s"""{"type":"CONNECTED","gameId":"$gameId","playerId":"$pid"}"""
|
||||
case None => s"""{"type":"CONNECTED","gameId":"$gameId"}"""
|
||||
redis.pubsub(classOf[String]).publish(c2sTopic(gameId), connectedMsg)
|
||||
|
||||
private def injectPlayerId(msg: String, pid: String): String =
|
||||
val trimmed = msg.trim
|
||||
if trimmed.endsWith("}") then trimmed.dropRight(1) + s""","playerId":"$pid"}"""
|
||||
|
||||
Reference in New Issue
Block a user