From 106118ad2931bfecce9596e1a56ad1c53c478b05 Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 29 Apr 2026 08:38:27 +0200 Subject: [PATCH] refactor: clean up code formatting and improve readability across multiple files --- .../scala/de/nowchess/account/dto/Dtos.scala | 1 - .../repository/AccountRepository.scala | 1 - .../account/resource/AccountResource.scala | 1 - .../resource/OfficialChallengeResource.scala | 85 +++++---- .../account/service/AccountService.scala | 1 - .../account/service/ChallengeService.scala | 1 - .../account/service/EventPublisher.scala | 5 +- .../botplatform/registry/BotRegistry.scala | 10 +- .../resource/BotEventResource.scala | 4 +- .../coordinator/config/JacksonConfig.scala | 1 - .../config/NativeReflectionConfig.scala | 1 - .../grpc/CoordinatorGrpcServer.scala | 2 + .../coordinator/grpc/CoreGrpcClient.scala | 4 +- .../resource/CoordinatorResource.scala | 2 +- .../coordinator/service/AutoScaler.scala | 175 ++++++++++-------- .../service/CacheEvictionManager.scala | 29 +-- .../coordinator/service/FailoverService.scala | 48 +++-- .../coordinator/service/HealthMonitor.scala | 89 ++++----- .../service/InstanceRegistry.scala | 14 +- .../coordinator/service/LoadBalancer.scala | 83 ++++----- .../grpc/CoordinatorServiceHandler.scala | 2 + .../chess/redis/GameRedisPublisher.scala | 16 +- .../redis/GameRedisSubscriberManager.scala | 47 ++--- .../chess/resource/GameResource.scala | 3 +- .../service/InstanceHeartbeatService.scala | 84 ++++----- .../de/nowchess/io/grpc/IoGrpcService.scala | 2 - .../de/nowchess/bot/bots/ClassicalBot.scala | 6 +- .../de/nowchess/bot/bots/HybridBot.scala | 12 +- .../scala/de/nowchess/bot/bots/NNUEBot.scala | 6 +- .../OfficialBotChallengeResource.scala | 10 +- .../bot/service/OfficialBotService.scala | 16 +- .../de/nowchess/bot/ClassicalBotTest.scala | 1 - modules/rule/build.gradle.kts | 1 + .../nowchess/rules/grpc/RuleGrpcService.scala | 2 +- .../security/InternalAuthFilter.scala | 4 +- .../InternalGrpcAuthInterceptor.scala | 3 +- .../nowchess/store/config/JacksonConfig.scala | 1 - .../store/config/NativeReflectionConfig.scala | 1 - .../repository/GameRecordRepository.scala | 6 +- .../store/resource/StoreGameResource.scala | 8 +- .../de/nowchess/ws/config/JacksonConfig.scala | 1 - .../ws/resource/GameWebSocketResource.scala | 2 +- .../ws/resource/UserWebSocketResource.scala | 2 +- 43 files changed, 414 insertions(+), 379 deletions(-) diff --git a/modules/account/src/main/scala/de/nowchess/account/dto/Dtos.scala b/modules/account/src/main/scala/de/nowchess/account/dto/Dtos.scala index d95c08e..6876803 100644 --- a/modules/account/src/main/scala/de/nowchess/account/dto/Dtos.scala +++ b/modules/account/src/main/scala/de/nowchess/account/dto/Dtos.scala @@ -46,5 +46,4 @@ case class RotatedTokenDto(token: String) case class OfficialBotAccountDto(id: String, name: String, rating: Int, createdAt: String) - case class OfficialChallengeResponse(gameId: String, botName: String, difficulty: Int) diff --git a/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala b/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala index 6374381..3d6803f 100644 --- a/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala +++ b/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala @@ -96,4 +96,3 @@ class OfficialBotAccountRepository: def delete(botId: UUID): Unit = em.find(classOf[OfficialBotAccount], botId) match case bot: OfficialBotAccount => em.remove(bot) - diff --git a/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala b/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala index bf4a3f7..c24d634 100644 --- a/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala +++ b/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala @@ -200,4 +200,3 @@ class AccountResource: rating = bot.rating, createdAt = bot.createdAt.toString, ) - diff --git a/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala b/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala index cc7a877..635dbe7 100644 --- a/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala +++ b/modules/account/src/main/scala/de/nowchess/account/resource/OfficialChallengeResource.scala @@ -24,8 +24,8 @@ import java.util.concurrent.ThreadLocalRandom class OfficialChallengeResource: // scalafix:off DisableSyntax.var - @Inject var accountService: AccountService = uninitialized - @Inject var jwt: JsonWebToken = uninitialized + @Inject var accountService: AccountService = uninitialized + @Inject var jwt: JsonWebToken = uninitialized @Inject var botEventPublisher: EventPublisher = uninitialized @Inject @@ -43,46 +43,49 @@ class OfficialChallengeResource: @QueryParam("color") color: String, ): Response = if difficulty < 1000 || difficulty > 2800 then - return Response + Response .status(Response.Status.BAD_REQUEST) .entity(ErrorDto("difficulty must be between 1000 and 2800")) .build() + else + val normalizedColor = Option(color).map(_.toLowerCase).getOrElse("random") + normalizedColor match + case "white" | "black" | "random" => + val userId = UUID.fromString(jwt.getSubject) + val botOpt = accountService.getOfficialBotAccounts().find(_.name == botName) + val userOpt = accountService.findById(userId) - val playerColor = Option(color).map(_.toLowerCase).getOrElse("random") match - case "white" | "black" | "random" => Option(color).map(_.toLowerCase).getOrElse("random") - case other => - return Response - .status(Response.Status.BAD_REQUEST) - .entity(ErrorDto(s"Invalid color: $other. Must be white, black or random")) - .build() - - val userId = UUID.fromString(jwt.getSubject) - val botOpt = accountService.getOfficialBotAccounts().find(_.name == botName) - val userOpt = accountService.findById(userId) - - (botOpt, userOpt) match - case (None, _) => - Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Official bot '$botName' not found")).build() - case (_, None) => - Response.status(Response.Status.NOT_FOUND).entity(ErrorDto("User not found")).build() - case (Some(bot), Some(user)) => - val userIsWhite = playerColor match - case "white" => true - case "black" => false - case _ => ThreadLocalRandom.current().nextBoolean() - val (white, black, botColor) = - if userIsWhite then - (CorePlayerInfo(user.id.toString, user.username), CorePlayerInfo(bot.id.toString, bot.name), "black") - else - (CorePlayerInfo(bot.id.toString, bot.name), CorePlayerInfo(user.id.toString, user.username), "white") - val req = CoreCreateGameRequest(Some(white), Some(black), None, Some("Authenticated")) - val gameId = - try Right(coreGameClient.createGame(req).gameId) - catch case _ => Left("Failed to create game") - gameId match - case Left(err) => - Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ErrorDto(err)).build() - case Right(id) => - try botEventPublisher.publishGameStart(bot.name, id, botColor, difficulty, bot.id.toString) - catch case ex: Exception => log.warnf(ex, "Failed to notify bot for game %s", id) - Response.status(Response.Status.CREATED).entity(OfficialChallengeResponse(id, botName, difficulty)).build() + (botOpt, userOpt) match + case (None, _) => + Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Official bot '$botName' not found")).build() + case (_, None) => + Response.status(Response.Status.NOT_FOUND).entity(ErrorDto("User not found")).build() + case (Some(bot), Some(user)) => + val userIsWhite = normalizedColor match + case "white" => true + case "black" => false + case _ => ThreadLocalRandom.current().nextBoolean() + val (white, black, botColor) = + if userIsWhite then + (CorePlayerInfo(user.id.toString, user.username), CorePlayerInfo(bot.id.toString, bot.name), "black") + else + (CorePlayerInfo(bot.id.toString, bot.name), CorePlayerInfo(user.id.toString, user.username), "white") + val req = CoreCreateGameRequest(Some(white), Some(black), None, Some("Authenticated")) + val gameId = + try Right(coreGameClient.createGame(req).gameId) + catch case _ => Left("Failed to create game") + gameId match + case Left(err) => + Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ErrorDto(err)).build() + case Right(id) => + try botEventPublisher.publishGameStart(bot.name, id, botColor, difficulty, bot.id.toString) + catch case ex: Exception => log.warnf(ex, "Failed to notify bot for game %s", id) + Response + .status(Response.Status.CREATED) + .entity(OfficialChallengeResponse(id, botName, difficulty)) + .build() + case other => + Response + .status(Response.Status.BAD_REQUEST) + .entity(ErrorDto(s"Invalid color: $other. Must be white, black or random")) + .build() diff --git a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala index 21a3489..71d5bb7 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala @@ -163,4 +163,3 @@ class AccountService: user.botAccounts.forEach(_.banned = false) userAccountRepository.persist(user) Right(user) - diff --git a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala index ab7693d..532b4c8 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala @@ -25,7 +25,6 @@ import org.eclipse.microprofile.rest.client.inject.RestClient import org.jboss.logging.Logger import scala.compiletime.uninitialized - import java.time.Instant import java.time.temporal.ChronoUnit import java.util.UUID diff --git a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala index 6e33f57..fc508a7 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/EventPublisher.scala @@ -10,12 +10,13 @@ import scala.compiletime.uninitialized class EventPublisher: // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized + @Inject var redis: RedisDataSource = uninitialized @Inject var redisConfig: RedisConfig = uninitialized // scalafix:on DisableSyntax.var def publishGameStart(botId: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit = - val event = s"""{"type":"gameStart","gameId":"$gameId","playingAs":"$playingAs","difficulty":$difficulty,"botAccountId":"$botAccountId"}""" + val event = + s"""{"type":"gameStart","gameId":"$gameId","playingAs":"$playingAs","difficulty":$difficulty,"botAccountId":"$botAccountId"}""" redis.pubsub(classOf[String]).publish(s"${redisConfig.prefix}:bot:$botId:events", event) () diff --git a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala index 287c761..909ee3f 100644 --- a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala +++ b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/registry/BotRegistry.scala @@ -14,16 +14,16 @@ import java.util.function.Consumer class BotRegistry: // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized + @Inject var redis: RedisDataSource = uninitialized @Inject var redisConfig: RedisConfig = uninitialized // scalafix:on DisableSyntax.var - private val connections = ConcurrentHashMap[String, (MultiEmitter[?], PubSubCommands.RedisSubscriber)]() + private val connections = ConcurrentHashMap[String, (MultiEmitter[? >: String], PubSubCommands.RedisSubscriber)]() def register(botId: String, emitter: MultiEmitter[? >: String]): Unit = - val channel = s"${redisConfig.prefix}:bot:$botId:events" - val handler: Consumer[String] = msg => emitter.asInstanceOf[MultiEmitter[String]].emit(msg) - val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler) + val channel = s"${redisConfig.prefix}:bot:$botId:events" + val handler: Consumer[String] = msg => emitter.emit(msg) + val subscriber = redis.pubsub(classOf[String]).subscribe(channel, handler) connections.put(botId, (emitter, subscriber)) () diff --git a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/resource/BotEventResource.scala b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/resource/BotEventResource.scala index 3875265..0264832 100644 --- a/modules/bot-platform/src/main/scala/de/nowchess/botplatform/resource/BotEventResource.scala +++ b/modules/bot-platform/src/main/scala/de/nowchess/botplatform/resource/BotEventResource.scala @@ -44,9 +44,9 @@ class BotEventResource: @Produces(Array(MediaType.SERVER_SENT_EVENTS)) def streamGame(@PathParam("gameId") gameId: String): Multi[String] = Multi.createFrom().emitter[String] { emitter => - val topicName = s"${redisConfig.prefix}:game:$gameId:s2c" + val topicName = s"${redisConfig.prefix}:game:$gameId:s2c" val handler: Consumer[String] = msg => emitter.emit(msg) - val subscriber = redis.pubsub(classOf[String]).subscribe(topicName, handler) + val subscriber = redis.pubsub(classOf[String]).subscribe(topicName, handler) emitter.onTermination(() => subscriber.unsubscribe(topicName)) } diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/JacksonConfig.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/JacksonConfig.scala index 8abeeed..7fecbea 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/JacksonConfig.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/JacksonConfig.scala @@ -15,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer: new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala") // scalafix:on DisableSyntax.null }) - diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/NativeReflectionConfig.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/NativeReflectionConfig.scala index 2e6d6da..f49df79 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/NativeReflectionConfig.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/NativeReflectionConfig.scala @@ -11,4 +11,3 @@ import io.quarkus.runtime.annotations.RegisterForReflection ), ) class NativeReflectionConfig - diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala index a781c01..754241c 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala @@ -28,7 +28,9 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp responseObserver: StreamObserver[CoordinatorCommand], ): StreamObserver[HeartbeatFrame] = new StreamObserver[HeartbeatFrame]: + // scalafix:off DisableSyntax.var private var lastInstanceId = "" + // scalafix:on DisableSyntax.var override def onNext(frame: HeartbeatFrame): Unit = lastInstanceId = frame.getInstanceId diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala index 88489ab..1be6081 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala @@ -16,9 +16,7 @@ class CoreGrpcClient: private val channels = ConcurrentHashMap[String, ManagedChannel]() private def getChannel(host: String, port: Int): ManagedChannel = - channels.computeIfAbsent(s"$host:$port", _ => - ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(), - ) + channels.computeIfAbsent(s"$host:$port", _ => ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()) private def evictStaleChannel(host: String, port: Int): Unit = Option(channels.remove(s"$host:$port")).foreach(_.shutdownNow()) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala index 5735459..f02a7ef 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala @@ -33,7 +33,7 @@ class CoordinatorResource: @Path("/instances") @Produces(Array(MediaType.APPLICATION_JSON)) def listInstances: java.util.List[InstanceMetadata] = - instanceRegistry.getAllInstances.asJava.asInstanceOf[java.util.List[InstanceMetadata]] + instanceRegistry.getAllInstances.asJava @GET @Path("/metrics") diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 4d0010e..5a074c8 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -1,6 +1,7 @@ package de.nowchess.coordinator.service import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.inject.Instance import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.api.model.GenericKubernetesResource @@ -11,98 +12,124 @@ import scala.compiletime.uninitialized @ApplicationScoped class AutoScaler: + // scalafix:off DisableSyntax.var @Inject - private var kubeClient: KubernetesClient = null + private var kubeClientInstance: Instance[KubernetesClient] = uninitialized @Inject private var config: CoordinatorConfig = uninitialized @Inject private var instanceRegistry: InstanceRegistry = uninitialized + // scalafix:on DisableSyntax.var private val log = Logger.getLogger(classOf[AutoScaler]) private val lastScaleTime = new java.util.concurrent.atomic.AtomicLong(0L) + private def kubeClientOpt: Option[KubernetesClient] = + if kubeClientInstance.isUnsatisfied then None + else Some(kubeClientInstance.get()) + + // scalafix:off DisableSyntax.asInstanceOf + // scalafix:off DisableSyntax.isInstanceOf + private def rolloutSpec(rollout: GenericKubernetesResource): Option[java.util.Map[String, AnyRef]] = + Option(rollout.get("spec")).collect { + case m if m.isInstanceOf[java.util.Map[?, ?]] => m.asInstanceOf[java.util.Map[String, AnyRef]] + } + // scalafix:on DisableSyntax.asInstanceOf + // scalafix:on DisableSyntax.isInstanceOf + def checkAndScale: Unit = - if !config.autoScaleEnabled then return + if config.autoScaleEnabled then + val now = System.currentTimeMillis() + val last = lastScaleTime.get() + if now - last >= 120000 && lastScaleTime.compareAndSet(last, now) then + val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") + if instances.nonEmpty then + val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size - val now = System.currentTimeMillis() - val last = lastScaleTime.get() - if now - last < 120000 then return - if !lastScaleTime.compareAndSet(last, now) then return - - val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") - if instances.isEmpty then return - - val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size - - if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp() - else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas - then scaleDown() + if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp() + else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas + then scaleDown() def scaleUp(): Unit = log.info("Scaling up Argo Rollout") - if kubeClient == null then - log.warn("Kubernetes client not available, cannot scale") - return + kubeClientOpt match + case None => + log.warn("Kubernetes client not available, cannot scale") + case Some(kube) => + try + Option( + kube + .resources(classOf[GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .get(), + ).foreach { rollout => + rolloutSpec(rollout).foreach { spec => + spec.get("replicas") match + case replicas: Integer => + val currentReplicas = replicas.intValue() + val maxReplicas = config.scaleMaxReplicas - try - val rollout = kubeClient - .resources(classOf[GenericKubernetesResource]) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get() - - if rollout != null then - val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]] - val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue() - val maxReplicas = config.scaleMaxReplicas - - if currentReplicas < maxReplicas then - spec.put("replicas", currentReplicas + 1) - kubeClient - .resources(classOf[GenericKubernetesResource]) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .createOrReplace(rollout) - log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, currentReplicas, currentReplicas + 1) - else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName) - catch - case ex: Exception => - log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName) + if currentReplicas < maxReplicas then + spec.put("replicas", String.valueOf(currentReplicas + 1)) + kube + .resources(classOf[GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .update() + log.infof( + "Scaled up %s from %d to %d replicas", + config.k8sRolloutName, + currentReplicas, + currentReplicas + 1, + ) + else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName) + case _ => () + } + } + catch + case ex: Exception => + log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName) def scaleDown(): Unit = log.info("Scaling down Argo Rollout") - if kubeClient == null then - log.warn("Kubernetes client not available, cannot scale") - return + kubeClientOpt match + case None => + log.warn("Kubernetes client not available, cannot scale") + case Some(kube) => + try + Option( + kube + .resources(classOf[GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .get(), + ).foreach { rollout => + rolloutSpec(rollout).foreach { spec => + spec.get("replicas") match + case replicas: Integer => + val currentReplicas = replicas.intValue() + val minReplicas = config.scaleMinReplicas - try - val rollout = kubeClient - .resources(classOf[GenericKubernetesResource]) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get() - - if rollout != null then - val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]] - val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue() - val minReplicas = config.scaleMinReplicas - - if currentReplicas > minReplicas then - spec.put("replicas", currentReplicas - 1) - kubeClient - .resources(classOf[GenericKubernetesResource]) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .createOrReplace(rollout) - log.infof( - "Scaled down %s from %d to %d replicas", - config.k8sRolloutName, - currentReplicas, - currentReplicas - 1, - ) - else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) - catch - case ex: Exception => - log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName) + if currentReplicas > minReplicas then + spec.put("replicas", String.valueOf(currentReplicas - 1)) + kube + .resources(classOf[GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .update() + log.infof( + "Scaled down %s from %d to %d replicas", + config.k8sRolloutName, + currentReplicas, + currentReplicas - 1, + ) + else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) + case _ => () + } + } + catch + case ex: Exception => + log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala index c45aa33..40a62e9 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala @@ -14,6 +14,7 @@ import de.nowchess.coordinator.grpc.CoreGrpcClient @ApplicationScoped class CacheEvictionManager: + // scalafix:off DisableSyntax.var @Inject private var redis: RedisDataSource = uninitialized @@ -31,6 +32,7 @@ class CacheEvictionManager: private val log = Logger.getLogger(classOf[CacheEvictionManager]) private var redisPrefix = "nowchess" + // scalafix:on DisableSyntax.var def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -38,44 +40,45 @@ class CacheEvictionManager: def evictStaleGames: Unit = log.info("Starting cache eviction scan") - val pattern = s"$redisPrefix:game:entry:*" - val keys = redis.key(classOf[String]).keys(pattern) - + val pattern = s"$redisPrefix:game:entry:*" + val keys = redis.key(classOf[String]).keys(pattern) val now = System.currentTimeMillis() val idleThresholdMs = config.gameIdleThreshold.toMillis - var evictedCount = 0 - keys.asScala.foreach { key => + val evictedCount = keys.asScala.foldLeft(0) { (count, key) => try - val value = redis.value(classOf[String]).get(key) - if value != null then + Option(redis.value(classOf[String]).get(key)).fold(count) { value => val gameId = key.stripPrefix(s"$redisPrefix:game:entry:") val lastUpdated = extractLastUpdatedTimestamp(value) if lastUpdated > 0 && (now - lastUpdated) > idleThresholdMs then - findInstanceWithGame(gameId).foreach { instance => + findInstanceWithGame(gameId).fold(count) { instance => try coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId)) redis.key(classOf[String]).del(key) - evictedCount += 1 log.infof("Evicted idle game %s from %s", gameId, instance.instanceId) + count + 1 catch case ex: Exception => log.warnf(ex, "Failed to evict game %s", gameId) + count } + else count + } catch case ex: Exception => log.warnf(ex, "Error processing game key %s", key) + count } log.infof("Cache eviction scan completed, evicted %d games", evictedCount) private def extractLastUpdatedTimestamp(json: String): Long = Try { - val parsed = objectMapper.readTree(json) - val lastHeartbeat = parsed.get("lastHeartbeat") - if lastHeartbeat != null && lastHeartbeat.isTextual then Instant.parse(lastHeartbeat.asText()).toEpochMilli - else 0L + val parsed = objectMapper.readTree(json) + Option(parsed.get("lastHeartbeat")) + .filter(_.isTextual) + .fold(0L)(lh => Instant.parse(lh.asText()).toEpochMilli) }.getOrElse(0L) private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] = diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala index dbcd965..fb31858 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala @@ -11,6 +11,7 @@ import de.nowchess.coordinator.grpc.CoreGrpcClient @ApplicationScoped class FailoverService: + // scalafix:off DisableSyntax.var @Inject private var redis: RedisDataSource = uninitialized @@ -22,6 +23,7 @@ class FailoverService: private val log = Logger.getLogger(classOf[FailoverService]) private var redisPrefix = "nowchess" + // scalafix:on DisableSyntax.var def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -58,32 +60,42 @@ class FailoverService: healthyInstances: List[InstanceMetadata], deadInstanceId: String, ): Unit = - if gameIds.isEmpty || healthyInstances.isEmpty then return + if gameIds.nonEmpty && healthyInstances.nonEmpty then + val batchSize = math.max(1, gameIds.size / healthyInstances.size) + val batches = gameIds.grouped(batchSize).toList - val batchSize = math.max(1, gameIds.size / healthyInstances.size) - val batches = gameIds.grouped(batchSize).toList + batches.zipWithIndex.foreach { case (batch, idx) => + if !tryMigrateBatch(batch, idx, healthyInstances, deadInstanceId) then + log.errorf( + "Failed to migrate batch of %d games from %s to any healthy instance", + batch.size, + deadInstanceId, + ) + } - batches.zipWithIndex.foreach { case (batch, idx) => - var migrated = false - var attempt = 0 - while !migrated && attempt < healthyInstances.size do - val target = healthyInstances((idx + attempt) % healthyInstances.size) - attempt += 1 + @scala.annotation.tailrec + private def tryMigrateBatch( + batch: List[String], + batchIdx: Int, + instances: List[InstanceMetadata], + deadId: String, + attempt: Int = 0, + ): Boolean = + if attempt >= instances.size then false + else + val target = instances((batchIdx + attempt) % instances.size) + val success = try val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch) if subscribed > 0 then - log.infof("Migrated %d games from %s to %s", subscribed, deadInstanceId, target.instanceId) - migrated = true + log.infof("Migrated %d games from %s to %s", subscribed, deadId, target.instanceId) + true + else false catch case ex: Exception => log.warnf(ex, "Failed to migrate batch to %s, trying next", target.instanceId) - if !migrated then - log.errorf( - "Failed to migrate batch of %d games from %s to any healthy instance", - batch.size, - deadInstanceId, - ) - } + false + if success then true else tryMigrateBatch(batch, batchIdx, instances, deadId, attempt + 1) private def cleanupDeadInstance(instanceId: String): Unit = val setKey = s"$redisPrefix:instance:$instanceId:games" diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala index 03c9361..4a031a7 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala @@ -1,6 +1,7 @@ package de.nowchess.coordinator.service import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.inject.Instance import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.client.KubernetesClient @@ -13,8 +14,9 @@ import java.time.Instant @ApplicationScoped class HealthMonitor: + // scalafix:off DisableSyntax.var @Inject - private var kubeClient: KubernetesClient = null + private var kubeClientInstance: Instance[KubernetesClient] = uninitialized @Inject private var config: CoordinatorConfig = uninitialized @@ -27,6 +29,11 @@ class HealthMonitor: private val log = Logger.getLogger(classOf[HealthMonitor]) private var redisPrefix = "nowchess" + // scalafix:on DisableSyntax.var + + private def kubeClientOpt: Option[KubernetesClient] = + if kubeClientInstance.isUnsatisfied then None + else Some(kubeClientInstance.get()) def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -55,10 +62,9 @@ class HealthMonitor: false private def checkK8sPodStatus(instanceId: String): Boolean = - if kubeClient == null then true - else + kubeClientOpt.fold(true) { kube => try - val pods = kubeClient + val pods = kube .pods() .inNamespace(config.k8sNamespace) .withLabel(config.k8sRolloutLabelSelector) @@ -74,49 +80,44 @@ class HealthMonitor: case ex: Exception => log.debugf(ex, "K8s pod status check failed for %s", instanceId) true + } def watchK8sPods: Unit = - if kubeClient == null then - log.debug("Kubernetes client not available for pod watch") - return + kubeClientOpt match + case None => + log.debug("Kubernetes client not available for pod watch") + case Some(kube) => + try + val pods = kube + .pods() + .inNamespace(config.k8sNamespace) + .withLabel(config.k8sRolloutLabelSelector) + .list() + .getItems + .asScala - try - val pods = kubeClient - .pods() - .inNamespace(config.k8sNamespace) - .withLabel(config.k8sRolloutLabelSelector) - .list() - .getItems - .asScala + val instances = instanceRegistry.getAllInstances + instances.foreach { inst => + val matchingPod = pods.find { pod => + pod.getMetadata.getName.contains(inst.instanceId) + } - val instances = instanceRegistry.getAllInstances - instances.foreach { inst => - val matchingPod = pods.find { pod => - pod.getMetadata.getName.contains(inst.instanceId) - } - - matchingPod match - case Some(pod) => - val isReady = isPodReady(pod) - if !isReady && inst.state == "HEALTHY" then - log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId) - instanceRegistry.markInstanceDead(inst.instanceId) - case None => - if inst.state == "HEALTHY" then - log.warnf("No pod found for instance %s, marking dead", inst.instanceId) - instanceRegistry.markInstanceDead(inst.instanceId) - } - catch - case ex: Exception => - log.warnf(ex, "Failed to watch k8s pods") + matchingPod match + case Some(pod) => + val isReady = isPodReady(pod) + if !isReady && inst.state == "HEALTHY" then + log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId) + instanceRegistry.markInstanceDead(inst.instanceId) + case None => + if inst.state == "HEALTHY" then + log.warnf("No pod found for instance %s, marking dead", inst.instanceId) + instanceRegistry.markInstanceDead(inst.instanceId) + } + catch + case ex: Exception => + log.warnf(ex, "Failed to watch k8s pods") private def isPodReady(pod: Pod): Boolean = - val status = pod.getStatus - if status == null then false - else - val conditions = status.getConditions - if conditions == null then false - else - conditions.asScala.exists { cond => - cond.getType == "Ready" && cond.getStatus == "True" - } + Option(pod.getStatus) + .flatMap(s => Option(s.getConditions)) + .exists(_.asScala.exists(cond => cond.getType == "Ready" && cond.getStatus == "True")) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala index af1d706..69e9db2 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala @@ -11,12 +11,14 @@ import java.util.concurrent.ConcurrentHashMap @ApplicationScoped class InstanceRegistry: + // scalafix:off DisableSyntax.var @Inject private var redis: RedisDataSource = uninitialized + private var redisPrefix = "nowchess" + // scalafix:on DisableSyntax.var - private val mapper = ObjectMapper() - private val instances = ConcurrentHashMap[String, InstanceMetadata]() - private var redisPrefix = "nowchess" + private val mapper = ObjectMapper() + private val instances = ConcurrentHashMap[String, InstanceMetadata]() def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -28,13 +30,13 @@ class InstanceRegistry: instances.values.asScala.toList def updateInstanceFromRedis(instanceId: String): Unit = - val key = s"$redisPrefix:instances:$instanceId" - val value = redis.value(classOf[String]).get(key) - if value != null then + val key = s"$redisPrefix:instances:$instanceId" + Option(redis.value(classOf[String]).get(key)).foreach { value => try val metadata = mapper.readValue(value, classOf[InstanceMetadata]) instances.put(instanceId, metadata) catch case _: Exception => () + } def markInstanceDead(instanceId: String): Unit = instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD")) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala index 7dcbdf2..4c37df5 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala @@ -12,6 +12,7 @@ import de.nowchess.coordinator.grpc.CoreGrpcClient @ApplicationScoped class LoadBalancer: + // scalafix:off DisableSyntax.var @Inject private var config: CoordinatorConfig = uninitialized @@ -27,6 +28,7 @@ class LoadBalancer: private val log = Logger.getLogger(classOf[LoadBalancer]) private val lastRebalanceTime = new java.util.concurrent.atomic.AtomicLong(0L) private var redisPrefix = "nowchess" + // scalafix:on DisableSyntax.var def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -34,22 +36,22 @@ class LoadBalancer: def shouldRebalance: Boolean = val now = System.currentTimeMillis() val minInterval = config.rebalanceMinInterval.toMillis - if now - lastRebalanceTime.get() < minInterval then return false + if now - lastRebalanceTime.get() < minInterval then false + else + val instances = instanceRegistry.getAllInstances + if instances.isEmpty then false + else + val loads = instances.map(_.subscriptionCount) + val maxLoad = loads.max + val minLoad = loads.min + val avgLoad = loads.sum.toDouble / loads.size - val instances = instanceRegistry.getAllInstances - if instances.isEmpty then return false + val exceededMax = maxLoad > config.maxGamesPerCore + val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad + val exceededDeviation = + maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50 - val loads = instances.map(_.subscriptionCount) - val maxLoad = loads.max - val minLoad = loads.min - val avgLoad = loads.sum.toDouble / loads.size - - val exceededMax = maxLoad > config.maxGamesPerCore - val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad - val exceededDeviation = - maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50 - - exceededMax || exceededDeviation + exceededMax || exceededDeviation def rebalance: Unit = log.info("Starting rebalance") @@ -59,34 +61,32 @@ class LoadBalancer: try val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") - if instances.size < 2 then - log.info("Not enough healthy instances for rebalance") - return + if instances.size < 2 then log.info("Not enough healthy instances for rebalance") + else + val loads = instances.map(_.subscriptionCount) + val avgLoad = loads.sum.toDouble / loads.size - val loads = instances.map(_.subscriptionCount) - val avgLoad = loads.sum.toDouble / loads.size + val overloaded = instances + .filter(_.subscriptionCount > config.maxGamesPerCore) + .sortBy[Int](_.subscriptionCount) + .reverse + val underloaded = instances + .filter(_.subscriptionCount < avgLoad * 0.8) + .sortBy(_.subscriptionCount) - val overloaded = instances - .filter(_.subscriptionCount > config.maxGamesPerCore) - .sortBy[Int](_.subscriptionCount) - .reverse - val underloaded = instances - .filter(_.subscriptionCount < avgLoad * 0.8) - .sortBy(_.subscriptionCount) + if underloaded.isEmpty then log.info("No underloaded instances available for rebalance") + else + val allBatches = overloaded.flatMap { over => + val excess = math.max(0, over.subscriptionCount - avgLoad.toInt) + val gamesToMove = getGamesToMove(over.instanceId, excess) + if gamesToMove.isEmpty then List.empty + else + val batchSize = math.max(1, (gamesToMove.size + underloaded.size - 1) / underloaded.size) + gamesToMove.grouped(batchSize).toList.map((over, _)) + } - if underloaded.isEmpty then - log.info("No underloaded instances available for rebalance") - return - - var targetIdx = 0 - overloaded.foreach { over => - val excess = math.max(0, over.subscriptionCount - avgLoad.toInt) - val gamesToMove = getGamesToMove(over.instanceId, excess) - if gamesToMove.nonEmpty then - val batchSize = math.max(1, (gamesToMove.size + underloaded.size - 1) / underloaded.size) - gamesToMove.grouped(batchSize).foreach { batch => - val target = underloaded(targetIdx % underloaded.size) - targetIdx += 1 + allBatches.zipWithIndex.foreach { case ((over, batch), idx) => + val target = underloaded(idx % underloaded.size) try coreGrpcClient.unsubscribeGames(over.hostname, over.grpcPort, batch) val subscribed = coreGrpcClient.batchResubscribeGames(target.hostname, target.grpcPort, batch) @@ -97,10 +97,9 @@ class LoadBalancer: case ex: Exception => log.warnf(ex, "Failed to move games from %s to %s", over.instanceId, target.instanceId) } - } - val elapsed = System.currentTimeMillis() - startTime - log.infof("Rebalance completed in %dms", elapsed) + val elapsed = System.currentTimeMillis() - startTime + log.infof("Rebalance completed in %dms", elapsed) catch case ex: Exception => log.warnf(ex, "Rebalance failed") diff --git a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala index c5cd4a9..05e2e9c 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala @@ -12,8 +12,10 @@ import scala.jdk.CollectionConverters.* @GrpcService @Singleton class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServiceImplBase: + // scalafix:off DisableSyntax.var @Inject private var gameSubscriberManager: GameRedisSubscriberManager = uninitialized + // scalafix:on DisableSyntax.var override def batchResubscribeGames( request: BatchResubscribeRequest, diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala index 5ac4f98..fc42ccd 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala @@ -62,14 +62,14 @@ class GameRedisPublisher( case GameResult.Draw(_) => "draw" }, terminationReason = entry.engine.context.result.map { - case GameResult.Win(_, WinReason.Checkmate) => "checkmate" - case GameResult.Win(_, WinReason.Resignation) => "resignation" - case GameResult.Win(_, WinReason.TimeControl) => "timeout" - case GameResult.Draw(DrawReason.Stalemate) => "stalemate" - case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material" - case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move" - case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition" - case GameResult.Draw(DrawReason.Agreement) => "agreement" + case GameResult.Win(_, WinReason.Checkmate) => "checkmate" + case GameResult.Win(_, WinReason.Resignation) => "resignation" + case GameResult.Win(_, WinReason.TimeControl) => "timeout" + case GameResult.Draw(DrawReason.Stalemate) => "stalemate" + case GameResult.Draw(DrawReason.InsufficientMaterial) => "insufficient_material" + case GameResult.Draw(DrawReason.FiftyMoveRule) => "fifty_move" + case GameResult.Draw(DrawReason.ThreefoldRepetition) => "repetition" + case GameResult.Draw(DrawReason.Agreement) => "agreement" }, redoStack = entry.engine.redoStackMoves.map(GameDtoMapper.moveToUci), pendingTakebackRequest = entry.engine.pendingTakebackRequestBy.map(_.label.toLowerCase), diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index 60b78ee..05888ea 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -14,6 +14,7 @@ import io.quarkus.redis.datasource.RedisDataSource import io.quarkus.redis.datasource.pubsub.PubSubCommands import jakarta.annotation.PreDestroy import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.inject.Instance import jakarta.inject.Inject import scala.compiletime.uninitialized import scala.util.Try @@ -24,14 +25,18 @@ import java.util.function.Consumer class GameRedisSubscriberManager: // scalafix:off DisableSyntax.var - @Inject var redis: RedisDataSource = uninitialized - @Inject var registry: GameRegistry = uninitialized - @Inject var objectMapper: ObjectMapper = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized - @Inject var ioClient: IoGrpcClientWrapper = uninitialized - @Inject var heartbeatService: InstanceHeartbeatService = null + @Inject var redis: RedisDataSource = uninitialized + @Inject var registry: GameRegistry = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var ioClient: IoGrpcClientWrapper = uninitialized + @Inject var heartbeatServiceInstance: Instance[InstanceHeartbeatService] = uninitialized // scalafix:on DisableSyntax.var + private def heartbeatServiceOpt: Option[InstanceHeartbeatService] = + if heartbeatServiceInstance.isUnsatisfied then None + else Some(heartbeatServiceInstance.get()) + private val c2sListeners = new ConcurrentHashMap[String, PubSubCommands.RedisSubscriber]() private val s2cObservers = new ConcurrentHashMap[String, Observer]() @@ -44,7 +49,7 @@ class GameRedisSubscriberManager: def subscribeGame(gameId: String): Unit = try val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg) - val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler) + val subscriber = redis.pubsub(classOf[String]).subscribe(c2sTopic(gameId), handler) c2sListeners.put(gameId, subscriber) val writebackFn: String => Unit = json => redis.pubsub(classOf[String]).publish("game-writeback", json) @@ -61,7 +66,7 @@ class GameRedisSubscriberManager: s2cObservers.put(gameId, obs) registry.get(gameId).foreach(_.engine.subscribe(obs)) - if heartbeatService != null then heartbeatService.addGameSubscription(gameId) + heartbeatServiceOpt.foreach(_.addGameSubscription(gameId)) catch case e: Exception => System.err.println(s"Warning: Redis subscription failed for game $gameId: ${e.getMessage}") @@ -75,7 +80,7 @@ class GameRedisSubscriberManager: registry.get(gameId).foreach(_.engine.unsubscribe(obs)) } - if heartbeatService != null then heartbeatService.removeGameSubscription(gameId) + heartbeatServiceOpt.foreach(_.removeGameSubscription(gameId)) private def handleC2sMessage(gameId: String, msg: String): Unit = parseC2sMessage(msg) match @@ -121,28 +126,16 @@ class GameRedisSubscriberManager: } def batchResubscribeGames(gameIds: java.util.List[String]): Int = - var count = 0 - gameIds.forEach { gameId => - subscribeGame(gameId) - count += 1 - } - count + gameIds.forEach(subscribeGame) + gameIds.size() def unsubscribeGames(gameIds: java.util.List[String]): Int = - var count = 0 - gameIds.forEach { gameId => - unsubscribeGame(gameId) - count += 1 - } - count + gameIds.forEach(unsubscribeGame) + gameIds.size() def evictGames(gameIds: java.util.List[String]): Int = - var count = 0 - gameIds.forEach { gameId => - unsubscribeGame(gameId) - count += 1 - } - count + gameIds.forEach(unsubscribeGame) + gameIds.size() def drainInstance(): Int = val gameIds = new java.util.ArrayList(c2sListeners.keySet()) diff --git a/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala b/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala index 1d9aead..2a83dd6 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala @@ -82,8 +82,7 @@ class GameResource: private def assertIsNotBot(): Unit = val botType = Option(jwt.getClaim[AnyRef]("type")).map(_.toString).getOrElse("") - if Set("bot", "official-bot").contains(botType) then - throw ForbiddenException("Only bots can make moves") + if Set("bot", "official-bot").contains(botType) then throw ForbiddenException("Only bots can make moves") // scalafix:on DisableSyntax.throw diff --git a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala index af4ddda..a98bcc7 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala @@ -20,6 +20,7 @@ import io.grpc.Channel @ApplicationScoped class InstanceHeartbeatService: + // scalafix:off DisableSyntax.var @Inject private var redis: RedisDataSource = uninitialized @@ -36,51 +37,46 @@ class InstanceHeartbeatService: private var coordinatorEnabled: Boolean = true private var coordinatorStub: CoordinatorServiceStub = uninitialized - - private val log = Logger.getLogger(classOf[InstanceHeartbeatService]) - private val mapper = ObjectMapper() - - private var instanceId = "" - private var redisPrefix = "nowchess" + private val log = Logger.getLogger(classOf[InstanceHeartbeatService]) + private val mapper = ObjectMapper() + private var instanceId = "" + private var redisPrefix = "nowchess" private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None private var heartbeatExecutor = Executors.newScheduledThreadPool(1) private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1) private var subscriptionCount = 0 private var localCacheSize = 0 - private var serviceActive = false - private var shuttingDown = false + private var serviceActive = false + private var shuttingDown = false + // scalafix:on DisableSyntax.var def onStart(@Observes event: StartupEvent): Unit = - if !coordinatorEnabled then - log.info("Coordinator support disabled via config; skipping heartbeat service startup") - return - - try - shuttingDown = false - generateInstanceId() - initializeHeartbeatStream() - scheduleHeartbeats() - serviceActive = true - log.infof("Instance heartbeat service started with ID: %s", instanceId) - catch - case ex: Exception => - serviceActive = false - log.errorf(ex, "Failed to start instance heartbeat service") + if coordinatorEnabled then + try + shuttingDown = false + generateInstanceId() + initializeHeartbeatStream() + scheduleHeartbeats() + serviceActive = true + log.infof("Instance heartbeat service started with ID: %s", instanceId) + catch + case ex: Exception => + serviceActive = false + log.errorf(ex, "Failed to start instance heartbeat service") + else log.info("Coordinator support disabled via config; skipping heartbeat service startup") def onShutdown(@Observes event: ShutdownEvent): Unit = shuttingDown = true - if !serviceActive then - log.info("Instance heartbeat service stopped") - return - - try - cleanup() - serviceActive = false - log.info("Instance heartbeat service stopped") - catch - case ex: Exception => - log.errorf(ex, "Error during heartbeat service shutdown") + if serviceActive then + try + cleanup() + serviceActive = false + log.info("Instance heartbeat service stopped") + catch + case ex: Exception => + log.errorf(ex, "Error during heartbeat service shutdown") + else log.info("Instance heartbeat service stopped") def setRedisPrefix(prefix: String): Unit = redisPrefix = prefix @@ -92,18 +88,16 @@ class InstanceHeartbeatService: localCacheSize = count def addGameSubscription(gameId: String): Unit = - if !coordinatorEnabled then return - - val setKey = s"$redisPrefix:instance:$instanceId:games" - redis.set(classOf[String]).sadd(setKey, gameId) - subscriptionCount += 1 + if coordinatorEnabled then + val setKey = s"$redisPrefix:instance:$instanceId:games" + redis.set(classOf[String]).sadd(setKey, gameId) + subscriptionCount += 1 def removeGameSubscription(gameId: String): Unit = - if !coordinatorEnabled then return - - val setKey = s"$redisPrefix:instance:$instanceId:games" - redis.set(classOf[String]).srem(setKey, gameId) - subscriptionCount = Math.max(0, subscriptionCount - 1) + if coordinatorEnabled then + val setKey = s"$redisPrefix:instance:$instanceId:games" + redis.set(classOf[String]).srem(setKey, gameId) + subscriptionCount = Math.max(0, subscriptionCount - 1) private def generateInstanceId(): Unit = val hostname = @@ -137,7 +131,6 @@ class InstanceHeartbeatService: streamObserver = None private def scheduleHeartbeats(): Unit = - // Send heartbeat every 200ms heartbeatExecutor.scheduleAtFixedRate( () => sendHeartbeat(), 0, @@ -145,7 +138,6 @@ class InstanceHeartbeatService: TimeUnit.MILLISECONDS, ) - // Refresh Redis TTL every 2s redisHeartbeatExecutor.scheduleAtFixedRate( () => refreshRedisHeartbeat(), 0, diff --git a/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala b/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala index c567020..17a17b6 100644 --- a/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala +++ b/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala @@ -9,7 +9,6 @@ import io.quarkus.grpc.GrpcService import scala.jdk.CollectionConverters.* -// scalafix:off DisableSyntax.throw @GrpcService class IoGrpcService extends IoServiceGrpc.IoServiceImplBase: @@ -59,4 +58,3 @@ class IoGrpcService extends IoServiceGrpc.IoServiceImplBase: private def respond[T](obs: StreamObserver[T], value: T): Unit = obs.onNext(value) obs.onCompleted() -// scalafix:on DisableSyntax.throw diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/bots/ClassicalBot.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/bots/ClassicalBot.scala index c5e638a..ed4427a 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/bots/ClassicalBot.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/bots/ClassicalBot.scala @@ -11,9 +11,9 @@ import de.nowchess.rules.sets.DefaultRules object ClassicalBot: def apply( - difficulty: BotDifficulty, - rules: RuleSet = DefaultRules, - book: Option[PolyglotBook] = None, + difficulty: BotDifficulty, + rules: RuleSet = DefaultRules, + book: Option[PolyglotBook] = None, ): Bot = val search = AlphaBetaSearch(rules, weights = EvaluationClassic) val timeBudgetMs = 1000L diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/bots/HybridBot.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/bots/HybridBot.scala index 0184847..2884e88 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/bots/HybridBot.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/bots/HybridBot.scala @@ -14,12 +14,12 @@ import de.nowchess.rules.sets.DefaultRules object HybridBot: def apply( - difficulty: BotDifficulty, - rules: RuleSet = DefaultRules, - book: Option[PolyglotBook] = None, - nnueEvaluation: Evaluation = EvaluationNNUE, - classicalEvaluation: Evaluation = EvaluationClassic, - vetoReporter: String => Unit = println(_), + difficulty: BotDifficulty, + rules: RuleSet = DefaultRules, + book: Option[PolyglotBook] = None, + nnueEvaluation: Evaluation = EvaluationNNUE, + classicalEvaluation: Evaluation = EvaluationClassic, + vetoReporter: String => Unit = println(_), ): Bot = val search = AlphaBetaSearch(rules, TranspositionTable(), classicalEvaluation) context => diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/bots/NNUEBot.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/bots/NNUEBot.scala index d61a46e..4a27000 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/bots/NNUEBot.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/bots/NNUEBot.scala @@ -12,9 +12,9 @@ import de.nowchess.rules.sets.DefaultRules object NNUEBot: def apply( - difficulty: BotDifficulty, - rules: RuleSet = DefaultRules, - book: Option[PolyglotBook] = None, + difficulty: BotDifficulty, + rules: RuleSet = DefaultRules, + book: Option[PolyglotBook] = None, ): Bot = val search = AlphaBetaSearch(rules, weights = EvaluationNNUE) context => diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/resource/OfficialBotChallengeResource.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/resource/OfficialBotChallengeResource.scala index fd68c9d..7ec7ab6 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/resource/OfficialBotChallengeResource.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/resource/OfficialBotChallengeResource.scala @@ -16,16 +16,18 @@ class OfficialBotChallengeResource: @POST @Path("/{botId}") def challengeWithDifficulty( - @PathParam("botId") botId: String, - @QueryParam("difficulty") difficulty: Int + @PathParam("botId") botId: String, + @QueryParam("difficulty") difficulty: Int, ): Response = DifficultyMapper.fromElo(difficulty) match case None => - Response.status(Response.Status.BAD_REQUEST) + Response + .status(Response.Status.BAD_REQUEST) .entity(s"""{"error":"difficulty must be between 1000 and 2800"}""") .build() case Some(botDifficulty) => // TODO: wire to account service challenge creation + bot routing - Response.status(Response.Status.CREATED) + Response + .status(Response.Status.CREATED) .entity(s"""{"botId":"$botId","difficulty":$difficulty,"status":"pending"}""") .build() diff --git a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala index 4f40e0c..b797bf1 100644 --- a/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala +++ b/modules/official-bots/src/main/scala/de/nowchess/bot/service/OfficialBotService.scala @@ -46,7 +46,13 @@ class OfficialBotService: watchGame(botName, gameId, playingAs, difficulty, botAccountId) catch case _: Exception => () - private def watchGame(botName: String, gameId: String, playingAs: String, difficulty: Int, botAccountId: String): Unit = + private def watchGame( + botName: String, + gameId: String, + playingAs: String, + difficulty: Int, + botAccountId: String, + ): Unit = val handler: Consumer[String] = msg => handleGameEvent(botName, gameId, playingAs, difficulty, botAccountId, msg) redis.pubsub(classOf[String]).subscribe(s"${redisConfig.prefix}:game:$gameId:s2c", handler) () @@ -69,7 +75,13 @@ class OfficialBotService: computeAndSendMove(botName, gameId, fen, difficulty, botAccountId) catch case _: Exception => () - private def computeAndSendMove(botName: String, gameId: String, fen: String, difficulty: Int, botAccountId: String): Unit = + private def computeAndSendMove( + botName: String, + gameId: String, + fen: String, + difficulty: Int, + botAccountId: String, + ): Unit = val level = DifficultyMapper.fromElo(difficulty).getOrElse(BotDifficulty.Medium) botController.getBot(botName).orElse(botController.getBot(level.toString.toLowerCase)).foreach { bot => FenParser.parseFen(fen).toOption.foreach { context => diff --git a/modules/official-bots/src/test/scala/de/nowchess/bot/ClassicalBotTest.scala b/modules/official-bots/src/test/scala/de/nowchess/bot/ClassicalBotTest.scala index 71b070e..8e33a8f 100644 --- a/modules/official-bots/src/test/scala/de/nowchess/bot/ClassicalBotTest.scala +++ b/modules/official-bots/src/test/scala/de/nowchess/bot/ClassicalBotTest.scala @@ -11,7 +11,6 @@ import org.scalatest.matchers.should.Matchers import de.nowchess.rules.sets.DefaultRules class ClassicalBotTest extends AnyFunSuite with Matchers: - test("nextMove on initial position returns a move"): val bot = ClassicalBot(BotDifficulty.Easy) diff --git a/modules/rule/build.gradle.kts b/modules/rule/build.gradle.kts index 91455b5..4297761 100644 --- a/modules/rule/build.gradle.kts +++ b/modules/rule/build.gradle.kts @@ -127,4 +127,5 @@ tasks.withType(ScalaCompile::class).configureEach { tasks.named("compileScoverageJava").configure { dependsOn(tasks.named("quarkusGenerateCode")) + dependsOn(tasks.named("compileQuarkusGeneratedSourcesJava")) } diff --git a/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala b/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala index 44771b5..4649735 100644 --- a/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala +++ b/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala @@ -25,7 +25,7 @@ class RuleGrpcService extends RuleServiceGrpc.RuleServiceImplBase: resp.onNext( ProtoMoveList .newBuilder() - .addAllMoves(moves.map(ProtoMapper.toProtoMove).asInstanceOf[java.util.List[ProtoMove]]) + .addAllMoves(toJavaMoveList(moves)) .build(), ) resp.onCompleted() diff --git a/modules/security/src/main/scala/de/nowchess/security/InternalAuthFilter.scala b/modules/security/src/main/scala/de/nowchess/security/InternalAuthFilter.scala index c2cf102..d713f42 100644 --- a/modules/security/src/main/scala/de/nowchess/security/InternalAuthFilter.scala +++ b/modules/security/src/main/scala/de/nowchess/security/InternalAuthFilter.scala @@ -22,6 +22,6 @@ class InternalAuthFilter extends ContainerRequestFilter: override def filter(ctx: ContainerRequestContext): Unit = if authEnabled then - val header = ctx.getHeaderString("X-Internal-Secret") - if header == null || header != secret then + val header = Option(ctx.getHeaderString("X-Internal-Secret")) + if header.isEmpty || header.get.equals(secret) then ctx.abortWith(Response.status(Response.Status.UNAUTHORIZED).build()) diff --git a/modules/security/src/main/scala/de/nowchess/security/InternalGrpcAuthInterceptor.scala b/modules/security/src/main/scala/de/nowchess/security/InternalGrpcAuthInterceptor.scala index 1b2b698..9f1a7e8 100644 --- a/modules/security/src/main/scala/de/nowchess/security/InternalGrpcAuthInterceptor.scala +++ b/modules/security/src/main/scala/de/nowchess/security/InternalGrpcAuthInterceptor.scala @@ -29,5 +29,4 @@ class InternalGrpcAuthInterceptor extends ServerInterceptor: if authEnabled && token != secret then call.close(Status.UNAUTHENTICATED.withDescription("Missing or invalid internal secret"), new Metadata()) new ServerCall.Listener[Req] {} - else - next.startCall(call, headers) + else next.startCall(call, headers) diff --git a/modules/store/src/main/scala/de/nowchess/store/config/JacksonConfig.scala b/modules/store/src/main/scala/de/nowchess/store/config/JacksonConfig.scala index c6c5c16..5042bb2 100644 --- a/modules/store/src/main/scala/de/nowchess/store/config/JacksonConfig.scala +++ b/modules/store/src/main/scala/de/nowchess/store/config/JacksonConfig.scala @@ -15,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer: new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala") // scalafix:on DisableSyntax.null }) - diff --git a/modules/store/src/main/scala/de/nowchess/store/config/NativeReflectionConfig.scala b/modules/store/src/main/scala/de/nowchess/store/config/NativeReflectionConfig.scala index 6e552a0..16e06c7 100644 --- a/modules/store/src/main/scala/de/nowchess/store/config/NativeReflectionConfig.scala +++ b/modules/store/src/main/scala/de/nowchess/store/config/NativeReflectionConfig.scala @@ -11,4 +11,3 @@ import io.quarkus.runtime.annotations.RegisterForReflection ), ) class NativeReflectionConfig - diff --git a/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala b/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala index f012685..e180f96 100644 --- a/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala +++ b/modules/store/src/main/scala/de/nowchess/store/repository/GameRecordRepository.scala @@ -36,9 +36,9 @@ class GameRecordRepository: def findByPlayerIdRunning(playerId: String, offset: Int, limit: Int): List[GameRecord] = em.createQuery( - "SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC", - classOf[GameRecord], - ).setParameter("id", playerId) + "SELECT g FROM GameRecord g WHERE g.whiteId = :id OR g.blackId = :id AND g.result = null ORDER BY g.updatedAt DESC", + classOf[GameRecord], + ).setParameter("id", playerId) .setFirstResult(offset) .setMaxResults(limit) .getResultList diff --git a/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala b/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala index 35b3331..c4f8457 100644 --- a/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala +++ b/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala @@ -28,10 +28,10 @@ class StoreGameResource: @Path("/running/{playerId}") @Produces(Array(MediaType.APPLICATION_JSON)) def getRunning( - @PathParam("playerId") playerId: String, - @QueryParam("offset") @DefaultValue("0") offset: Int, - @QueryParam("limit") @DefaultValue("20") limit: Int, - ): Response = + @PathParam("playerId") playerId: String, + @QueryParam("offset") @DefaultValue("0") offset: Int, + @QueryParam("limit") @DefaultValue("20") limit: Int, + ): Response = Response.ok(repository.findByPlayerIdRunning(playerId, offset, limit)).build() @GET diff --git a/modules/ws/src/main/scala/de/nowchess/ws/config/JacksonConfig.scala b/modules/ws/src/main/scala/de/nowchess/ws/config/JacksonConfig.scala index eee4318..3b8adaf 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/config/JacksonConfig.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/config/JacksonConfig.scala @@ -15,4 +15,3 @@ class JacksonConfig extends ObjectMapperCustomizer: new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala") // scalafix:on DisableSyntax.null }) - diff --git a/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala b/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala index 3821a07..204502c 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala @@ -35,7 +35,7 @@ class GameWebSocketResource: @OnOpen def onOpen(connection: WebSocketConnection, handshake: HandshakeRequest): Unit = - val gameId = connection.pathParam("gameId") + val gameId = connection.pathParam("gameId") val playerId = Option(handshake.header("Authorization")) .filter(_.nonEmpty) .flatMap(token => Try(jwtParser.parse(token)).toOption) diff --git a/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala b/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala index 9fb3cb8..0c68a7a 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/resource/UserWebSocketResource.scala @@ -41,7 +41,7 @@ class UserWebSocketResource: case None => connection.close().subscribe().`with`(_ => (), _ => ()) case Some(userId) => val handler: Consumer[String] = msg => connection.sendText(msg).subscribe().`with`(_ => (), _ => ()) - val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler) + val subscriber = redis.pubsub(classOf[String]).subscribe(userTopic(userId), handler) connections.put(connection.id(), (userId, subscriber)) val connectedMsg = s"""{"type":"CONNECTED","userId":"$userId"}""" connection.sendText(connectedMsg).subscribe().`with`(_ => (), _ => ())