@@ -0,0 +1,60 @@
|
||||
syntax = "proto3";
|
||||
option java_package = "de.nowchess.coordinator.proto";
|
||||
option java_multiple_files = true;
|
||||
option java_outer_classname = "CoordinatorServiceProto";
|
||||
|
||||
service CoordinatorService {
|
||||
rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand);
|
||||
rpc BatchResubscribeGames(BatchResubscribeRequest) returns (BatchResubscribeResponse);
|
||||
rpc UnsubscribeGames(UnsubscribeGamesRequest) returns (UnsubscribeGamesResponse);
|
||||
rpc EvictGames(EvictGamesRequest) returns (EvictGamesResponse);
|
||||
rpc DrainInstance(DrainInstanceRequest) returns (DrainInstanceResponse);
|
||||
}
|
||||
|
||||
message HeartbeatFrame {
|
||||
string instanceId = 1;
|
||||
string hostname = 2;
|
||||
int32 httpPort = 3;
|
||||
int32 grpcPort = 4;
|
||||
int32 subscriptionCount = 5;
|
||||
int32 localCacheSize = 6;
|
||||
int64 timestampMillis = 7;
|
||||
}
|
||||
|
||||
message CoordinatorCommand {
|
||||
string type = 1;
|
||||
string payload = 2;
|
||||
}
|
||||
|
||||
message BatchResubscribeRequest {
|
||||
repeated string gameIds = 1;
|
||||
}
|
||||
|
||||
message BatchResubscribeResponse {
|
||||
int32 subscribedCount = 1;
|
||||
repeated string failedGameIds = 2;
|
||||
}
|
||||
|
||||
message UnsubscribeGamesRequest {
|
||||
repeated string gameIds = 1;
|
||||
}
|
||||
|
||||
message UnsubscribeGamesResponse {
|
||||
int32 unsubscribedCount = 1;
|
||||
}
|
||||
|
||||
message EvictGamesRequest {
|
||||
repeated string gameIds = 1;
|
||||
}
|
||||
|
||||
message EvictGamesResponse {
|
||||
int32 evictedCount = 1;
|
||||
}
|
||||
|
||||
message DrainInstanceRequest {
|
||||
string instanceId = 1;
|
||||
}
|
||||
|
||||
message DrainInstanceResponse {
|
||||
int32 gamesMigrated = 1;
|
||||
}
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"reflection": [
|
||||
{ "type": "scala.Tuple1[]" },
|
||||
{ "type": "scala.Tuple2[]" },
|
||||
{ "type": "scala.Tuple3[]" },
|
||||
{ "type": "scala.Tuple4[]" },
|
||||
{ "type": "scala.Tuple5[]" },
|
||||
{ "type": "scala.Tuple6[]" },
|
||||
{ "type": "scala.Tuple7[]" },
|
||||
{ "type": "scala.Tuple8[]" },
|
||||
{ "type": "scala.Tuple9[]" },
|
||||
{ "type": "scala.Tuple10[]" },
|
||||
{ "type": "scala.Tuple11[]" },
|
||||
{ "type": "scala.Tuple12[]" },
|
||||
{ "type": "scala.Tuple13[]" },
|
||||
{ "type": "scala.Tuple14[]" },
|
||||
{ "type": "scala.Tuple15[]" },
|
||||
{ "type": "scala.Tuple16[]" },
|
||||
{ "type": "scala.Tuple17[]" },
|
||||
{ "type": "scala.Tuple18[]" },
|
||||
{ "type": "scala.Tuple19[]" },
|
||||
{ "type": "scala.Tuple20[]" },
|
||||
{ "type": "scala.Tuple21[]" },
|
||||
{ "type": "scala.Tuple22[]" },
|
||||
{ "type": "com.fasterxml.jackson.module.scala.introspect.PropertyDescriptor[]" }
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
quarkus:
|
||||
application:
|
||||
name: nowchess-coordinator
|
||||
http:
|
||||
port: 8086
|
||||
redis:
|
||||
hosts: redis://${REDIS_HOST:localhost}:${REDIS_PORT:6379}
|
||||
grpc:
|
||||
server:
|
||||
port: 9086
|
||||
rest-client:
|
||||
connection-timeout: 5000
|
||||
read-timeout: 10000
|
||||
smallrye-openapi:
|
||||
info-title: NowChess Coordinator Service
|
||||
info-version: 1.0.0
|
||||
info-description: Coordination endpoints for instance health, balancing, failover, and scaling
|
||||
path: /openapi
|
||||
swagger-ui:
|
||||
always-include: true
|
||||
path: /swagger-ui
|
||||
|
||||
nowchess:
|
||||
redis:
|
||||
host: ${REDIS_HOST:localhost}
|
||||
port: ${REDIS_PORT:6379}
|
||||
prefix: ${REDIS_PREFIX:nowchess}
|
||||
|
||||
coordinator:
|
||||
max-games-per-core: 500
|
||||
max-deviation-percent: 20
|
||||
rebalance-interval: 30s
|
||||
rebalance-min-interval: 60s
|
||||
heartbeat-ttl: 5s
|
||||
stream-heartbeat-interval: PT0.2S
|
||||
cache-eviction-interval: 10m
|
||||
game-idle-threshold: 45m
|
||||
auto-scale-enabled: false
|
||||
scale-up-threshold: 0.8
|
||||
scale-down-threshold: 0.3
|
||||
scale-min-replicas: 2
|
||||
scale-max-replicas: 10
|
||||
k8s-namespace: default
|
||||
k8s-rollout-name: nowchess-core
|
||||
k8s-rollout-label-selector: "app=nowchess-core"
|
||||
|
||||
---
|
||||
# dev profile
|
||||
"%dev":
|
||||
quarkus:
|
||||
log:
|
||||
level: DEBUG
|
||||
@@ -0,0 +1,7 @@
|
||||
package de.nowchess.coordinator
|
||||
|
||||
import jakarta.ws.rs.core.Application
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
|
||||
@ApplicationScoped
|
||||
class CoordinatorApp extends Application
|
||||
@@ -0,0 +1,14 @@
|
||||
package de.nowchess.coordinator.config
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.enterprise.inject.Produces
|
||||
import io.fabric8.kubernetes.client.KubernetesClientBuilder
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
|
||||
@ApplicationScoped
|
||||
class BeansProducer:
|
||||
|
||||
@Produces
|
||||
@ApplicationScoped
|
||||
def kubernetesClient: KubernetesClient =
|
||||
KubernetesClientBuilder().build()
|
||||
+55
@@ -0,0 +1,55 @@
|
||||
package de.nowchess.coordinator.config
|
||||
|
||||
import io.smallrye.config.ConfigMapping
|
||||
import io.smallrye.config.WithName
|
||||
import java.time.Duration
|
||||
|
||||
@ConfigMapping(prefix = "nowchess.coordinator")
|
||||
trait CoordinatorConfig:
|
||||
@WithName("max-games-per-core")
|
||||
def maxGamesPerCore: Int
|
||||
|
||||
@WithName("max-deviation-percent")
|
||||
def maxDeviationPercent: Int
|
||||
|
||||
@WithName("rebalance-interval")
|
||||
def rebalanceInterval: Duration
|
||||
|
||||
@WithName("rebalance-min-interval")
|
||||
def rebalanceMinInterval: Duration
|
||||
|
||||
@WithName("heartbeat-ttl")
|
||||
def heartbeatTtl: Duration
|
||||
|
||||
@WithName("stream-heartbeat-interval")
|
||||
def streamHeartbeatInterval: Duration
|
||||
|
||||
@WithName("cache-eviction-interval")
|
||||
def cacheEvictionInterval: Duration
|
||||
|
||||
@WithName("game-idle-threshold")
|
||||
def gameIdleThreshold: Duration
|
||||
|
||||
@WithName("auto-scale-enabled")
|
||||
def autoScaleEnabled: Boolean
|
||||
|
||||
@WithName("scale-up-threshold")
|
||||
def scaleUpThreshold: Double
|
||||
|
||||
@WithName("scale-down-threshold")
|
||||
def scaleDownThreshold: Double
|
||||
|
||||
@WithName("scale-min-replicas")
|
||||
def scaleMinReplicas: Int
|
||||
|
||||
@WithName("scale-max-replicas")
|
||||
def scaleMaxReplicas: Int
|
||||
|
||||
@WithName("k8s-namespace")
|
||||
def k8sNamespace: String
|
||||
|
||||
@WithName("k8s-rollout-name")
|
||||
def k8sRolloutName: String
|
||||
|
||||
@WithName("k8s-rollout-label-selector")
|
||||
def k8sRolloutLabelSelector: String
|
||||
@@ -0,0 +1,17 @@
|
||||
package de.nowchess.coordinator.config
|
||||
|
||||
import com.fasterxml.jackson.core.Version
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
||||
import io.quarkus.jackson.ObjectMapperCustomizer
|
||||
import jakarta.inject.Singleton
|
||||
|
||||
@Singleton
|
||||
class JacksonConfig extends ObjectMapperCustomizer:
|
||||
def customize(mapper: ObjectMapper): Unit =
|
||||
mapper.registerModule(new DefaultScalaModule() {
|
||||
override def version(): Version =
|
||||
// scalafix:off DisableSyntax.null
|
||||
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
||||
// scalafix:on DisableSyntax.null
|
||||
})
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package de.nowchess.coordinator.config
|
||||
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import de.nowchess.coordinator.resource.MetricsDto
|
||||
import io.quarkus.runtime.annotations.RegisterForReflection
|
||||
|
||||
@RegisterForReflection(
|
||||
targets = Array(
|
||||
classOf[InstanceMetadata],
|
||||
classOf[MetricsDto],
|
||||
),
|
||||
)
|
||||
class NativeReflectionConfig
|
||||
@@ -0,0 +1,23 @@
|
||||
package de.nowchess.coordinator.dto
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty
|
||||
import java.time.Instant
|
||||
|
||||
case class InstanceMetadata(
|
||||
@JsonProperty("instanceId")
|
||||
instanceId: String,
|
||||
@JsonProperty("hostname")
|
||||
hostname: String,
|
||||
@JsonProperty("httpPort")
|
||||
httpPort: Int,
|
||||
@JsonProperty("grpcPort")
|
||||
grpcPort: Int,
|
||||
@JsonProperty("subscriptionCount")
|
||||
subscriptionCount: Int,
|
||||
@JsonProperty("localCacheSize")
|
||||
localCacheSize: Int,
|
||||
@JsonProperty("lastHeartbeat")
|
||||
lastHeartbeat: String,
|
||||
@JsonProperty("state")
|
||||
state: String = "HEALTHY",
|
||||
)
|
||||
+101
@@ -0,0 +1,101 @@
|
||||
package de.nowchess.coordinator.grpc
|
||||
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.inject.Singleton
|
||||
import io.quarkus.grpc.GrpcService
|
||||
import scala.compiletime.uninitialized
|
||||
import de.nowchess.coordinator.service.{FailoverService, InstanceRegistry}
|
||||
import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
|
||||
import io.grpc.stub.StreamObserver
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
@GrpcService
|
||||
@Singleton
|
||||
class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImplBase:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var failoverService: FailoverService = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val mapper = ObjectMapper()
|
||||
private val log = Logger.getLogger(classOf[CoordinatorGrpcServer])
|
||||
|
||||
override def heartbeatStream(
|
||||
responseObserver: StreamObserver[CoordinatorCommand],
|
||||
): StreamObserver[HeartbeatFrame] =
|
||||
new StreamObserver[HeartbeatFrame]:
|
||||
// scalafix:off DisableSyntax.var
|
||||
private var lastInstanceId = ""
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
override def onNext(frame: HeartbeatFrame): Unit =
|
||||
lastInstanceId = frame.getInstanceId
|
||||
try
|
||||
instanceRegistry.updateInstanceFromRedis(frame.getInstanceId)
|
||||
log.debugf(
|
||||
"Received heartbeat from %s with %d subscriptions",
|
||||
frame.getInstanceId,
|
||||
frame.getSubscriptionCount,
|
||||
)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to process heartbeat from %s", frame.getInstanceId)
|
||||
|
||||
override def onError(t: Throwable): Unit =
|
||||
log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId)
|
||||
if lastInstanceId.nonEmpty then failoverService.onInstanceStreamDropped(lastInstanceId)
|
||||
|
||||
override def onCompleted: Unit =
|
||||
log.infof("Heartbeat stream completed for instance %s", lastInstanceId)
|
||||
|
||||
override def batchResubscribeGames(
|
||||
request: BatchResubscribeRequest,
|
||||
responseObserver: StreamObserver[BatchResubscribeResponse],
|
||||
): Unit =
|
||||
log.infof("Batch resubscribe request for %d games", request.getGameIdsList.size())
|
||||
val response = BatchResubscribeResponse
|
||||
.newBuilder()
|
||||
.setSubscribedCount(request.getGameIdsList.size())
|
||||
.build()
|
||||
responseObserver.onNext(response)
|
||||
responseObserver.onCompleted()
|
||||
|
||||
override def unsubscribeGames(
|
||||
request: UnsubscribeGamesRequest,
|
||||
responseObserver: StreamObserver[UnsubscribeGamesResponse],
|
||||
): Unit =
|
||||
log.infof("Unsubscribe request for %d games", request.getGameIdsList.size())
|
||||
val response = UnsubscribeGamesResponse
|
||||
.newBuilder()
|
||||
.setUnsubscribedCount(request.getGameIdsList.size())
|
||||
.build()
|
||||
responseObserver.onNext(response)
|
||||
responseObserver.onCompleted()
|
||||
|
||||
override def evictGames(
|
||||
request: EvictGamesRequest,
|
||||
responseObserver: StreamObserver[EvictGamesResponse],
|
||||
): Unit =
|
||||
log.infof("Evict request for %d games", request.getGameIdsList.size())
|
||||
val response = EvictGamesResponse
|
||||
.newBuilder()
|
||||
.setEvictedCount(request.getGameIdsList.size())
|
||||
.build()
|
||||
responseObserver.onNext(response)
|
||||
responseObserver.onCompleted()
|
||||
|
||||
override def drainInstance(
|
||||
request: DrainInstanceRequest,
|
||||
responseObserver: StreamObserver[DrainInstanceResponse],
|
||||
): Unit =
|
||||
val instanceId = request.getInstanceId
|
||||
log.infof("Drain request for instance %s", instanceId)
|
||||
val gamesBefore = instanceRegistry.getInstance(instanceId).map(_.subscriptionCount).getOrElse(0)
|
||||
failoverService.onInstanceStreamDropped(instanceId)
|
||||
val response = DrainInstanceResponse.newBuilder().setGamesMigrated(gamesBefore).build()
|
||||
responseObserver.onNext(response)
|
||||
responseObserver.onCompleted()
|
||||
@@ -0,0 +1,63 @@
|
||||
package de.nowchess.coordinator.grpc
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.annotation.PreDestroy
|
||||
import org.jboss.logging.Logger
|
||||
import io.grpc.ManagedChannel
|
||||
import io.grpc.ManagedChannelBuilder
|
||||
import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@ApplicationScoped
|
||||
class CoreGrpcClient:
|
||||
private val log = Logger.getLogger(classOf[CoreGrpcClient])
|
||||
private val channels = ConcurrentHashMap[String, ManagedChannel]()
|
||||
|
||||
private def getChannel(host: String, port: Int): ManagedChannel =
|
||||
channels.computeIfAbsent(s"$host:$port", _ => ManagedChannelBuilder.forAddress(host, port).usePlaintext().build())
|
||||
|
||||
private def evictStaleChannel(host: String, port: Int): Unit =
|
||||
Option(channels.remove(s"$host:$port")).foreach(_.shutdownNow())
|
||||
|
||||
@PreDestroy
|
||||
def shutdown(): Unit =
|
||||
channels.values.asScala.foreach { ch =>
|
||||
ch.shutdown()
|
||||
if !ch.awaitTermination(5, TimeUnit.SECONDS) then ch.shutdownNow()
|
||||
}
|
||||
channels.clear()
|
||||
|
||||
def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
|
||||
try
|
||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
||||
val request = BatchResubscribeRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||
stub.batchResubscribeGames(request).getSubscribedCount
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "batchResubscribeGames RPC failed for %s:%d", host, port)
|
||||
evictStaleChannel(host, port)
|
||||
0
|
||||
|
||||
def unsubscribeGames(host: String, port: Int, gameIds: List[String]): Int =
|
||||
try
|
||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
||||
val request = UnsubscribeGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||
stub.unsubscribeGames(request).getUnsubscribedCount
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "unsubscribeGames RPC failed for %s:%d", host, port)
|
||||
evictStaleChannel(host, port)
|
||||
0
|
||||
|
||||
def evictGames(host: String, port: Int, gameIds: List[String]): Int =
|
||||
try
|
||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
||||
val request = EvictGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||
stub.evictGames(request).getEvictedCount
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "evictGames RPC failed for %s:%d", host, port)
|
||||
evictStaleChannel(host, port)
|
||||
0
|
||||
+101
@@ -0,0 +1,101 @@
|
||||
package de.nowchess.coordinator.resource
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.*
|
||||
import jakarta.ws.rs.core.MediaType
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import de.nowchess.coordinator.service.{AutoScaler, FailoverService, InstanceRegistry, LoadBalancer}
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
@Path("/api/coordinator")
|
||||
@ApplicationScoped
|
||||
class CoordinatorResource:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var loadBalancer: LoadBalancer = uninitialized
|
||||
|
||||
@Inject
|
||||
private var autoScaler: AutoScaler = uninitialized
|
||||
|
||||
@Inject
|
||||
private var failoverService: FailoverService = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[CoordinatorResource])
|
||||
|
||||
@GET
|
||||
@Path("/instances")
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
def listInstances: java.util.List[InstanceMetadata] =
|
||||
instanceRegistry.getAllInstances.asJava
|
||||
|
||||
@GET
|
||||
@Path("/metrics")
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
def getMetrics: MetricsDto =
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
val loads = instances.map(_.subscriptionCount)
|
||||
val totalGames = loads.sum
|
||||
val avgLoad = if instances.nonEmpty then loads.sum.toDouble / instances.size else 0.0
|
||||
val maxLoad = if loads.nonEmpty then loads.max else 0
|
||||
val minLoad = if loads.nonEmpty then loads.min else 0
|
||||
|
||||
MetricsDto(
|
||||
totalInstances = instances.size,
|
||||
healthyInstances = instances.count(_.state == "HEALTHY"),
|
||||
deadInstances = instances.count(_.state == "DEAD"),
|
||||
totalGames = totalGames,
|
||||
avgGamesPerCore = avgLoad,
|
||||
maxGamesPerCore = maxLoad,
|
||||
minGamesPerCore = minLoad,
|
||||
instances = instances,
|
||||
)
|
||||
|
||||
@POST
|
||||
@Path("/rebalance")
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
def triggerRebalance: scala.collection.Map[String, String] =
|
||||
log.info("Manual rebalance triggered")
|
||||
loadBalancer.rebalance
|
||||
Map("status" -> "rebalance_started")
|
||||
|
||||
@POST
|
||||
@Path("/failover/{instanceId}")
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
def triggerFailover(@PathParam("instanceId") instanceId: String): scala.collection.Map[String, String] =
|
||||
log.infof("Manual failover triggered for instance %s", instanceId)
|
||||
failoverService.onInstanceStreamDropped(instanceId)
|
||||
Map("status" -> "failover_started", "instanceId" -> instanceId)
|
||||
|
||||
@POST
|
||||
@Path("/scale-up")
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
def triggerScaleUp: scala.collection.Map[String, String] =
|
||||
log.info("Manual scale up triggered")
|
||||
autoScaler.scaleUp()
|
||||
Map("status" -> "scale_up_started")
|
||||
|
||||
@POST
|
||||
@Path("/scale-down")
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
def triggerScaleDown: scala.collection.Map[String, String] =
|
||||
log.info("Manual scale down triggered")
|
||||
autoScaler.scaleDown()
|
||||
Map("status" -> "scale_down_started")
|
||||
|
||||
case class MetricsDto(
|
||||
totalInstances: Int,
|
||||
healthyInstances: Int,
|
||||
deadInstances: Int,
|
||||
totalGames: Int,
|
||||
avgGamesPerCore: Double,
|
||||
maxGamesPerCore: Int,
|
||||
minGamesPerCore: Int,
|
||||
instances: List[InstanceMetadata],
|
||||
)
|
||||
@@ -0,0 +1,135 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.enterprise.inject.Instance
|
||||
import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import org.jboss.logging.Logger
|
||||
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@ApplicationScoped
|
||||
class AutoScaler:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var kubeClientInstance: Instance[KubernetesClient] = uninitialized
|
||||
|
||||
@Inject
|
||||
private var config: CoordinatorConfig = uninitialized
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||
|
||||
private def kubeClientOpt: Option[KubernetesClient] =
|
||||
if kubeClientInstance.isUnsatisfied then None
|
||||
else Some(kubeClientInstance.get())
|
||||
|
||||
// scalafix:off DisableSyntax.asInstanceOf
|
||||
// scalafix:off DisableSyntax.isInstanceOf
|
||||
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
||||
Option(rollout.get("spec")).collect {
|
||||
case m if m.isInstanceOf[java.util.Map[?, ?]] => m.asInstanceOf[java.util.Map[String, AnyRef]]
|
||||
}
|
||||
// scalafix:on DisableSyntax.asInstanceOf
|
||||
// scalafix:on DisableSyntax.isInstanceOf
|
||||
|
||||
def checkAndScale: Unit =
|
||||
if config.autoScaleEnabled then
|
||||
val now = System.currentTimeMillis()
|
||||
val last = lastScaleTime.get()
|
||||
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||
if instances.nonEmpty then
|
||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||
|
||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
||||
then scaleDown()
|
||||
|
||||
def scaleUp(): Unit =
|
||||
log.info("Scaling up Argo Rollout")
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.warn("Kubernetes client not available, cannot scale")
|
||||
case Some(kube) =>
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.resources(classOf[GenericKubernetesResource])
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case replicas: Integer =>
|
||||
val currentReplicas = replicas.intValue()
|
||||
val maxReplicas = config.scaleMaxReplicas
|
||||
|
||||
if currentReplicas < maxReplicas then
|
||||
spec.put("replicas", String.valueOf(currentReplicas + 1))
|
||||
kube
|
||||
.resources(classOf[GenericKubernetesResource])
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.update()
|
||||
log.infof(
|
||||
"Scaled up %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
currentReplicas,
|
||||
currentReplicas + 1,
|
||||
)
|
||||
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
||||
|
||||
def scaleDown(): Unit =
|
||||
log.info("Scaling down Argo Rollout")
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.warn("Kubernetes client not available, cannot scale")
|
||||
case Some(kube) =>
|
||||
try
|
||||
Option(
|
||||
kube
|
||||
.resources(classOf[GenericKubernetesResource])
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.get(),
|
||||
).foreach { rollout =>
|
||||
rolloutSpec(rollout).foreach { spec =>
|
||||
spec.get("replicas") match
|
||||
case replicas: Integer =>
|
||||
val currentReplicas = replicas.intValue()
|
||||
val minReplicas = config.scaleMinReplicas
|
||||
|
||||
if currentReplicas > minReplicas then
|
||||
spec.put("replicas", String.valueOf(currentReplicas - 1))
|
||||
kube
|
||||
.resources(classOf[GenericKubernetesResource])
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withName(config.k8sRolloutName)
|
||||
.update()
|
||||
log.infof(
|
||||
"Scaled down %s from %d to %d replicas",
|
||||
config.k8sRolloutName,
|
||||
currentReplicas,
|
||||
currentReplicas - 1,
|
||||
)
|
||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||
+93
@@ -0,0 +1,93 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.util.Try
|
||||
import java.time.Instant
|
||||
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
||||
|
||||
@ApplicationScoped
|
||||
class CacheEvictionManager:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
|
||||
@Inject
|
||||
private var config: CoordinatorConfig = uninitialized
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var coreGrpcClient: CoreGrpcClient = uninitialized
|
||||
|
||||
@Inject
|
||||
private var objectMapper: ObjectMapper = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
def evictStaleGames: Unit =
|
||||
log.info("Starting cache eviction scan")
|
||||
|
||||
val pattern = s"$redisPrefix:game:entry:*"
|
||||
val keys = redis.key(classOf[String]).keys(pattern)
|
||||
val now = System.currentTimeMillis()
|
||||
val idleThresholdMs = config.gameIdleThreshold.toMillis
|
||||
|
||||
val evictedCount = keys.asScala.foldLeft(0) { (count, key) =>
|
||||
try
|
||||
Option(redis.value(classOf[String]).get(key)).fold(count) { value =>
|
||||
val gameId = key.stripPrefix(s"$redisPrefix:game:entry:")
|
||||
val lastUpdated = extractLastUpdatedTimestamp(value)
|
||||
|
||||
if lastUpdated > 0 && (now - lastUpdated) > idleThresholdMs then
|
||||
findInstanceWithGame(gameId).fold(count) { instance =>
|
||||
try
|
||||
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
||||
redis.key(classOf[String]).del(key)
|
||||
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
||||
count + 1
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to evict game %s", gameId)
|
||||
count
|
||||
}
|
||||
else count
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Error processing game key %s", key)
|
||||
count
|
||||
}
|
||||
|
||||
log.infof("Cache eviction scan completed, evicted %d games", evictedCount)
|
||||
|
||||
private def extractLastUpdatedTimestamp(json: String): Long =
|
||||
Try {
|
||||
val parsed = objectMapper.readTree(json)
|
||||
Option(parsed.get("lastHeartbeat"))
|
||||
.filter(_.isTextual)
|
||||
.fold(0L)(lh => Instant.parse(lh.asText()).toEpochMilli)
|
||||
}.getOrElse(0L)
|
||||
|
||||
private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] =
|
||||
try
|
||||
instanceRegistry.getAllInstances.find { instance =>
|
||||
val setKey = s"$redisPrefix:instance:${instance.instanceId}:games"
|
||||
redis.set(classOf[String]).sismember(setKey, gameId)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.debugf(ex, "Failed to find instance for game %s", gameId)
|
||||
None
|
||||
+103
@@ -0,0 +1,103 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.compiletime.uninitialized
|
||||
import org.jboss.logging.Logger
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
||||
|
||||
@ApplicationScoped
|
||||
class FailoverService:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var coreGrpcClient: CoreGrpcClient = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[FailoverService])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
def onInstanceStreamDropped(instanceId: String): Unit =
|
||||
log.infof("Instance %s stream dropped, triggering failover", instanceId)
|
||||
|
||||
val startTime = System.currentTimeMillis()
|
||||
instanceRegistry.markInstanceDead(instanceId)
|
||||
|
||||
val gameIds = getOrphanedGames(instanceId)
|
||||
log.infof("Found %d orphaned games for instance %s", gameIds.size, instanceId)
|
||||
|
||||
if gameIds.nonEmpty then
|
||||
val healthyInstances = instanceRegistry.getAllInstances
|
||||
.filter(_.state == "HEALTHY")
|
||||
.sortBy(_.subscriptionCount)
|
||||
|
||||
if healthyInstances.nonEmpty then
|
||||
distributeGames(gameIds, healthyInstances, instanceId)
|
||||
|
||||
val elapsed = System.currentTimeMillis() - startTime
|
||||
log.infof("Failover completed in %dms for instance %s", elapsed, instanceId)
|
||||
else log.warnf("No healthy instances available for failover of %s", instanceId)
|
||||
|
||||
cleanupDeadInstance(instanceId)
|
||||
|
||||
private def getOrphanedGames(instanceId: String): List[String] =
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).smembers(setKey).asScala.toList
|
||||
|
||||
private def distributeGames(
|
||||
gameIds: List[String],
|
||||
healthyInstances: List[InstanceMetadata],
|
||||
deadInstanceId: String,
|
||||
): Unit =
|
||||
if gameIds.nonEmpty && healthyInstances.nonEmpty then
|
||||
val batchSize = math.max(1, gameIds.size / healthyInstances.size)
|
||||
val batches = gameIds.grouped(batchSize).toList
|
||||
|
||||
batches.zipWithIndex.foreach { case (batch, idx) =>
|
||||
if !tryMigrateBatch(batch, idx, healthyInstances, deadInstanceId) then
|
||||
log.errorf(
|
||||
"Failed to migrate batch of %d games from %s to any healthy instance",
|
||||
batch.size,
|
||||
deadInstanceId,
|
||||
)
|
||||
}
|
||||
|
||||
@scala.annotation.tailrec
|
||||
private def tryMigrateBatch(
|
||||
batch: List[String],
|
||||
batchIdx: Int,
|
||||
instances: List[InstanceMetadata],
|
||||
deadId: String,
|
||||
attempt: Int = 0,
|
||||
): Boolean =
|
||||
if attempt >= instances.size then false
|
||||
else
|
||||
val target = instances((batchIdx + attempt) % instances.size)
|
||||
val success =
|
||||
try
|
||||
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
||||
if subscribed > 0 then
|
||||
log.infof("Migrated %d games from %s to %s", subscribed, deadId, target.instanceId)
|
||||
true
|
||||
else false
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to migrate batch to %s, trying next", target.instanceId)
|
||||
false
|
||||
if success then true else tryMigrateBatch(batch, batchIdx, instances, deadId, attempt + 1)
|
||||
|
||||
private def cleanupDeadInstance(instanceId: String): Unit =
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.key(classOf[String]).del(setKey)
|
||||
log.infof("Cleaned up games set for instance %s", instanceId)
|
||||
+123
@@ -0,0 +1,123 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.enterprise.inject.Instance
|
||||
import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.fabric8.kubernetes.api.model.Pod
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import java.time.Instant
|
||||
|
||||
@ApplicationScoped
|
||||
class HealthMonitor:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var kubeClientInstance: Instance[KubernetesClient] = uninitialized
|
||||
|
||||
@Inject
|
||||
private var config: CoordinatorConfig = uninitialized
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[HealthMonitor])
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private def kubeClientOpt: Option[KubernetesClient] =
|
||||
if kubeClientInstance.isUnsatisfied then None
|
||||
else Some(kubeClientInstance.get())
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
def checkInstanceHealth: Unit =
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
instances.foreach { inst =>
|
||||
val isHealthy = checkHealth(inst.instanceId)
|
||||
if !isHealthy && inst.state == "HEALTHY" then
|
||||
log.warnf("Instance %s marked unhealthy", inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
}
|
||||
|
||||
private def checkHealth(instanceId: String): Boolean =
|
||||
val redisHealthy = checkRedisHeartbeat(instanceId)
|
||||
val k8sHealthy = checkK8sPodStatus(instanceId)
|
||||
redisHealthy && k8sHealthy
|
||||
|
||||
private def checkRedisHeartbeat(instanceId: String): Boolean =
|
||||
try
|
||||
val key = s"$redisPrefix:instances:$instanceId"
|
||||
redis.key(classOf[String]).pttl(key) > 0
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.debugf(ex, "Redis heartbeat check failed for %s", instanceId)
|
||||
false
|
||||
|
||||
private def checkK8sPodStatus(instanceId: String): Boolean =
|
||||
kubeClientOpt.fold(true) { kube =>
|
||||
try
|
||||
val pods = kube
|
||||
.pods()
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withLabel(config.k8sRolloutLabelSelector)
|
||||
.list()
|
||||
.getItems
|
||||
.asScala
|
||||
|
||||
pods.exists { pod =>
|
||||
val podName = pod.getMetadata.getName
|
||||
podName.contains(instanceId) && isPodReady(pod)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.debugf(ex, "K8s pod status check failed for %s", instanceId)
|
||||
true
|
||||
}
|
||||
|
||||
def watchK8sPods: Unit =
|
||||
kubeClientOpt match
|
||||
case None =>
|
||||
log.debug("Kubernetes client not available for pod watch")
|
||||
case Some(kube) =>
|
||||
try
|
||||
val pods = kube
|
||||
.pods()
|
||||
.inNamespace(config.k8sNamespace)
|
||||
.withLabel(config.k8sRolloutLabelSelector)
|
||||
.list()
|
||||
.getItems
|
||||
.asScala
|
||||
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
instances.foreach { inst =>
|
||||
val matchingPod = pods.find { pod =>
|
||||
pod.getMetadata.getName.contains(inst.instanceId)
|
||||
}
|
||||
|
||||
matchingPod match
|
||||
case Some(pod) =>
|
||||
val isReady = isPodReady(pod)
|
||||
if !isReady && inst.state == "HEALTHY" then
|
||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
case None =>
|
||||
if inst.state == "HEALTHY" then
|
||||
log.warnf("No pod found for instance %s, marking dead", inst.instanceId)
|
||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to watch k8s pods")
|
||||
|
||||
private def isPodReady(pod: Pod): Boolean =
|
||||
Option(pod.getStatus)
|
||||
.flatMap(s => Option(s.getConditions))
|
||||
.exists(_.asScala.exists(cond => cond.getType == "Ready" && cond.getStatus == "True"))
|
||||
+47
@@ -0,0 +1,47 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.compiletime.uninitialized
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
@ApplicationScoped
|
||||
class InstanceRegistry:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val mapper = ObjectMapper()
|
||||
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
def getInstance(instanceId: String): Option[InstanceMetadata] =
|
||||
Option(instances.get(instanceId))
|
||||
|
||||
def getAllInstances: List[InstanceMetadata] =
|
||||
instances.values.asScala.toList
|
||||
|
||||
def updateInstanceFromRedis(instanceId: String): Unit =
|
||||
val key = s"$redisPrefix:instances:$instanceId"
|
||||
Option(redis.value(classOf[String]).get(key)).foreach { value =>
|
||||
try
|
||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
||||
instances.put(instanceId, metadata)
|
||||
catch case _: Exception => ()
|
||||
}
|
||||
|
||||
def markInstanceDead(instanceId: String): Unit =
|
||||
instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD"))
|
||||
()
|
||||
|
||||
def removeInstance(instanceId: String): Unit =
|
||||
instances.remove(instanceId)
|
||||
()
|
||||
+127
@@ -0,0 +1,127 @@
|
||||
package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.concurrent.duration.*
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import de.nowchess.coordinator.grpc.CoreGrpcClient
|
||||
|
||||
@ApplicationScoped
|
||||
class LoadBalancer:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var config: CoordinatorConfig = uninitialized
|
||||
|
||||
@Inject
|
||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
|
||||
@Inject
|
||||
private var coreGrpcClient: CoreGrpcClient = uninitialized
|
||||
|
||||
private val log = Logger.getLogger(classOf[LoadBalancer])
|
||||
private val lastRebalanceTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
def setRedisPrefix(prefix: String): Unit =
|
||||
redisPrefix = prefix
|
||||
|
||||
def shouldRebalance: Boolean =
|
||||
val now = System.currentTimeMillis()
|
||||
val minInterval = config.rebalanceMinInterval.toMillis
|
||||
if now - lastRebalanceTime.get() < minInterval then false
|
||||
else
|
||||
val instances = instanceRegistry.getAllInstances
|
||||
if instances.isEmpty then false
|
||||
else
|
||||
val loads = instances.map(_.subscriptionCount)
|
||||
val maxLoad = loads.max
|
||||
val minLoad = loads.min
|
||||
val avgLoad = loads.sum.toDouble / loads.size
|
||||
|
||||
val exceededMax = maxLoad > config.maxGamesPerCore
|
||||
val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad
|
||||
val exceededDeviation =
|
||||
maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50
|
||||
|
||||
exceededMax || exceededDeviation
|
||||
|
||||
def rebalance: Unit =
|
||||
log.info("Starting rebalance")
|
||||
val startTime = System.currentTimeMillis()
|
||||
lastRebalanceTime.set(startTime)
|
||||
|
||||
try
|
||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||
|
||||
if instances.size < 2 then log.info("Not enough healthy instances for rebalance")
|
||||
else
|
||||
val loads = instances.map(_.subscriptionCount)
|
||||
val avgLoad = loads.sum.toDouble / loads.size
|
||||
|
||||
val overloaded = instances
|
||||
.filter(_.subscriptionCount > config.maxGamesPerCore)
|
||||
.sortBy[Int](_.subscriptionCount)
|
||||
.reverse
|
||||
val underloaded = instances
|
||||
.filter(_.subscriptionCount < avgLoad * 0.8)
|
||||
.sortBy(_.subscriptionCount)
|
||||
|
||||
if underloaded.isEmpty then log.info("No underloaded instances available for rebalance")
|
||||
else
|
||||
val allBatches = overloaded.flatMap { over =>
|
||||
val excess = math.max(0, over.subscriptionCount - avgLoad.toInt)
|
||||
val gamesToMove = getGamesToMove(over.instanceId, excess)
|
||||
if gamesToMove.isEmpty then List.empty
|
||||
else
|
||||
val batchSize = math.max(1, (gamesToMove.size + underloaded.size - 1) / underloaded.size)
|
||||
gamesToMove.grouped(batchSize).toList.map((over, _))
|
||||
}
|
||||
|
||||
allBatches.zipWithIndex.foreach { case ((over, batch), idx) =>
|
||||
val target = underloaded(idx % underloaded.size)
|
||||
try
|
||||
coreGrpcClient.unsubscribeGames(over.hostname, over.grpcPort, batch)
|
||||
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
||||
if subscribed > 0 then
|
||||
updateRedisGameSets(over.instanceId, target.instanceId, batch)
|
||||
log.infof("Moved %d games from %s to %s", subscribed, over.instanceId, target.instanceId)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to move games from %s to %s", over.instanceId, target.instanceId)
|
||||
}
|
||||
|
||||
val elapsed = System.currentTimeMillis() - startTime
|
||||
log.infof("Rebalance completed in %dms", elapsed)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Rebalance failed")
|
||||
|
||||
private def getGamesToMove(instanceId: String, count: Int): List[String] =
|
||||
try
|
||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||
redis.set(classOf[String]).smembers(setKey).asScala.toList.take(count)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.debugf(ex, "Failed to get games for %s", instanceId)
|
||||
List()
|
||||
|
||||
private def updateRedisGameSets(fromInstanceId: String, toInstanceId: String, gameIds: List[String]): Unit =
|
||||
try
|
||||
val fromKey = s"$redisPrefix:instance:$fromInstanceId:games"
|
||||
val toKey = s"$redisPrefix:instance:$toInstanceId:games"
|
||||
|
||||
gameIds.foreach { gameId =>
|
||||
redis.set(classOf[String]).srem(fromKey, gameId)
|
||||
redis.set(classOf[String]).sadd(toKey, gameId)
|
||||
}
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to update Redis game sets")
|
||||
Reference in New Issue
Block a user