From 64d5afa4d1529c98f486d59dcb770f119bad7823 Mon Sep 17 00:00:00 2001 From: Janis Date: Sun, 26 Apr 2026 20:51:17 +0200 Subject: [PATCH] feat(coordinator): enhance instance management with game migration count and configurable ports --- .../de/nowchess/chess/engine/GameEngine.scala | 2 +- .../grpc/CoordinatorServiceHandler.scala | 4 ++-- .../redis/GameRedisSubscriberManager.scala | 4 +++- .../service/InstanceHeartbeatService.scala | 20 ++++++++++++------- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala index 291dee5..3aaea33 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala @@ -426,7 +426,7 @@ class GameEngine( if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext)) if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext)) - else requestBotMoveIfNeeded() + requestBotMoveIfNeeded() private def translateMoveToNotation(move: Move, boardBefore: Board): String = move.moveType match diff --git a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala index c258313..c5cd4a9 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala @@ -55,10 +55,10 @@ class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServic request: DrainInstanceRequest, responseObserver: StreamObserver[DrainInstanceResponse], ): Unit = - gameSubscriberManager.drainInstance() + val migrated = gameSubscriberManager.drainInstance() val response = DrainInstanceResponse .newBuilder() - .setGamesMigrated(0) + .setGamesMigrated(migrated) .build() responseObserver.onNext(response) responseObserver.onCompleted() diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index 22ac44e..cdd1b6d 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -133,9 +133,11 @@ class GameRedisSubscriberManager: } count - def drainInstance(): Unit = + def drainInstance(): Int = val gameIds = new java.util.ArrayList(c2sListeners.keySet()) + val count = gameIds.size() gameIds.forEach(unsubscribeGame) + count @PreDestroy def cleanup(): Unit = diff --git a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala index 573bbdb..dfb5bd0 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala @@ -6,9 +6,9 @@ import jakarta.inject.Inject import io.quarkus.runtime.StartupEvent import io.quarkus.runtime.ShutdownEvent import io.quarkus.grpc.GrpcClient +import org.eclipse.microprofile.config.inject.ConfigProperty import org.redisson.api.RedissonClient import scala.annotation.nowarn -import scala.concurrent.duration.* import scala.compiletime.uninitialized import java.util.concurrent.{Executors, TimeUnit} import java.net.InetAddress @@ -18,7 +18,6 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *} import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub import io.grpc.stub.StreamObserver import io.grpc.Channel -import scala.jdk.FutureConverters.* @ApplicationScoped class InstanceHeartbeatService: @@ -28,6 +27,12 @@ class InstanceHeartbeatService: @GrpcClient("coordinator-grpc") private var channel: Channel = uninitialized + @ConfigProperty(name = "quarkus.http.port", defaultValue = "8080") + private var httpPort: Int = 0 + + @ConfigProperty(name = "quarkus.grpc.server.port", defaultValue = "9000") + private var grpcPort: Int = 0 + private var coordinatorStub: CoordinatorServiceStub = uninitialized private val log = Logger.getLogger(classOf[InstanceHeartbeatService]) @@ -97,7 +102,8 @@ class InstanceHeartbeatService: override def onError(t: Throwable): Unit = log.warnf(t, "Heartbeat stream error") - () // Placeholder for reconnect logic + streamObserver = None + heartbeatExecutor.schedule((() => initializeHeartbeatStream()): Runnable, 5, TimeUnit.SECONDS) override def onCompleted: Unit = log.info("Heartbeat stream completed") @@ -133,8 +139,8 @@ class InstanceHeartbeatService: .newBuilder() .setInstanceId(instanceId) .setHostname(getHostname) - .setHttpPort(8080) // Placeholder, should be configurable - .setGrpcPort(9080) // Placeholder + .setHttpPort(httpPort) + .setGrpcPort(grpcPort) .setSubscriptionCount(subscriptionCount) .setLocalCacheSize(localCacheSize) .setTimestampMillis(System.currentTimeMillis()) @@ -153,8 +159,8 @@ class InstanceHeartbeatService: val metadata = Map( "instanceId" -> instanceId, "hostname" -> getHostname, - "httpPort" -> 8080, - "grpcPort" -> 9080, + "httpPort" -> httpPort, + "grpcPort" -> grpcPort, "subscriptionCount" -> subscriptionCount, "localCacheSize" -> localCacheSize, "lastHeartbeat" -> java.time.Instant.now().toString,