feat: configure logging and add OpenTelemetry support (#49)
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
Reviewed-on: #49
This commit was merged in pull request #49.
This commit is contained in:
@@ -1,13 +1,16 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.enterprise.inject.Instance
|
||||
import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -21,10 +24,14 @@ class AutoScaler:
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||
private val avgLoadRef = new AtomicReference[Double](0.0)
|
||||
|
||||
private def kubeClientOpt: Option[KubernetesClient] =
|
||||
if kubeClientInstance.isUnsatisfied then None
|
||||
@@ -33,6 +40,13 @@ class AutoScaler:
|
||||
private val argoApiVersion = "argoproj.io/v1alpha1"
|
||||
private val argoKind = "Rollout"
|
||||
|
||||
@PostConstruct
|
||||
def initMetrics(): Unit =
|
||||
Gauge
|
||||
.builder("nowchess.coordinator.load.average", avgLoadRef, _.get())
|
||||
.register(meterRegistry)
|
||||
()
|
||||
|
||||
// scalafix:off DisableSyntax.asInstanceOf
|
||||
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
||||
Option(rollout.get[AnyRef]("spec")).collect { case m: java.util.Map[?, ?] =>
|
||||
@@ -48,6 +62,7 @@ class AutoScaler:
|
||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||
if instances.nonEmpty then
|
||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||
avgLoadRef.set(avgLoad)
|
||||
|
||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore then scaleDown()
|
||||
@@ -79,6 +94,7 @@ class AutoScaler:
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "up").increment()
|
||||
log.infof(
|
||||
"Scaled up %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
@@ -91,6 +107,7 @@ class AutoScaler:
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "up").increment()
|
||||
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
||||
|
||||
def scaleDown(): Unit =
|
||||
@@ -120,6 +137,7 @@ class AutoScaler:
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.resource(rollout)
|
||||
.update()
|
||||
meterRegistry.counter("nowchess.coordinator.scale.events", "direction", "down").increment()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
@@ -132,4 +150,5 @@ class AutoScaler:
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
meterRegistry.counter("nowchess.coordinator.scale.failures", "direction", "down").increment()
|
||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||
|
||||
+10
-1
@@ -5,6 +5,7 @@ import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
@@ -30,6 +31,9 @@ class CacheEvictionManager:
|
||||
@Inject
|
||||
private var objectMapper: ObjectMapper = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
@@ -38,8 +42,12 @@ class CacheEvictionManager:
|
||||
redisPrefix = prefix
|
||||
|
||||
def evictStaleGames: Unit =
|
||||
log.info("Starting cache eviction scan")
|
||||
meterRegistry.timer("nowchess.coordinator.cache.eviction.duration").record { () =>
|
||||
runEviction()
|
||||
}
|
||||
|
||||
private def runEviction(): Unit =
|
||||
log.info("Starting cache eviction scan")
|
||||
val pattern = s"$redisPrefix:game:entry:*"
|
||||
val keys = redis.key(classOf[String]).keys(pattern)
|
||||
val now = System.currentTimeMillis()
|
||||
@@ -56,6 +64,7 @@ class CacheEvictionManager:
|
||||
try
|
||||
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
||||
redis.key(classOf[String]).del(key)
|
||||
meterRegistry.counter("nowchess.coordinator.cache.evictions").increment()
|
||||
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
||||
count + 1
|
||||
catch
|
||||
|
||||
@@ -6,6 +6,7 @@ import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.fabric8.kubernetes.api.model.Pod
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import org.jboss.logging.Logger
|
||||
@@ -27,6 +28,9 @@ class HealthMonitor:
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[HealthMonitor])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
@@ -39,6 +43,7 @@ class HealthMonitor:
|
||||
redisPrefix = prefix
|
||||
|
||||
def checkInstanceHealth: Unit =
|
||||
meterRegistry.counter("nowchess.coordinator.health.checks").increment()
|
||||
val evicted = instanceRegistry.evictStaleInstances(config.instanceDeadTimeout)
|
||||
if evicted.nonEmpty then log.warnf("Evicted %d stale instances: %s", evicted.size, evicted.mkString(", "))
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
@@ -108,6 +113,7 @@ class HealthMonitor:
|
||||
case Some(pod) =>
|
||||
val isReady = isPodReady(pod)
|
||||
if !isReady && inst.state == "HEALTHY" then
|
||||
meterRegistry.counter("nowchess.coordinator.pods.unhealthy").increment()
|
||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
case None =>
|
||||
|
||||
+15
@@ -1,5 +1,6 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||
@@ -9,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.time.{Duration, Instant}
|
||||
import io.micrometer.core.instrument.{Gauge, MeterRegistry}
|
||||
import io.smallrye.mutiny.Uni
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
@@ -18,12 +20,22 @@ class InstanceRegistry:
|
||||
@Inject
|
||||
private var redis: ReactiveRedisDataSource = uninitialized
|
||||
private var redisPrefix = "nowchess"
|
||||
|
||||
@Inject
|
||||
private var meterRegistry: MeterRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[InstanceRegistry])
|
||||
private val mapper = ObjectMapper()
|
||||
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
||||
|
||||
@PostConstruct
|
||||
def initMetrics(): Unit =
|
||||
Gauge
|
||||
.builder("nowchess.coordinator.instances.active", instances, m => m.size().toDouble)
|
||||
.register(meterRegistry)
|
||||
()
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
@@ -45,6 +57,7 @@ class InstanceRegistry:
|
||||
val isNew = !instances.containsKey(instanceId)
|
||||
instances.put(instanceId, metadata)
|
||||
if isNew then
|
||||
meterRegistry.counter("nowchess.coordinator.instances.joined").increment()
|
||||
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
||||
else
|
||||
log.debugf(
|
||||
@@ -68,6 +81,7 @@ class InstanceRegistry:
|
||||
|
||||
def removeInstance(instanceId: String): Unit =
|
||||
instances.remove(instanceId)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.removed").increment()
|
||||
log.infof("Instance %s removed from registry", instanceId)
|
||||
|
||||
def evictStaleInstances(maxAge: Duration): List[String] =
|
||||
@@ -83,6 +97,7 @@ class InstanceRegistry:
|
||||
.toList
|
||||
stale.foreach { id =>
|
||||
instances.remove(id)
|
||||
meterRegistry.counter("nowchess.coordinator.instances.evicted").increment()
|
||||
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
||||
}
|
||||
stale
|
||||
|
||||
Reference in New Issue
Block a user