feat: add metrics for user registrations, logins, challenges, and game writebacks
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
This commit is contained in:
@@ -17,10 +17,12 @@ import de.nowchess.chess.observer.*
|
||||
import de.nowchess.api.error.GameError
|
||||
import de.nowchess.api.game.WinReason.{Checkmate, Resignation}
|
||||
import de.nowchess.api.io.{GameContextExport, GameContextImport}
|
||||
import de.nowchess.api.rules.RuleSet
|
||||
import de.nowchess.api.rules.{PostMoveStatus, RuleSet}
|
||||
|
||||
import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer}
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/** Pure game engine that manages game state and notifies observers of state changes. All rule queries delegate to the
|
||||
* injected RuleSet. All user interactions go through Commands; state changes are broadcast via GameEvents.
|
||||
@@ -33,6 +35,7 @@ class GameEngine(
|
||||
initialDrawOffer: Option[Color] = None,
|
||||
initialRedoStack: List[Move] = Nil,
|
||||
initialTakebackRequest: Option[Color] = None,
|
||||
private val meterRegistry: Option[MeterRegistry] = None,
|
||||
) extends Observable:
|
||||
// Ensure that initialBoard is set correctly for threefold repetition detection
|
||||
private val contextWithInitialBoard =
|
||||
@@ -57,6 +60,17 @@ class GameEngine(
|
||||
@SuppressWarnings(Array("DisableSyntax.var"))
|
||||
private var pendingTakebackRequest: Option[Color] = initialTakebackRequest
|
||||
|
||||
meterRegistry.foreach { reg =>
|
||||
GameEngine.activeGamesCount.incrementAndGet()
|
||||
reg.counter("nowchess.games.started").increment()
|
||||
}
|
||||
private def gamesCompletedCounter(result: String): Counter =
|
||||
meterRegistry.map(_.counter("nowchess.games.completed", "result", result)).orNull
|
||||
private def movesProcessedCounter: Counter =
|
||||
meterRegistry.map(_.counter("nowchess.moves.processed")).orNull
|
||||
private def movesDurationTimer: Timer =
|
||||
meterRegistry.map(_.timer("nowchess.moves.duration")).orNull
|
||||
|
||||
// Start scheduler immediately for live clocks so passive expiry fires without waiting for a move.
|
||||
clockState.foreach(scheduleExpiryCheck)
|
||||
|
||||
@@ -165,6 +179,8 @@ class GameEngine(
|
||||
pendingTakebackRequest = None
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("resignation")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(ResignEvent(currentContext, color))
|
||||
}
|
||||
|
||||
@@ -197,6 +213,8 @@ class GameEngine(
|
||||
pendingTakebackRequest = None
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("draw.agreement")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.Agreement))
|
||||
}
|
||||
|
||||
@@ -223,11 +241,15 @@ class GameEngine(
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.FiftyMoveRule)))
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("draw.fifty_move")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.FiftyMoveRule))
|
||||
else if ruleSet.isThreefoldRepetition(currentContext) then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.ThreefoldRepetition)))
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter("draw.threefold")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.ThreefoldRepetition))
|
||||
else notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.DrawCannotBeClaimed))
|
||||
}
|
||||
@@ -311,9 +333,18 @@ class GameEngine(
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(reason)))
|
||||
stopClock()
|
||||
redoStack = Nil
|
||||
Option(gamesCompletedCounter(drawReasonTag(reason))).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, reason))
|
||||
}
|
||||
|
||||
private def drawReasonTag(reason: DrawReason): String = reason match
|
||||
case DrawReason.Agreement => "draw.agreement"
|
||||
case DrawReason.FiftyMoveRule => "draw.fifty_move"
|
||||
case DrawReason.ThreefoldRepetition => "draw.threefold"
|
||||
case DrawReason.Stalemate => "draw.stalemate"
|
||||
case DrawReason.InsufficientMaterial => "draw.insufficient"
|
||||
|
||||
/** Inject clock state directly (for testing). */
|
||||
private[engine] def injectClockState(cs: Option[ClockState]): Unit = synchronized { clockState = cs }
|
||||
|
||||
@@ -334,6 +365,11 @@ class GameEngine(
|
||||
pendingDrawOffer = None
|
||||
pendingTakebackRequest = None
|
||||
redoStack = Nil
|
||||
val tag = result match
|
||||
case GameResult.Draw(_) => "draw.insufficient"
|
||||
case _ => "timeout"
|
||||
Option(gamesCompletedCounter(tag)).foreach(_.increment())
|
||||
activeGamesCount.decrementAndGet()
|
||||
notifyObservers(TimeFlagEvent(currentContext, flagged))
|
||||
|
||||
private def scheduleExpiryCheck(cs: ClockState): Unit =
|
||||
@@ -369,6 +405,12 @@ class GameEngine(
|
||||
// ──── Private helpers ────
|
||||
|
||||
private def executeMove(move: Move): Unit =
|
||||
Option(movesDurationTimer) match
|
||||
case Some(timer) => timer.record(() => executeMoveBody(move))
|
||||
case None => executeMoveBody(move)
|
||||
Option(movesProcessedCounter).foreach(_.increment())
|
||||
|
||||
private def executeMoveBody(move: Move): Unit =
|
||||
if !isRedoing then
|
||||
redoStack = Nil
|
||||
pendingTakebackRequest = None
|
||||
@@ -391,28 +433,36 @@ class GameEngine(
|
||||
)
|
||||
|
||||
val status = ruleSet.postMoveStatus(currentContext)
|
||||
if currentContext.result.isEmpty then
|
||||
if status.isCheckmate then
|
||||
val winner = currentContext.turn.opposite
|
||||
currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate)))
|
||||
cancelScheduled()
|
||||
notifyObservers(CheckmateEvent(currentContext, winner))
|
||||
redoStack = Nil
|
||||
else if status.isStalemate then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate)))
|
||||
cancelScheduled()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate))
|
||||
redoStack = Nil
|
||||
else if status.isInsufficientMaterial then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial)))
|
||||
cancelScheduled()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial))
|
||||
redoStack = Nil
|
||||
else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext))
|
||||
if currentContext.result.isEmpty then applyPostMoveStatus(status)
|
||||
|
||||
if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext))
|
||||
if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext))
|
||||
|
||||
private def applyPostMoveStatus(status: PostMoveStatus): Unit =
|
||||
if status.isCheckmate then
|
||||
val winner = currentContext.turn.opposite
|
||||
currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate)))
|
||||
cancelScheduled()
|
||||
Option(gamesCompletedCounter("checkmate")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(CheckmateEvent(currentContext, winner))
|
||||
redoStack = Nil
|
||||
else if status.isStalemate then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate)))
|
||||
cancelScheduled()
|
||||
Option(gamesCompletedCounter("draw.stalemate")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate))
|
||||
redoStack = Nil
|
||||
else if status.isInsufficientMaterial then
|
||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial)))
|
||||
cancelScheduled()
|
||||
Option(gamesCompletedCounter("draw.insufficient")).foreach(_.increment())
|
||||
GameEngine.activeGamesCount.decrementAndGet()
|
||||
notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial))
|
||||
redoStack = Nil
|
||||
else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext))
|
||||
|
||||
private def translateMoveToNotation(move: Move, boardBefore: Board): String =
|
||||
move.moveType match
|
||||
case MoveType.CastleKingside => "O-O"
|
||||
@@ -526,3 +576,6 @@ class GameEngine(
|
||||
pendingTakebackRequest = None
|
||||
notifyObservers(TakebackDeclinedEvent(currentContext, color))
|
||||
}
|
||||
|
||||
object GameEngine:
|
||||
val activeGamesCount: AtomicInteger = new AtomicInteger(0)
|
||||
|
||||
@@ -12,7 +12,9 @@ import de.nowchess.chess.grpc.RuleSetGrpcAdapter
|
||||
import de.nowchess.chess.config.RedisConfig
|
||||
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||
import de.nowchess.chess.resource.GameDtoMapper
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient
|
||||
@@ -34,12 +36,20 @@ class RedisGameRegistry extends GameRegistry:
|
||||
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
|
||||
@Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized
|
||||
@Inject @RestClient var storeClient: StoreServiceClient = uninitialized
|
||||
@Inject var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
private val log = Logger.getLogger(classOf[RedisGameRegistry])
|
||||
private val localEngines = ConcurrentHashMap[String, GameEntry]()
|
||||
private val rng = new SecureRandom()
|
||||
|
||||
@PostConstruct
|
||||
def initMetrics(): Unit =
|
||||
Gauge
|
||||
.builder("nowchess.games.active", GameEngine.activeGamesCount, _.get().toDouble)
|
||||
.register(meterRegistry)
|
||||
()
|
||||
|
||||
private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId"
|
||||
|
||||
def generateId(): String =
|
||||
@@ -58,14 +68,17 @@ class RedisGameRegistry extends GameRegistry:
|
||||
)
|
||||
|
||||
def get(gameId: String): Option[GameEntry] =
|
||||
Option(localEngines.get(gameId)) match
|
||||
val result = Option(localEngines.get(gameId)) match
|
||||
case Some(localEntry) =>
|
||||
meterRegistry.counter("nowchess.games.cache.hits", "source", "local").increment()
|
||||
readRedisDto(gameId).flatMap(dto => Try(reconstruct(dto)).toOption) match
|
||||
case Some(redisEntry) if !sameSnapshot(localEntry, redisEntry) =>
|
||||
localEngines.put(gameId, redisEntry)
|
||||
Some(redisEntry)
|
||||
case _ => Some(localEntry)
|
||||
case None => fromRedis(gameId).orElse(fromDb(gameId))
|
||||
if result.isEmpty then meterRegistry.counter("nowchess.games.cache.misses").increment()
|
||||
result
|
||||
|
||||
def update(entry: GameEntry): Unit =
|
||||
localEngines.put(entry.gameId, entry)
|
||||
@@ -86,6 +99,7 @@ class RedisGameRegistry extends GameRegistry:
|
||||
}
|
||||
}
|
||||
.map { entry =>
|
||||
meterRegistry.counter("nowchess.games.cache.hits", "source", "redis").increment()
|
||||
localEngines.put(gameId, entry)
|
||||
log.infof("Loaded game %s from Redis cache", gameId)
|
||||
entry
|
||||
@@ -118,6 +132,7 @@ class RedisGameRegistry extends GameRegistry:
|
||||
(dto, reconstruct(dto))
|
||||
} match
|
||||
case scala.util.Success((dto, entry)) =>
|
||||
meterRegistry.counter("nowchess.games.cache.hits", "source", "db").increment()
|
||||
log.infof("Loaded game %s from store service", gameId)
|
||||
localEngines.put(gameId, entry)
|
||||
redis.value(classOf[String]).setex(cacheKey(gameId), 1800L, objectMapper.writeValueAsString(dto))
|
||||
|
||||
+13
-6
@@ -19,6 +19,8 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
|
||||
import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub
|
||||
import io.grpc.stub.StreamObserver
|
||||
import io.grpc.Channel
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
@ApplicationScoped
|
||||
class InstanceHeartbeatService:
|
||||
@@ -32,6 +34,9 @@ class InstanceHeartbeatService:
|
||||
@Inject
|
||||
private var mapper: ObjectMapper = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
@GrpcClient("coordinator-grpc")
|
||||
private var channel: Channel = uninitialized
|
||||
|
||||
@@ -51,14 +56,15 @@ class InstanceHeartbeatService:
|
||||
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
|
||||
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||
private var subscriptionCount = 0
|
||||
private var localCacheSize = 0
|
||||
private var serviceActive = false
|
||||
private var shuttingDown = false
|
||||
// scalafix:on DisableSyntax.var
|
||||
private val redisHeartbeatPending = new AtomicBoolean(false)
|
||||
private val subscriptionCount = new AtomicInteger(0)
|
||||
|
||||
def onStart(@Observes event: StartupEvent): Unit =
|
||||
Gauge.builder("nowchess.instance.subscriptions", subscriptionCount, _.get().toDouble).register(meterRegistry)
|
||||
if coordinatorEnabled then
|
||||
try
|
||||
shuttingDown = false
|
||||
@@ -90,7 +96,7 @@ class InstanceHeartbeatService:
|
||||
redisPrefix = prefix
|
||||
|
||||
def setSubscriptionCount(count: Int): Unit =
|
||||
subscriptionCount = count
|
||||
subscriptionCount.set(count)
|
||||
|
||||
def setLocalCacheSize(count: Int): Unit =
|
||||
localCacheSize = count
|
||||
@@ -99,13 +105,13 @@ class InstanceHeartbeatService:
|
||||
if coordinatorEnabled then
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).sadd(setKey, gameId)
|
||||
subscriptionCount += 1
|
||||
subscriptionCount.incrementAndGet()
|
||||
|
||||
def removeGameSubscription(gameId: String): Unit =
|
||||
if coordinatorEnabled then
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).srem(setKey, gameId)
|
||||
subscriptionCount = Math.max(0, subscriptionCount - 1)
|
||||
subscriptionCount.updateAndGet(c => Math.max(0, c - 1))
|
||||
|
||||
private def generateInstanceId(): Unit =
|
||||
val hostname =
|
||||
@@ -162,13 +168,14 @@ class InstanceHeartbeatService:
|
||||
.setHostname(getHostname)
|
||||
.setHttpPort(httpPort)
|
||||
.setGrpcPort(grpcPort)
|
||||
.setSubscriptionCount(subscriptionCount)
|
||||
.setSubscriptionCount(subscriptionCount.get())
|
||||
.setLocalCacheSize(localCacheSize)
|
||||
.setTimestampMillis(System.currentTimeMillis())
|
||||
.build()
|
||||
observer.onNext(frame)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.heartbeat.failures").increment()
|
||||
log.warnf(ex, "Failed to send heartbeat frame")
|
||||
}
|
||||
|
||||
@@ -182,7 +189,7 @@ class InstanceHeartbeatService:
|
||||
"hostname" -> getHostname,
|
||||
"httpPort" -> httpPort,
|
||||
"grpcPort" -> grpcPort,
|
||||
"subscriptionCount" -> subscriptionCount,
|
||||
"subscriptionCount" -> subscriptionCount.get(),
|
||||
"localCacheSize" -> localCacheSize,
|
||||
"lastHeartbeat" -> java.time.Instant.now().toString,
|
||||
"state" -> "HEALTHY",
|
||||
|
||||
Reference in New Issue
Block a user