feat: add initialization metrics for various services
This commit is contained in:
@@ -45,6 +45,10 @@ class AutoScaler:
|
||||
Gauge
|
||||
.builder("nowchess.coordinator.load.average", avgLoadRef, _.get())
|
||||
.register(meterRegistry)
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment(0)
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment(0)
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment(0)
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment(0)
|
||||
()
|
||||
|
||||
// scalafix:off DisableSyntax.asInstanceOf
|
||||
|
||||
+7
@@ -1,5 +1,6 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
@@ -11,6 +12,7 @@ import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.util.Try
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit
|
||||
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -41,6 +43,11 @@ class CacheEvictionManager:
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
@PostConstruct
|
||||
def initializeMetrics(): Unit =
|
||||
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record(0L, TimeUnit.MILLISECONDS)
|
||||
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment(0)
|
||||
|
||||
def evictStaleGames: Unit =
|
||||
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record((() => runEviction()): Runnable)
|
||||
|
||||
|
||||
+37
-1
@@ -1,5 +1,6 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.enterprise.inject.Instance
|
||||
import jakarta.inject.Inject
|
||||
@@ -42,16 +43,24 @@ class HealthMonitor:
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
@PostConstruct
|
||||
def initializeMetrics(): Unit =
|
||||
meterRegistry.counter("nowchess.coordinator.health.checks").increment(0)
|
||||
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment(0)
|
||||
|
||||
def checkInstanceHealth: Unit =
|
||||
meterRegistry.counter("nowchess.coordinator.health.checks").increment()
|
||||
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
||||
if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||
if evicted.nonEmpty then
|
||||
log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||
evicted.foreach(deleteK8sPod)
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
instances.foreach { inst =>
|
||||
val isHealthy = checkHealth(inst.instanceId)
|
||||
if !isHealthy && inst.state == "HEALTHY" then
|
||||
log.warnf("Instance %s marked unhealthy", inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
deleteK8sPod(inst.instanceId)
|
||||
}
|
||||
|
||||
private def checkHealth(instanceId: String): Boolean =
|
||||
@@ -116,6 +125,7 @@ class HealthMonitor:
|
||||
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
|
||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
deleteK8sPod(inst.instanceId)
|
||||
case None =>
|
||||
log.warnf("No pod found for instance %s, evicting from registry", inst.instanceId)
|
||||
instanceRegistry.removeInstance(inst.instanceId)
|
||||
@@ -128,3 +138,29 @@ class HealthMonitor:
|
||||
Option(pod.getStatus)
|
||||
.flatMap(s => Option(s.getConditions))
|
||||
.exists(_.asScala.exists(cond => cond.getType == "Ready" && cond.getStatus == "True"))
|
||||
|
||||
private def deleteK8sPod(instanceId: String): Unit =
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.debugf("Kubernetes client not available, skipping pod deletion for %s", instanceId)
|
||||
case Some(kube) =>
|
||||
try
|
||||
val pods = kube
|
||||
.pods()
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withLabel(config.k8sRolloutLabelSelector)
|
||||
.list()
|
||||
.getItems
|
||||
.asScala
|
||||
|
||||
pods.find(pod => pod.getMetadata.getName.contains(instanceId)) match
|
||||
case Some(pod) =>
|
||||
val podName = pod.getMetadata.getName
|
||||
kube.pods().inNamespace(config.k8sNamespace).withName(podName).delete()
|
||||
meterRegistry.counter("nowchess.coordinator.pods.deleted").increment()
|
||||
log.infof("Deleted pod %s for dead instance %s", podName, instanceId)
|
||||
case None =>
|
||||
log.debugf("No pod found for instance %s, skipping deletion", instanceId)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to delete pod for instance %s", instanceId)
|
||||
|
||||
+3
@@ -34,6 +34,9 @@ class InstanceRegistry:
|
||||
Gauge
|
||||
.builder("nowchess.coordinator.instances.active", instances, m => m.size().toDouble)
|
||||
.register(meterRegistry)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.joined").increment(0)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.removed").increment(0)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.evicted").increment(0)
|
||||
()
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
|
||||
Reference in New Issue
Block a user