feat: configure logging and add OpenTelemetry support #49
@@ -4,6 +4,7 @@ import de.nowchess.account.domain.{BotAccount, OfficialBotAccount, UserAccount}
|
|||||||
import de.nowchess.account.dto.{LoginRequest, RegisterRequest}
|
import de.nowchess.account.dto.{LoginRequest, RegisterRequest}
|
||||||
import de.nowchess.account.error.AccountError
|
import de.nowchess.account.error.AccountError
|
||||||
import de.nowchess.account.repository.{BotAccountRepository, OfficialBotAccountRepository, UserAccountRepository}
|
import de.nowchess.account.repository.{BotAccountRepository, OfficialBotAccountRepository, UserAccountRepository}
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import io.quarkus.elytron.security.common.BcryptUtil
|
import io.quarkus.elytron.security.common.BcryptUtil
|
||||||
import io.smallrye.jwt.build.Jwt
|
import io.smallrye.jwt.build.Jwt
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
@@ -29,11 +30,21 @@ class AccountService:
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
var officialBotAccountRepository: OfficialBotAccountRepository = uninitialized
|
var officialBotAccountRepository: OfficialBotAccountRepository = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on
|
// scalafix:on
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
def register(req: RegisterRequest): Either[AccountError, UserAccount] =
|
def register(req: RegisterRequest): Either[AccountError, UserAccount] =
|
||||||
log.infof("Registering user %s", req.username)
|
log.infof("Registering user %s", req.username)
|
||||||
|
val result = registerAccount(req)
|
||||||
|
result match
|
||||||
|
case Right(_) => meterRegistry.counter("nowchess.users.registrations", "result", "success").increment()
|
||||||
|
case Left(_) => meterRegistry.counter("nowchess.users.registrations", "result", "failure").increment()
|
||||||
|
result
|
||||||
|
|
||||||
|
private def registerAccount(req: RegisterRequest): Either[AccountError, UserAccount] =
|
||||||
if userAccountRepository.findByUsername(req.username).isDefined then Left(AccountError.UsernameTaken(req.username))
|
if userAccountRepository.findByUsername(req.username).isDefined then Left(AccountError.UsernameTaken(req.username))
|
||||||
else if userAccountRepository.findByEmail(req.email).isDefined then
|
else if userAccountRepository.findByEmail(req.email).isDefined then
|
||||||
Left(AccountError.EmailAlreadyRegistered(req.email))
|
Left(AccountError.EmailAlreadyRegistered(req.email))
|
||||||
@@ -48,6 +59,15 @@ class AccountService:
|
|||||||
Right(account)
|
Right(account)
|
||||||
|
|
||||||
def login(req: LoginRequest): Either[AccountError, String] =
|
def login(req: LoginRequest): Either[AccountError, String] =
|
||||||
|
val result = authenticateUser(req)
|
||||||
|
result match
|
||||||
|
case Right(_) => meterRegistry.counter("nowchess.auth.logins", "result", "success").increment()
|
||||||
|
case Left(error) =>
|
||||||
|
meterRegistry.counter("nowchess.auth.logins", "result", "failure").increment()
|
||||||
|
meterRegistry.counter("nowchess.auth.login.failures", "reason", loginFailureReason(error)).increment()
|
||||||
|
result
|
||||||
|
|
||||||
|
private def authenticateUser(req: LoginRequest): Either[AccountError, String] =
|
||||||
userAccountRepository.findByUsername(req.username) match
|
userAccountRepository.findByUsername(req.username) match
|
||||||
case None =>
|
case None =>
|
||||||
log.warnf("Login failed for unknown user %s", req.username)
|
log.warnf("Login failed for unknown user %s", req.username)
|
||||||
@@ -69,6 +89,17 @@ class AccountService:
|
|||||||
.sign(),
|
.sign(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
private def loginFailureReason(error: AccountError): String = error match
|
||||||
|
case AccountError.InvalidCredentials => "invalid_credentials"
|
||||||
|
case AccountError.UserBanned => "user_banned"
|
||||||
|
case AccountError.UsernameTaken(_) => "username_taken"
|
||||||
|
case AccountError.EmailAlreadyRegistered(_) => "email_registered"
|
||||||
|
case AccountError.UserNotFound => "user_not_found"
|
||||||
|
case AccountError.BotNotFound => "bot_not_found"
|
||||||
|
case AccountError.BotLimitExceeded => "bot_limit_exceeded"
|
||||||
|
case AccountError.NotAuthorized => "not_authorized"
|
||||||
|
case AccountError.BotBanned => "bot_banned"
|
||||||
|
|
||||||
def findByUsername(username: String): Option[UserAccount] =
|
def findByUsername(username: String): Option[UserAccount] =
|
||||||
userAccountRepository.findByUsername(username)
|
userAccountRepository.findByUsername(username)
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import de.nowchess.account.dto.{
|
|||||||
}
|
}
|
||||||
import de.nowchess.account.error.ChallengeError
|
import de.nowchess.account.error.ChallengeError
|
||||||
import de.nowchess.account.repository.{ChallengeRepository, UserAccountRepository}
|
import de.nowchess.account.repository.{ChallengeRepository, UserAccountRepository}
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import jakarta.transaction.Transactional
|
import jakarta.transaction.Transactional
|
||||||
@@ -48,10 +49,22 @@ class ChallengeService:
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
var eventPublisher: EventPublisher = uninitialized
|
var eventPublisher: EventPublisher = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on
|
// scalafix:on
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
def create(challengerId: UUID, destUsername: String, req: ChallengeRequest): Either[ChallengeError, Challenge] =
|
def create(challengerId: UUID, destUsername: String, req: ChallengeRequest): Either[ChallengeError, Challenge] =
|
||||||
|
val result = createChallenge(challengerId, destUsername, req)
|
||||||
|
result.foreach(_ => meterRegistry.counter("nowchess.challenges.created").increment())
|
||||||
|
result
|
||||||
|
|
||||||
|
private def createChallenge(
|
||||||
|
challengerId: UUID,
|
||||||
|
destUsername: String,
|
||||||
|
req: ChallengeRequest,
|
||||||
|
): Either[ChallengeError, Challenge] =
|
||||||
for
|
for
|
||||||
destUser <- userAccountRepository.findByUsername(destUsername).toRight(ChallengeError.UserNotFound(destUsername))
|
destUser <- userAccountRepository.findByUsername(destUsername).toRight(ChallengeError.UserNotFound(destUsername))
|
||||||
challenger <- userAccountRepository.findById(challengerId).toRight(ChallengeError.ChallengerNotFound)
|
challenger <- userAccountRepository.findById(challengerId).toRight(ChallengeError.ChallengerNotFound)
|
||||||
@@ -80,6 +93,11 @@ class ChallengeService:
|
|||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
def accept(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
def accept(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||||
|
val result = acceptChallenge(challengeId, userId)
|
||||||
|
result.foreach(_ => meterRegistry.counter("nowchess.challenges.accepted").increment())
|
||||||
|
result
|
||||||
|
|
||||||
|
private def acceptChallenge(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||||
for
|
for
|
||||||
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
||||||
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
||||||
@@ -96,6 +114,11 @@ class ChallengeService:
|
|||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
def decline(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] =
|
def decline(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] =
|
||||||
|
val result = declineChallenge(challengeId, userId, req)
|
||||||
|
result.foreach(_ => meterRegistry.counter("nowchess.challenges.declined").increment())
|
||||||
|
result
|
||||||
|
|
||||||
|
private def declineChallenge(challengeId: UUID, userId: UUID, req: DeclineRequest): Either[ChallengeError, Challenge] =
|
||||||
for
|
for
|
||||||
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
||||||
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
||||||
@@ -109,6 +132,11 @@ class ChallengeService:
|
|||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
def cancel(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
def cancel(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||||
|
val result = cancelChallenge(challengeId, userId)
|
||||||
|
result.foreach(_ => meterRegistry.counter("nowchess.challenges.cancelled").increment())
|
||||||
|
result
|
||||||
|
|
||||||
|
private def cancelChallenge(challengeId: UUID, userId: UUID): Either[ChallengeError, Challenge] =
|
||||||
for
|
for
|
||||||
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
challenge <- challengeRepository.findById(challengeId).toRight(ChallengeError.ChallengeNotFound)
|
||||||
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
_ <- Either.cond(challenge.status == ChallengeStatus.Created, (), ChallengeError.ChallengeNotActive)
|
||||||
|
|||||||
@@ -1,13 +1,16 @@
|
|||||||
package de.nowchess.coordinator.service
|
package de.nowchess.coordinator.service
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.enterprise.inject.Instance
|
import jakarta.enterprise.inject.Instance
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
||||||
import io.fabric8.kubernetes.client.KubernetesClient
|
import io.fabric8.kubernetes.client.KubernetesClient
|
||||||
|
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
@@ -21,10 +24,14 @@ class AutoScaler:
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||||
|
private val avgLoadRef = new AtomicReference[Double](0.0)
|
||||||
|
|
||||||
private def kubeClientOpt: Option[KubernetesClient] =
|
private def kubeClientOpt: Option[KubernetesClient] =
|
||||||
if kubeClientInstance.isUnsatisfied then None
|
if kubeClientInstance.isUnsatisfied then None
|
||||||
@@ -33,6 +40,13 @@ class AutoScaler:
|
|||||||
private val argoApiVersion = "argoproj.io/v1alpha1"
|
private val argoApiVersion = "argoproj.io/v1alpha1"
|
||||||
private val argoKind = "Rollout"
|
private val argoKind = "Rollout"
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
def initMetrics(): Unit =
|
||||||
|
Gauge
|
||||||
|
.builder("nowchess.coordinator.load.average", avgLoadRef, _.get())
|
||||||
|
.register(meterRegistry)
|
||||||
|
()
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.asInstanceOf
|
// scalafix:off DisableSyntax.asInstanceOf
|
||||||
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
||||||
Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] =>
|
Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] =>
|
||||||
@@ -48,6 +62,7 @@ class AutoScaler:
|
|||||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||||
if instances.nonEmpty then
|
if instances.nonEmpty then
|
||||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||||
|
avgLoadRef.set(avgLoad)
|
||||||
|
|
||||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
||||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore then scaleDown()
|
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore then scaleDown()
|
||||||
@@ -79,6 +94,7 @@ class AutoScaler:
|
|||||||
.inNamespace(config.k8sNamespace)
|
.inNamespace(config.k8sNamespace)
|
||||||
.resource(rollout)
|
.resource(rollout)
|
||||||
.update()
|
.update()
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment()
|
||||||
log.infof(
|
log.infof(
|
||||||
"Scaled up %s from %d to %d replicas",
|
"Scaled up %s from %d to %d replicas",
|
||||||
config.k8sRolloutName,
|
config.k8sRolloutName,
|
||||||
@@ -91,6 +107,7 @@ class AutoScaler:
|
|||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment()
|
||||||
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
||||||
|
|
||||||
def scaleDown(): Unit =
|
def scaleDown(): Unit =
|
||||||
@@ -120,6 +137,7 @@ class AutoScaler:
|
|||||||
.inNamespace(config.k8sNamespace)
|
.inNamespace(config.k8sNamespace)
|
||||||
.resource(rollout)
|
.resource(rollout)
|
||||||
.update()
|
.update()
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||||
log.infof(
|
log.infof(
|
||||||
"Scaled down %s from %d to %d replicas",
|
"Scaled down %s from %d to %d replicas",
|
||||||
config.k8sRolloutName,
|
config.k8sRolloutName,
|
||||||
@@ -132,4 +150,5 @@ class AutoScaler:
|
|||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
|
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||||
|
|||||||
+10
-1
@@ -5,6 +5,7 @@ import jakarta.inject.Inject
|
|||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
@@ -30,6 +31,9 @@ class CacheEvictionManager:
|
|||||||
@Inject
|
@Inject
|
||||||
private var objectMapper: ObjectMapper = uninitialized
|
private var objectMapper: ObjectMapper = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
@@ -38,8 +42,12 @@ class CacheEvictionManager:
|
|||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
def evictStaleGames: Unit =
|
def evictStaleGames: Unit =
|
||||||
log.info("Starting cache eviction scan")
|
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record { () =>
|
||||||
|
runEviction()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def runEviction(): Unit =
|
||||||
|
log.info("Starting cache eviction scan")
|
||||||
val pattern = s"$redisPrefix:game:entry:*"
|
val pattern = s"$redisPrefix:game:entry:*"
|
||||||
val keys = redis.key(classOf[String]).keys(pattern)
|
val keys = redis.key(classOf[String]).keys(pattern)
|
||||||
val now = System.currentTimeMillis()
|
val now = System.currentTimeMillis()
|
||||||
@@ -56,6 +64,7 @@ class CacheEvictionManager:
|
|||||||
try
|
try
|
||||||
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
||||||
redis.key(classOf[String]).del(key)
|
redis.key(classOf[String]).del(key)
|
||||||
|
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment()
|
||||||
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
||||||
count + 1
|
count + 1
|
||||||
catch
|
catch
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import jakarta.inject.Inject
|
|||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import io.fabric8.kubernetes.client.KubernetesClient
|
import io.fabric8.kubernetes.client.KubernetesClient
|
||||||
import io.fabric8.kubernetes.api.model.Pod
|
import io.fabric8.kubernetes.api.model.Pod
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
@@ -27,6 +28,9 @@ class HealthMonitor:
|
|||||||
@Inject
|
@Inject
|
||||||
private var redis: RedisDataSource = uninitialized
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[HealthMonitor])
|
private val log = Logger.getLogger(classOf[HealthMonitor])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
@@ -39,6 +43,7 @@ class HealthMonitor:
|
|||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
def checkInstanceHealth: Unit =
|
def checkInstanceHealth: Unit =
|
||||||
|
meterRegistry.counter("nowchess.coordinator.health.checks").increment()
|
||||||
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
||||||
if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||||
val instances = instanceRegistry.getAllInstances
|
val instances = instanceRegistry.getAllInstances
|
||||||
@@ -108,6 +113,7 @@ class HealthMonitor:
|
|||||||
case Some(pod) =>
|
case Some(pod) =>
|
||||||
val isReady = isPodReady(pod)
|
val isReady = isPodReady(pod)
|
||||||
if !isReady && inst.state == "HEALTHY" then
|
if !isReady && inst.state == "HEALTHY" then
|
||||||
|
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
|
||||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
||||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||||
case None =>
|
case None =>
|
||||||
|
|||||||
+15
@@ -1,5 +1,6 @@
|
|||||||
package de.nowchess.coordinator.service
|
package de.nowchess.coordinator.service
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||||
@@ -9,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
|
|||||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.time.{Duration, Instant}
|
import java.time.{Duration, Instant}
|
||||||
|
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||||
import io.smallrye.mutiny.Uni
|
import io.smallrye.mutiny.Uni
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
|
|
||||||
@@ -18,12 +20,22 @@ class InstanceRegistry:
|
|||||||
@Inject
|
@Inject
|
||||||
private var redis: ReactiveRedisDataSource = uninitialized
|
private var redis: ReactiveRedisDataSource = uninitialized
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[InstanceRegistry])
|
private val log = Logger.getLogger(classOf[InstanceRegistry])
|
||||||
private val mapper = ObjectMapper()
|
private val mapper = ObjectMapper()
|
||||||
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
def initMetrics(): Unit =
|
||||||
|
Gauge
|
||||||
|
.builder("nowchess.coordinator.instances.active", instances, m => m.size().toDouble)
|
||||||
|
.register(meterRegistry)
|
||||||
|
()
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
@@ -45,6 +57,7 @@ class InstanceRegistry:
|
|||||||
val isNew = !instances.containsKey(instanceId)
|
val isNew = !instances.containsKey(instanceId)
|
||||||
instances.put(instanceId, metadata)
|
instances.put(instanceId, metadata)
|
||||||
if isNew then
|
if isNew then
|
||||||
|
meterRegistry.counter("nowchess.coordinator.instances.joined").increment()
|
||||||
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
||||||
else
|
else
|
||||||
log.debugf(
|
log.debugf(
|
||||||
@@ -68,6 +81,7 @@ class InstanceRegistry:
|
|||||||
|
|
||||||
def removeInstance(instanceId: String): Unit =
|
def removeInstance(instanceId: String): Unit =
|
||||||
instances.remove(instanceId)
|
instances.remove(instanceId)
|
||||||
|
meterRegistry.counter("nowchess.coordinator.instances.removed").increment()
|
||||||
log.infof("Instance %s removed from registry", instanceId)
|
log.infof("Instance %s removed from registry", instanceId)
|
||||||
|
|
||||||
def evictStaleInstances(maxAge: Duration): List[String] =
|
def evictStaleInstances(maxAge: Duration): List[String] =
|
||||||
@@ -83,6 +97,7 @@ class InstanceRegistry:
|
|||||||
.toList
|
.toList
|
||||||
stale.foreach { id =>
|
stale.foreach { id =>
|
||||||
instances.remove(id)
|
instances.remove(id)
|
||||||
|
meterRegistry.counter("nowchess.coordinator.instances.evicted").increment()
|
||||||
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
||||||
}
|
}
|
||||||
stale
|
stale
|
||||||
|
|||||||
@@ -17,10 +17,12 @@ import de.nowchess.chess.observer.*
|
|||||||
import de.nowchess.api.error.GameError
|
import de.nowchess.api.error.GameError
|
||||||
import de.nowchess.api.game.WinReason.{Checkmate, Resignation}
|
import de.nowchess.api.game.WinReason.{Checkmate, Resignation}
|
||||||
import de.nowchess.api.io.{GameContextExport, GameContextImport}
|
import de.nowchess.api.io.{GameContextExport, GameContextImport}
|
||||||
import de.nowchess.api.rules.RuleSet
|
import de.nowchess.api.rules.{PostMoveStatus, RuleSet}
|
||||||
|
|
||||||
|
import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer}
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}
|
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
/** Pure game engine that manages game state and notifies observers of state changes. All rule queries delegate to the
|
/** Pure game engine that manages game state and notifies observers of state changes. All rule queries delegate to the
|
||||||
* injected RuleSet. All user interactions go through Commands; state changes are broadcast via GameEvents.
|
* injected RuleSet. All user interactions go through Commands; state changes are broadcast via GameEvents.
|
||||||
@@ -33,6 +35,7 @@ class GameEngine(
|
|||||||
initialDrawOffer: Option[Color] = None,
|
initialDrawOffer: Option[Color] = None,
|
||||||
initialRedoStack: List[Move] = Nil,
|
initialRedoStack: List[Move] = Nil,
|
||||||
initialTakebackRequest: Option[Color] = None,
|
initialTakebackRequest: Option[Color] = None,
|
||||||
|
private val meterRegistry: Option[MeterRegistry] = None,
|
||||||
) extends Observable:
|
) extends Observable:
|
||||||
// Ensure that initialBoard is set correctly for threefold repetition detection
|
// Ensure that initialBoard is set correctly for threefold repetition detection
|
||||||
private val contextWithInitialBoard =
|
private val contextWithInitialBoard =
|
||||||
@@ -57,6 +60,17 @@ class GameEngine(
|
|||||||
@SuppressWarnings(Array("DisableSyntax.var"))
|
@SuppressWarnings(Array("DisableSyntax.var"))
|
||||||
private var pendingTakebackRequest: Option[Color] = initialTakebackRequest
|
private var pendingTakebackRequest: Option[Color] = initialTakebackRequest
|
||||||
|
|
||||||
|
meterRegistry.foreach { reg =>
|
||||||
|
GameEngine.activeGamesCount.incrementAndGet()
|
||||||
|
reg.counter("nowchess.games.started").increment()
|
||||||
|
}
|
||||||
|
private def gamesCompletedCounter(result: String): Counter =
|
||||||
|
meterRegistry.map(_.counter("nowchess.games.completed", "result", result)).orNull
|
||||||
|
private def movesProcessedCounter: Counter =
|
||||||
|
meterRegistry.map(_.counter("nowchess.moves.processed")).orNull
|
||||||
|
private def movesDurationTimer: Timer =
|
||||||
|
meterRegistry.map(_.timer("nowchess.moves.duration")).orNull
|
||||||
|
|
||||||
// Start scheduler immediately for live clocks so passive expiry fires without waiting for a move.
|
// Start scheduler immediately for live clocks so passive expiry fires without waiting for a move.
|
||||||
clockState.foreach(scheduleExpiryCheck)
|
clockState.foreach(scheduleExpiryCheck)
|
||||||
|
|
||||||
@@ -165,6 +179,8 @@ class GameEngine(
|
|||||||
pendingTakebackRequest = None
|
pendingTakebackRequest = None
|
||||||
stopClock()
|
stopClock()
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
|
Option(gamesCompletedCounter("resignation")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
notifyObservers(ResignEvent(currentContext, color))
|
notifyObservers(ResignEvent(currentContext, color))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,6 +213,8 @@ class GameEngine(
|
|||||||
pendingTakebackRequest = None
|
pendingTakebackRequest = None
|
||||||
stopClock()
|
stopClock()
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
|
Option(gamesCompletedCounter("draw.agreement")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
notifyObservers(DrawEvent(currentContext, DrawReason.Agreement))
|
notifyObservers(DrawEvent(currentContext, DrawReason.Agreement))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,11 +241,15 @@ class GameEngine(
|
|||||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.FiftyMoveRule)))
|
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.FiftyMoveRule)))
|
||||||
stopClock()
|
stopClock()
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
|
Option(gamesCompletedCounter("draw.fifty_move")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
notifyObservers(DrawEvent(currentContext, DrawReason.FiftyMoveRule))
|
notifyObservers(DrawEvent(currentContext, DrawReason.FiftyMoveRule))
|
||||||
else if ruleSet.isThreefoldRepetition(currentContext) then
|
else if ruleSet.isThreefoldRepetition(currentContext) then
|
||||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.ThreefoldRepetition)))
|
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.ThreefoldRepetition)))
|
||||||
stopClock()
|
stopClock()
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
|
Option(gamesCompletedCounter("draw.threefold")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
notifyObservers(DrawEvent(currentContext, DrawReason.ThreefoldRepetition))
|
notifyObservers(DrawEvent(currentContext, DrawReason.ThreefoldRepetition))
|
||||||
else notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.DrawCannotBeClaimed))
|
else notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.DrawCannotBeClaimed))
|
||||||
}
|
}
|
||||||
@@ -311,9 +333,18 @@ class GameEngine(
|
|||||||
currentContext = currentContext.withResult(Some(GameResult.Draw(reason)))
|
currentContext = currentContext.withResult(Some(GameResult.Draw(reason)))
|
||||||
stopClock()
|
stopClock()
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
|
Option(gamesCompletedCounter(drawReasonTag(reason))).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
notifyObservers(DrawEvent(currentContext, reason))
|
notifyObservers(DrawEvent(currentContext, reason))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def drawReasonTag(reason: DrawReason): String = reason match
|
||||||
|
case DrawReason.Agreement => "draw.agreement"
|
||||||
|
case DrawReason.FiftyMoveRule => "draw.fifty_move"
|
||||||
|
case DrawReason.ThreefoldRepetition => "draw.threefold"
|
||||||
|
case DrawReason.Stalemate => "draw.stalemate"
|
||||||
|
case DrawReason.InsufficientMaterial => "draw.insufficient"
|
||||||
|
|
||||||
/** Inject clock state directly (for testing). */
|
/** Inject clock state directly (for testing). */
|
||||||
private[engine] def injectClockState(cs: Option[ClockState]): Unit = synchronized { clockState = cs }
|
private[engine] def injectClockState(cs: Option[ClockState]): Unit = synchronized { clockState = cs }
|
||||||
|
|
||||||
@@ -334,6 +365,11 @@ class GameEngine(
|
|||||||
pendingDrawOffer = None
|
pendingDrawOffer = None
|
||||||
pendingTakebackRequest = None
|
pendingTakebackRequest = None
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
|
val tag = result match
|
||||||
|
case GameResult.Draw(_) => "draw.insufficient"
|
||||||
|
case _ => "timeout"
|
||||||
|
Option(gamesCompletedCounter(tag)).foreach(_.increment())
|
||||||
|
activeGamesCount.decrementAndGet()
|
||||||
notifyObservers(TimeFlagEvent(currentContext, flagged))
|
notifyObservers(TimeFlagEvent(currentContext, flagged))
|
||||||
|
|
||||||
private def scheduleExpiryCheck(cs: ClockState): Unit =
|
private def scheduleExpiryCheck(cs: ClockState): Unit =
|
||||||
@@ -369,6 +405,12 @@ class GameEngine(
|
|||||||
// ──── Private helpers ────
|
// ──── Private helpers ────
|
||||||
|
|
||||||
private def executeMove(move: Move): Unit =
|
private def executeMove(move: Move): Unit =
|
||||||
|
Option(movesDurationTimer) match
|
||||||
|
case Some(timer) => timer.record(() => executeMoveBody(move))
|
||||||
|
case None => executeMoveBody(move)
|
||||||
|
Option(movesProcessedCounter).foreach(_.increment())
|
||||||
|
|
||||||
|
private def executeMoveBody(move: Move): Unit =
|
||||||
if !isRedoing then
|
if !isRedoing then
|
||||||
redoStack = Nil
|
redoStack = Nil
|
||||||
pendingTakebackRequest = None
|
pendingTakebackRequest = None
|
||||||
@@ -391,28 +433,36 @@ class GameEngine(
|
|||||||
)
|
)
|
||||||
|
|
||||||
val status = ruleSet.postMoveStatus(currentContext)
|
val status = ruleSet.postMoveStatus(currentContext)
|
||||||
if currentContext.result.isEmpty then
|
if currentContext.result.isEmpty then applyPostMoveStatus(status)
|
||||||
if status.isCheckmate then
|
|
||||||
val winner = currentContext.turn.opposite
|
|
||||||
currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate)))
|
|
||||||
cancelScheduled()
|
|
||||||
notifyObservers(CheckmateEvent(currentContext, winner))
|
|
||||||
redoStack = Nil
|
|
||||||
else if status.isStalemate then
|
|
||||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate)))
|
|
||||||
cancelScheduled()
|
|
||||||
notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate))
|
|
||||||
redoStack = Nil
|
|
||||||
else if status.isInsufficientMaterial then
|
|
||||||
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial)))
|
|
||||||
cancelScheduled()
|
|
||||||
notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial))
|
|
||||||
redoStack = Nil
|
|
||||||
else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext))
|
|
||||||
|
|
||||||
if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext))
|
if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext))
|
||||||
if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext))
|
if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext))
|
||||||
|
|
||||||
|
private def applyPostMoveStatus(status: PostMoveStatus): Unit =
|
||||||
|
if status.isCheckmate then
|
||||||
|
val winner = currentContext.turn.opposite
|
||||||
|
currentContext = currentContext.withResult(Some(GameResult.Win(winner, Checkmate)))
|
||||||
|
cancelScheduled()
|
||||||
|
Option(gamesCompletedCounter("checkmate")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
|
notifyObservers(CheckmateEvent(currentContext, winner))
|
||||||
|
redoStack = Nil
|
||||||
|
else if status.isStalemate then
|
||||||
|
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.Stalemate)))
|
||||||
|
cancelScheduled()
|
||||||
|
Option(gamesCompletedCounter("draw.stalemate")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
|
notifyObservers(DrawEvent(currentContext, DrawReason.Stalemate))
|
||||||
|
redoStack = Nil
|
||||||
|
else if status.isInsufficientMaterial then
|
||||||
|
currentContext = currentContext.withResult(Some(GameResult.Draw(DrawReason.InsufficientMaterial)))
|
||||||
|
cancelScheduled()
|
||||||
|
Option(gamesCompletedCounter("draw.insufficient")).foreach(_.increment())
|
||||||
|
GameEngine.activeGamesCount.decrementAndGet()
|
||||||
|
notifyObservers(DrawEvent(currentContext, DrawReason.InsufficientMaterial))
|
||||||
|
redoStack = Nil
|
||||||
|
else if status.isCheck then notifyObservers(CheckDetectedEvent(currentContext))
|
||||||
|
|
||||||
private def translateMoveToNotation(move: Move, boardBefore: Board): String =
|
private def translateMoveToNotation(move: Move, boardBefore: Board): String =
|
||||||
move.moveType match
|
move.moveType match
|
||||||
case MoveType.CastleKingside => "O-O"
|
case MoveType.CastleKingside => "O-O"
|
||||||
@@ -526,3 +576,6 @@ class GameEngine(
|
|||||||
pendingTakebackRequest = None
|
pendingTakebackRequest = None
|
||||||
notifyObservers(TakebackDeclinedEvent(currentContext, color))
|
notifyObservers(TakebackDeclinedEvent(currentContext, color))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object GameEngine:
|
||||||
|
val activeGamesCount: AtomicInteger = new AtomicInteger(0)
|
||||||
|
|||||||
@@ -12,7 +12,9 @@ import de.nowchess.chess.grpc.RuleSetGrpcAdapter
|
|||||||
import de.nowchess.chess.config.RedisConfig
|
import de.nowchess.chess.config.RedisConfig
|
||||||
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
import de.nowchess.chess.grpc.IoGrpcClientWrapper
|
||||||
import de.nowchess.chess.resource.GameDtoMapper
|
import de.nowchess.chess.resource.GameDtoMapper
|
||||||
|
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
|
import jakarta.annotation.PostConstruct
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import org.eclipse.microprofile.rest.client.inject.RestClient
|
import org.eclipse.microprofile.rest.client.inject.RestClient
|
||||||
@@ -34,12 +36,20 @@ class RedisGameRegistry extends GameRegistry:
|
|||||||
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
|
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
|
||||||
@Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized
|
@Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized
|
||||||
@Inject @RestClient var storeClient: StoreServiceClient = uninitialized
|
@Inject @RestClient var storeClient: StoreServiceClient = uninitialized
|
||||||
|
@Inject var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on
|
// scalafix:on
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[RedisGameRegistry])
|
private val log = Logger.getLogger(classOf[RedisGameRegistry])
|
||||||
private val localEngines = ConcurrentHashMap[String, GameEntry]()
|
private val localEngines = ConcurrentHashMap[String, GameEntry]()
|
||||||
private val rng = new SecureRandom()
|
private val rng = new SecureRandom()
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
def initMetrics(): Unit =
|
||||||
|
Gauge
|
||||||
|
.builder("nowchess.games.active", GameEngine.activeGamesCount, _.get().toDouble)
|
||||||
|
.register(meterRegistry)
|
||||||
|
()
|
||||||
|
|
||||||
private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId"
|
private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId"
|
||||||
|
|
||||||
def generateId(): String =
|
def generateId(): String =
|
||||||
@@ -58,14 +68,17 @@ class RedisGameRegistry extends GameRegistry:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def get(gameId: String): Option[GameEntry] =
|
def get(gameId: String): Option[GameEntry] =
|
||||||
Option(localEngines.get(gameId)) match
|
val result = Option(localEngines.get(gameId)) match
|
||||||
case Some(localEntry) =>
|
case Some(localEntry) =>
|
||||||
|
meterRegistry.counter("nowchess.games.cache.hits", "source", "local").increment()
|
||||||
readRedisDto(gameId).flatMap(dto => Try(reconstruct(dto)).toOption) match
|
readRedisDto(gameId).flatMap(dto => Try(reconstruct(dto)).toOption) match
|
||||||
case Some(redisEntry) if !sameSnapshot(localEntry, redisEntry) =>
|
case Some(redisEntry) if !sameSnapshot(localEntry, redisEntry) =>
|
||||||
localEngines.put(gameId, redisEntry)
|
localEngines.put(gameId, redisEntry)
|
||||||
Some(redisEntry)
|
Some(redisEntry)
|
||||||
case _ => Some(localEntry)
|
case _ => Some(localEntry)
|
||||||
case None => fromRedis(gameId).orElse(fromDb(gameId))
|
case None => fromRedis(gameId).orElse(fromDb(gameId))
|
||||||
|
if result.isEmpty then meterRegistry.counter("nowchess.games.cache.misses").increment()
|
||||||
|
result
|
||||||
|
|
||||||
def update(entry: GameEntry): Unit =
|
def update(entry: GameEntry): Unit =
|
||||||
localEngines.put(entry.gameId, entry)
|
localEngines.put(entry.gameId, entry)
|
||||||
@@ -86,6 +99,7 @@ class RedisGameRegistry extends GameRegistry:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
.map { entry =>
|
.map { entry =>
|
||||||
|
meterRegistry.counter("nowchess.games.cache.hits", "source", "redis").increment()
|
||||||
localEngines.put(gameId, entry)
|
localEngines.put(gameId, entry)
|
||||||
log.infof("Loaded game %s from Redis cache", gameId)
|
log.infof("Loaded game %s from Redis cache", gameId)
|
||||||
entry
|
entry
|
||||||
@@ -118,6 +132,7 @@ class RedisGameRegistry extends GameRegistry:
|
|||||||
(dto, reconstruct(dto))
|
(dto, reconstruct(dto))
|
||||||
} match
|
} match
|
||||||
case scala.util.Success((dto, entry)) =>
|
case scala.util.Success((dto, entry)) =>
|
||||||
|
meterRegistry.counter("nowchess.games.cache.hits", "source", "db").increment()
|
||||||
log.infof("Loaded game %s from store service", gameId)
|
log.infof("Loaded game %s from store service", gameId)
|
||||||
localEngines.put(gameId, entry)
|
localEngines.put(gameId, entry)
|
||||||
redis.value(classOf[String]).setex(cacheKey(gameId), 1800L, objectMapper.writeValueAsString(dto))
|
redis.value(classOf[String]).setex(cacheKey(gameId), 1800L, objectMapper.writeValueAsString(dto))
|
||||||
|
|||||||
+13
-6
@@ -19,6 +19,8 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
|
|||||||
import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub
|
import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub
|
||||||
import io.grpc.stub.StreamObserver
|
import io.grpc.stub.StreamObserver
|
||||||
import io.grpc.Channel
|
import io.grpc.Channel
|
||||||
|
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class InstanceHeartbeatService:
|
class InstanceHeartbeatService:
|
||||||
@@ -32,6 +34,9 @@ class InstanceHeartbeatService:
|
|||||||
@Inject
|
@Inject
|
||||||
private var mapper: ObjectMapper = uninitialized
|
private var mapper: ObjectMapper = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private var meterRegistry: MeterRegistry = uninitialized
|
||||||
|
|
||||||
@GrpcClient("coordinator-grpc")
|
@GrpcClient("coordinator-grpc")
|
||||||
private var channel: Channel = uninitialized
|
private var channel: Channel = uninitialized
|
||||||
|
|
||||||
@@ -51,14 +56,15 @@ class InstanceHeartbeatService:
|
|||||||
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
|
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
|
||||||
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
|
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||||
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
|
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||||
private var subscriptionCount = 0
|
|
||||||
private var localCacheSize = 0
|
private var localCacheSize = 0
|
||||||
private var serviceActive = false
|
private var serviceActive = false
|
||||||
private var shuttingDown = false
|
private var shuttingDown = false
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
private val redisHeartbeatPending = new AtomicBoolean(false)
|
private val redisHeartbeatPending = new AtomicBoolean(false)
|
||||||
|
private val subscriptionCount = new AtomicInteger(0)
|
||||||
|
|
||||||
def onStart(@Observes event: StartupEvent): Unit =
|
def onStart(@Observes event: StartupEvent): Unit =
|
||||||
|
Gauge.builder("nowchess.instance.subscriptions", subscriptionCount, _.get().toDouble).register(meterRegistry)
|
||||||
if coordinatorEnabled then
|
if coordinatorEnabled then
|
||||||
try
|
try
|
||||||
shuttingDown = false
|
shuttingDown = false
|
||||||
@@ -90,7 +96,7 @@ class InstanceHeartbeatService:
|
|||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
|
|
||||||
def setSubscriptionCount(count: Int): Unit =
|
def setSubscriptionCount(count: Int): Unit =
|
||||||
subscriptionCount = count
|
subscriptionCount.set(count)
|
||||||
|
|
||||||
def setLocalCacheSize(count: Int): Unit =
|
def setLocalCacheSize(count: Int): Unit =
|
||||||
localCacheSize = count
|
localCacheSize = count
|
||||||
@@ -99,13 +105,13 @@ class InstanceHeartbeatService:
|
|||||||
if coordinatorEnabled then
|
if coordinatorEnabled then
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
redis.set(classOf[String]).sadd(setKey, gameId)
|
redis.set(classOf[String]).sadd(setKey, gameId)
|
||||||
subscriptionCount += 1
|
subscriptionCount.incrementAndGet()
|
||||||
|
|
||||||
def removeGameSubscription(gameId: String): Unit =
|
def removeGameSubscription(gameId: String): Unit =
|
||||||
if coordinatorEnabled then
|
if coordinatorEnabled then
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
redis.set(classOf[String]).srem(setKey, gameId)
|
redis.set(classOf[String]).srem(setKey, gameId)
|
||||||
subscriptionCount = Math.max(0, subscriptionCount - 1)
|
subscriptionCount.updateAndGet(c => Math.max(0, c - 1))
|
||||||
|
|
||||||
private def generateInstanceId(): Unit =
|
private def generateInstanceId(): Unit =
|
||||||
val hostname =
|
val hostname =
|
||||||
@@ -162,13 +168,14 @@ class InstanceHeartbeatService:
|
|||||||
.setHostname(getHostname)
|
.setHostname(getHostname)
|
||||||
.setHttpPort(httpPort)
|
.setHttpPort(httpPort)
|
||||||
.setGrpcPort(grpcPort)
|
.setGrpcPort(grpcPort)
|
||||||
.setSubscriptionCount(subscriptionCount)
|
.setSubscriptionCount(subscriptionCount.get())
|
||||||
.setLocalCacheSize(localCacheSize)
|
.setLocalCacheSize(localCacheSize)
|
||||||
.setTimestampMillis(System.currentTimeMillis())
|
.setTimestampMillis(System.currentTimeMillis())
|
||||||
.build()
|
.build()
|
||||||
observer.onNext(frame)
|
observer.onNext(frame)
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
|
meterRegistry.counter("nowchess.heartbeat.failures").increment()
|
||||||
log.warnf(ex, "Failed to send heartbeat frame")
|
log.warnf(ex, "Failed to send heartbeat frame")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,7 +189,7 @@ class InstanceHeartbeatService:
|
|||||||
"hostname" -> getHostname,
|
"hostname" -> getHostname,
|
||||||
"httpPort" -> httpPort,
|
"httpPort" -> httpPort,
|
||||||
"grpcPort" -> grpcPort,
|
"grpcPort" -> grpcPort,
|
||||||
"subscriptionCount" -> subscriptionCount,
|
"subscriptionCount" -> subscriptionCount.get(),
|
||||||
"localCacheSize" -> localCacheSize,
|
"localCacheSize" -> localCacheSize,
|
||||||
"lastHeartbeat" -> java.time.Instant.now().toString,
|
"lastHeartbeat" -> java.time.Instant.now().toString,
|
||||||
"state" -> "HEALTHY",
|
"state" -> "HEALTHY",
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import de.nowchess.bot.ai.Evaluation
|
|||||||
import de.nowchess.bot.util.ZobristHash
|
import de.nowchess.bot.util.ZobristHash
|
||||||
import de.nowchess.api.rules.RuleSet
|
import de.nowchess.api.rules.RuleSet
|
||||||
import de.nowchess.rules.sets.DefaultRules
|
import de.nowchess.rules.sets.DefaultRules
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
|
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
|
||||||
|
|
||||||
final class AlphaBetaSearch(
|
final class AlphaBetaSearch(
|
||||||
@@ -14,6 +15,7 @@ final class AlphaBetaSearch(
|
|||||||
tt: TranspositionTable = TranspositionTable(),
|
tt: TranspositionTable = TranspositionTable(),
|
||||||
weights: Evaluation,
|
weights: Evaluation,
|
||||||
numThreads: Int = Runtime.getRuntime.availableProcessors,
|
numThreads: Int = Runtime.getRuntime.availableProcessors,
|
||||||
|
meterRegistry: MeterRegistry = null,
|
||||||
):
|
):
|
||||||
|
|
||||||
private val INF = Int.MaxValue / 2
|
private val INF = Int.MaxValue / 2
|
||||||
@@ -85,8 +87,8 @@ final class AlphaBetaSearch(
|
|||||||
val rootHash = ZobristHash.hash(context)
|
val rootHash = ZobristHash.hash(context)
|
||||||
|
|
||||||
@scala.annotation.tailrec
|
@scala.annotation.tailrec
|
||||||
def loop(bestSoFar: Option[Move], prevScore: Int, depth: Int): Option[Move] =
|
def loop(bestSoFar: Option[Move], prevScore: Int, depth: Int, lastDepth: Int): (Option[Move], Int) =
|
||||||
if isOutOfTime then bestSoFar
|
if isOutOfTime then (bestSoFar, lastDepth)
|
||||||
else
|
else
|
||||||
val (alpha, beta) =
|
val (alpha, beta) =
|
||||||
if depth == 1 then (-INF, INF) else (prevScore - ASPIRATION_DELTA, prevScore + ASPIRATION_DELTA)
|
if depth == 1 then (-INF, INF) else (prevScore - ASPIRATION_DELTA, prevScore + ASPIRATION_DELTA)
|
||||||
@@ -99,9 +101,16 @@ final class AlphaBetaSearch(
|
|||||||
rootHash,
|
rootHash,
|
||||||
excludedRootMoves,
|
excludedRootMoves,
|
||||||
)
|
)
|
||||||
loop(move.orElse(bestSoFar), score, depth + 1)
|
loop(move.orElse(bestSoFar), score, depth + 1, depth)
|
||||||
|
|
||||||
loop(None, 0, 1)
|
val (result, depthReached) = loop(None, 0, 1, 0)
|
||||||
|
recordSearchMetrics(depthReached)
|
||||||
|
result
|
||||||
|
|
||||||
|
private def recordSearchMetrics(depthReached: Int): Unit =
|
||||||
|
if meterRegistry != null then
|
||||||
|
meterRegistry.summary("nowchess.bot.search.nodes").record(nodeCount.get().toDouble)
|
||||||
|
meterRegistry.summary("nowchess.bot.search.depth").record(depthReached.toDouble)
|
||||||
|
|
||||||
private def isOutOfTime: Boolean =
|
private def isOutOfTime: Boolean =
|
||||||
System.currentTimeMillis - timeStartMs.get >= timeLimitMs.get
|
System.currentTimeMillis - timeStartMs.get >= timeLimitMs.get
|
||||||
|
|||||||
+10
-5
@@ -6,6 +6,7 @@ import de.nowchess.bot.BotController
|
|||||||
import de.nowchess.bot.BotDifficulty
|
import de.nowchess.bot.BotDifficulty
|
||||||
import de.nowchess.bot.config.RedisConfig
|
import de.nowchess.bot.config.RedisConfig
|
||||||
import de.nowchess.io.fen.FenParser
|
import de.nowchess.io.fen.FenParser
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import io.quarkus.runtime.StartupEvent
|
import io.quarkus.runtime.StartupEvent
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
@@ -18,10 +19,11 @@ import java.util.function.Consumer
|
|||||||
class OfficialBotService:
|
class OfficialBotService:
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject var redis: RedisDataSource = uninitialized
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
@Inject var redisConfig: RedisConfig = uninitialized
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||||
@Inject var botController: BotController = uninitialized
|
@Inject var botController: BotController = uninitialized
|
||||||
|
@Inject var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val terminalStatuses =
|
private val terminalStatuses =
|
||||||
@@ -85,7 +87,10 @@ class OfficialBotService:
|
|||||||
val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium)
|
val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium)
|
||||||
botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot =>
|
botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot =>
|
||||||
FenParser.parseFen(fen).toOption.foreach { context =>
|
FenParser.parseFen(fen).toOption.foreach { context =>
|
||||||
bot(context).foreach { move =>
|
val timer = meterRegistry.timer("nowchess.bot.move.duration", "bot", botName)
|
||||||
|
val moveOpt = timer.recordCallable(() => bot(context))
|
||||||
|
moveOpt.flatten.foreach { move =>
|
||||||
|
meterRegistry.counter("nowchess.bot.moves.computed", "bot", botName).increment()
|
||||||
val uci = toUci(move)
|
val uci = toUci(move)
|
||||||
val c2sTopic = s"${redisConfig.prefix}:game:$gameId:c2s"
|
val c2sTopic = s"${redisConfig.prefix}:game:$gameId:c2s"
|
||||||
val moveMsg = s"""{"type":"MOVE","uci":"$uci","playerId":"$botAccountId"}"""
|
val moveMsg = s"""{"type":"MOVE","uci":"$uci","playerId":"$botAccountId"}"""
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package de.nowchess.store.service
|
|||||||
import de.nowchess.api.dto.GameWritebackEventDto
|
import de.nowchess.api.dto.GameWritebackEventDto
|
||||||
import de.nowchess.store.domain.GameRecord
|
import de.nowchess.store.domain.GameRecord
|
||||||
import de.nowchess.store.repository.GameRecordRepository
|
import de.nowchess.store.repository.GameRecordRepository
|
||||||
|
import io.micrometer.core.instrument.{Counter, MeterRegistry, Timer}
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import jakarta.transaction.Transactional
|
import jakarta.transaction.Transactional
|
||||||
@@ -12,64 +13,84 @@ import java.time.Instant
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class GameWritebackService:
|
class GameWritebackService:
|
||||||
@Inject
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
|
@Inject
|
||||||
var repository: GameRecordRepository = uninitialized
|
var repository: GameRecordRepository = uninitialized
|
||||||
// scalafix:on
|
|
||||||
|
@Inject
|
||||||
|
var meterRegistry: MeterRegistry = uninitialized
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private lazy val writebackTimer: Timer =
|
||||||
|
meterRegistry.timer("nowchess.store.writeback.duration")
|
||||||
|
|
||||||
|
private lazy val writebackSkipped: Counter =
|
||||||
|
meterRegistry.counter("nowchess.store.writeback.skipped")
|
||||||
|
|
||||||
|
private def gamesWrittenCounter(operation: String): Counter =
|
||||||
|
meterRegistry.counter("nowchess.store.games.written", "operation", operation)
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
def writeBack(event: GameWritebackEventDto): Unit =
|
def writeBack(event: GameWritebackEventDto): Unit =
|
||||||
|
writebackTimer.record(() => doWriteBack(event))
|
||||||
|
|
||||||
|
private def doWriteBack(event: GameWritebackEventDto): Unit =
|
||||||
repository.findByGameId(event.gameId) match
|
repository.findByGameId(event.gameId) match
|
||||||
case None =>
|
case None =>
|
||||||
val record = new GameRecord
|
createRecord(event)
|
||||||
record.gameId = event.gameId
|
gamesWrittenCounter("create").increment()
|
||||||
record.fen = event.fen
|
|
||||||
record.pgn = event.pgn
|
|
||||||
record.moveCount = event.moveCount
|
|
||||||
record.whiteId = event.whiteId
|
|
||||||
record.whiteName = event.whiteName
|
|
||||||
record.blackId = event.blackId
|
|
||||||
record.blackName = event.blackName
|
|
||||||
record.mode = event.mode
|
|
||||||
record.resigned = event.resigned
|
|
||||||
record.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull
|
|
||||||
record.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull
|
|
||||||
record.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull
|
|
||||||
record.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull
|
|
||||||
record.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull
|
|
||||||
record.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull
|
|
||||||
record.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull
|
|
||||||
record.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull
|
|
||||||
record.clockActiveColor = event.clockActiveColor.orNull
|
|
||||||
record.pendingDrawOffer = event.pendingDrawOffer.orNull
|
|
||||||
record.result = event.result.orNull
|
|
||||||
record.terminationReason = event.terminationReason.orNull
|
|
||||||
record.createdAt = Instant.now()
|
|
||||||
record.updatedAt = Instant.now()
|
|
||||||
repository.persist(record)
|
|
||||||
case Some(r) if event.moveCount > r.moveCount || event.pgn != r.pgn =>
|
case Some(r) if event.moveCount > r.moveCount || event.pgn != r.pgn =>
|
||||||
r.fen = event.fen
|
updateRecord(r, event)
|
||||||
r.pgn = event.pgn
|
gamesWrittenCounter("update").increment()
|
||||||
r.moveCount = event.moveCount
|
case _ =>
|
||||||
r.whiteId = event.whiteId
|
writebackSkipped.increment()
|
||||||
r.whiteName = event.whiteName
|
|
||||||
r.blackId = event.blackId
|
private def createRecord(event: GameWritebackEventDto): Unit =
|
||||||
r.blackName = event.blackName
|
val record = new GameRecord
|
||||||
r.mode = event.mode
|
record.gameId = event.gameId
|
||||||
r.resigned = event.resigned
|
record.fen = event.fen
|
||||||
r.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull
|
record.pgn = event.pgn
|
||||||
r.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull
|
record.moveCount = event.moveCount
|
||||||
r.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull
|
record.whiteId = event.whiteId
|
||||||
r.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull
|
record.whiteName = event.whiteName
|
||||||
r.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull
|
record.blackId = event.blackId
|
||||||
r.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull
|
record.blackName = event.blackName
|
||||||
r.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull
|
record.mode = event.mode
|
||||||
r.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull
|
record.resigned = event.resigned
|
||||||
r.clockActiveColor = event.clockActiveColor.orNull
|
applyClockFields(record, event)
|
||||||
r.pendingDrawOffer = event.pendingDrawOffer.orNull
|
record.result = event.result.orNull
|
||||||
r.pendingTakebackOffer = event.pendingTakebackRequest.orNull
|
record.terminationReason = event.terminationReason.orNull
|
||||||
r.result = event.result.orNull
|
record.createdAt = Instant.now()
|
||||||
r.terminationReason = event.terminationReason.orNull
|
record.updatedAt = Instant.now()
|
||||||
r.updatedAt = Instant.now()
|
repository.persist(record)
|
||||||
repository.merge(r)
|
|
||||||
case _ => ()
|
private def updateRecord(r: GameRecord, event: GameWritebackEventDto): Unit =
|
||||||
|
r.fen = event.fen
|
||||||
|
r.pgn = event.pgn
|
||||||
|
r.moveCount = event.moveCount
|
||||||
|
r.whiteId = event.whiteId
|
||||||
|
r.whiteName = event.whiteName
|
||||||
|
r.blackId = event.blackId
|
||||||
|
r.blackName = event.blackName
|
||||||
|
r.mode = event.mode
|
||||||
|
r.resigned = event.resigned
|
||||||
|
applyClockFields(r, event)
|
||||||
|
r.pendingDrawOffer = event.pendingDrawOffer.orNull
|
||||||
|
r.pendingTakebackOffer = event.pendingTakebackRequest.orNull
|
||||||
|
r.result = event.result.orNull
|
||||||
|
r.terminationReason = event.terminationReason.orNull
|
||||||
|
r.updatedAt = Instant.now()
|
||||||
|
repository.merge(r)
|
||||||
|
|
||||||
|
private def applyClockFields(r: GameRecord, event: GameWritebackEventDto): Unit =
|
||||||
|
r.limitSeconds = event.limitSeconds.map(java.lang.Integer.valueOf).orNull
|
||||||
|
r.incrementSeconds = event.incrementSeconds.map(java.lang.Integer.valueOf).orNull
|
||||||
|
r.daysPerMove = event.daysPerMove.map(java.lang.Integer.valueOf).orNull
|
||||||
|
r.whiteRemainingMs = event.whiteRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||||
|
r.blackRemainingMs = event.blackRemainingMs.map(java.lang.Long.valueOf).orNull
|
||||||
|
r.incrementMs = event.incrementMs.map(java.lang.Long.valueOf).orNull
|
||||||
|
r.clockLastTickAt = event.clockLastTickAt.map(java.lang.Long.valueOf).orNull
|
||||||
|
r.clockMoveDeadline = event.clockMoveDeadline.map(java.lang.Long.valueOf).orNull
|
||||||
|
r.clockActiveColor = event.clockActiveColor.orNull
|
||||||
|
r.pendingDrawOffer = event.pendingDrawOffer.orNull
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package de.nowchess.ws.resource
|
package de.nowchess.ws.resource
|
||||||
|
|
||||||
import de.nowchess.ws.config.RedisConfig
|
import de.nowchess.ws.config.RedisConfig
|
||||||
|
import io.micrometer.core.instrument.{Counter, Gauge, MeterRegistry}
|
||||||
import io.quarkus.redis.datasource.RedisDataSource
|
import io.quarkus.redis.datasource.RedisDataSource
|
||||||
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||||
import io.quarkus.websockets.next.*
|
import io.quarkus.websockets.next.*
|
||||||
@@ -26,10 +27,27 @@ class GameWebSocketResource:
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
var jwtParser: JWTParser = uninitialized
|
var jwtParser: JWTParser = uninitialized
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
var meterRegistry: MeterRegistry = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val connections = new ConcurrentHashMap[String, ConnectionMeta]()
|
private val connections = new ConcurrentHashMap[String, ConnectionMeta]()
|
||||||
|
|
||||||
|
private lazy val connectionsOpened: Counter =
|
||||||
|
meterRegistry.counter("nowchess.ws.connections.opened")
|
||||||
|
|
||||||
|
private lazy val connectionsClosed: Counter =
|
||||||
|
meterRegistry.counter("nowchess.ws.connections.closed")
|
||||||
|
|
||||||
|
private lazy val messagesReceived: Counter =
|
||||||
|
meterRegistry.counter("nowchess.ws.messages.received")
|
||||||
|
|
||||||
|
private lazy val activeGauge: Unit =
|
||||||
|
Gauge
|
||||||
|
.builder("nowchess.ws.connections.active", connections, _.size().toDouble)
|
||||||
|
.register(meterRegistry)
|
||||||
|
|
||||||
private def s2cTopic(gameId: String): String =
|
private def s2cTopic(gameId: String): String =
|
||||||
s"${redisConfig.prefix}:game:$gameId:s2c"
|
s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||||
|
|
||||||
@@ -38,22 +56,19 @@ class GameWebSocketResource:
|
|||||||
|
|
||||||
@OnOpen
|
@OnOpen
|
||||||
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
||||||
val gameId = connection.pathParam("gameId")
|
activeGauge
|
||||||
val playerId = Option(handshake.header("Authorization"))
|
val gameId = connection.pathParam("gameId")
|
||||||
.filter(_.nonEmpty)
|
val playerId = resolvePlayerId(handshake)
|
||||||
.flatMap(token => Try(jwtParser.parse(token)).toOption)
|
|
||||||
.map(_.getSubject)
|
|
||||||
log.infof("Game WebSocket opened — gameId=%s playerId=%s", gameId, playerId.getOrElse("anonymous"))
|
log.infof("Game WebSocket opened — gameId=%s playerId=%s", gameId, playerId.getOrElse("anonymous"))
|
||||||
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(s2cTopic(gameId), handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(s2cTopic(gameId), handler)
|
||||||
connections.put(connection.id(), ConnectionMeta(gameId, subscriber, playerId))
|
connections.put(connection.id(), ConnectionMeta(gameId, subscriber, playerId))
|
||||||
val connectedMsg = playerId match
|
connectionsOpened.increment()
|
||||||
case Some(pid) => s"""{"type":"CONNECTED","gameId":"$gameId","playerId":"$pid"}"""
|
publishConnected(gameId, playerId)
|
||||||
case None => s"""{"type":"CONNECTED","gameId":"$gameId"}"""
|
|
||||||
redis.pubsub(classOf[String]).publish(c2sTopic(gameId), connectedMsg)
|
|
||||||
|
|
||||||
@OnTextMessage
|
@OnTextMessage
|
||||||
def onTextMessage(connection: WebSocketConnection, message: String): Unit =
|
def onTextMessage(connection: WebSocketConnection, message: String): Unit =
|
||||||
|
messagesReceived.increment()
|
||||||
Option(connections.get(connection.id())).foreach { meta =>
|
Option(connections.get(connection.id())).foreach { meta =>
|
||||||
val enriched = meta.playerId match
|
val enriched = meta.playerId match
|
||||||
case Some(pid) => injectPlayerId(message, pid)
|
case Some(pid) => injectPlayerId(message, pid)
|
||||||
@@ -66,8 +81,21 @@ class GameWebSocketResource:
|
|||||||
Option(connections.remove(connection.id())).foreach { meta =>
|
Option(connections.remove(connection.id())).foreach { meta =>
|
||||||
log.infof("Game WebSocket closed — gameId=%s", meta.gameId)
|
log.infof("Game WebSocket closed — gameId=%s", meta.gameId)
|
||||||
meta.subscriber.unsubscribe(s2cTopic(meta.gameId))
|
meta.subscriber.unsubscribe(s2cTopic(meta.gameId))
|
||||||
|
connectionsClosed.increment()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def resolvePlayerId(handshake: HandshakeRequest): Option[String] =
|
||||||
|
Option(handshake.header("Authorization"))
|
||||||
|
.filter(_.nonEmpty)
|
||||||
|
.flatMap(token => Try(jwtParser.parse(token)).toOption)
|
||||||
|
.map(_.getSubject)
|
||||||
|
|
||||||
|
private def publishConnected(gameId: String, playerId: Option[String]): Unit =
|
||||||
|
val connectedMsg = playerId match
|
||||||
|
case Some(pid) => s"""{"type":"CONNECTED","gameId":"$gameId","playerId":"$pid"}"""
|
||||||
|
case None => s"""{"type":"CONNECTED","gameId":"$gameId"}"""
|
||||||
|
redis.pubsub(classOf[String]).publish(c2sTopic(gameId), connectedMsg)
|
||||||
|
|
||||||
private def injectPlayerId(msg: String, pid: String): String =
|
private def injectPlayerId(msg: String, pid: String): String =
|
||||||
val trimmed = msg.trim
|
val trimmed = msg.trim
|
||||||
if trimmed.endsWith("}") then trimmed.dropRight(1) + s""","playerId":"$pid"}"""
|
if trimmed.endsWith("}") then trimmed.dropRight(1) + s""","playerId":"$pid"}"""
|
||||||
|
|||||||
Reference in New Issue
Block a user