feat(coordinator): enhance instance management with game migration count and configurable ports

This commit is contained in:
2026-04-26 20:51:17 +02:00
parent 57e6e5d200
commit 64d5afa4d1
4 changed files with 19 additions and 11 deletions
@@ -426,7 +426,7 @@ class GameEngine(
if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext)) if currentContext.halfMoveClock >= 100 then notifyObservers(FiftyMoveRuleAvailableEvent(currentContext))
if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext)) if status.isThreefoldRepetition then notifyObservers(ThreefoldRepetitionAvailableEvent(currentContext))
else requestBotMoveIfNeeded() requestBotMoveIfNeeded()
private def translateMoveToNotation(move: Move, boardBefore: Board): String = private def translateMoveToNotation(move: Move, boardBefore: Board): String =
move.moveType match move.moveType match
@@ -55,10 +55,10 @@ class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServic
request: DrainInstanceRequest, request: DrainInstanceRequest,
responseObserver: StreamObserver[DrainInstanceResponse], responseObserver: StreamObserver[DrainInstanceResponse],
): Unit = ): Unit =
gameSubscriberManager.drainInstance() val migrated = gameSubscriberManager.drainInstance()
val response = DrainInstanceResponse val response = DrainInstanceResponse
.newBuilder() .newBuilder()
.setGamesMigrated(0) .setGamesMigrated(migrated)
.build() .build()
responseObserver.onNext(response) responseObserver.onNext(response)
responseObserver.onCompleted() responseObserver.onCompleted()
@@ -133,9 +133,11 @@ class GameRedisSubscriberManager:
} }
count count
def drainInstance(): Unit = def drainInstance(): Int =
val gameIds = new java.util.ArrayList(c2sListeners.keySet()) val gameIds = new java.util.ArrayList(c2sListeners.keySet())
val count = gameIds.size()
gameIds.forEach(unsubscribeGame) gameIds.forEach(unsubscribeGame)
count
@PreDestroy @PreDestroy
def cleanup(): Unit = def cleanup(): Unit =
@@ -6,9 +6,9 @@ import jakarta.inject.Inject
import io.quarkus.runtime.StartupEvent import io.quarkus.runtime.StartupEvent
import io.quarkus.runtime.ShutdownEvent import io.quarkus.runtime.ShutdownEvent
import io.quarkus.grpc.GrpcClient import io.quarkus.grpc.GrpcClient
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.redisson.api.RedissonClient import org.redisson.api.RedissonClient
import scala.annotation.nowarn import scala.annotation.nowarn
import scala.concurrent.duration.*
import scala.compiletime.uninitialized import scala.compiletime.uninitialized
import java.util.concurrent.{Executors, TimeUnit} import java.util.concurrent.{Executors, TimeUnit}
import java.net.InetAddress import java.net.InetAddress
@@ -18,7 +18,6 @@ import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *}
import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub
import io.grpc.stub.StreamObserver import io.grpc.stub.StreamObserver
import io.grpc.Channel import io.grpc.Channel
import scala.jdk.FutureConverters.*
@ApplicationScoped @ApplicationScoped
class InstanceHeartbeatService: class InstanceHeartbeatService:
@@ -28,6 +27,12 @@ class InstanceHeartbeatService:
@GrpcClient("coordinator-grpc") @GrpcClient("coordinator-grpc")
private var channel: Channel = uninitialized private var channel: Channel = uninitialized
@ConfigProperty(name = "quarkus.http.port", defaultValue = "8080")
private var httpPort: Int = 0
@ConfigProperty(name = "quarkus.grpc.server.port", defaultValue = "9000")
private var grpcPort: Int = 0
private var coordinatorStub: CoordinatorServiceStub = uninitialized private var coordinatorStub: CoordinatorServiceStub = uninitialized
private val log = Logger.getLogger(classOf[InstanceHeartbeatService]) private val log = Logger.getLogger(classOf[InstanceHeartbeatService])
@@ -97,7 +102,8 @@ class InstanceHeartbeatService:
override def onError(t: Throwable): Unit = override def onError(t: Throwable): Unit =
log.warnf(t, "Heartbeat stream error") log.warnf(t, "Heartbeat stream error")
() // Placeholder for reconnect logic streamObserver = None
heartbeatExecutor.schedule((() => initializeHeartbeatStream()): Runnable, 5, TimeUnit.SECONDS)
override def onCompleted: Unit = override def onCompleted: Unit =
log.info("Heartbeat stream completed") log.info("Heartbeat stream completed")
@@ -133,8 +139,8 @@ class InstanceHeartbeatService:
.newBuilder() .newBuilder()
.setInstanceId(instanceId) .setInstanceId(instanceId)
.setHostname(getHostname) .setHostname(getHostname)
.setHttpPort(8080) // Placeholder, should be configurable .setHttpPort(httpPort)
.setGrpcPort(9080) // Placeholder .setGrpcPort(grpcPort)
.setSubscriptionCount(subscriptionCount) .setSubscriptionCount(subscriptionCount)
.setLocalCacheSize(localCacheSize) .setLocalCacheSize(localCacheSize)
.setTimestampMillis(System.currentTimeMillis()) .setTimestampMillis(System.currentTimeMillis())
@@ -153,8 +159,8 @@ class InstanceHeartbeatService:
val metadata = Map( val metadata = Map(
"instanceId" -> instanceId, "instanceId" -> instanceId,
"hostname" -> getHostname, "hostname" -> getHostname,
"httpPort" -> 8080, "httpPort" -> httpPort,
"grpcPort" -> 9080, "grpcPort" -> grpcPort,
"subscriptionCount" -> subscriptionCount, "subscriptionCount" -> subscriptionCount,
"localCacheSize" -> localCacheSize, "localCacheSize" -> localCacheSize,
"lastHeartbeat" -> java.time.Instant.now().toString, "lastHeartbeat" -> java.time.Instant.now().toString,