refactor: clean up code formatting and improve readability across multiple files
This commit is contained in:
@@ -46,5 +46,4 @@ case class RotatedTokenDto(token: String)
|
|||||||
|
|
||||||
case class OfficialBotAccountDto(id: String, name: String, rating: Int, createdAt: String)
|
case class OfficialBotAccountDto(id: String, name: String, rating: Int, createdAt: String)
|
||||||
|
|
||||||
|
|
||||||
case class OfficialChallengeResponse(gameId: String, botName: String, difficulty: Int)
|
case class OfficialChallengeResponse(gameId: String, botName: String, difficulty: Int)
|
||||||
|
|||||||
@@ -96,4 +96,3 @@ class OfficialBotAccountRepository:
|
|||||||
def delete(botId: UUID): Unit =
|
def delete(botId: UUID): Unit =
|
||||||
em.find(classOf[OfficialBotAccount], botId) match
|
em.find(classOf[OfficialBotAccount], botId) match
|
||||||
case bot: OfficialBotAccount => em.remove(bot)
|
case bot: OfficialBotAccount => em.remove(bot)
|
||||||
|
|
||||||
|
|||||||
@@ -200,4 +200,3 @@ class AccountResource:
|
|||||||
rating = bot.rating,
|
rating = bot.rating,
|
||||||
createdAt = bot.createdAt.toString,
|
createdAt = bot.createdAt.toString,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
+44
-41
@@ -24,8 +24,8 @@ import java.util.concurrent.ThreadLocalRandom
|
|||||||
class OfficialChallengeResource:
|
class OfficialChallengeResource:
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject var accountService: AccountService = uninitialized
|
@Inject var accountService: AccountService = uninitialized
|
||||||
@Inject var jwt: JsonWebToken = uninitialized
|
@Inject var jwt: JsonWebToken = uninitialized
|
||||||
@Inject var botEventPublisher: EventPublisher = uninitialized
|
@Inject var botEventPublisher: EventPublisher = uninitialized
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@@ -43,46 +43,49 @@ class OfficialChallengeResource:
|
|||||||
@QueryParam("color") color: String,
|
@QueryParam("color") color: String,
|
||||||
): Response =
|
): Response =
|
||||||
if difficulty < 1000 || difficulty > 2800 then
|
if difficulty < 1000 || difficulty > 2800 then
|
||||||
return Response
|
Response
|
||||||
.status(Response.Status.BAD_REQUEST)
|
.status(Response.Status.BAD_REQUEST)
|
||||||
.entity(ErrorDto("difficulty must be between 1000 and 2800"))
|
.entity(ErrorDto("difficulty must be between 1000 and 2800"))
|
||||||
.build()
|
.build()
|
||||||
|
else
|
||||||
|
val normalizedColor = Option(color).map(_.toLowerCase).getOrElse("random")
|
||||||
|
normalizedColor match
|
||||||
|
case "white" | "black" | "random" =>
|
||||||
|
val userId = UUID.fromString(jwt.getSubject)
|
||||||
|
val botOpt = accountService.getOfficialBotAccounts().find(_.name == botName)
|
||||||
|
val userOpt = accountService.findById(userId)
|
||||||
|
|
||||||
val playerColor = Option(color).map(_.toLowerCase).getOrElse("random") match
|
(botOpt, userOpt) match
|
||||||
case "white" | "black" | "random" => Option(color).map(_.toLowerCase).getOrElse("random")
|
case (None, _) =>
|
||||||
case other =>
|
Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Official bot '$botName' not found")).build()
|
||||||
return Response
|
case (_, None) =>
|
||||||
.status(Response.Status.BAD_REQUEST)
|
Response.status(Response.Status.NOT_FOUND).entity(ErrorDto("User not found")).build()
|
||||||
.entity(ErrorDto(s"Invalid color: $other. Must be white, black or random"))
|
case (Some(bot), Some(user)) =>
|
||||||
.build()
|
val userIsWhite = normalizedColor match
|
||||||
|
case "white" => true
|
||||||
val userId = UUID.fromString(jwt.getSubject)
|
case "black" => false
|
||||||
val botOpt = accountService.getOfficialBotAccounts().find(_.name == botName)
|
case _ => ThreadLocalRandom.current().nextBoolean()
|
||||||
val userOpt = accountService.findById(userId)
|
val (white, black, botColor) =
|
||||||
|
if userIsWhite then
|
||||||
(botOpt, userOpt) match
|
(CorePlayerInfo(user.id.toString, user.username), CorePlayerInfo(bot.id.toString, bot.name), "black")
|
||||||
case (None, _) =>
|
else
|
||||||
Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Official bot '$botName' not found")).build()
|
(CorePlayerInfo(bot.id.toString, bot.name), CorePlayerInfo(user.id.toString, user.username), "white")
|
||||||
case (_, None) =>
|
val req = CoreCreateGameRequest(Some(white), Some(black), None, Some("Authenticated"))
|
||||||
Response.status(Response.Status.NOT_FOUND).entity(ErrorDto("User not found")).build()
|
val gameId =
|
||||||
case (Some(bot), Some(user)) =>
|
try Right(coreGameClient.createGame(req).gameId)
|
||||||
val userIsWhite = playerColor match
|
catch case _ => Left("Failed to create game")
|
||||||
case "white" => true
|
gameId match
|
||||||
case "black" => false
|
case Left(err) =>
|
||||||
case _ => ThreadLocalRandom.current().nextBoolean()
|
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ErrorDto(err)).build()
|
||||||
val (white, black, botColor) =
|
case Right(id) =>
|
||||||
if userIsWhite then
|
try botEventPublisher.publishGameStart(bot.name, id, botColor, difficulty, bot.id.toString)
|
||||||
(CorePlayerInfo(user.id.toString, user.username), CorePlayerInfo(bot.id.toString, bot.name), "black")
|
catch case ex: Exception => log.warnf(ex, "Failed to notify bot for game %s", id)
|
||||||
else
|
Response
|
||||||
(CorePlayerInfo(bot.id.toString, bot.name), CorePlayerInfo(user.id.toString, user.username), "white")
|
.status(Response.Status.CREATED)
|
||||||
val req = CoreCreateGameRequest(Some(white), Some(black), None, Some("Authenticated"))
|
.entity(OfficialChallengeResponse(id, botName, difficulty))
|
||||||
val gameId =
|
.build()
|
||||||
try Right(coreGameClient.createGame(req).gameId)
|
case other =>
|
||||||
catch case _ => Left("Failed to create game")
|
Response
|
||||||
gameId match
|
.status(Response.Status.BAD_REQUEST)
|
||||||
case Left(err) =>
|
.entity(ErrorDto(s"Invalid color: $other. Must be white, black or random"))
|
||||||
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ErrorDto(err)).build()
|
.build()
|
||||||
case Right(id) =>
|
|
||||||
try botEventPublisher.publishGameStart(bot.name, id, botColor, difficulty, bot.id.toString)
|
|
||||||
catch case ex: Exception => log.warnf(ex, "Failed to notify bot for game %s", id)
|
|
||||||
Response.status(Response.Status.CREATED).entity(OfficialChallengeResponse(id, botName, difficulty)).build()
|
|
||||||
|
|||||||
@@ -163,4 +163,3 @@ class AccountService:
|
|||||||
user.botAccounts.forEach(_.banned = false)
|
user.botAccounts.forEach(_.banned = false)
|
||||||
userAccountRepository.persist(user)
|
userAccountRepository.persist(user)
|
||||||
Right(user)
|
Right(user)
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,6 @@ import org.eclipse.microprofile.rest.client.inject.RestClient
|
|||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
|
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.time.temporal.ChronoUnit
|
import java.time.temporal.ChronoUnit
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|||||||
@@ -10,12 +10,13 @@ import scala.compiletime.uninitialized
|
|||||||
class EventPublisher:
|
class EventPublisher:
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject var redis: RedisDataSource = uninitialized
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
@Inject var redisConfig: RedisConfig = uninitialized
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
def publishGameStart(botId: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit =
|
def publishGameStart(botId: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit =
|
||||||
val event = s"""{"type":"gameStart","gameId":"$gameId","playingAs":"$playingAs","difficulty":$difficulty,"botAccountId":"$botAccountId"}"""
|
val event =
|
||||||
|
s"""{"type":"gameStart","gameId":"$gameId","playingAs":"$playingAs","difficulty":$difficulty,"botAccountId":"$botAccountId"}"""
|
||||||
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event)
|
redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event)
|
||||||
()
|
()
|
||||||
|
|
||||||
|
|||||||
+5
-5
@@ -14,16 +14,16 @@ import java.util.function.Consumer
|
|||||||
class BotRegistry:
|
class BotRegistry:
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject var redis: RedisDataSource = uninitialized
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
@Inject var redisConfig: RedisConfig = uninitialized
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val connections = ConcurrentHashMap[String, (MultiEmitter[?], PubSubCommands.RedisSubscriber)]()
|
private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]()
|
||||||
|
|
||||||
def register(botId: String, emitter: MultiEmitter[? >: String]): Unit =
|
def register(botId: String, emitter: MultiEmitter[? >: String]): Unit =
|
||||||
val channel = s"${redisConfig.prefix}:bot:$botId:events"
|
val channel = s"${redisConfig.prefix}:bot:$botId:events"
|
||||||
val handler: Consumer[String] = msg => emitter.asInstanceOf[MultiEmitter[String]].emit(msg)
|
val handler: Consumer[String] = msg => emitter.emit(msg)
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler)
|
||||||
connections.put(botId, (emitter, subscriber))
|
connections.put(botId, (emitter, subscriber))
|
||||||
()
|
()
|
||||||
|
|
||||||
|
|||||||
+2
-2
@@ -44,9 +44,9 @@ class BotEventResource:
|
|||||||
@Produces(Array(MediaType.SERVER_SENT_EVENTS))
|
@Produces(Array(MediaType.SERVER_SENT_EVENTS))
|
||||||
def streamGame(@PathParam("gameId") gameId: String): Multi[String] =
|
def streamGame(@PathParam("gameId") gameId: String): Multi[String] =
|
||||||
Multi.createFrom().emitter[String] { emitter =>
|
Multi.createFrom().emitter[String] { emitter =>
|
||||||
val topicName = s"${redisConfig.prefix}:game:$gameId:s2c"
|
val topicName = s"${redisConfig.prefix}:game:$gameId:s2c"
|
||||||
val handler: Consumer[String] = msg => emitter.emit(msg)
|
val handler: Consumer[String] = msg => emitter.emit(msg)
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(topicName, handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(topicName, handler)
|
||||||
emitter.onTermination(() => subscriber.unsubscribe(topicName))
|
emitter.onTermination(() => subscriber.unsubscribe(topicName))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer:
|
|||||||
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
||||||
// scalafix:on DisableSyntax.null
|
// scalafix:on DisableSyntax.null
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
-1
@@ -11,4 +11,3 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
class NativeReflectionConfig
|
class NativeReflectionConfig
|
||||||
|
|
||||||
|
|||||||
+2
@@ -28,7 +28,9 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
|||||||
responseObserver: StreamObserver[CoordinatorCommand],
|
responseObserver: StreamObserver[CoordinatorCommand],
|
||||||
): StreamObserver[HeartbeatFrame] =
|
): StreamObserver[HeartbeatFrame] =
|
||||||
new StreamObserver[HeartbeatFrame]:
|
new StreamObserver[HeartbeatFrame]:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
private var lastInstanceId = ""
|
private var lastInstanceId = ""
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
override def onNext(frame: HeartbeatFrame): Unit =
|
override def onNext(frame: HeartbeatFrame): Unit =
|
||||||
lastInstanceId = frame.getInstanceId
|
lastInstanceId = frame.getInstanceId
|
||||||
|
|||||||
+1
-3
@@ -16,9 +16,7 @@ class CoreGrpcClient:
|
|||||||
private val channels = ConcurrentHashMap[String, ManagedChannel]()
|
private val channels = ConcurrentHashMap[String, ManagedChannel]()
|
||||||
|
|
||||||
private def getChannel(host: String, port: Int): ManagedChannel =
|
private def getChannel(host: String, port: Int): ManagedChannel =
|
||||||
channels.computeIfAbsent(s"$host:$port", _ =>
|
channels.computeIfAbsent(s"$host:$port", _ => ManagedChannelBuilder.forAddress(host, port).usePlaintext().build())
|
||||||
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(),
|
|
||||||
)
|
|
||||||
|
|
||||||
private def evictStaleChannel(host: String, port: Int): Unit =
|
private def evictStaleChannel(host: String, port: Int): Unit =
|
||||||
Option(channels.remove(s"$host:$port")).foreach(_.shutdownNow())
|
Option(channels.remove(s"$host:$port")).foreach(_.shutdownNow())
|
||||||
|
|||||||
+1
-1
@@ -33,7 +33,7 @@ class CoordinatorResource:
|
|||||||
@Path("/instances")
|
@Path("/instances")
|
||||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||||
def listInstances: java.util.List[InstanceMetadata] =
|
def listInstances: java.util.List[InstanceMetadata] =
|
||||||
instanceRegistry.getAllInstances.asJava.asInstanceOf[java.util.List[InstanceMetadata]]
|
instanceRegistry.getAllInstances.asJava
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path("/metrics")
|
@Path("/metrics")
|
||||||
|
|||||||
+101
-74
@@ -1,6 +1,7 @@
|
|||||||
package de.nowchess.coordinator.service
|
package de.nowchess.coordinator.service
|
||||||
|
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
|
import jakarta.enterprise.inject.Instance
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
import io.fabric8.kubernetes.api.model.GenericKubernetesResource
|
||||||
@@ -11,98 +12,124 @@ import scala.compiletime.uninitialized
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class AutoScaler:
|
class AutoScaler:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var kubeClient: KubernetesClient = null
|
private var kubeClientInstance: Instance[KubernetesClient] = uninitialized
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private var config: CoordinatorConfig = uninitialized
|
private var config: CoordinatorConfig = uninitialized
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private var instanceRegistry: InstanceRegistry = uninitialized
|
private var instanceRegistry: InstanceRegistry = uninitialized
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val log = Logger.getLogger(classOf[AutoScaler])
|
private val log = Logger.getLogger(classOf[AutoScaler])
|
||||||
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||||
|
|
||||||
|
private def kubeClientOpt: Option[KubernetesClient] =
|
||||||
|
if kubeClientInstance.isUnsatisfied then None
|
||||||
|
else Some(kubeClientInstance.get())
|
||||||
|
|
||||||
|
// scalafix:off DisableSyntax.asInstanceOf
|
||||||
|
// scalafix:off DisableSyntax.isInstanceOf
|
||||||
|
private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] =
|
||||||
|
Option(rollout.get("spec")).collect {
|
||||||
|
case m if m.isInstanceOf[java.util.Map[?, ?]] => m.asInstanceOf[java.util.Map[String, AnyRef]]
|
||||||
|
}
|
||||||
|
// scalafix:on DisableSyntax.asInstanceOf
|
||||||
|
// scalafix:on DisableSyntax.isInstanceOf
|
||||||
|
|
||||||
def checkAndScale: Unit =
|
def checkAndScale: Unit =
|
||||||
if !config.autoScaleEnabled then return
|
if config.autoScaleEnabled then
|
||||||
|
val now = System.currentTimeMillis()
|
||||||
|
val last = lastScaleTime.get()
|
||||||
|
if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then
|
||||||
|
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||||
|
if instances.nonEmpty then
|
||||||
|
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
||||||
|
|
||||||
val now = System.currentTimeMillis()
|
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
||||||
val last = lastScaleTime.get()
|
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
||||||
if now - last < 120000 then return
|
then scaleDown()
|
||||||
if !lastScaleTime.compareAndSet(last, now) then return
|
|
||||||
|
|
||||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
|
||||||
if instances.isEmpty then return
|
|
||||||
|
|
||||||
val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size
|
|
||||||
|
|
||||||
if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp()
|
|
||||||
else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas
|
|
||||||
then scaleDown()
|
|
||||||
|
|
||||||
def scaleUp(): Unit =
|
def scaleUp(): Unit =
|
||||||
log.info("Scaling up Argo Rollout")
|
log.info("Scaling up Argo Rollout")
|
||||||
if kubeClient == null then
|
kubeClientOpt match
|
||||||
log.warn("Kubernetes client not available, cannot scale")
|
case None =>
|
||||||
return
|
log.warn("Kubernetes client not available, cannot scale")
|
||||||
|
case Some(kube) =>
|
||||||
|
try
|
||||||
|
Option(
|
||||||
|
kube
|
||||||
|
.resources(classOf[GenericKubernetesResource])
|
||||||
|
.inNamespace(config.k8sNamespace)
|
||||||
|
.withName(config.k8sRolloutName)
|
||||||
|
.get(),
|
||||||
|
).foreach { rollout =>
|
||||||
|
rolloutSpec(rollout).foreach { spec =>
|
||||||
|
spec.get("replicas") match
|
||||||
|
case replicas: Integer =>
|
||||||
|
val currentReplicas = replicas.intValue()
|
||||||
|
val maxReplicas = config.scaleMaxReplicas
|
||||||
|
|
||||||
try
|
if currentReplicas < maxReplicas then
|
||||||
val rollout = kubeClient
|
spec.put("replicas", String.valueOf(currentReplicas + 1))
|
||||||
.resources(classOf[GenericKubernetesResource])
|
kube
|
||||||
.inNamespace(config.k8sNamespace)
|
.resources(classOf[GenericKubernetesResource])
|
||||||
.withName(config.k8sRolloutName)
|
.inNamespace(config.k8sNamespace)
|
||||||
.get()
|
.withName(config.k8sRolloutName)
|
||||||
|
.update()
|
||||||
if rollout != null then
|
log.infof(
|
||||||
val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]]
|
"Scaled up %s from %d to %d replicas",
|
||||||
val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue()
|
config.k8sRolloutName,
|
||||||
val maxReplicas = config.scaleMaxReplicas
|
currentReplicas,
|
||||||
|
currentReplicas + 1,
|
||||||
if currentReplicas < maxReplicas then
|
)
|
||||||
spec.put("replicas", currentReplicas + 1)
|
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
||||||
kubeClient
|
case _ => ()
|
||||||
.resources(classOf[GenericKubernetesResource])
|
}
|
||||||
.inNamespace(config.k8sNamespace)
|
}
|
||||||
.withName(config.k8sRolloutName)
|
catch
|
||||||
.createOrReplace(rollout)
|
case ex: Exception =>
|
||||||
log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, currentReplicas, currentReplicas + 1)
|
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
||||||
else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName)
|
|
||||||
catch
|
|
||||||
case ex: Exception =>
|
|
||||||
log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName)
|
|
||||||
|
|
||||||
def scaleDown(): Unit =
|
def scaleDown(): Unit =
|
||||||
log.info("Scaling down Argo Rollout")
|
log.info("Scaling down Argo Rollout")
|
||||||
if kubeClient == null then
|
kubeClientOpt match
|
||||||
log.warn("Kubernetes client not available, cannot scale")
|
case None =>
|
||||||
return
|
log.warn("Kubernetes client not available, cannot scale")
|
||||||
|
case Some(kube) =>
|
||||||
|
try
|
||||||
|
Option(
|
||||||
|
kube
|
||||||
|
.resources(classOf[GenericKubernetesResource])
|
||||||
|
.inNamespace(config.k8sNamespace)
|
||||||
|
.withName(config.k8sRolloutName)
|
||||||
|
.get(),
|
||||||
|
).foreach { rollout =>
|
||||||
|
rolloutSpec(rollout).foreach { spec =>
|
||||||
|
spec.get("replicas") match
|
||||||
|
case replicas: Integer =>
|
||||||
|
val currentReplicas = replicas.intValue()
|
||||||
|
val minReplicas = config.scaleMinReplicas
|
||||||
|
|
||||||
try
|
if currentReplicas > minReplicas then
|
||||||
val rollout = kubeClient
|
spec.put("replicas", String.valueOf(currentReplicas - 1))
|
||||||
.resources(classOf[GenericKubernetesResource])
|
kube
|
||||||
.inNamespace(config.k8sNamespace)
|
.resources(classOf[GenericKubernetesResource])
|
||||||
.withName(config.k8sRolloutName)
|
.inNamespace(config.k8sNamespace)
|
||||||
.get()
|
.withName(config.k8sRolloutName)
|
||||||
|
.update()
|
||||||
if rollout != null then
|
log.infof(
|
||||||
val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]]
|
"Scaled down %s from %d to %d replicas",
|
||||||
val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue()
|
config.k8sRolloutName,
|
||||||
val minReplicas = config.scaleMinReplicas
|
currentReplicas,
|
||||||
|
currentReplicas - 1,
|
||||||
if currentReplicas > minReplicas then
|
)
|
||||||
spec.put("replicas", currentReplicas - 1)
|
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
||||||
kubeClient
|
case _ => ()
|
||||||
.resources(classOf[GenericKubernetesResource])
|
}
|
||||||
.inNamespace(config.k8sNamespace)
|
}
|
||||||
.withName(config.k8sRolloutName)
|
catch
|
||||||
.createOrReplace(rollout)
|
case ex: Exception =>
|
||||||
log.infof(
|
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
||||||
"Scaled down %s from %d to %d replicas",
|
|
||||||
config.k8sRolloutName,
|
|
||||||
currentReplicas,
|
|
||||||
currentReplicas - 1,
|
|
||||||
)
|
|
||||||
else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName)
|
|
||||||
catch
|
|
||||||
case ex: Exception =>
|
|
||||||
log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName)
|
|
||||||
|
|||||||
+16
-13
@@ -14,6 +14,7 @@ import de.nowchess.coordinator.grpc.CoreGrpcClient
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class CacheEvictionManager:
|
class CacheEvictionManager:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var redis: RedisDataSource = uninitialized
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
|
||||||
@@ -31,6 +32,7 @@ class CacheEvictionManager:
|
|||||||
|
|
||||||
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
private val log = Logger.getLogger(classOf[CacheEvictionManager])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
@@ -38,44 +40,45 @@ class CacheEvictionManager:
|
|||||||
def evictStaleGames: Unit =
|
def evictStaleGames: Unit =
|
||||||
log.info("Starting cache eviction scan")
|
log.info("Starting cache eviction scan")
|
||||||
|
|
||||||
val pattern = s"$redisPrefix:game:entry:*"
|
val pattern = s"$redisPrefix:game:entry:*"
|
||||||
val keys = redis.key(classOf[String]).keys(pattern)
|
val keys = redis.key(classOf[String]).keys(pattern)
|
||||||
|
|
||||||
val now = System.currentTimeMillis()
|
val now = System.currentTimeMillis()
|
||||||
val idleThresholdMs = config.gameIdleThreshold.toMillis
|
val idleThresholdMs = config.gameIdleThreshold.toMillis
|
||||||
|
|
||||||
var evictedCount = 0
|
val evictedCount = keys.asScala.foldLeft(0) { (count, key) =>
|
||||||
keys.asScala.foreach { key =>
|
|
||||||
try
|
try
|
||||||
val value = redis.value(classOf[String]).get(key)
|
Option(redis.value(classOf[String]).get(key)).fold(count) { value =>
|
||||||
if value != null then
|
|
||||||
val gameId = key.stripPrefix(s"$redisPrefix:game:entry:")
|
val gameId = key.stripPrefix(s"$redisPrefix:game:entry:")
|
||||||
val lastUpdated = extractLastUpdatedTimestamp(value)
|
val lastUpdated = extractLastUpdatedTimestamp(value)
|
||||||
|
|
||||||
if lastUpdated > 0 && (now - lastUpdated) > idleThresholdMs then
|
if lastUpdated > 0 && (now - lastUpdated) > idleThresholdMs then
|
||||||
findInstanceWithGame(gameId).foreach { instance =>
|
findInstanceWithGame(gameId).fold(count) { instance =>
|
||||||
try
|
try
|
||||||
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId))
|
||||||
redis.key(classOf[String]).del(key)
|
redis.key(classOf[String]).del(key)
|
||||||
evictedCount += 1
|
|
||||||
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
log.infof("Evicted idle game %s from %s", gameId, instance.instanceId)
|
||||||
|
count + 1
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to evict game %s", gameId)
|
log.warnf(ex, "Failed to evict game %s", gameId)
|
||||||
|
count
|
||||||
}
|
}
|
||||||
|
else count
|
||||||
|
}
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Error processing game key %s", key)
|
log.warnf(ex, "Error processing game key %s", key)
|
||||||
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
log.infof("Cache eviction scan completed, evicted %d games", evictedCount)
|
log.infof("Cache eviction scan completed, evicted %d games", evictedCount)
|
||||||
|
|
||||||
private def extractLastUpdatedTimestamp(json: String): Long =
|
private def extractLastUpdatedTimestamp(json: String): Long =
|
||||||
Try {
|
Try {
|
||||||
val parsed = objectMapper.readTree(json)
|
val parsed = objectMapper.readTree(json)
|
||||||
val lastHeartbeat = parsed.get("lastHeartbeat")
|
Option(parsed.get("lastHeartbeat"))
|
||||||
if lastHeartbeat != null && lastHeartbeat.isTextual then Instant.parse(lastHeartbeat.asText()).toEpochMilli
|
.filter(_.isTextual)
|
||||||
else 0L
|
.fold(0L)(lh => Instant.parse(lh.asText()).toEpochMilli)
|
||||||
}.getOrElse(0L)
|
}.getOrElse(0L)
|
||||||
|
|
||||||
private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] =
|
private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] =
|
||||||
|
|||||||
+30
-18
@@ -11,6 +11,7 @@ import de.nowchess.coordinator.grpc.CoreGrpcClient
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class FailoverService:
|
class FailoverService:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var redis: RedisDataSource = uninitialized
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
|
||||||
@@ -22,6 +23,7 @@ class FailoverService:
|
|||||||
|
|
||||||
private val log = Logger.getLogger(classOf[FailoverService])
|
private val log = Logger.getLogger(classOf[FailoverService])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
@@ -58,32 +60,42 @@ class FailoverService:
|
|||||||
healthyInstances: List[InstanceMetadata],
|
healthyInstances: List[InstanceMetadata],
|
||||||
deadInstanceId: String,
|
deadInstanceId: String,
|
||||||
): Unit =
|
): Unit =
|
||||||
if gameIds.isEmpty || healthyInstances.isEmpty then return
|
if gameIds.nonEmpty && healthyInstances.nonEmpty then
|
||||||
|
val batchSize = math.max(1, gameIds.size / healthyInstances.size)
|
||||||
|
val batches = gameIds.grouped(batchSize).toList
|
||||||
|
|
||||||
val batchSize = math.max(1, gameIds.size / healthyInstances.size)
|
batches.zipWithIndex.foreach { case (batch, idx) =>
|
||||||
val batches = gameIds.grouped(batchSize).toList
|
if !tryMigrateBatch(batch, idx, healthyInstances, deadInstanceId) then
|
||||||
|
log.errorf(
|
||||||
|
"Failed to migrate batch of %d games from %s to any healthy instance",
|
||||||
|
batch.size,
|
||||||
|
deadInstanceId,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
batches.zipWithIndex.foreach { case (batch, idx) =>
|
@scala.annotation.tailrec
|
||||||
var migrated = false
|
private def tryMigrateBatch(
|
||||||
var attempt = 0
|
batch: List[String],
|
||||||
while !migrated && attempt < healthyInstances.size do
|
batchIdx: Int,
|
||||||
val target = healthyInstances((idx + attempt) % healthyInstances.size)
|
instances: List[InstanceMetadata],
|
||||||
attempt += 1
|
deadId: String,
|
||||||
|
attempt: Int = 0,
|
||||||
|
): Boolean =
|
||||||
|
if attempt >= instances.size then false
|
||||||
|
else
|
||||||
|
val target = instances((batchIdx + attempt) % instances.size)
|
||||||
|
val success =
|
||||||
try
|
try
|
||||||
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
||||||
if subscribed > 0 then
|
if subscribed > 0 then
|
||||||
log.infof("Migrated %d games from %s to %s", subscribed, deadInstanceId, target.instanceId)
|
log.infof("Migrated %d games from %s to %s", subscribed, deadId, target.instanceId)
|
||||||
migrated = true
|
true
|
||||||
|
else false
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to migrate batch to %s, trying next", target.instanceId)
|
log.warnf(ex, "Failed to migrate batch to %s, trying next", target.instanceId)
|
||||||
if !migrated then
|
false
|
||||||
log.errorf(
|
if success then true else tryMigrateBatch(batch, batchIdx, instances, deadId, attempt + 1)
|
||||||
"Failed to migrate batch of %d games from %s to any healthy instance",
|
|
||||||
batch.size,
|
|
||||||
deadInstanceId,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def cleanupDeadInstance(instanceId: String): Unit =
|
private def cleanupDeadInstance(instanceId: String): Unit =
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
|
|||||||
+45
-44
@@ -1,6 +1,7 @@
|
|||||||
package de.nowchess.coordinator.service
|
package de.nowchess.coordinator.service
|
||||||
|
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
|
import jakarta.enterprise.inject.Instance
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import de.nowchess.coordinator.config.CoordinatorConfig
|
import de.nowchess.coordinator.config.CoordinatorConfig
|
||||||
import io.fabric8.kubernetes.client.KubernetesClient
|
import io.fabric8.kubernetes.client.KubernetesClient
|
||||||
@@ -13,8 +14,9 @@ import java.time.Instant
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class HealthMonitor:
|
class HealthMonitor:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var kubeClient: KubernetesClient = null
|
private var kubeClientInstance: Instance[KubernetesClient] = uninitialized
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private var config: CoordinatorConfig = uninitialized
|
private var config: CoordinatorConfig = uninitialized
|
||||||
@@ -27,6 +29,11 @@ class HealthMonitor:
|
|||||||
|
|
||||||
private val log = Logger.getLogger(classOf[HealthMonitor])
|
private val log = Logger.getLogger(classOf[HealthMonitor])
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private def kubeClientOpt: Option[KubernetesClient] =
|
||||||
|
if kubeClientInstance.isUnsatisfied then None
|
||||||
|
else Some(kubeClientInstance.get())
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
@@ -55,10 +62,9 @@ class HealthMonitor:
|
|||||||
false
|
false
|
||||||
|
|
||||||
private def checkK8sPodStatus(instanceId: String): Boolean =
|
private def checkK8sPodStatus(instanceId: String): Boolean =
|
||||||
if kubeClient == null then true
|
kubeClientOpt.fold(true) { kube =>
|
||||||
else
|
|
||||||
try
|
try
|
||||||
val pods = kubeClient
|
val pods = kube
|
||||||
.pods()
|
.pods()
|
||||||
.inNamespace(config.k8sNamespace)
|
.inNamespace(config.k8sNamespace)
|
||||||
.withLabel(config.k8sRolloutLabelSelector)
|
.withLabel(config.k8sRolloutLabelSelector)
|
||||||
@@ -74,49 +80,44 @@ class HealthMonitor:
|
|||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.debugf(ex, "K8s pod status check failed for %s", instanceId)
|
log.debugf(ex, "K8s pod status check failed for %s", instanceId)
|
||||||
true
|
true
|
||||||
|
}
|
||||||
|
|
||||||
def watchK8sPods: Unit =
|
def watchK8sPods: Unit =
|
||||||
if kubeClient == null then
|
kubeClientOpt match
|
||||||
log.debug("Kubernetes client not available for pod watch")
|
case None =>
|
||||||
return
|
log.debug("Kubernetes client not available for pod watch")
|
||||||
|
case Some(kube) =>
|
||||||
|
try
|
||||||
|
val pods = kube
|
||||||
|
.pods()
|
||||||
|
.inNamespace(config.k8sNamespace)
|
||||||
|
.withLabel(config.k8sRolloutLabelSelector)
|
||||||
|
.list()
|
||||||
|
.getItems
|
||||||
|
.asScala
|
||||||
|
|
||||||
try
|
val instances = instanceRegistry.getAllInstances
|
||||||
val pods = kubeClient
|
instances.foreach { inst =>
|
||||||
.pods()
|
val matchingPod = pods.find { pod =>
|
||||||
.inNamespace(config.k8sNamespace)
|
pod.getMetadata.getName.contains(inst.instanceId)
|
||||||
.withLabel(config.k8sRolloutLabelSelector)
|
}
|
||||||
.list()
|
|
||||||
.getItems
|
|
||||||
.asScala
|
|
||||||
|
|
||||||
val instances = instanceRegistry.getAllInstances
|
matchingPod match
|
||||||
instances.foreach { inst =>
|
case Some(pod) =>
|
||||||
val matchingPod = pods.find { pod =>
|
val isReady = isPodReady(pod)
|
||||||
pod.getMetadata.getName.contains(inst.instanceId)
|
if !isReady && inst.state == "HEALTHY" then
|
||||||
}
|
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
||||||
|
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||||
matchingPod match
|
case None =>
|
||||||
case Some(pod) =>
|
if inst.state == "HEALTHY" then
|
||||||
val isReady = isPodReady(pod)
|
log.warnf("No pod found for instance %s, marking dead", inst.instanceId)
|
||||||
if !isReady && inst.state == "HEALTHY" then
|
instanceRegistry.markInstanceDead(inst.instanceId)
|
||||||
log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId)
|
}
|
||||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
catch
|
||||||
case None =>
|
case ex: Exception =>
|
||||||
if inst.state == "HEALTHY" then
|
log.warnf(ex, "Failed to watch k8s pods")
|
||||||
log.warnf("No pod found for instance %s, marking dead", inst.instanceId)
|
|
||||||
instanceRegistry.markInstanceDead(inst.instanceId)
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
case ex: Exception =>
|
|
||||||
log.warnf(ex, "Failed to watch k8s pods")
|
|
||||||
|
|
||||||
private def isPodReady(pod: Pod): Boolean =
|
private def isPodReady(pod: Pod): Boolean =
|
||||||
val status = pod.getStatus
|
Option(pod.getStatus)
|
||||||
if status == null then false
|
.flatMap(s => Option(s.getConditions))
|
||||||
else
|
.exists(_.asScala.exists(cond => cond.getType == "Ready" && cond.getStatus == "True"))
|
||||||
val conditions = status.getConditions
|
|
||||||
if conditions == null then false
|
|
||||||
else
|
|
||||||
conditions.asScala.exists { cond =>
|
|
||||||
cond.getType == "Ready" && cond.getStatus == "True"
|
|
||||||
}
|
|
||||||
|
|||||||
+8
-6
@@ -11,12 +11,14 @@ import java.util.concurrent.ConcurrentHashMap
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class InstanceRegistry:
|
class InstanceRegistry:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var redis: RedisDataSource = uninitialized
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
private var redisPrefix = "nowchess"
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
private val mapper = ObjectMapper()
|
private val mapper = ObjectMapper()
|
||||||
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
|
||||||
private var redisPrefix = "nowchess"
|
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
@@ -28,13 +30,13 @@ class InstanceRegistry:
|
|||||||
instances.values.asScala.toList
|
instances.values.asScala.toList
|
||||||
|
|
||||||
def updateInstanceFromRedis(instanceId: String): Unit =
|
def updateInstanceFromRedis(instanceId: String): Unit =
|
||||||
val key = s"$redisPrefix:instances:$instanceId"
|
val key = s"$redisPrefix:instances:$instanceId"
|
||||||
val value = redis.value(classOf[String]).get(key)
|
Option(redis.value(classOf[String]).get(key)).foreach { value =>
|
||||||
if value != null then
|
|
||||||
try
|
try
|
||||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
||||||
instances.put(instanceId, metadata)
|
instances.put(instanceId, metadata)
|
||||||
catch case _: Exception => ()
|
catch case _: Exception => ()
|
||||||
|
}
|
||||||
|
|
||||||
def markInstanceDead(instanceId: String): Unit =
|
def markInstanceDead(instanceId: String): Unit =
|
||||||
instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD"))
|
instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD"))
|
||||||
|
|||||||
+41
-42
@@ -12,6 +12,7 @@ import de.nowchess.coordinator.grpc.CoreGrpcClient
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class LoadBalancer:
|
class LoadBalancer:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var config: CoordinatorConfig = uninitialized
|
private var config: CoordinatorConfig = uninitialized
|
||||||
|
|
||||||
@@ -27,6 +28,7 @@ class LoadBalancer:
|
|||||||
private val log = Logger.getLogger(classOf[LoadBalancer])
|
private val log = Logger.getLogger(classOf[LoadBalancer])
|
||||||
private val lastRebalanceTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
private val lastRebalanceTime = new java.util.concurrent.atomic.AtomicLong(0L)
|
||||||
private var redisPrefix = "nowchess"
|
private var redisPrefix = "nowchess"
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
@@ -34,22 +36,22 @@ class LoadBalancer:
|
|||||||
def shouldRebalance: Boolean =
|
def shouldRebalance: Boolean =
|
||||||
val now = System.currentTimeMillis()
|
val now = System.currentTimeMillis()
|
||||||
val minInterval = config.rebalanceMinInterval.toMillis
|
val minInterval = config.rebalanceMinInterval.toMillis
|
||||||
if now - lastRebalanceTime.get() < minInterval then return false
|
if now - lastRebalanceTime.get() < minInterval then false
|
||||||
|
else
|
||||||
|
val instances = instanceRegistry.getAllInstances
|
||||||
|
if instances.isEmpty then false
|
||||||
|
else
|
||||||
|
val loads = instances.map(_.subscriptionCount)
|
||||||
|
val maxLoad = loads.max
|
||||||
|
val minLoad = loads.min
|
||||||
|
val avgLoad = loads.sum.toDouble / loads.size
|
||||||
|
|
||||||
val instances = instanceRegistry.getAllInstances
|
val exceededMax = maxLoad > config.maxGamesPerCore
|
||||||
if instances.isEmpty then return false
|
val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad
|
||||||
|
val exceededDeviation =
|
||||||
|
maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50
|
||||||
|
|
||||||
val loads = instances.map(_.subscriptionCount)
|
exceededMax || exceededDeviation
|
||||||
val maxLoad = loads.max
|
|
||||||
val minLoad = loads.min
|
|
||||||
val avgLoad = loads.sum.toDouble / loads.size
|
|
||||||
|
|
||||||
val exceededMax = maxLoad > config.maxGamesPerCore
|
|
||||||
val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad
|
|
||||||
val exceededDeviation =
|
|
||||||
maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50
|
|
||||||
|
|
||||||
exceededMax || exceededDeviation
|
|
||||||
|
|
||||||
def rebalance: Unit =
|
def rebalance: Unit =
|
||||||
log.info("Starting rebalance")
|
log.info("Starting rebalance")
|
||||||
@@ -59,34 +61,32 @@ class LoadBalancer:
|
|||||||
try
|
try
|
||||||
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY")
|
||||||
|
|
||||||
if instances.size < 2 then
|
if instances.size < 2 then log.info("Not enough healthy instances for rebalance")
|
||||||
log.info("Not enough healthy instances for rebalance")
|
else
|
||||||
return
|
val loads = instances.map(_.subscriptionCount)
|
||||||
|
val avgLoad = loads.sum.toDouble / loads.size
|
||||||
|
|
||||||
val loads = instances.map(_.subscriptionCount)
|
val overloaded = instances
|
||||||
val avgLoad = loads.sum.toDouble / loads.size
|
.filter(_.subscriptionCount > config.maxGamesPerCore)
|
||||||
|
.sortBy[Int](_.subscriptionCount)
|
||||||
|
.reverse
|
||||||
|
val underloaded = instances
|
||||||
|
.filter(_.subscriptionCount < avgLoad * 0.8)
|
||||||
|
.sortBy(_.subscriptionCount)
|
||||||
|
|
||||||
val overloaded = instances
|
if underloaded.isEmpty then log.info("No underloaded instances available for rebalance")
|
||||||
.filter(_.subscriptionCount > config.maxGamesPerCore)
|
else
|
||||||
.sortBy[Int](_.subscriptionCount)
|
val allBatches = overloaded.flatMap { over =>
|
||||||
.reverse
|
val excess = math.max(0, over.subscriptionCount - avgLoad.toInt)
|
||||||
val underloaded = instances
|
val gamesToMove = getGamesToMove(over.instanceId, excess)
|
||||||
.filter(_.subscriptionCount < avgLoad * 0.8)
|
if gamesToMove.isEmpty then List.empty
|
||||||
.sortBy(_.subscriptionCount)
|
else
|
||||||
|
val batchSize = math.max(1, (gamesToMove.size + underloaded.size - 1) / underloaded.size)
|
||||||
|
gamesToMove.grouped(batchSize).toList.map((over, _))
|
||||||
|
}
|
||||||
|
|
||||||
if underloaded.isEmpty then
|
allBatches.zipWithIndex.foreach { case ((over, batch), idx) =>
|
||||||
log.info("No underloaded instances available for rebalance")
|
val target = underloaded(idx % underloaded.size)
|
||||||
return
|
|
||||||
|
|
||||||
var targetIdx = 0
|
|
||||||
overloaded.foreach { over =>
|
|
||||||
val excess = math.max(0, over.subscriptionCount - avgLoad.toInt)
|
|
||||||
val gamesToMove = getGamesToMove(over.instanceId, excess)
|
|
||||||
if gamesToMove.nonEmpty then
|
|
||||||
val batchSize = math.max(1, (gamesToMove.size + underloaded.size - 1) / underloaded.size)
|
|
||||||
gamesToMove.grouped(batchSize).foreach { batch =>
|
|
||||||
val target = underloaded(targetIdx % underloaded.size)
|
|
||||||
targetIdx += 1
|
|
||||||
try
|
try
|
||||||
coreGrpcClient.unsubscribeGames(over.hostname, over.grpcPort, batch)
|
coreGrpcClient.unsubscribeGames(over.hostname, over.grpcPort, batch)
|
||||||
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch)
|
||||||
@@ -97,10 +97,9 @@ class LoadBalancer:
|
|||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to move games from %s to %s", over.instanceId, target.instanceId)
|
log.warnf(ex, "Failed to move games from %s to %s", over.instanceId, target.instanceId)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
val elapsed = System.currentTimeMillis() - startTime
|
val elapsed = System.currentTimeMillis() - startTime
|
||||||
log.infof("Rebalance completed in %dms", elapsed)
|
log.infof("Rebalance completed in %dms", elapsed)
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Rebalance failed")
|
log.warnf(ex, "Rebalance failed")
|
||||||
|
|||||||
@@ -12,8 +12,10 @@ import scala.jdk.CollectionConverters.*
|
|||||||
@GrpcService
|
@GrpcService
|
||||||
@Singleton
|
@Singleton
|
||||||
class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServiceImplBase:
|
class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServiceImplBase:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var gameSubscriberManager: GameRedisSubscriberManager = uninitialized
|
private var gameSubscriberManager: GameRedisSubscriberManager = uninitialized
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
override def batchResubscribeGames(
|
override def batchResubscribeGames(
|
||||||
request: BatchResubscribeRequest,
|
request: BatchResubscribeRequest,
|
||||||
|
|||||||
@@ -62,14 +62,14 @@ class GameRedisPublisher(
|
|||||||
case GameResult.Draw(_) => "draw"
|
case GameResult.Draw(_) => "draw"
|
||||||
},
|
},
|
||||||
terminationReason = entry.engine.context.result.map {
|
terminationReason = entry.engine.context.result.map {
|
||||||
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
|
case GameResult.Win(_, WinReason.Checkmate) => "checkmate"
|
||||||
case GameResult.Win(_, WinReason.Resignation) => "resignation"
|
case GameResult.Win(_, WinReason.Resignation) => "resignation"
|
||||||
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
|
case GameResult.Win(_, WinReason.TimeControl) => "timeout"
|
||||||
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
|
case GameResult.Draw(DrawReason.Stalemate) => "stalemate"
|
||||||
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
|
case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material"
|
||||||
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
|
case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move"
|
||||||
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
|
case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition"
|
||||||
case GameResult.Draw(DrawReason.Agreement) => "agreement"
|
case GameResult.Draw(DrawReason.Agreement) => "agreement"
|
||||||
},
|
},
|
||||||
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
|
redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci),
|
||||||
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
|
pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase),
|
||||||
|
|||||||
+20
-27
@@ -14,6 +14,7 @@ import io.quarkus.redis.datasource.RedisDataSource
|
|||||||
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
import io.quarkus.redis.datasource.pubsub.PubSubCommands
|
||||||
import jakarta.annotation.PreDestroy
|
import jakarta.annotation.PreDestroy
|
||||||
import jakarta.enterprise.context.ApplicationScoped
|
import jakarta.enterprise.context.ApplicationScoped
|
||||||
|
import jakarta.enterprise.inject.Instance
|
||||||
import jakarta.inject.Inject
|
import jakarta.inject.Inject
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
@@ -24,14 +25,18 @@ import java.util.function.Consumer
|
|||||||
class GameRedisSubscriberManager:
|
class GameRedisSubscriberManager:
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.var
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject var redis: RedisDataSource = uninitialized
|
@Inject var redis: RedisDataSource = uninitialized
|
||||||
@Inject var registry: GameRegistry = uninitialized
|
@Inject var registry: GameRegistry = uninitialized
|
||||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||||
@Inject var redisConfig: RedisConfig = uninitialized
|
@Inject var redisConfig: RedisConfig = uninitialized
|
||||||
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
|
@Inject var ioClient: IoGrpcClientWrapper = uninitialized
|
||||||
@Inject var heartbeatService: InstanceHeartbeatService = null
|
@Inject var heartbeatServiceInstance: Instance[InstanceHeartbeatService] = uninitialized
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
|
private def heartbeatServiceOpt: Option[InstanceHeartbeatService] =
|
||||||
|
if heartbeatServiceInstance.isUnsatisfied then None
|
||||||
|
else Some(heartbeatServiceInstance.get())
|
||||||
|
|
||||||
private val c2sListeners = new ConcurrentHashMap[String, PubSubCommands.RedisSubscriber]()
|
private val c2sListeners = new ConcurrentHashMap[String, PubSubCommands.RedisSubscriber]()
|
||||||
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
|
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
|
||||||
|
|
||||||
@@ -44,7 +49,7 @@ class GameRedisSubscriberManager:
|
|||||||
def subscribeGame(gameId: String): Unit =
|
def subscribeGame(gameId: String): Unit =
|
||||||
try
|
try
|
||||||
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
|
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler)
|
||||||
c2sListeners.put(gameId, subscriber)
|
c2sListeners.put(gameId, subscriber)
|
||||||
|
|
||||||
val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json)
|
val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json)
|
||||||
@@ -61,7 +66,7 @@ class GameRedisSubscriberManager:
|
|||||||
s2cObservers.put(gameId, obs)
|
s2cObservers.put(gameId, obs)
|
||||||
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
registry.get(gameId).foreach(_.engine.subscribe(obs))
|
||||||
|
|
||||||
if heartbeatService != null then heartbeatService.addGameSubscription(gameId)
|
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
|
||||||
catch
|
catch
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
System.err.println(s"Warning: Redis subscription failed for game $gameId: ${e.getMessage}")
|
System.err.println(s"Warning: Redis subscription failed for game $gameId: ${e.getMessage}")
|
||||||
@@ -75,7 +80,7 @@ class GameRedisSubscriberManager:
|
|||||||
registry.get(gameId).foreach(_.engine.unsubscribe(obs))
|
registry.get(gameId).foreach(_.engine.unsubscribe(obs))
|
||||||
}
|
}
|
||||||
|
|
||||||
if heartbeatService != null then heartbeatService.removeGameSubscription(gameId)
|
heartbeatServiceOpt.foreach(_.removeGameSubscription(gameId))
|
||||||
|
|
||||||
private def handleC2sMessage(gameId: String, msg: String): Unit =
|
private def handleC2sMessage(gameId: String, msg: String): Unit =
|
||||||
parseC2sMessage(msg) match
|
parseC2sMessage(msg) match
|
||||||
@@ -121,28 +126,16 @@ class GameRedisSubscriberManager:
|
|||||||
}
|
}
|
||||||
|
|
||||||
def batchResubscribeGames(gameIds: java.util.List[String]): Int =
|
def batchResubscribeGames(gameIds: java.util.List[String]): Int =
|
||||||
var count = 0
|
gameIds.forEach(subscribeGame)
|
||||||
gameIds.forEach { gameId =>
|
gameIds.size()
|
||||||
subscribeGame(gameId)
|
|
||||||
count += 1
|
|
||||||
}
|
|
||||||
count
|
|
||||||
|
|
||||||
def unsubscribeGames(gameIds: java.util.List[String]): Int =
|
def unsubscribeGames(gameIds: java.util.List[String]): Int =
|
||||||
var count = 0
|
gameIds.forEach(unsubscribeGame)
|
||||||
gameIds.forEach { gameId =>
|
gameIds.size()
|
||||||
unsubscribeGame(gameId)
|
|
||||||
count += 1
|
|
||||||
}
|
|
||||||
count
|
|
||||||
|
|
||||||
def evictGames(gameIds: java.util.List[String]): Int =
|
def evictGames(gameIds: java.util.List[String]): Int =
|
||||||
var count = 0
|
gameIds.forEach(unsubscribeGame)
|
||||||
gameIds.forEach { gameId =>
|
gameIds.size()
|
||||||
unsubscribeGame(gameId)
|
|
||||||
count += 1
|
|
||||||
}
|
|
||||||
count
|
|
||||||
|
|
||||||
def drainInstance(): Int =
|
def drainInstance(): Int =
|
||||||
val gameIds = new java.util.ArrayList(c2sListeners.keySet())
|
val gameIds = new java.util.ArrayList(c2sListeners.keySet())
|
||||||
|
|||||||
@@ -82,8 +82,7 @@ class GameResource:
|
|||||||
|
|
||||||
private def assertIsNotBot(): Unit =
|
private def assertIsNotBot(): Unit =
|
||||||
val botType = Option(jwt.getClaim[AnyRef]("type")).map(_.toString).getOrElse("")
|
val botType = Option(jwt.getClaim[AnyRef]("type")).map(_.toString).getOrElse("")
|
||||||
if Set("bot", "official-bot").contains(botType) then
|
if Set("bot", "official-bot").contains(botType) then throw ForbiddenException("Only bots can make moves")
|
||||||
throw ForbiddenException("Only bots can make moves")
|
|
||||||
|
|
||||||
// scalafix:on DisableSyntax.throw
|
// scalafix:on DisableSyntax.throw
|
||||||
|
|
||||||
|
|||||||
+38
-46
@@ -20,6 +20,7 @@ import io.grpc.Channel
|
|||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
class InstanceHeartbeatService:
|
class InstanceHeartbeatService:
|
||||||
|
// scalafix:off DisableSyntax.var
|
||||||
@Inject
|
@Inject
|
||||||
private var redis: RedisDataSource = uninitialized
|
private var redis: RedisDataSource = uninitialized
|
||||||
|
|
||||||
@@ -36,51 +37,46 @@ class InstanceHeartbeatService:
|
|||||||
private var coordinatorEnabled: Boolean = true
|
private var coordinatorEnabled: Boolean = true
|
||||||
|
|
||||||
private var coordinatorStub: CoordinatorServiceStub = uninitialized
|
private var coordinatorStub: CoordinatorServiceStub = uninitialized
|
||||||
|
private val log = Logger.getLogger(classOf[InstanceHeartbeatService])
|
||||||
private val log = Logger.getLogger(classOf[InstanceHeartbeatService])
|
private val mapper = ObjectMapper()
|
||||||
private val mapper = ObjectMapper()
|
private var instanceId = ""
|
||||||
|
private var redisPrefix = "nowchess"
|
||||||
private var instanceId = ""
|
|
||||||
private var redisPrefix = "nowchess"
|
|
||||||
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
|
private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None
|
||||||
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
|
private var heartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||||
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
|
private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1)
|
||||||
private var subscriptionCount = 0
|
private var subscriptionCount = 0
|
||||||
private var localCacheSize = 0
|
private var localCacheSize = 0
|
||||||
private var serviceActive = false
|
private var serviceActive = false
|
||||||
private var shuttingDown = false
|
private var shuttingDown = false
|
||||||
|
// scalafix:on DisableSyntax.var
|
||||||
|
|
||||||
def onStart(@Observes event: StartupEvent): Unit =
|
def onStart(@Observes event: StartupEvent): Unit =
|
||||||
if !coordinatorEnabled then
|
if coordinatorEnabled then
|
||||||
log.info("Coordinator support disabled via config; skipping heartbeat service startup")
|
try
|
||||||
return
|
shuttingDown = false
|
||||||
|
generateInstanceId()
|
||||||
try
|
initializeHeartbeatStream()
|
||||||
shuttingDown = false
|
scheduleHeartbeats()
|
||||||
generateInstanceId()
|
serviceActive = true
|
||||||
initializeHeartbeatStream()
|
log.infof("Instance heartbeat service started with ID: %s", instanceId)
|
||||||
scheduleHeartbeats()
|
catch
|
||||||
serviceActive = true
|
case ex: Exception =>
|
||||||
log.infof("Instance heartbeat service started with ID: %s", instanceId)
|
serviceActive = false
|
||||||
catch
|
log.errorf(ex, "Failed to start instance heartbeat service")
|
||||||
case ex: Exception =>
|
else log.info("Coordinator support disabled via config; skipping heartbeat service startup")
|
||||||
serviceActive = false
|
|
||||||
log.errorf(ex, "Failed to start instance heartbeat service")
|
|
||||||
|
|
||||||
def onShutdown(@Observes event: ShutdownEvent): Unit =
|
def onShutdown(@Observes event: ShutdownEvent): Unit =
|
||||||
shuttingDown = true
|
shuttingDown = true
|
||||||
|
|
||||||
if !serviceActive then
|
if serviceActive then
|
||||||
log.info("Instance heartbeat service stopped")
|
try
|
||||||
return
|
cleanup()
|
||||||
|
serviceActive = false
|
||||||
try
|
log.info("Instance heartbeat service stopped")
|
||||||
cleanup()
|
catch
|
||||||
serviceActive = false
|
case ex: Exception =>
|
||||||
log.info("Instance heartbeat service stopped")
|
log.errorf(ex, "Error during heartbeat service shutdown")
|
||||||
catch
|
else log.info("Instance heartbeat service stopped")
|
||||||
case ex: Exception =>
|
|
||||||
log.errorf(ex, "Error during heartbeat service shutdown")
|
|
||||||
|
|
||||||
def setRedisPrefix(prefix: String): Unit =
|
def setRedisPrefix(prefix: String): Unit =
|
||||||
redisPrefix = prefix
|
redisPrefix = prefix
|
||||||
@@ -92,18 +88,16 @@ class InstanceHeartbeatService:
|
|||||||
localCacheSize = count
|
localCacheSize = count
|
||||||
|
|
||||||
def addGameSubscription(gameId: String): Unit =
|
def addGameSubscription(gameId: String): Unit =
|
||||||
if !coordinatorEnabled then return
|
if coordinatorEnabled then
|
||||||
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
redis.set(classOf[String]).sadd(setKey, gameId)
|
||||||
redis.set(classOf[String]).sadd(setKey, gameId)
|
subscriptionCount += 1
|
||||||
subscriptionCount += 1
|
|
||||||
|
|
||||||
def removeGameSubscription(gameId: String): Unit =
|
def removeGameSubscription(gameId: String): Unit =
|
||||||
if !coordinatorEnabled then return
|
if coordinatorEnabled then
|
||||||
|
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
||||||
val setKey = s"$redisPrefix:instance:$instanceId:games"
|
redis.set(classOf[String]).srem(setKey, gameId)
|
||||||
redis.set(classOf[String]).srem(setKey, gameId)
|
subscriptionCount = Math.max(0, subscriptionCount - 1)
|
||||||
subscriptionCount = Math.max(0, subscriptionCount - 1)
|
|
||||||
|
|
||||||
private def generateInstanceId(): Unit =
|
private def generateInstanceId(): Unit =
|
||||||
val hostname =
|
val hostname =
|
||||||
@@ -137,7 +131,6 @@ class InstanceHeartbeatService:
|
|||||||
streamObserver = None
|
streamObserver = None
|
||||||
|
|
||||||
private def scheduleHeartbeats(): Unit =
|
private def scheduleHeartbeats(): Unit =
|
||||||
// Send heartbeat every 200ms
|
|
||||||
heartbeatExecutor.scheduleAtFixedRate(
|
heartbeatExecutor.scheduleAtFixedRate(
|
||||||
() => sendHeartbeat(),
|
() => sendHeartbeat(),
|
||||||
0,
|
0,
|
||||||
@@ -145,7 +138,6 @@ class InstanceHeartbeatService:
|
|||||||
TimeUnit.MILLISECONDS,
|
TimeUnit.MILLISECONDS,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Refresh Redis TTL every 2s
|
|
||||||
redisHeartbeatExecutor.scheduleAtFixedRate(
|
redisHeartbeatExecutor.scheduleAtFixedRate(
|
||||||
() => refreshRedisHeartbeat(),
|
() => refreshRedisHeartbeat(),
|
||||||
0,
|
0,
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import io.quarkus.grpc.GrpcService
|
|||||||
|
|
||||||
import scala.jdk.CollectionConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
|
|
||||||
// scalafix:off DisableSyntax.throw
|
|
||||||
@GrpcService
|
@GrpcService
|
||||||
class IoGrpcService extends IoServiceGrpc.IoServiceImplBase:
|
class IoGrpcService extends IoServiceGrpc.IoServiceImplBase:
|
||||||
|
|
||||||
@@ -59,4 +58,3 @@ class IoGrpcService extends IoServiceGrpc.IoServiceImplBase:
|
|||||||
private def respond[T](obs: StreamObserver[T], value: T): Unit =
|
private def respond[T](obs: StreamObserver[T], value: T): Unit =
|
||||||
obs.onNext(value)
|
obs.onNext(value)
|
||||||
obs.onCompleted()
|
obs.onCompleted()
|
||||||
// scalafix:on DisableSyntax.throw
|
|
||||||
|
|||||||
@@ -11,9 +11,9 @@ import de.nowchess.rules.sets.DefaultRules
|
|||||||
|
|
||||||
object ClassicalBot:
|
object ClassicalBot:
|
||||||
def apply(
|
def apply(
|
||||||
difficulty: BotDifficulty,
|
difficulty: BotDifficulty,
|
||||||
rules: RuleSet = DefaultRules,
|
rules: RuleSet = DefaultRules,
|
||||||
book: Option[PolyglotBook] = None,
|
book: Option[PolyglotBook] = None,
|
||||||
): Bot =
|
): Bot =
|
||||||
val search = AlphaBetaSearch(rules, weights = EvaluationClassic)
|
val search = AlphaBetaSearch(rules, weights = EvaluationClassic)
|
||||||
val timeBudgetMs = 1000L
|
val timeBudgetMs = 1000L
|
||||||
|
|||||||
@@ -14,12 +14,12 @@ import de.nowchess.rules.sets.DefaultRules
|
|||||||
|
|
||||||
object HybridBot:
|
object HybridBot:
|
||||||
def apply(
|
def apply(
|
||||||
difficulty: BotDifficulty,
|
difficulty: BotDifficulty,
|
||||||
rules: RuleSet = DefaultRules,
|
rules: RuleSet = DefaultRules,
|
||||||
book: Option[PolyglotBook] = None,
|
book: Option[PolyglotBook] = None,
|
||||||
nnueEvaluation: Evaluation = EvaluationNNUE,
|
nnueEvaluation: Evaluation = EvaluationNNUE,
|
||||||
classicalEvaluation: Evaluation = EvaluationClassic,
|
classicalEvaluation: Evaluation = EvaluationClassic,
|
||||||
vetoReporter: String => Unit = println(_),
|
vetoReporter: String => Unit = println(_),
|
||||||
): Bot =
|
): Bot =
|
||||||
val search = AlphaBetaSearch(rules, TranspositionTable(), classicalEvaluation)
|
val search = AlphaBetaSearch(rules, TranspositionTable(), classicalEvaluation)
|
||||||
context =>
|
context =>
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ import de.nowchess.rules.sets.DefaultRules
|
|||||||
|
|
||||||
object NNUEBot:
|
object NNUEBot:
|
||||||
def apply(
|
def apply(
|
||||||
difficulty: BotDifficulty,
|
difficulty: BotDifficulty,
|
||||||
rules: RuleSet = DefaultRules,
|
rules: RuleSet = DefaultRules,
|
||||||
book: Option[PolyglotBook] = None,
|
book: Option[PolyglotBook] = None,
|
||||||
): Bot =
|
): Bot =
|
||||||
val search = AlphaBetaSearch(rules, weights = EvaluationNNUE)
|
val search = AlphaBetaSearch(rules, weights = EvaluationNNUE)
|
||||||
context =>
|
context =>
|
||||||
|
|||||||
+6
-4
@@ -16,16 +16,18 @@ class OfficialBotChallengeResource:
|
|||||||
@POST
|
@POST
|
||||||
@Path("/{botId}")
|
@Path("/{botId}")
|
||||||
def challengeWithDifficulty(
|
def challengeWithDifficulty(
|
||||||
@PathParam("botId") botId: String,
|
@PathParam("botId") botId: String,
|
||||||
@QueryParam("difficulty") difficulty: Int
|
@QueryParam("difficulty") difficulty: Int,
|
||||||
): Response =
|
): Response =
|
||||||
DifficultyMapper.fromElo(difficulty) match
|
DifficultyMapper.fromElo(difficulty) match
|
||||||
case None =>
|
case None =>
|
||||||
Response.status(Response.Status.BAD_REQUEST)
|
Response
|
||||||
|
.status(Response.Status.BAD_REQUEST)
|
||||||
.entity(s"""{"error":"difficulty must be between 1000 and 2800"}""")
|
.entity(s"""{"error":"difficulty must be between 1000 and 2800"}""")
|
||||||
.build()
|
.build()
|
||||||
case Some(botDifficulty) =>
|
case Some(botDifficulty) =>
|
||||||
// TODO: wire to account service challenge creation + bot routing
|
// TODO: wire to account service challenge creation + bot routing
|
||||||
Response.status(Response.Status.CREATED)
|
Response
|
||||||
|
.status(Response.Status.CREATED)
|
||||||
.entity(s"""{"botId":"$botId","difficulty":$difficulty,"status":"pending"}""")
|
.entity(s"""{"botId":"$botId","difficulty":$difficulty,"status":"pending"}""")
|
||||||
.build()
|
.build()
|
||||||
|
|||||||
+14
-2
@@ -46,7 +46,13 @@ class OfficialBotService:
|
|||||||
watchGame(botName, gameId, playingAs, difficulty, botAccountId)
|
watchGame(botName, gameId, playingAs, difficulty, botAccountId)
|
||||||
catch case _: Exception => ()
|
catch case _: Exception => ()
|
||||||
|
|
||||||
private def watchGame(botName: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit =
|
private def watchGame(
|
||||||
|
botName: String,
|
||||||
|
gameId: String,
|
||||||
|
playingAs: String,
|
||||||
|
difficulty: Int,
|
||||||
|
botAccountId: String,
|
||||||
|
): Unit =
|
||||||
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
|
val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg)
|
||||||
redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
|
redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler)
|
||||||
()
|
()
|
||||||
@@ -69,7 +75,13 @@ class OfficialBotService:
|
|||||||
computeAndSendMove(botName, gameId, fen, difficulty, botAccountId)
|
computeAndSendMove(botName, gameId, fen, difficulty, botAccountId)
|
||||||
catch case _: Exception => ()
|
catch case _: Exception => ()
|
||||||
|
|
||||||
private def computeAndSendMove(botName: String, gameId: String, fen: String, difficulty: Int, botAccountId: String): Unit =
|
private def computeAndSendMove(
|
||||||
|
botName: String,
|
||||||
|
gameId: String,
|
||||||
|
fen: String,
|
||||||
|
difficulty: Int,
|
||||||
|
botAccountId: String,
|
||||||
|
): Unit =
|
||||||
val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium)
|
val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium)
|
||||||
botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot =>
|
botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot =>
|
||||||
FenParser.parseFen(fen).toOption.foreach { context =>
|
FenParser.parseFen(fen).toOption.foreach { context =>
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import org.scalatest.matchers.should.Matchers
|
|||||||
import de.nowchess.rules.sets.DefaultRules
|
import de.nowchess.rules.sets.DefaultRules
|
||||||
|
|
||||||
class ClassicalBotTest extends AnyFunSuite with Matchers:
|
class ClassicalBotTest extends AnyFunSuite with Matchers:
|
||||||
|
|
||||||
|
|
||||||
test("nextMove on initial position returns a move"):
|
test("nextMove on initial position returns a move"):
|
||||||
val bot = ClassicalBot(BotDifficulty.Easy)
|
val bot = ClassicalBot(BotDifficulty.Easy)
|
||||||
|
|||||||
@@ -127,4 +127,5 @@ tasks.withType(ScalaCompile::class).configureEach {
|
|||||||
|
|
||||||
tasks.named("compileScoverageJava").configure {
|
tasks.named("compileScoverageJava").configure {
|
||||||
dependsOn(tasks.named("quarkusGenerateCode"))
|
dependsOn(tasks.named("quarkusGenerateCode"))
|
||||||
|
dependsOn(tasks.named("compileQuarkusGeneratedSourcesJava"))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ class RuleGrpcService extends RuleServiceGrpc.RuleServiceImplBase:
|
|||||||
resp.onNext(
|
resp.onNext(
|
||||||
ProtoMoveList
|
ProtoMoveList
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.addAllMoves(moves.map(ProtoMapper.toProtoMove).asInstanceOf[java.util.List[ProtoMove]])
|
.addAllMoves(toJavaMoveList(moves))
|
||||||
.build(),
|
.build(),
|
||||||
)
|
)
|
||||||
resp.onCompleted()
|
resp.onCompleted()
|
||||||
|
|||||||
@@ -22,6 +22,6 @@ class InternalAuthFilter extends ContainerRequestFilter:
|
|||||||
|
|
||||||
override def filter(ctx: ContainerRequestContext): Unit =
|
override def filter(ctx: ContainerRequestContext): Unit =
|
||||||
if authEnabled then
|
if authEnabled then
|
||||||
val header = ctx.getHeaderString("X-Internal-Secret")
|
val header = Option(ctx.getHeaderString("X-Internal-Secret"))
|
||||||
if header == null || header != secret then
|
if header.isEmpty || header.get.equals(secret) then
|
||||||
ctx.abortWith(Response.status(Response.Status.UNAUTHORIZED).build())
|
ctx.abortWith(Response.status(Response.Status.UNAUTHORIZED).build())
|
||||||
|
|||||||
+1
-2
@@ -29,5 +29,4 @@ class InternalGrpcAuthInterceptor extends ServerInterceptor:
|
|||||||
if authEnabled && token != secret then
|
if authEnabled && token != secret then
|
||||||
call.close(Status.UNAUTHENTICATED.withDescription("Missing or invalid internal secret"), new Metadata())
|
call.close(Status.UNAUTHENTICATED.withDescription("Missing or invalid internal secret"), new Metadata())
|
||||||
new ServerCall.Listener[Req] {}
|
new ServerCall.Listener[Req] {}
|
||||||
else
|
else next.startCall(call, headers)
|
||||||
next.startCall(call, headers)
|
|
||||||
|
|||||||
@@ -15,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer:
|
|||||||
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
||||||
// scalafix:on DisableSyntax.null
|
// scalafix:on DisableSyntax.null
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -11,4 +11,3 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
class NativeReflectionConfig
|
class NativeReflectionConfig
|
||||||
|
|
||||||
|
|||||||
+3
-3
@@ -36,9 +36,9 @@ class GameRecordRepository:
|
|||||||
|
|
||||||
def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] =
|
def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] =
|
||||||
em.createQuery(
|
em.createQuery(
|
||||||
"SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC",
|
"SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC",
|
||||||
classOf[GameRecord],
|
classOf[GameRecord],
|
||||||
).setParameter("id", playerId)
|
).setParameter("id", playerId)
|
||||||
.setFirstResult(offset)
|
.setFirstResult(offset)
|
||||||
.setMaxResults(limit)
|
.setMaxResults(limit)
|
||||||
.getResultList
|
.getResultList
|
||||||
|
|||||||
@@ -28,10 +28,10 @@ class StoreGameResource:
|
|||||||
@Path("/running/{playerId}")
|
@Path("/running/{playerId}")
|
||||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||||
def getRunning(
|
def getRunning(
|
||||||
@PathParam("playerId") playerId: String,
|
@PathParam("playerId") playerId: String,
|
||||||
@QueryParam("offset") @DefaultValue("0") offset: Int,
|
@QueryParam("offset") @DefaultValue("0") offset: Int,
|
||||||
@QueryParam("limit") @DefaultValue("20") limit: Int,
|
@QueryParam("limit") @DefaultValue("20") limit: Int,
|
||||||
): Response =
|
): Response =
|
||||||
Response.ok(repository.findByPlayerIdRunning(playerId, offset, limit)).build()
|
Response.ok(repository.findByPlayerIdRunning(playerId, offset, limit)).build()
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
|
|||||||
@@ -15,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer:
|
|||||||
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
|
||||||
// scalafix:on DisableSyntax.null
|
// scalafix:on DisableSyntax.null
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ class GameWebSocketResource:
|
|||||||
|
|
||||||
@OnOpen
|
@OnOpen
|
||||||
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit =
|
||||||
val gameId = connection.pathParam("gameId")
|
val gameId = connection.pathParam("gameId")
|
||||||
val playerId = Option(handshake.header("Authorization"))
|
val playerId = Option(handshake.header("Authorization"))
|
||||||
.filter(_.nonEmpty)
|
.filter(_.nonEmpty)
|
||||||
.flatMap(token => Try(jwtParser.parse(token)).toOption)
|
.flatMap(token => Try(jwtParser.parse(token)).toOption)
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ class UserWebSocketResource:
|
|||||||
case None => connection.close().subscribe().`with`(_ => (), _ => ())
|
case None => connection.close().subscribe().`with`(_ => (), _ => ())
|
||||||
case Some(userId) =>
|
case Some(userId) =>
|
||||||
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ())
|
||||||
val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler)
|
val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler)
|
||||||
connections.put(connection.id(), (userId, subscriber))
|
connections.put(connection.id(), (userId, subscriber))
|
||||||
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
|
val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}"""
|
||||||
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
|
connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())
|
||||||
|
|||||||
Reference in New Issue
Block a user