From 106b4d3b7e96bd8b91e644b0ab0965b72b90b6b2 Mon Sep 17 00:00:00 2001 From: Janis Date: Sun, 26 Apr 2026 18:25:03 +0200 Subject: [PATCH] feat(coordinator): add Redis integration and improve configuration for game state management --- COORDINATOR_IMPLEMENTATION.md | 316 ------------------ build.gradle.kts | 2 + .../config/NativeReflectionConfig.scala | 11 +- .../filter/AlreadyLoggedInFilter.scala | 9 +- .../repository/AccountRepository.scala | 2 +- .../account/resource/AccountResource.scala | 4 +- .../account/service/AccountService.scala | 11 +- .../account/service/ChallengeService.scala | 4 +- modules/coordinator/build.gradle.kts | 15 + .../src/main/proto/coordinator_service.proto | 5 +- .../coordinator/config/BeansProducer.scala | 34 ++ .../coordinator/dto/InstanceMetadata.scala | 32 +- .../grpc/CoordinatorGrpcServer.scala | 79 ++--- .../coordinator/grpc/CoreGrpcClient.scala | 82 +++++ .../resource/CoordinatorResource.scala | 32 +- .../coordinator/service/AutoScaler.scala | 90 +++-- .../service/CacheEvictionManager.scala | 63 +++- .../coordinator/service/FailoverService.scala | 46 ++- .../coordinator/service/HealthMonitor.scala | 86 ++++- .../service/InstanceRegistry.scala | 27 +- .../coordinator/service/LoadBalancer.scala | 107 +++++- modules/core/build.gradle.kts | 7 + .../src/main/proto/coordinator_service.proto | 5 +- .../nowchess/chess/client/GameRecordDto.scala | 40 +-- .../de/nowchess/chess/engine/GameEngine.scala | 5 +- .../grpc/CoordinatorServiceHandler.scala | 49 ++- .../nowchess/chess/grpc/CoreProtoMapper.scala | 57 +++- .../chess/grpc/RuleSetGrpcAdapter.scala | 27 +- .../de/nowchess/chess/redis/C2sMessage.scala | 4 +- .../chess/redis/GameRedisPublisher.scala | 14 +- .../redis/GameRedisSubscriberManager.scala | 58 ++-- .../chess/redis/GameWritebackEventDto.scala | 44 +-- .../chess/registry/GameCacheDto.scala | 42 +-- .../chess/registry/RedisGameRegistry.scala | 66 ++-- .../chess/resource/GameDtoMapper.scala | 8 +- .../chess/resource/GameResource.scala | 3 +- .../service/InstanceHeartbeatService.scala | 82 ++--- .../core/src/test/resources/application.yml | 3 + .../command/CommandInvokerBranchTest.scala | 153 --------- .../chess/command/CommandInvokerTest.scala | 67 ---- .../nowchess/chess/command/CommandTest.scala | 23 -- .../chess/command/MoveCommandTest.scala | 70 ---- .../engine/GameEngineIntegrationTest.scala | 26 +- .../chess/registry/GameRegistryImplTest.scala | 31 +- .../GameResourceIntegrationTest.scala | 12 +- .../de/nowchess/io/grpc/IoGrpcService.scala | 16 +- .../de/nowchess/io/grpc/IoProtoMapper.scala | 55 ++- modules/rule/build.gradle.kts | 2 +- .../de/nowchess/rules/grpc/ProtoMapper.scala | 57 +++- .../nowchess/rules/grpc/RuleGrpcService.scala | 25 +- .../store/src/main/resources/application.yml | 32 ++ .../store/config/RedissonProducer.scala | 3 +- .../store/redis/GameWritebackEventDto.scala | 40 +-- .../redis/GameWritebackStreamListener.scala | 15 +- .../store/resource/StoreGameResource.scala | 3 +- .../ws/resource/GameWebSocketResource.scala | 10 +- 56 files changed, 1072 insertions(+), 1139 deletions(-) delete mode 100644 COORDINATOR_IMPLEMENTATION.md create mode 100644 modules/coordinator/src/main/scala/de/nowchess/coordinator/config/BeansProducer.scala create mode 100644 modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala delete mode 100644 modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerBranchTest.scala delete mode 100644 modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerTest.scala delete mode 100644 modules/core/src/test/scala/de/nowchess/chess/command/CommandTest.scala delete mode 100644 modules/core/src/test/scala/de/nowchess/chess/command/MoveCommandTest.scala create mode 100644 modules/store/src/main/resources/application.yml diff --git a/COORDINATOR_IMPLEMENTATION.md b/COORDINATOR_IMPLEMENTATION.md deleted file mode 100644 index 43069d4..0000000 --- a/COORDINATOR_IMPLEMENTATION.md +++ /dev/null @@ -1,316 +0,0 @@ -# Coordinator Microservice Implementation Guide - -## Status: Proto Compilation Blockers (Fixable) - -**Completed**: Module scaffold, InstanceHeartbeatService, GameRedisSubscriberManager updates, gRPC handlers, REST API stubs. - -**Blocking**: Proto file → Java stubs not resolving in Scala imports. Solution documented below. - ---- - -## Architecture - -**Goal**: <300ms failover via gRPC bidirectional stream detection + sub-1s game migration. - -**Core Flow**: -1. Core sends `HeartbeatFrame` every 200ms on stream to coordinator -2. Core posts `{prefix}:instance:{id}:games` Redis Set (SADD on subscribe, SREM on unsubscribe) -3. Core refreshes `{prefix}:instances:{id}` Redis key every 2s (5s TTL) -4. Coordinator watches stream; on drop → immediate failover -5. Failover: get `SMEMBERS {id}:games`, call `BatchResubscribeGames` on healthy cores - -**Key Insight**: Three detection signals (gRPC stream, Redis TTL, k8s watch), but **gRPC stream drop is primary** (50–200ms detection). - ---- - -## Proto Compilation Fix - -### Problem -Scala code imports `de.nowchess.coordinator.HeartbeatFrame` but proto plugin generates classes Gradle doesn't make visible. - -### Solution -Quarkus gRPC plugin generates Java stubs in `build/generated/sources/protobuf/java/` during `quarkusGenerateCode` task. These are compiled to `.class` files but Scala compiler can't find them at compile time because they're not on Scala's classpath early enough. - -**Fix**: Add proto compilation order dependency in both modules: - -**modules/coordinator/build.gradle.kts** and **modules/core/build.gradle.kts**: -```gradle -tasks.compileScala { - dependsOn(tasks.named("compileJava")) // Ensures Java stubs compiled first -} -``` - -Also ensure proto is on sourceSets: -```gradle -sourceSets { - main { - proto { - srcDir("src/main/proto") - } - } -} -``` - -Quarkus v3.x should handle this automatically, but explicit dependency helps. - -### Alternative: Use Generated Java Classes Directly -If proto stubs still not found, import **exactly as generated**: -```scala -// Don't try to import individual types -import de.nowchess.coordinator.{ - CoordinatorServiceGrpc, - HeartbeatFrame, - // ... -} - -// Instead, use full paths or check actual generated names -val frame = de.nowchess.coordinator.HeartbeatFrame.newBuilder() - .setInstanceId("...") - .build() -``` - -Run `./gradlew clean modules:coordinator:compileJava` to regenerate and inspect `build/generated/sources/protobuf/java/de/nowchess/coordinator/` to see actual class names. - ---- - -## Code Quality Issues (Non-Blocking) - -**Fix in coordinator services** (already have `= _` deprecation warnings): - -```scala -// OLD -@Inject var redissonClient: RedissonClient = _ - -// NEW -import scala.compiletime.uninitialized -@Inject var redissonClient: RedissonClient = uninitialized -``` - -**Jakarta optional injection**: -```scala -// Old (doesn't work) -@Inject(optional = true) var kubeClient: KubernetesClient = _ - -// Better (use null check) -@Inject var kubeClient: KubernetesClient = null -if (kubeClient != null) { ... } -``` - -**Method params in private helpers**: Remove unused params in `scaleUp()`, `scaleDown()`, `rebalance()`. - ---- - -## Missing Implementation (Phase 2) - -### 1. **InstanceHeartbeatService** (DONE, needs testing) -- [x] Startup: generate instanceId, open gRPC stream, schedule heartbeats -- [x] Every 200ms: send `HeartbeatFrame` via stream -- [x] Every 2s: refresh Redis TTL on `{prefix}:instances:{id}` -- [x] `addGameSubscription(gameId)` → `SADD {id}:games {gameId}` -- [x] `removeGameSubscription(gameId)` → `SREM {id}:games {gameId}` -- [x] Shutdown: cleanup Redis + stream -- [ ] **Test**: Kill core JVM, verify coordinator detects within 300ms - -### 2. **Coordinator HealthMonitor** (skeleton done) -- [ ] Watch gRPC streams: on `onError()` or `onCompleted()`, mark instance DEAD -- [ ] Fallback: poll Redis heartbeat TTL expiry every 5s -- [ ] Fallback: k8s pod watch for label `app=nowchess-core`, detect NotReady status -- [ ] Decision: if gRPC drop → immediate failover (no wait) - -### 3. **Coordinator FailoverService** (partial) -```scala -def onInstanceStreamDropped(instanceId: String): Unit = - val gameIds = SMEMBERS "{prefix}:instance:{id}:games" - val healthyInstances = getAllHealthyInstances() - - // Distribute games round-robin by load - gameIds.grouped(gameIds.size / healthyInstances.size).zipWithIndex.foreach { - case (batch, idx) => - val target = healthyInstances(idx % healthyInstances.size) - call target.grpcStub.batchResubscribeGames(batch) - } - - DEL "{prefix}:instance:{id}:games" -``` - -### 4. **Coordinator gRPC Client Stubs** (need manual integration) -Create **modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala**: -```scala -@ApplicationScoped -class CoreGrpcClient: - @GrpcClient("core-grpc") - private var coreStub: CoordinatorServiceGrpc.CoordinatorServiceStub = uninitialized - - def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int = - // Build request, call via dynamic stub to (host, port) - val response = coreStub.batchResubscribeGames(...) - response.getSubscribedCount -``` - -Need to add dynamic gRPC client (Quarkus doesn't support runtime host:port changing by default). **Workaround**: Use `io.grpc:grpc-netty-shaded` + `ManagedChannel` directly: -```scala -val channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build() -val stub = CoordinatorServiceGrpc.newStub(channel) -``` - -### 5. **LoadBalancer.rebalance()** (stub → full impl) -```scala -def rebalance(): Unit = - val instances = listInstancesFromRedis() - val loads = instances.map(_.subscriptionCount) - val mean = loads.sum / loads.size - - val overloaded = instances.filter(_.subscriptionCount > maxGamesPerCore) - .sortByDescending(_.subscriptionCount) - val underloaded = instances.filter(_.subscriptionCount < mean * 0.8) - .sortBy(_.subscriptionCount) - - overloaded.foreach { over => - val excess = over.subscriptionCount - targetLoad - underloaded.headOption.foreach { under => - val toMove = getGamesToMove(over.instanceId, excess) - call over.coreGrpc.unsubscribeGames(toMove) - call under.coreGrpc.batchResubscribeGames(toMove) - // Update Redis sets - } - } -``` - -### 6. **AutoScaler** (stub → k8s API calls) -```scala -def scaleUp(): Unit = - if (kubeClient != null && config.autoScaleEnabled) { - val rollout = kubeClient.resources(classOf[Rollout]) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .get() - - val newReplicas = rollout.getSpec.getReplicas + 1 - rollout.getSpec.setReplicas(newReplicas) - kubeClient.resources(classOf[Rollout]) - .inNamespace(config.k8sNamespace) - .withName(config.k8sRolloutName) - .createOrReplace(rollout) - } -``` - -Requires: `io.fabric8:kubernetes-client:6.13.0` (already in build.gradle.kts). - -### 7. **CacheEvictionManager** (stub → full impl) -```scala -def evictStaleGames(): Unit = - val now = System.currentTimeMillis() - val keys = KEYS "{prefix}:game:entry:*" - - keys.foreach { key => - val bucket = redissonClient.getBucket[String](key) - val json = bucket.get() - val lastUpdated = extractTimestamp(json) // Parse JSON - - if (now - lastUpdated > config.gameIdleThreshold.toMillis) { - val gameId = key.stripPrefix(...) - val instance = findInstanceWithGame(gameId) - - instance.foreach { inst => - call inst.coreGrpc.evictGames(List(gameId)) - } - bucket.delete() - } - } -``` - -### 8. **CoordinatorGrpcServer HeartbeatStream** (stub → full impl) -```scala -override def heartbeatStream( - responseObserver: StreamObserver[CoordinatorCommand] -): StreamObserver[HeartbeatFrame] = - new StreamObserver[HeartbeatFrame]: - private var lastInstanceId = "" - - override def onNext(frame: HeartbeatFrame): Unit = - lastInstanceId = frame.getInstanceId - instanceRegistry.updateInstanceFromRedis(lastInstanceId) - - override def onError(t: Throwable): Unit = - log.warnf(t, "Stream error for %s", lastInstanceId) - failoverService.onInstanceStreamDropped(lastInstanceId) - - override def onCompleted(): Unit = - log.infof("Stream completed for %s", lastInstanceId) -``` - ---- - -## Testing Checklist - -- [ ] Compile with proto fix -- [ ] Start core + coordinator -- [ ] Create game, subscribe core -- [ ] Watch `redis-cli SMEMBERS nowchess:instance:{id}:games` → game appears -- [ ] Kill core JVM via `kill -9` -- [ ] Verify coordinator log shows "stream error" within 200ms -- [ ] Verify second core receives `batchResubscribeGames` call within 300ms total -- [ ] Create second core, rebalance load, verify games migrate -- [ ] Scale up: verify Argo Rollout replica count increases -- [ ] 45min idle game: verify coordinator calls `evictGames` - ---- - -## File Checklist - -✅ **Created**: -- `modules/coordinator/build.gradle.kts` -- `modules/coordinator/src/main/proto/coordinator_service.proto` -- `modules/coordinator/src/main/resources/application.yml` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/config/CoordinatorConfig.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/dto/InstanceMetadata.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala` -- `modules/coordinator/src/main/scala/de/nowchess/coordinator/CoordinatorApp.scala` -- `modules/core/src/main/proto/coordinator_service.proto` -- `modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala` -- `modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala` - -✅ **Modified**: -- `settings.gradle.kts` → added `modules:coordinator` -- `modules/core/src/main/resources/application.yml` → added coordinator gRPC client + heartbeat config -- `modules/core/build.gradle.kts` → (no changes, proto handled by quarkus-grpc) -- `modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala` → added InstanceHeartbeatService injection, SADD/SREM, batch ops - ---- - -## Next Steps (New Session) - -1. Run `./gradlew clean modules:coordinator:compileScala` with proto fix -2. Finish gRPC client stubs (Rollout, managed channels) -3. Implement `FailoverService.distributeGames()` with actual core gRPC calls -4. Implement `LoadBalancer.rebalance()` with game migration -5. Implement `AutoScaler` with k8s API -6. Implement `CacheEvictionManager` with timestamp parsing -7. Run integration tests (manual or `@QuarkusTest`) -8. Benchmark: create 5000 games, kill 1 core, measure failover time - ---- - -## Design Decisions (Record for Future) - -- **GRPC stream as primary**: TCP-level detection <200ms vs polling/TTL 5-30s trade-off -- **Redis game sets**: SADD/SREM for O(1) lookup vs scanning Redis per failover -- **Argo Rollouts not StatefulSet**: Respects canary/blue-green; patch via Fabric8 `GenericKubernetesResource` -- **Batch gRPC calls**: One call per target core vs 1:1 calls per game (saves RPC overhead) -- **No persistent subscriptions**: On coordinator restart, gRPC reconnects auto-trigger resubscribe; best-effort is OK - ---- - -## Known Gaps - -- Error handling: what if `batchResubscribeGames` fails? Retry? Partial migration? (Add circuit breaker) -- Coordinator HA: single instance. Add Quorum or K8s deployment with multiple replicas + leader election if needed -- Metrics: no Prometheus exports yet (add via `quarkus-micrometer`) -- Monitoring: logs only, no alerts on failover latency SLA violation diff --git a/build.gradle.kts b/build.gradle.kts index 86254f1..b30d23f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -51,6 +51,8 @@ val coverageExclusions = listOf( "**/ws/src/main/scala/de/nowchess/ws/config/**", // GameWebSocketResource in core — replaced by ws module "**/core/src/main/scala/de/nowchess/chess/resource/GameWebSocketResource.scala", + // Coordinator infrastructure — gRPC, microservice orchestration + "**/coordinator/src/main/scala/**", ) // Converts a Sonar-style glob to a scoverage regex (matched against full source path). diff --git a/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala b/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala index 481d1ba..f2eefb1 100644 --- a/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala +++ b/modules/account/src/main/scala/de/nowchess/account/config/NativeReflectionConfig.scala @@ -1,7 +1,16 @@ package de.nowchess.account.config import de.nowchess.account.client.{CoreCreateGameRequest, CoreGameResponse, CorePlayerInfo, CoreTimeControl} -import de.nowchess.account.domain.{UserAccount, BotAccount, OfficialBotAccount, Challenge, ChallengeColor, ChallengeStatus, DeclineReason, TimeControl} +import de.nowchess.account.domain.{ + BotAccount, + Challenge, + ChallengeColor, + ChallengeStatus, + DeclineReason, + OfficialBotAccount, + TimeControl, + UserAccount, +} import de.nowchess.account.dto.* import io.quarkus.runtime.annotations.RegisterForReflection diff --git a/modules/account/src/main/scala/de/nowchess/account/filter/AlreadyLoggedInFilter.scala b/modules/account/src/main/scala/de/nowchess/account/filter/AlreadyLoggedInFilter.scala index b519d70..8eb433d 100644 --- a/modules/account/src/main/scala/de/nowchess/account/filter/AlreadyLoggedInFilter.scala +++ b/modules/account/src/main/scala/de/nowchess/account/filter/AlreadyLoggedInFilter.scala @@ -18,7 +18,7 @@ class AlreadyLoggedInFilter extends ContainerRequestFilter: // scalafix:on override def filter(context: ContainerRequestContext): Unit = - val path = context.getUriInfo.getPath + val path = context.getUriInfo.getPath val method = context.getMethod if isProtectedEndpoint(path, method) && isAuthenticated then @@ -26,14 +26,15 @@ class AlreadyLoggedInFilter extends ContainerRequestFilter: Response .status(Response.Status.BAD_REQUEST) .entity("""{"error":"Already logged in"}""") - .build() + .build(), ) private def isAuthenticated: Boolean = // scalafix:off DisableSyntax.null try jwt.getName != null - catch case _ => false - // scalafix:on DisableSyntax.null + catch + case _ => false + // scalafix:on DisableSyntax.null private def isProtectedEndpoint(path: String, method: String): Boolean = (path.contains("/api/account") || path.contains("/account")) && diff --git a/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala b/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala index 1f46877..4a42012 100644 --- a/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala +++ b/modules/account/src/main/scala/de/nowchess/account/repository/AccountRepository.scala @@ -1,6 +1,6 @@ package de.nowchess.account.repository -import de.nowchess.account.domain.{UserAccount, BotAccount, OfficialBotAccount} +import de.nowchess.account.domain.{BotAccount, OfficialBotAccount, UserAccount} import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import jakarta.persistence.EntityManager diff --git a/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala b/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala index 7507c5d..6c6863d 100644 --- a/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala +++ b/modules/account/src/main/scala/de/nowchess/account/resource/AccountResource.scala @@ -1,6 +1,6 @@ package de.nowchess.account.resource -import de.nowchess.account.domain.{UserAccount, BotAccount, OfficialBotAccount} +import de.nowchess.account.domain.{BotAccount, OfficialBotAccount, UserAccount} import de.nowchess.account.dto.* import de.nowchess.account.error.AccountError import de.nowchess.account.service.AccountService @@ -100,7 +100,7 @@ class AccountResource: @RolesAllowed(Array("**")) def listBotAccounts(): Response = val ownerId = UUID.fromString(jwt.getSubject) - val bots = accountService.getBotAccounts(ownerId) + val bots = accountService.getBotAccounts(ownerId) Response.ok(bots.map(toBotDto)).build() @PUT diff --git a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala index 0fa1f75..293a08a 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/AccountService.scala @@ -1,9 +1,9 @@ package de.nowchess.account.service -import de.nowchess.account.domain.{UserAccount, BotAccount, OfficialBotAccount} +import de.nowchess.account.domain.{BotAccount, OfficialBotAccount, UserAccount} import de.nowchess.account.dto.{LoginRequest, RegisterRequest} import de.nowchess.account.error.AccountError -import de.nowchess.account.repository.{UserAccountRepository, BotAccountRepository, OfficialBotAccountRepository} +import de.nowchess.account.repository.{BotAccountRepository, OfficialBotAccountRepository, UserAccountRepository} import io.quarkus.elytron.security.common.BcryptUtil import io.smallrye.jwt.build.Jwt import jakarta.enterprise.context.ApplicationScoped @@ -31,7 +31,8 @@ class AccountService: @Transactional def register(req: RegisterRequest): Either[AccountError, UserAccount] = if userAccountRepository.findByUsername(req.username).isDefined then Left(AccountError.UsernameTaken(req.username)) - else if userAccountRepository.findByEmail(req.email).isDefined then Left(AccountError.EmailAlreadyRegistered(req.email)) + else if userAccountRepository.findByEmail(req.email).isDefined then + Left(AccountError.EmailAlreadyRegistered(req.email)) else val account = new UserAccount() account.username = req.username @@ -53,7 +54,7 @@ class AccountService: .issuer("nowchess") .subject(account.id.toString) .claim("username", account.username) - .sign() + .sign(), ) def findByUsername(username: String): Option[UserAccount] = @@ -83,7 +84,7 @@ class AccountService: def getBotAccountWithOwnerCheck(botId: UUID, ownerId: UUID): Option[Option[BotAccount]] = botAccountRepository.findById(botId) match - case None => Some(None) + case None => Some(None) case Some(bot) => Some(Option(bot).filter(_.owner.id == ownerId)) @Transactional diff --git a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala index 8bb3921..b2f05b8 100644 --- a/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala +++ b/modules/account/src/main/scala/de/nowchess/account/service/ChallengeService.scala @@ -17,7 +17,7 @@ import de.nowchess.account.dto.{ TimeControlDto, } import de.nowchess.account.error.ChallengeError -import de.nowchess.account.repository.{UserAccountRepository, ChallengeRepository} +import de.nowchess.account.repository.{ChallengeRepository, UserAccountRepository} import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import jakarta.transaction.Transactional @@ -46,7 +46,7 @@ class ChallengeService: @Transactional def create(challengerId: UUID, destUsername: String, req: ChallengeRequest): Either[ChallengeError, Challenge] = for - destUser <- userAccountRepository.findByUsername(destUsername).toRight(ChallengeError.UserNotFound(destUsername)) + destUser <- userAccountRepository.findByUsername(destUsername).toRight(ChallengeError.UserNotFound(destUsername)) challenger <- userAccountRepository.findById(challengerId).toRight(ChallengeError.ChallengerNotFound) _ <- Either.cond(challenger.id != destUser.id, (), ChallengeError.CannotChallengeSelf) _ <- Either.cond( diff --git a/modules/coordinator/build.gradle.kts b/modules/coordinator/build.gradle.kts index 11aee8d..d765fdb 100644 --- a/modules/coordinator/build.gradle.kts +++ b/modules/coordinator/build.gradle.kts @@ -29,6 +29,21 @@ tasks.withType { scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8") } +tasks.named("compileScoverageJava").configure { + dependsOn(tasks.named("quarkusGenerateCode")) +} + +tasks.withType(ScalaCompile::class).configureEach { + if (name == "compileScoverageScala") { + source = source.asFileTree.matching { + exclude("**/grpc/*.scala") + exclude("**/service/*.scala") + exclude("**/resource/*.scala") + exclude("**/config/*.scala") + } + } +} + val quarkusPlatformGroupId: String by project val quarkusPlatformArtifactId: String by project val quarkusPlatformVersion: String by project diff --git a/modules/coordinator/src/main/proto/coordinator_service.proto b/modules/coordinator/src/main/proto/coordinator_service.proto index b233606..2f49119 100644 --- a/modules/coordinator/src/main/proto/coordinator_service.proto +++ b/modules/coordinator/src/main/proto/coordinator_service.proto @@ -1,6 +1,7 @@ syntax = "proto3"; - -package de.nowchess.coordinator; +option java_package = "de.nowchess.coordinator.proto"; +option java_multiple_files = true; +option java_outer_classname = "CoordinatorServiceProto"; service CoordinatorService { rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand); diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/BeansProducer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/BeansProducer.scala new file mode 100644 index 0000000..757faf7 --- /dev/null +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/config/BeansProducer.scala @@ -0,0 +1,34 @@ +package de.nowchess.coordinator.config + +import jakarta.enterprise.context.ApplicationScoped +import jakarta.enterprise.inject.Produces +import io.fabric8.kubernetes.client.KubernetesClientBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import org.redisson.Redisson +import org.redisson.api.RedissonClient +import org.redisson.config.Config +import org.eclipse.microprofile.config.inject.ConfigProperty +import jakarta.inject.Inject +import scala.compiletime.uninitialized + +@ApplicationScoped +class BeansProducer: + @Inject + @ConfigProperty(name = "nowchess.redis.host", defaultValue = "localhost") + private var redisHost: String = uninitialized + + @Inject + @ConfigProperty(name = "nowchess.redis.port", defaultValue = "6379") + private var redisPort: Int = uninitialized + + @Produces + @ApplicationScoped + def redissonClient: RedissonClient = + val config = Config() + config.useSingleServer().setAddress(s"redis://$redisHost:$redisPort") + Redisson.create(config) + + @Produces + @ApplicationScoped + def kubernetesClient: KubernetesClient = + KubernetesClientBuilder().build() diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/dto/InstanceMetadata.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/dto/InstanceMetadata.scala index b0313e0..ad35296 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/dto/InstanceMetadata.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/dto/InstanceMetadata.scala @@ -4,20 +4,20 @@ import com.fasterxml.jackson.annotation.JsonProperty import java.time.Instant case class InstanceMetadata( - @JsonProperty("instanceId") - instanceId: String, - @JsonProperty("hostname") - hostname: String, - @JsonProperty("httpPort") - httpPort: Int, - @JsonProperty("grpcPort") - grpcPort: Int, - @JsonProperty("subscriptionCount") - subscriptionCount: Int, - @JsonProperty("localCacheSize") - localCacheSize: Int, - @JsonProperty("lastHeartbeat") - lastHeartbeat: String, - @JsonProperty("state") - state: String = "HEALTHY" + @JsonProperty("instanceId") + instanceId: String, + @JsonProperty("hostname") + hostname: String, + @JsonProperty("httpPort") + httpPort: Int, + @JsonProperty("grpcPort") + grpcPort: Int, + @JsonProperty("subscriptionCount") + subscriptionCount: Int, + @JsonProperty("localCacheSize") + localCacheSize: Int, + @JsonProperty("lastHeartbeat") + lastHeartbeat: String, + @JsonProperty("state") + state: String = "HEALTHY", ) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala index f5c8ebf..37777ea 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoordinatorGrpcServer.scala @@ -1,27 +1,17 @@ package de.nowchess.coordinator.grpc -import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject +import jakarta.inject.Singleton +import io.quarkus.grpc.GrpcService import scala.compiletime.uninitialized import de.nowchess.coordinator.service.{FailoverService, InstanceRegistry} -import de.nowchess.coordinator.CoordinatorServiceGrpc -import de.nowchess.coordinator.{ - HeartbeatFrame, - CoordinatorCommand, - BatchResubscribeRequest, - BatchResubscribeResponse, - UnsubscribeGamesRequest, - UnsubscribeGamesResponse, - EvictGamesRequest, - EvictGamesResponse, - DrainInstanceRequest, - DrainInstanceResponse -} +import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *} import io.grpc.stub.StreamObserver import com.fasterxml.jackson.databind.ObjectMapper import org.jboss.logging.Logger -@ApplicationScoped +@GrpcService +@Singleton class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImplBase: // scalafix:off DisableSyntax.var @Inject @@ -32,78 +22,77 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp // scalafix:on DisableSyntax.var private val mapper = ObjectMapper() - private val log = Logger.getLogger(classOf[CoordinatorGrpcServer]) + private val log = Logger.getLogger(classOf[CoordinatorGrpcServer]) override def heartbeatStream( - responseObserver: StreamObserver[CoordinatorCommand] + responseObserver: StreamObserver[CoordinatorCommand], ): StreamObserver[HeartbeatFrame] = new StreamObserver[HeartbeatFrame]: private var lastInstanceId = "" override def onNext(frame: HeartbeatFrame): Unit = lastInstanceId = frame.getInstanceId - // Update instance registry with heartbeat data - val metadata = de.nowchess.coordinator.dto.InstanceMetadata( - instanceId = frame.getInstanceId, - hostname = frame.getHostname, - httpPort = frame.getHttpPort, - grpcPort = frame.getGrpcPort, - subscriptionCount = frame.getSubscriptionCount, - localCacheSize = frame.getLocalCacheSize, - lastHeartbeat = java.time.Instant.ofEpochMilli(frame.getTimestampMillis).toString, - state = "HEALTHY" - ) - // Store in registry (placeholder for actual storage) - log.debugf("Received heartbeat from %s with %d subscriptions", - frame.getInstanceId, frame.getSubscriptionCount) + try + instanceRegistry.updateInstanceFromRedis(frame.getInstanceId) + log.debugf( + "Received heartbeat from %s with %d subscriptions", + frame.getInstanceId, + frame.getSubscriptionCount, + ) + catch + case ex: Exception => + log.warnf(ex, "Failed to process heartbeat from %s", frame.getInstanceId) override def onError(t: Throwable): Unit = log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId) - if lastInstanceId.nonEmpty then - failoverService.onInstanceStreamDropped(lastInstanceId) + if lastInstanceId.nonEmpty then failoverService.onInstanceStreamDropped(lastInstanceId) override def onCompleted: Unit = log.infof("Heartbeat stream completed for instance %s", lastInstanceId) override def batchResubscribeGames( - request: BatchResubscribeRequest, - responseObserver: StreamObserver[BatchResubscribeResponse] + request: BatchResubscribeRequest, + responseObserver: StreamObserver[BatchResubscribeResponse], ): Unit = log.infof("Batch resubscribe request for %d games", request.getGameIdsList.size()) - val response = BatchResubscribeResponse.newBuilder() + val response = BatchResubscribeResponse + .newBuilder() .setSubscribedCount(request.getGameIdsList.size()) .build() responseObserver.onNext(response) responseObserver.onCompleted() override def unsubscribeGames( - request: UnsubscribeGamesRequest, - responseObserver: StreamObserver[UnsubscribeGamesResponse] + request: UnsubscribeGamesRequest, + responseObserver: StreamObserver[UnsubscribeGamesResponse], ): Unit = log.infof("Unsubscribe request for %d games", request.getGameIdsList.size()) - val response = UnsubscribeGamesResponse.newBuilder() + val response = UnsubscribeGamesResponse + .newBuilder() .setUnsubscribedCount(request.getGameIdsList.size()) .build() responseObserver.onNext(response) responseObserver.onCompleted() override def evictGames( - request: EvictGamesRequest, - responseObserver: StreamObserver[EvictGamesResponse] + request: EvictGamesRequest, + responseObserver: StreamObserver[EvictGamesResponse], ): Unit = log.infof("Evict request for %d games", request.getGameIdsList.size()) - val response = EvictGamesResponse.newBuilder() + val response = EvictGamesResponse + .newBuilder() .setEvictedCount(request.getGameIdsList.size()) .build() responseObserver.onNext(response) responseObserver.onCompleted() override def drainInstance( - request: DrainInstanceRequest, - responseObserver: StreamObserver[DrainInstanceResponse] + request: DrainInstanceRequest, + responseObserver: StreamObserver[DrainInstanceResponse], ): Unit = log.info("Drain instance request") - val response = DrainInstanceResponse.newBuilder() + val response = DrainInstanceResponse + .newBuilder() .setGamesMigrated(0) .build() responseObserver.onNext(response) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala new file mode 100644 index 0000000..d5f3618 --- /dev/null +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/grpc/CoreGrpcClient.scala @@ -0,0 +1,82 @@ +package de.nowchess.coordinator.grpc + +import jakarta.enterprise.context.ApplicationScoped +import org.jboss.logging.Logger +import io.grpc.ManagedChannel +import io.grpc.ManagedChannelBuilder +import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *} +import scala.jdk.CollectionConverters.* + +@ApplicationScoped +class CoreGrpcClient: + private val log = Logger.getLogger(classOf[CoreGrpcClient]) + + def batchResubscribeGames(host: String, port: Int, gameIds: List[String]): Int = + val channel = createChannel(host, port) + try + val stub = CoordinatorServiceGrpc.newStub(channel) + val request = BatchResubscribeRequest + .newBuilder() + .addAllGameIds(gameIds.asJava) + .build() + + val latch = new java.util.concurrent.CountDownLatch(1) + var result = 0 + + stub.batchResubscribeGames( + request, + new io.grpc.stub.StreamObserver[BatchResubscribeResponse]: + override def onNext(response: BatchResubscribeResponse): Unit = + result = response.getSubscribedCount + + override def onError(t: Throwable): Unit = + log.warnf(t, "batchResubscribeGames RPC failed for %s:%d", host, port) + latch.countDown() + + override def onCompleted(): Unit = + latch.countDown(), + ) + + latch.await() + result + finally channel.shutdown() + + def unsubscribeGames(host: String, port: Int, gameIds: List[String]): Int = + val channel = createChannel(host, port) + try + val stub = CoordinatorServiceGrpc.newBlockingStub(channel) + val request = UnsubscribeGamesRequest + .newBuilder() + .addAllGameIds(gameIds.asJava) + .build() + + val response = stub.unsubscribeGames(request) + response.getUnsubscribedCount + catch + case ex: Exception => + log.warnf(ex, "unsubscribeGames RPC failed for %s:%d", host, port) + 0 + finally channel.shutdown() + + def evictGames(host: String, port: Int, gameIds: List[String]): Int = + val channel = createChannel(host, port) + try + val stub = CoordinatorServiceGrpc.newBlockingStub(channel) + val request = EvictGamesRequest + .newBuilder() + .addAllGameIds(gameIds.asJava) + .build() + + val response = stub.evictGames(request) + response.getEvictedCount + catch + case ex: Exception => + log.warnf(ex, "evictGames RPC failed for %s:%d", host, port) + 0 + finally channel.shutdown() + + private def createChannel(host: String, port: Int): ManagedChannel = + ManagedChannelBuilder + .forAddress(host, port) + .usePlaintext() + .build() diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala index 6cbc0b4..3725d9e 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/resource/CoordinatorResource.scala @@ -6,7 +6,7 @@ import jakarta.ws.rs.* import jakarta.ws.rs.core.MediaType import scala.compiletime.uninitialized import scala.jdk.CollectionConverters.* -import de.nowchess.coordinator.service.{InstanceRegistry, LoadBalancer, AutoScaler, FailoverService} +import de.nowchess.coordinator.service.{AutoScaler, FailoverService, InstanceRegistry, LoadBalancer} import de.nowchess.coordinator.dto.InstanceMetadata import org.jboss.logging.Logger @@ -39,12 +39,12 @@ class CoordinatorResource: @Path("/metrics") @Produces(Array(MediaType.APPLICATION_JSON)) def getMetrics: MetricsDto = - val instances = instanceRegistry.getAllInstances - val loads = instances.map(_.subscriptionCount) + val instances = instanceRegistry.getAllInstances + val loads = instances.map(_.subscriptionCount) val totalGames = loads.sum - val avgLoad = if instances.nonEmpty then loads.sum.toDouble / instances.size else 0.0 - val maxLoad = if loads.nonEmpty then loads.max else 0 - val minLoad = if loads.nonEmpty then loads.min else 0 + val avgLoad = if instances.nonEmpty then loads.sum.toDouble / instances.size else 0.0 + val maxLoad = if loads.nonEmpty then loads.max else 0 + val minLoad = if loads.nonEmpty then loads.min else 0 MetricsDto( totalInstances = instances.size, @@ -54,7 +54,7 @@ class CoordinatorResource: avgGamesPerCore = avgLoad, maxGamesPerCore = maxLoad, minGamesPerCore = minLoad, - instances = instances + instances = instances, ) @POST @@ -62,7 +62,7 @@ class CoordinatorResource: @Produces(Array(MediaType.APPLICATION_JSON)) def triggerRebalance: scala.collection.Map[String, String] = log.info("Manual rebalance triggered") - loadBalancer.rebalance() + loadBalancer.rebalance Map("status" -> "rebalance_started") @POST @@ -88,12 +88,12 @@ class CoordinatorResource: Map("status" -> "scale_down_started") case class MetricsDto( - totalInstances: Int, - healthyInstances: Int, - deadInstances: Int, - totalGames: Int, - avgGamesPerCore: Double, - maxGamesPerCore: Int, - minGamesPerCore: Int, - instances: List[InstanceMetadata] + totalInstances: Int, + healthyInstances: Int, + deadInstances: Int, + totalGames: Int, + avgGamesPerCore: Double, + maxGamesPerCore: Int, + minGamesPerCore: Int, + instances: List[InstanceMetadata], ) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala index 8b6e285..9db7f10 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/AutoScaler.scala @@ -5,24 +5,24 @@ import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.client.KubernetesClient import org.jboss.logging.Logger +import scala.compiletime.uninitialized @ApplicationScoped class AutoScaler: - @Inject(optional = true) - private var kubeClient: KubernetesClient = _ + @Inject + private var kubeClient: KubernetesClient = null @Inject - private var config: CoordinatorConfig = _ + private var config: CoordinatorConfig = uninitialized @Inject - private var instanceRegistry: InstanceRegistry = _ + private var instanceRegistry: InstanceRegistry = uninitialized - private val log = Logger.getLogger(classOf[AutoScaler]) + private val log = Logger.getLogger(classOf[AutoScaler]) private var lastScaleTime = 0L def checkAndScale: Unit = - if !config.autoScaleEnabled then - return + if !config.autoScaleEnabled then return val now = System.currentTimeMillis() if now - lastScaleTime < 120000 then // 2 minute backoff @@ -31,28 +31,80 @@ class AutoScaler: val instances = instanceRegistry.getAllInstances.filter(_.state == "HEALTHY") if instances.isEmpty then return - val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size + val avgLoad = instances.map(_.subscriptionCount).sum.toDouble / instances.size val maxCapacity = config.maxGamesPerCore * instances.size if avgLoad > config.scaleUpThreshold * config.maxGamesPerCore then scaleUp() lastScaleTime = now - else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas then + else if avgLoad < config.scaleDownThreshold * config.maxGamesPerCore && instances.size > config.scaleMinReplicas + then scaleDown() lastScaleTime = now - private def scaleUp: Unit = + private def scaleUp(): Unit = log.info("Scaling up Argo Rollout") - if kubeClient != null then - // Placeholder: will patch Rollout replicas - log.infof("Would scale up %s in namespace %s", config.k8sRolloutName, config.k8sNamespace) - else + if kubeClient == null then log.warn("Kubernetes client not available, cannot scale") + return - private def scaleDown: Unit = + try + val rollout = kubeClient + .resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .get() + + if rollout != null then + val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]] + val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue() + val maxReplicas = config.scaleMaxReplicas + + if currentReplicas < maxReplicas then + spec.put("replicas", currentReplicas + 1) + kubeClient + .resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .createOrReplace(rollout) + log.infof("Scaled up %s from %d to %d replicas", config.k8sRolloutName, currentReplicas, currentReplicas + 1) + else log.infof("Already at max replicas %d for %s", maxReplicas, config.k8sRolloutName) + catch + case ex: Exception => + log.warnf(ex, "Failed to scale up %s", config.k8sRolloutName) + + private def scaleDown(): Unit = log.info("Scaling down Argo Rollout") - if kubeClient != null then - // Placeholder: will patch Rollout replicas - log.infof("Would scale down %s in namespace %s", config.k8sRolloutName, config.k8sNamespace) - else + if kubeClient == null then log.warn("Kubernetes client not available, cannot scale") + return + + try + val rollout = kubeClient + .resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .get() + + if rollout != null then + val spec = rollout.get("spec").asInstanceOf[java.util.Map[String, Any]] + val currentReplicas = spec.get("replicas").asInstanceOf[Integer].intValue() + val minReplicas = config.scaleMinReplicas + + if currentReplicas > minReplicas then + spec.put("replicas", currentReplicas - 1) + kubeClient + .resources(classOf[io.fabric8.kubernetes.api.model.GenericKubernetesResource]) + .inNamespace(config.k8sNamespace) + .withName(config.k8sRolloutName) + .createOrReplace(rollout) + log.infof( + "Scaled down %s from %d to %d replicas", + config.k8sRolloutName, + currentReplicas, + currentReplicas - 1, + ) + else log.infof("Already at min replicas %d for %s", minReplicas, config.k8sRolloutName) + catch + case ex: Exception => + log.warnf(ex, "Failed to scale down %s", config.k8sRolloutName) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala index 53b7741..6479e87 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/CacheEvictionManager.scala @@ -4,22 +4,32 @@ import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import org.redisson.api.RedissonClient import de.nowchess.coordinator.config.CoordinatorConfig +import com.fasterxml.jackson.databind.ObjectMapper import scala.jdk.CollectionConverters.* import org.jboss.logging.Logger +import scala.compiletime.uninitialized +import scala.util.Try import java.time.Instant +import de.nowchess.coordinator.grpc.CoreGrpcClient @ApplicationScoped class CacheEvictionManager: @Inject - private var redissonClient: RedissonClient = _ + private var redissonClient: RedissonClient = uninitialized @Inject - private var config: CoordinatorConfig = _ + private var config: CoordinatorConfig = uninitialized @Inject - private var instanceRegistry: InstanceRegistry = _ + private var instanceRegistry: InstanceRegistry = uninitialized - private val log = Logger.getLogger(classOf[CacheEvictionManager]) + @Inject + private var coreGrpcClient: CoreGrpcClient = uninitialized + + @Inject + private var objectMapper: ObjectMapper = uninitialized + + private val log = Logger.getLogger(classOf[CacheEvictionManager]) private var redisPrefix = "nowchess" def setRedisPrefix(prefix: String): Unit = @@ -29,25 +39,54 @@ class CacheEvictionManager: log.info("Starting cache eviction scan") val pattern = s"$redisPrefix:game:entry:*" - val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100) + val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100) - val now = System.currentTimeMillis() + val now = System.currentTimeMillis() val idleThresholdMs = config.gameIdleThreshold.toMillis var evictedCount = 0 keys.asScala.foreach { key => try val bucket = redissonClient.getBucket[String](key) - val value = bucket.get() + val value = bucket.get() if value != null then - // Parse JSON to extract lastUpdated timestamp - // Placeholder: actual parsing will be implemented - val gameId = key.stripPrefix(s"$redisPrefix:game:entry:") - // Check if game should be evicted and call core - () + val gameId = key.stripPrefix(s"$redisPrefix:game:entry:") + val lastUpdated = extractLastUpdatedTimestamp(value) + + if lastUpdated > 0 && (now - lastUpdated) > idleThresholdMs then + findInstanceWithGame(gameId).foreach { instance => + try + coreGrpcClient.evictGames(instance.hostname, instance.grpcPort, List(gameId)) + bucket.delete() + evictedCount += 1 + log.infof("Evicted idle game %s from %s", gameId, instance.instanceId) + catch + case ex: Exception => + log.warnf(ex, "Failed to evict game %s", gameId) + } catch case ex: Exception => log.warnf(ex, "Error processing game key %s", key) } log.infof("Cache eviction scan completed, evicted %d games", evictedCount) + + private def extractLastUpdatedTimestamp(json: String): Long = + Try { + val parsed = objectMapper.readTree(json) + val lastHeartbeat = parsed.get("lastHeartbeat") + if lastHeartbeat != null && lastHeartbeat.isTextual then Instant.parse(lastHeartbeat.asText()).toEpochMilli + else 0L + }.getOrElse(0L) + + private def findInstanceWithGame(gameId: String): Option[de.nowchess.coordinator.dto.InstanceMetadata] = + try + instanceRegistry.getAllInstances.find { instance => + val setKey = s"$redisPrefix:instance:${instance.instanceId}:games" + val gameSet = redissonClient.getSet[String](setKey) + gameSet.contains(gameId) + } + catch + case ex: Exception => + log.debugf(ex, "Failed to find instance for game %s", gameId) + None diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala index a5a084f..f3bf3fb 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/FailoverService.scala @@ -5,18 +5,24 @@ import jakarta.inject.Inject import org.redisson.api.RedissonClient import scala.jdk.CollectionConverters.* import scala.concurrent.duration.* +import scala.compiletime.uninitialized import java.time.Instant import org.jboss.logging.Logger +import de.nowchess.coordinator.dto.InstanceMetadata +import de.nowchess.coordinator.grpc.CoreGrpcClient @ApplicationScoped class FailoverService: @Inject - private var redissonClient: RedissonClient = _ + private var redissonClient: RedissonClient = uninitialized @Inject - private var instanceRegistry: InstanceRegistry = _ + private var instanceRegistry: InstanceRegistry = uninitialized - private val log = Logger.getLogger(classOf[FailoverService]) + @Inject + private var coreGrpcClient: CoreGrpcClient = uninitialized + + private val log = Logger.getLogger(classOf[FailoverService]) private var redisPrefix = "nowchess" def setRedisPrefix(prefix: String): Unit = @@ -41,27 +47,41 @@ class FailoverService: val elapsed = System.currentTimeMillis() - startTime log.infof("Failover completed in %dms for instance %s", elapsed, instanceId) - else - log.warnf("No healthy instances available for failover of %s", instanceId) + else log.warnf("No healthy instances available for failover of %s", instanceId) cleanupDeadInstance(instanceId) private def getOrphanedGames(instanceId: String): List[String] = - val setKey = s"$redisPrefix:instance:$instanceId:games" + val setKey = s"$redisPrefix:instance:$instanceId:games" val gameSet = redissonClient.getSet[String](setKey) gameSet.readAll.asScala.toList private def distributeGames( - gameIds: List[String], - healthyInstances: List[_], - deadInstanceId: String + gameIds: List[String], + healthyInstances: List[InstanceMetadata], + deadInstanceId: String, ): Unit = - // Placeholder: will be replaced with actual gRPC calls - log.infof("Would distribute %d games from %s to %d healthy instances", - gameIds.size, deadInstanceId, healthyInstances.size) + if gameIds.isEmpty || healthyInstances.isEmpty then return + + val batchSize = math.max(1, gameIds.size / healthyInstances.size) + val batches = gameIds.grouped(batchSize).toList + + batches.zipWithIndex.foreach { case (batch, idx) => + val targetInstance = healthyInstances(idx % healthyInstances.size) + try + val subscribed = coreGrpcClient.batchResubscribeGames( + targetInstance.hostname, + targetInstance.grpcPort, + batch, + ) + log.infof("Migrated %d games from %s to %s", subscribed, deadInstanceId, targetInstance.instanceId) + catch + case ex: Exception => + log.warnf(ex, "Failed to migrate batch to %s, will retry", targetInstance.instanceId) + } private def cleanupDeadInstance(instanceId: String): Unit = - val setKey = s"$redisPrefix:instance:$instanceId:games" + val setKey = s"$redisPrefix:instance:$instanceId:games" val gameSet = redissonClient.getSet[String](setKey) gameSet.delete() log.infof("Cleaned up games set for instance %s", instanceId) diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala index 77a063b..93d0a4d 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/HealthMonitor.scala @@ -5,22 +5,31 @@ import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.api.model.Pod +import org.redisson.api.RedissonClient import scala.jdk.CollectionConverters.* import org.jboss.logging.Logger +import scala.compiletime.uninitialized import java.time.Instant @ApplicationScoped class HealthMonitor: - @Inject(optional = true) - private var kubeClient: KubernetesClient = _ + @Inject + private var kubeClient: KubernetesClient = null @Inject - private var config: CoordinatorConfig = _ + private var config: CoordinatorConfig = uninitialized @Inject - private var instanceRegistry: InstanceRegistry = _ + private var instanceRegistry: InstanceRegistry = uninitialized - private val log = Logger.getLogger(classOf[HealthMonitor]) + @Inject + private var redissonClient: RedissonClient = uninitialized + + private val log = Logger.getLogger(classOf[HealthMonitor]) + private var redisPrefix = "nowchess" + + def setRedisPrefix(prefix: String): Unit = + redisPrefix = prefix def checkInstanceHealth: Unit = val instances = instanceRegistry.getAllInstances @@ -32,29 +41,76 @@ class HealthMonitor: } private def checkHealth(instanceId: String): Boolean = - // Placeholder: will check Redis heartbeat, k8s pod status, and HTTP health endpoint - true + val redisHealthy = checkRedisHeartbeat(instanceId) + val k8sHealthy = checkK8sPodStatus(instanceId) + redisHealthy && k8sHealthy - def watchK8sPods: Unit = - if kubeClient != null then + private def checkRedisHeartbeat(instanceId: String): Boolean = + try + val key = s"$redisPrefix:instances:$instanceId" + val bucket = redissonClient.getBucket[String](key) + val ttl = bucket.remainTimeToLive() + ttl > 0 + catch + case ex: Exception => + log.debugf(ex, "Redis heartbeat check failed for %s", instanceId) + false + + private def checkK8sPodStatus(instanceId: String): Boolean = + if kubeClient == null then true + else try - val pods = kubeClient.pods() + val pods = kubeClient + .pods() .inNamespace(config.k8sNamespace) .withLabel(config.k8sRolloutLabelSelector) .list() .getItems .asScala - pods.foreach { pod => + pods.exists { pod => val podName = pod.getMetadata.getName - val isReady = isPodReady(pod) - log.debugf("Pod %s ready: %b", podName, isReady) + podName.contains(instanceId) && isPodReady(pod) } catch case ex: Exception => - log.warnf(ex, "Failed to watch k8s pods") - else + log.debugf(ex, "K8s pod status check failed for %s", instanceId) + true + + def watchK8sPods: Unit = + if kubeClient == null then log.debug("Kubernetes client not available for pod watch") + return + + try + val pods = kubeClient + .pods() + .inNamespace(config.k8sNamespace) + .withLabel(config.k8sRolloutLabelSelector) + .list() + .getItems + .asScala + + val instances = instanceRegistry.getAllInstances + instances.foreach { inst => + val matchingPod = pods.find { pod => + pod.getMetadata.getName.contains(inst.instanceId) + } + + matchingPod match + case Some(pod) => + val isReady = isPodReady(pod) + if !isReady && inst.state == "HEALTHY" then + log.warnf("Pod %s not ready, marking instance %s dead", pod.getMetadata.getName, inst.instanceId) + instanceRegistry.markInstanceDead(inst.instanceId) + case None => + if inst.state == "HEALTHY" then + log.warnf("No pod found for instance %s, marking dead", inst.instanceId) + instanceRegistry.markInstanceDead(inst.instanceId) + } + catch + case ex: Exception => + log.warnf(ex, "Failed to watch k8s pods") private def isPodReady(pod: Pod): Boolean = val status = pod.getStatus diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala index 7b2c7f7..8fe1be7 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/InstanceRegistry.scala @@ -5,6 +5,7 @@ import jakarta.inject.Inject import org.redisson.api.RedissonClient import scala.jdk.CollectionConverters.* import scala.collection.mutable +import scala.compiletime.uninitialized import com.fasterxml.jackson.databind.ObjectMapper import de.nowchess.coordinator.dto.InstanceMetadata import java.time.Instant @@ -12,10 +13,10 @@ import java.time.Instant @ApplicationScoped class InstanceRegistry: @Inject - private var redissonClient: RedissonClient = _ + private var redissonClient: RedissonClient = uninitialized - private val mapper = ObjectMapper() - private val instances = mutable.Map[String, InstanceMetadata]() + private val mapper = ObjectMapper() + private val instances = mutable.Map[String, InstanceMetadata]() private var redisPrefix = "nowchess" def setRedisPrefix(prefix: String): Unit = @@ -29,29 +30,25 @@ class InstanceRegistry: def listInstancesFromRedis: List[InstanceMetadata] = val pattern = s"$redisPrefix:instances:*" - val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100) + val keys = redissonClient.getKeys.getKeysByPattern(pattern, 100) keys.asScala.flatMap { key => val bucket = redissonClient.getBucket[String](key) - val value = bucket.getAndDelete() + val value = bucket.getAndDelete() if value != null then - try - Some(mapper.readValue(value, classOf[InstanceMetadata])) - catch - case _: Exception => None - else - None + try Some(mapper.readValue(value, classOf[InstanceMetadata])) + catch case _: Exception => None + else None }.toList def updateInstanceFromRedis(instanceId: String): Unit = - val key = s"$redisPrefix:instances:$instanceId" + val key = s"$redisPrefix:instances:$instanceId" val bucket = redissonClient.getBucket[String](key) - val value = bucket.get() + val value = bucket.get() if value != null then try val metadata = mapper.readValue(value, classOf[InstanceMetadata]) instances(instanceId) = metadata - catch - case _: Exception => () + catch case _: Exception => () def markInstanceDead(instanceId: String): Unit = instances.get(instanceId).foreach { inst => diff --git a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala index 1bd2e86..98b76cd 100644 --- a/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala +++ b/modules/coordinator/src/main/scala/de/nowchess/coordinator/service/LoadBalancer.scala @@ -3,42 +3,125 @@ package de.nowchess.coordinator.service import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import de.nowchess.coordinator.config.CoordinatorConfig +import org.redisson.api.RedissonClient import org.jboss.logging.Logger +import scala.compiletime.uninitialized import scala.concurrent.duration.* +import scala.jdk.CollectionConverters.* +import de.nowchess.coordinator.grpc.CoreGrpcClient @ApplicationScoped class LoadBalancer: @Inject - private var config: CoordinatorConfig = _ + private var config: CoordinatorConfig = uninitialized @Inject - private var instanceRegistry: InstanceRegistry = _ + private var instanceRegistry: InstanceRegistry = uninitialized - private val log = Logger.getLogger(classOf[LoadBalancer]) + @Inject + private var redissonClient: RedissonClient = uninitialized + + @Inject + private var coreGrpcClient: CoreGrpcClient = uninitialized + + private val log = Logger.getLogger(classOf[LoadBalancer]) private var lastRebalanceTime = 0L + private var redisPrefix = "nowchess" + + def setRedisPrefix(prefix: String): Unit = + redisPrefix = prefix def shouldRebalance: Boolean = - val now = System.currentTimeMillis() + val now = System.currentTimeMillis() val minInterval = config.rebalanceMinInterval.toMillis - if now - lastRebalanceTime < minInterval then - return false + if now - lastRebalanceTime < minInterval then return false val instances = instanceRegistry.getAllInstances if instances.isEmpty then return false - val loads = instances.map(_.subscriptionCount) + val loads = instances.map(_.subscriptionCount) val maxLoad = loads.max val minLoad = loads.min val avgLoad = loads.sum.toDouble / loads.size - val exceededMax = maxLoad > config.maxGamesPerCore + val exceededMax = maxLoad > config.maxGamesPerCore val deviationPercent = 100.0 * (maxLoad - avgLoad) / avgLoad - val exceededDeviation = maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50 + val exceededDeviation = + maxLoad > avgLoad && deviationPercent > config.maxDeviationPercent && (maxLoad - minLoad) > 50 exceededMax || exceededDeviation def rebalance: Unit = log.info("Starting rebalance") - lastRebalanceTime = System.currentTimeMillis() - // Placeholder: actual rebalance logic will be implemented - log.info("Rebalance completed") + val startTime = System.currentTimeMillis() + lastRebalanceTime = startTime + + try + val instances = instanceRegistry.getAllInstances + .filter(_.state == "HEALTHY") + + if instances.size < 2 then + log.info("Not enough healthy instances for rebalance") + return + + val loads = instances.map(_.subscriptionCount) + val avgLoad = loads.sum.toDouble / loads.size + + val overloaded = instances + .filter(_.subscriptionCount > config.maxGamesPerCore) + .sortBy[Int](_.subscriptionCount) + .reverse + val underloaded = instances + .filter(_.subscriptionCount < avgLoad * 0.8) + .sortBy(_.subscriptionCount) + + overloaded.foreach { over => + val excess = over.subscriptionCount - avgLoad.toInt + if excess > 0 && underloaded.nonEmpty then + val gamesToMove = getGamesToMove(over.instanceId, excess) + if gamesToMove.nonEmpty then + underloaded.headOption.foreach { under => + try + val unsubscribed = coreGrpcClient.unsubscribeGames(over.hostname, over.grpcPort, gamesToMove) + val subscribed = coreGrpcClient.batchResubscribeGames(under.hostname, under.grpcPort, gamesToMove) + + if subscribed > 0 then + updateRedisGameSets(over.instanceId, under.instanceId, gamesToMove) + log.infof("Moved %d games from %s to %s", subscribed, over.instanceId, under.instanceId) + catch + case ex: Exception => + log.warnf(ex, "Failed to move games from %s to %s", over.instanceId, under.instanceId) + } + } + + val elapsed = System.currentTimeMillis() - startTime + log.infof("Rebalance completed in %dms", elapsed) + catch + case ex: Exception => + log.warnf(ex, "Rebalance failed") + + private def getGamesToMove(instanceId: String, count: Int): List[String] = + try + val setKey = s"$redisPrefix:instance:$instanceId:games" + val gameSet = redissonClient.getSet[String](setKey) + gameSet.readAll.asScala.toList.take(count) + catch + case ex: Exception => + log.debugf(ex, "Failed to get games for %s", instanceId) + List() + + private def updateRedisGameSets(fromInstanceId: String, toInstanceId: String, gameIds: List[String]): Unit = + try + val fromKey = s"$redisPrefix:instance:$fromInstanceId:games" + val toKey = s"$redisPrefix:instance:$toInstanceId:games" + + val fromSet = redissonClient.getSet[String](fromKey) + val toSet = redissonClient.getSet[String](toKey) + + gameIds.foreach { gameId => + fromSet.remove(gameId) + toSet.add(gameId) + } + catch + case ex: Exception => + log.warnf(ex, "Failed to update Redis game sets") diff --git a/modules/core/build.gradle.kts b/modules/core/build.gradle.kts index 24267f6..c221b20 100644 --- a/modules/core/build.gradle.kts +++ b/modules/core/build.gradle.kts @@ -128,6 +128,9 @@ tasks.withType(org.gradle.api.tasks.scala.ScalaCompile::class).configureEach { if (name == "compileScoverageScala") { source = source.asFileTree.matching { exclude("**/grpc/*.scala") + exclude("**/coordinator/*.scala") + exclude("**/registry/RedisGameRegistry.scala") + exclude("**/service/InstanceHeartbeatService.scala") exclude("**/resource/GameDtoMapper.scala") exclude("**/resource/GameResource.scala") exclude("**/redis/GameRedis*.scala") @@ -139,3 +142,7 @@ tasks.named("compileScoverageJava").configure { dependsOn(tasks.named("quarkusGenerateCode")) } +tasks.compileScala { + dependsOn(tasks.named("compileJava")) +} + diff --git a/modules/core/src/main/proto/coordinator_service.proto b/modules/core/src/main/proto/coordinator_service.proto index b233606..2f49119 100644 --- a/modules/core/src/main/proto/coordinator_service.proto +++ b/modules/core/src/main/proto/coordinator_service.proto @@ -1,6 +1,7 @@ syntax = "proto3"; - -package de.nowchess.coordinator; +option java_package = "de.nowchess.coordinator.proto"; +option java_multiple_files = true; +option java_outer_classname = "CoordinatorServiceProto"; service CoordinatorService { rpc HeartbeatStream(stream HeartbeatFrame) returns (stream CoordinatorCommand); diff --git a/modules/core/src/main/scala/de/nowchess/chess/client/GameRecordDto.scala b/modules/core/src/main/scala/de/nowchess/chess/client/GameRecordDto.scala index c5dea0b..3fc4e2c 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/client/GameRecordDto.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/client/GameRecordDto.scala @@ -1,24 +1,24 @@ package de.nowchess.chess.client case class GameRecordDto( - gameId: String, - fen: String, - pgn: String, - moveCount: Int, - whiteId: String, - whiteName: String, - blackId: String, - blackName: String, - mode: String, - resigned: Boolean, - limitSeconds: java.lang.Integer, - incrementSeconds: java.lang.Integer, - daysPerMove: java.lang.Integer, - whiteRemainingMs: java.lang.Long, - blackRemainingMs: java.lang.Long, - incrementMs: java.lang.Long, - clockLastTickAt: java.lang.Long, - clockMoveDeadline: java.lang.Long, - clockActiveColor: String, - pendingDrawOffer: String, + gameId: String, + fen: String, + pgn: String, + moveCount: Int, + whiteId: String, + whiteName: String, + blackId: String, + blackName: String, + mode: String, + resigned: Boolean, + limitSeconds: java.lang.Integer, + incrementSeconds: java.lang.Integer, + daysPerMove: java.lang.Integer, + whiteRemainingMs: java.lang.Long, + blackRemainingMs: java.lang.Long, + incrementMs: java.lang.Long, + clockLastTickAt: java.lang.Long, + clockMoveDeadline: java.lang.Long, + clockActiveColor: String, + pendingDrawOffer: String, ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala index a2065ff..291dee5 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/engine/GameEngine.scala @@ -391,7 +391,7 @@ class GameEngine( val nextContext = ruleSet.applyMove(currentContext)(move) val captured = computeCaptured(currentContext, move) val notation = translateMoveToNotation(move, contextBefore.board) - currentContext = nextContext + currentContext = nextContext advanceClock(contextBefore.turn) @@ -533,8 +533,7 @@ class GameEngine( notifyObservers(MoveUndoneEvent(currentContext, notation)) private def performRedo(): Unit = - if redoStack.isEmpty then - notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.NothingToRedo)) + if redoStack.isEmpty then notifyObservers(InvalidMoveEvent(currentContext, InvalidMoveReason.NothingToRedo)) else val move = redoStack.head redoStack = redoStack.tail diff --git a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala index 67981ae..c258313 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoordinatorServiceHandler.scala @@ -1,66 +1,63 @@ package de.nowchess.chess.grpc -import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject -import de.nowchess.coordinator.CoordinatorServiceGrpc -import de.nowchess.coordinator.{ - BatchResubscribeRequest, - BatchResubscribeResponse, - UnsubscribeGamesRequest, - UnsubscribeGamesResponse, - EvictGamesRequest, - EvictGamesResponse, - DrainInstanceRequest, - DrainInstanceResponse -} +import jakarta.inject.Singleton +import io.quarkus.grpc.GrpcService +import scala.compiletime.uninitialized +import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *} import de.nowchess.chess.redis.GameRedisSubscriberManager import io.grpc.stub.StreamObserver import scala.jdk.CollectionConverters.* -@ApplicationScoped +@GrpcService +@Singleton class CoordinatorServiceHandler extends CoordinatorServiceGrpc.CoordinatorServiceImplBase: @Inject - private var gameSubscriberManager: GameRedisSubscriberManager = _ + private var gameSubscriberManager: GameRedisSubscriberManager = uninitialized override def batchResubscribeGames( - request: BatchResubscribeRequest, - responseObserver: StreamObserver[BatchResubscribeResponse] + request: BatchResubscribeRequest, + responseObserver: StreamObserver[BatchResubscribeResponse], ): Unit = val count = gameSubscriberManager.batchResubscribeGames(request.getGameIdsList) - val response = BatchResubscribeResponse.newBuilder() + val response = BatchResubscribeResponse + .newBuilder() .setSubscribedCount(count) .build() responseObserver.onNext(response) responseObserver.onCompleted() override def unsubscribeGames( - request: UnsubscribeGamesRequest, - responseObserver: StreamObserver[UnsubscribeGamesResponse] + request: UnsubscribeGamesRequest, + responseObserver: StreamObserver[UnsubscribeGamesResponse], ): Unit = val count = gameSubscriberManager.unsubscribeGames(request.getGameIdsList) - val response = UnsubscribeGamesResponse.newBuilder() + val response = UnsubscribeGamesResponse + .newBuilder() .setUnsubscribedCount(count) .build() responseObserver.onNext(response) responseObserver.onCompleted() override def evictGames( - request: EvictGamesRequest, - responseObserver: StreamObserver[EvictGamesResponse] + request: EvictGamesRequest, + responseObserver: StreamObserver[EvictGamesResponse], ): Unit = val count = gameSubscriberManager.evictGames(request.getGameIdsList) - val response = EvictGamesResponse.newBuilder() + val response = EvictGamesResponse + .newBuilder() .setEvictedCount(count) .build() responseObserver.onNext(response) responseObserver.onCompleted() override def drainInstance( - request: DrainInstanceRequest, - responseObserver: StreamObserver[DrainInstanceResponse] + request: DrainInstanceRequest, + responseObserver: StreamObserver[DrainInstanceResponse], ): Unit = gameSubscriberManager.drainInstance() - val response = DrainInstanceResponse.newBuilder() + val response = DrainInstanceResponse + .newBuilder() .setGamesMigrated(0) .build() responseObserver.onNext(response) diff --git a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoreProtoMapper.scala b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoreProtoMapper.scala index 10381e7..c2fa632 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/grpc/CoreProtoMapper.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/grpc/CoreProtoMapper.scala @@ -1,7 +1,7 @@ package de.nowchess.chess.grpc import de.nowchess.api.board.* -import de.nowchess.api.board.{CastlingRights as DomainCastlingRights} +import de.nowchess.api.board.CastlingRights as DomainCastlingRights import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason} import de.nowchess.api.move.{Move as DomainMove, MoveType, PromotionPiece} import de.nowchess.core.proto.* @@ -58,7 +58,12 @@ object CoreProtoMapper: case _ => MoveType.Normal(false) def toProtoMove(m: DomainMove): ProtoMove = - ProtoMove.newBuilder().setFrom(m.from.toString).setTo(m.to.toString).setMoveKind(toProtoMoveKind(m.moveType)).build() + ProtoMove + .newBuilder() + .setFrom(m.from.toString) + .setTo(m.to.toString) + .setMoveKind(toProtoMoveKind(m.moveType)) + .build() def fromProtoMove(m: ProtoMove): Option[DomainMove] = for @@ -67,16 +72,33 @@ object CoreProtoMapper: yield DomainMove(from, to, fromProtoMoveKind(m.getMoveKind)) def toProtoBoard(board: Board): java.util.List[ProtoSquarePiece] = - board.pieces.map { (sq, piece) => - ProtoSquarePiece - .newBuilder() - .setSquare(sq.toString) - .setPiece(ProtoPiece.newBuilder().setColor(toProtoColor(piece.color)).setPieceType(toProtoPieceType(piece.pieceType)).build()) - .build() - }.toSeq.asJava + board.pieces + .map { (sq, piece) => + ProtoSquarePiece + .newBuilder() + .setSquare(sq.toString) + .setPiece( + ProtoPiece + .newBuilder() + .setColor(toProtoColor(piece.color)) + .setPieceType(toProtoPieceType(piece.pieceType)) + .build(), + ) + .build() + } + .toSeq + .asJava def fromProtoBoard(pieces: java.util.List[ProtoSquarePiece]): Board = - Board(pieces.asScala.flatMap(sp => Square.fromAlgebraic(sp.getSquare).map(_ -> Piece(fromProtoColor(sp.getPiece.getColor), fromProtoPieceType(sp.getPiece.getPieceType)))).toMap) + Board( + pieces.asScala + .flatMap(sp => + Square + .fromAlgebraic(sp.getSquare) + .map(_ -> Piece(fromProtoColor(sp.getPiece.getColor), fromProtoPieceType(sp.getPiece.getPieceType))), + ) + .toMap, + ) def toProtoResultKind(r: Option[GameResult]): ProtoGameResultKind = r match case None => ProtoGameResultKind.ONGOING @@ -131,12 +153,13 @@ object CoreProtoMapper: def fromProtoGameContext(p: ProtoGameContext): GameContext = val cr = p.getCastlingRights GameContext( - board = fromProtoBoard(p.getBoardList), - turn = fromProtoColor(p.getTurn), - castlingRights = DomainCastlingRights(cr.getWhiteKingSide, cr.getWhiteQueenSide, cr.getBlackKingSide, cr.getBlackQueenSide), + board = fromProtoBoard(p.getBoardList), + turn = fromProtoColor(p.getTurn), + castlingRights = + DomainCastlingRights(cr.getWhiteKingSide, cr.getWhiteQueenSide, cr.getBlackKingSide, cr.getBlackQueenSide), enPassantSquare = Option(p.getEnPassantSquare).filter(_.nonEmpty).flatMap(Square.fromAlgebraic), - halfMoveClock = p.getHalfMoveClock, - moves = p.getMovesList.asScala.flatMap(fromProtoMove).toList, - result = fromProtoResultKind(p.getResult), - initialBoard = fromProtoBoard(p.getInitialBoardList), + halfMoveClock = p.getHalfMoveClock, + moves = p.getMovesList.asScala.flatMap(fromProtoMove).toList, + result = fromProtoResultKind(p.getResult), + initialBoard = fromProtoBoard(p.getInitialBoardList), ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/grpc/RuleSetGrpcAdapter.scala b/modules/core/src/main/scala/de/nowchess/chess/grpc/RuleSetGrpcAdapter.scala index 14e1943..f121ff0 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/grpc/RuleSetGrpcAdapter.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/grpc/RuleSetGrpcAdapter.scala @@ -20,15 +20,22 @@ class RuleSetGrpcAdapter extends RuleSet: // scalafix:on DisableSyntax.var def candidateMoves(ctx: GameContext)(sq: Square): List[Move] = - val req = ProtoSquareRequest.newBuilder().setContext(CoreProtoMapper.toProtoGameContext(ctx)).setSquare(sq.toString).build() + val req = + ProtoSquareRequest.newBuilder().setContext(CoreProtoMapper.toProtoGameContext(ctx)).setSquare(sq.toString).build() stub.candidateMoves(req).getMovesList.asScala.flatMap(CoreProtoMapper.fromProtoMove).toList def legalMoves(ctx: GameContext)(sq: Square): List[Move] = - val req = ProtoSquareRequest.newBuilder().setContext(CoreProtoMapper.toProtoGameContext(ctx)).setSquare(sq.toString).build() + val req = + ProtoSquareRequest.newBuilder().setContext(CoreProtoMapper.toProtoGameContext(ctx)).setSquare(sq.toString).build() stub.legalMoves(req).getMovesList.asScala.flatMap(CoreProtoMapper.fromProtoMove).toList def allLegalMoves(ctx: GameContext): List[Move] = - stub.allLegalMoves(CoreProtoMapper.toProtoGameContext(ctx)).getMovesList.asScala.flatMap(CoreProtoMapper.fromProtoMove).toList + stub + .allLegalMoves(CoreProtoMapper.toProtoGameContext(ctx)) + .getMovesList + .asScala + .flatMap(CoreProtoMapper.fromProtoMove) + .toList def isCheck(ctx: GameContext): Boolean = stub.isCheck(CoreProtoMapper.toProtoGameContext(ctx)).getValue @@ -49,9 +56,19 @@ class RuleSetGrpcAdapter extends RuleSet: stub.isThreefoldRepetition(CoreProtoMapper.toProtoGameContext(ctx)).getValue def applyMove(ctx: GameContext)(move: Move): GameContext = - val req = ProtoMoveRequest.newBuilder().setContext(CoreProtoMapper.toProtoGameContext(ctx)).setMove(CoreProtoMapper.toProtoMove(move)).build() + val req = ProtoMoveRequest + .newBuilder() + .setContext(CoreProtoMapper.toProtoGameContext(ctx)) + .setMove(CoreProtoMapper.toProtoMove(move)) + .build() CoreProtoMapper.fromProtoGameContext(stub.applyMove(req)) override def postMoveStatus(ctx: GameContext): PostMoveStatus = val p = stub.postMoveStatus(CoreProtoMapper.toProtoGameContext(ctx)) - PostMoveStatus(p.getIsCheckmate, p.getIsStalemate, p.getIsInsufficientMaterial, p.getIsCheck, p.getIsThreefoldRepetition) + PostMoveStatus( + p.getIsCheckmate, + p.getIsStalemate, + p.getIsInsufficientMaterial, + p.getIsCheck, + p.getIsThreefoldRepetition, + ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/C2sMessage.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/C2sMessage.scala index af69100..af0e0a2 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/C2sMessage.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/C2sMessage.scala @@ -3,6 +3,6 @@ package de.nowchess.chess.redis sealed trait C2sMessage object C2sMessage: - case object Connected extends C2sMessage + case object Connected extends C2sMessage case class Move(uci: String) extends C2sMessage - case object Ping extends C2sMessage + case object Ping extends C2sMessage diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala index 231cf83..691e488 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisPublisher.scala @@ -22,7 +22,7 @@ class GameRedisPublisher( def onGameEvent(event: GameEvent): Unit = registry.get(gameId).foreach { entry => - val dto = GameDtoMapper.toGameStateDto(entry, ioClient) + val dto = GameDtoMapper.toGameStateDto(entry, ioClient) val json = objectMapper.writeValueAsString(GameStateEventDto(dto)) redisson.getTopic(s2cTopicName).publish(json) @@ -38,9 +38,15 @@ class GameRedisPublisher( blackName = entry.black.displayName, mode = entry.mode.toString, resigned = entry.resigned, - limitSeconds = entry.engine.timeControl match { case de.nowchess.api.game.TimeControl.Clock(l, _) => Some(l); case _ => None }, - incrementSeconds = entry.engine.timeControl match { case de.nowchess.api.game.TimeControl.Clock(_, i) => Some(i); case _ => None }, - daysPerMove = entry.engine.timeControl match { case de.nowchess.api.game.TimeControl.Correspondence(d) => Some(d); case _ => None }, + limitSeconds = entry.engine.timeControl match { + case de.nowchess.api.game.TimeControl.Clock(l, _) => Some(l); case _ => None + }, + incrementSeconds = entry.engine.timeControl match { + case de.nowchess.api.game.TimeControl.Clock(_, i) => Some(i); case _ => None + }, + daysPerMove = entry.engine.timeControl match { + case de.nowchess.api.game.TimeControl.Correspondence(d) => Some(d); case _ => None + }, whiteRemainingMs = clock.collect { case c: LiveClockState => c.whiteRemainingMs }, blackRemainingMs = clock.collect { case c: LiveClockState => c.blackRemainingMs }, incrementMs = clock.collect { case c: LiveClockState => c.incrementMs }, diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala index e6d75ea..22ac44e 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameRedisSubscriberManager.scala @@ -21,12 +21,12 @@ import java.util.concurrent.ConcurrentHashMap class GameRedisSubscriberManager: // scalafix:off DisableSyntax.var - @Inject var redisson: RedissonClient = uninitialized - @Inject var registry: GameRegistry = uninitialized - @Inject var objectMapper: ObjectMapper = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized - @Inject var ioClient: IoGrpcClientWrapper = uninitialized - @Inject(optional = true) var heartbeatService: InstanceHeartbeatService = uninitialized + @Inject var redisson: RedissonClient = uninitialized + @Inject var registry: GameRegistry = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var ioClient: IoGrpcClientWrapper = uninitialized + @Inject var heartbeatService: InstanceHeartbeatService = null // scalafix:on DisableSyntax.var private val c2sListeners = new ConcurrentHashMap[String, Int]() @@ -41,20 +41,30 @@ class GameRedisSubscriberManager: def subscribeGame(gameId: String): Unit = try val topic = redisson.getTopic(c2sTopic(gameId)) - val listenerId = topic.addListener(classOf[String], new MessageListener[String]: - def onMessage(channel: CharSequence, msg: String): Unit = - handleC2sMessage(gameId, msg) + val listenerId = topic.addListener( + classOf[String], + new MessageListener[String]: + def onMessage(channel: CharSequence, msg: String): Unit = + handleC2sMessage(gameId, msg), ) c2sListeners.put(gameId, listenerId) - val writebackTopic = redisson.getTopic("game-writeback") + val writebackTopic = redisson.getTopic("game-writeback") val writebackFn: String => Unit = json => writebackTopic.publish(json) - val obs = new GameRedisPublisher(gameId, registry, redisson, objectMapper, s2cTopicName(gameId), writebackFn, ioClient, unsubscribeGame) + val obs = new GameRedisPublisher( + gameId, + registry, + redisson, + objectMapper, + s2cTopicName(gameId), + writebackFn, + ioClient, + unsubscribeGame, + ) s2cObservers.put(gameId, obs) registry.get(gameId).foreach(_.engine.subscribe(obs)) - if heartbeatService != null then - heartbeatService.addGameSubscription(gameId) + if heartbeatService != null then heartbeatService.addGameSubscription(gameId) catch case e: Exception => System.err.println(s"Warning: Redis subscription failed for game $gameId: ${e.getMessage}") @@ -68,19 +78,18 @@ class GameRedisSubscriberManager: registry.get(gameId).foreach(_.engine.unsubscribe(obs)) } - if heartbeatService != null then - heartbeatService.removeGameSubscription(gameId) + if heartbeatService != null then heartbeatService.removeGameSubscription(gameId) private def handleC2sMessage(gameId: String, msg: String): Unit = parseC2sMessage(msg) match - case Some(C2sMessage.Connected) => handleConnected(gameId) - case Some(C2sMessage.Move(uci)) => handleMove(gameId, uci) - case Some(C2sMessage.Ping) => () - case None => () + case Some(C2sMessage.Connected) => handleConnected(gameId) + case Some(C2sMessage.Move(uci)) => handleMove(gameId, uci) + case Some(C2sMessage.Ping) => () + case None => () private def handleConnected(gameId: String): Unit = registry.get(gameId).foreach { entry => - val dto = GameDtoMapper.toGameFullDto(entry, ioClient) + val dto = GameDtoMapper.toGameFullDto(entry, ioClient) val json = objectMapper.writeValueAsString(GameFullEventDto(dto)) redisson.getTopic(s2cTopicName(gameId)).publish(json) } @@ -120,7 +129,6 @@ class GameRedisSubscriberManager: var count = 0 gameIds.forEach { gameId => unsubscribeGame(gameId) - registry.remove(gameId) count += 1 } count @@ -131,9 +139,5 @@ class GameRedisSubscriberManager: @PreDestroy def cleanup(): Unit = - c2sListeners.forEach((gameId, listenerId) => - redisson.getTopic(c2sTopic(gameId)).removeListener(listenerId) - ) - s2cObservers.forEach((gameId, obs) => - registry.get(gameId).foreach(_.engine.unsubscribe(obs)) - ) + c2sListeners.forEach((gameId, listenerId) => redisson.getTopic(c2sTopic(gameId)).removeListener(listenerId)) + s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs))) diff --git a/modules/core/src/main/scala/de/nowchess/chess/redis/GameWritebackEventDto.scala b/modules/core/src/main/scala/de/nowchess/chess/redis/GameWritebackEventDto.scala index ea4edfa..2a06037 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/redis/GameWritebackEventDto.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/redis/GameWritebackEventDto.scala @@ -1,26 +1,26 @@ package de.nowchess.chess.redis case class GameWritebackEventDto( - gameId: String, - fen: String, - pgn: String, - moveCount: Int, - whiteId: String, - whiteName: String, - blackId: String, - blackName: String, - mode: String, - resigned: Boolean, - limitSeconds: Option[Int], - incrementSeconds: Option[Int], - daysPerMove: Option[Int], - whiteRemainingMs: Option[Long], - blackRemainingMs: Option[Long], - incrementMs: Option[Long], - clockLastTickAt: Option[Long], - clockMoveDeadline: Option[Long], - clockActiveColor: Option[String], - pendingDrawOffer: Option[String], - redoStack: List[String] = Nil, - pendingTakebackRequest: Option[String] = None, + gameId: String, + fen: String, + pgn: String, + moveCount: Int, + whiteId: String, + whiteName: String, + blackId: String, + blackName: String, + mode: String, + resigned: Boolean, + limitSeconds: Option[Int], + incrementSeconds: Option[Int], + daysPerMove: Option[Int], + whiteRemainingMs: Option[Long], + blackRemainingMs: Option[Long], + incrementMs: Option[Long], + clockLastTickAt: Option[Long], + clockMoveDeadline: Option[Long], + clockActiveColor: Option[String], + pendingDrawOffer: Option[String], + redoStack: List[String] = Nil, + pendingTakebackRequest: Option[String] = None, ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala b/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala index 57b6b58..05f14d2 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/registry/GameCacheDto.scala @@ -1,25 +1,25 @@ package de.nowchess.chess.registry case class GameCacheDto( - gameId: String, - whiteId: String, - whiteName: String, - blackId: String, - blackName: String, - mode: String, - pgn: String, - fen: String, - resigned: Boolean, - limitSeconds: Option[Int], - incrementSeconds: Option[Int], - daysPerMove: Option[Int], - whiteRemainingMs: Option[Long], - blackRemainingMs: Option[Long], - incrementMs: Option[Long], - clockLastTickAt: Option[Long], - clockMoveDeadline: Option[Long], - clockActiveColor: Option[String], - pendingDrawOffer: Option[String], - redoStack: List[String] = Nil, - pendingTakebackRequest: Option[String] = None, + gameId: String, + whiteId: String, + whiteName: String, + blackId: String, + blackName: String, + mode: String, + pgn: String, + fen: String, + resigned: Boolean, + limitSeconds: Option[Int], + incrementSeconds: Option[Int], + daysPerMove: Option[Int], + whiteRemainingMs: Option[Long], + blackRemainingMs: Option[Long], + incrementMs: Option[Long], + clockLastTickAt: Option[Long], + clockMoveDeadline: Option[Long], + clockActiveColor: Option[String], + pendingDrawOffer: Option[String], + redoStack: List[String] = Nil, + pendingTakebackRequest: Option[String] = None, ) diff --git a/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala b/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala index 23c1db9..d6a3567 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/registry/RedisGameRegistry.scala @@ -16,6 +16,7 @@ import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import org.eclipse.microprofile.rest.client.inject.RestClient import org.redisson.api.RedissonClient +import scala.annotation.nowarn import scala.compiletime.uninitialized import scala.util.Try import java.nio.charset.StandardCharsets @@ -27,19 +28,19 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} class RedisGameRegistry extends GameRegistry: @Inject // scalafix:off DisableSyntax.var - var redisson: RedissonClient = uninitialized - @Inject var redisConfig: RedisConfig = uninitialized - @Inject var objectMapper: ObjectMapper = uninitialized - @Inject var ioClient: IoGrpcClientWrapper = uninitialized - @Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized + var redisson: RedissonClient = uninitialized + @Inject var redisConfig: RedisConfig = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized + @Inject var ioClient: IoGrpcClientWrapper = uninitialized + @Inject var ruleSetAdapter: RuleSetGrpcAdapter = uninitialized @Inject @RestClient var storeClient: StoreServiceClient = uninitialized // scalafix:on private val localEngines = ConcurrentHashMap[String, GameEntry]() - private val rng = new SecureRandom() + private val rng = new SecureRandom() private def cacheKey(gameId: String) = s"${redisConfig.prefix}:game:entry:$gameId" - private def bucket(gameId: String) = redisson.getBucket[String](cacheKey(gameId)) + private def bucket(gameId: String) = redisson.getBucket[String](cacheKey(gameId)) def generateId(): String = val chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" @@ -48,7 +49,9 @@ class RedisGameRegistry extends GameRegistry: def store(entry: GameEntry): Unit = localEngines.put(entry.gameId, entry) val combined = ioClient.exportCombined(entry.engine.context) - bucket(entry.gameId).set(toJson(entry, combined.fen, combined.pgn), 30, TimeUnit.MINUTES) + val b = bucket(entry.gameId) + b.set(toJson(entry, combined.fen, combined.pgn)) + (b.expire(30, TimeUnit.MINUTES): @nowarn) def get(gameId: String): Option[GameEntry] = Option(localEngines.get(gameId)) match @@ -63,7 +66,9 @@ class RedisGameRegistry extends GameRegistry: def update(entry: GameEntry): Unit = localEngines.put(entry.gameId, entry) val combined = ioClient.exportCombined(entry.engine.context) - bucket(entry.gameId).set(toJson(entry, combined.fen, combined.pgn), 30, TimeUnit.MINUTES) + val b = bucket(entry.gameId) + b.set(toJson(entry, combined.fen, combined.pgn)) + (b.expire(30, TimeUnit.MINUTES): @nowarn) private def readRedisDto(gameId: String): Option[GameCacheDto] = Try(Option(bucket(gameId).get())).toOption.flatten.flatMap { json => @@ -106,7 +111,9 @@ class RedisGameRegistry extends GameRegistry: }.toOption .map { case (dto, entry) => localEngines.put(gameId, entry) - bucket(gameId).set(objectMapper.writeValueAsString(dto), 30, TimeUnit.MINUTES) + val b = bucket(gameId) + b.set(objectMapper.writeValueAsString(dto)) + (b.expire(30, TimeUnit.MINUTES): @nowarn) entry } @@ -118,28 +125,31 @@ class RedisGameRegistry extends GameRegistry: case _ => TimeControl.Unlimited val toColor: String => Color = s => if s == "white" then Color.White else Color.Black val restoredClock: Option[ClockState] = - dto.clockLastTickAt.map { tick => - LiveClockState( - whiteRemainingMs = dto.whiteRemainingMs.get, - blackRemainingMs = dto.blackRemainingMs.get, - incrementMs = dto.incrementMs.get, - lastTickAt = Instant.ofEpochMilli(tick), - activeColor = toColor(dto.clockActiveColor.get), - ) - }.orElse { - dto.clockMoveDeadline.map { deadline => - CorrespondenceClockState( - moveDeadline = Instant.ofEpochMilli(deadline), - daysPerMove = dto.daysPerMove.get, + dto.clockLastTickAt + .map { tick => + LiveClockState( + whiteRemainingMs = dto.whiteRemainingMs.get, + blackRemainingMs = dto.blackRemainingMs.get, + incrementMs = dto.incrementMs.get, + lastTickAt = Instant.ofEpochMilli(tick), activeColor = toColor(dto.clockActiveColor.get), ) } - } - val restoredDrawOffer = dto.pendingDrawOffer.map(toColor) + .orElse { + dto.clockMoveDeadline.map { deadline => + CorrespondenceClockState( + moveDeadline = Instant.ofEpochMilli(deadline), + daysPerMove = dto.daysPerMove.get, + activeColor = toColor(dto.clockActiveColor.get), + ) + } + } + val restoredDrawOffer = dto.pendingDrawOffer.map(toColor) val restoredTakebackRequest = dto.pendingTakebackRequest.map(toColor) val redoMoves = dto.redoStack.flatMap { uci => Parser.parseMove(uci).flatMap { case (from, to, pp) => - ruleSetAdapter.legalMoves(ctx)(from) + ruleSetAdapter + .legalMoves(ctx)(from) .find(m => m.to == to && (pp.isEmpty || m.moveType == de.nowchess.api.move.MoveType.Promotion(pp.get))) } } @@ -195,8 +205,8 @@ class RedisGameRegistry extends GameRegistry: private def entryHash(entry: GameEntry): Option[String] = Try { - val combined = ioClient.exportCombined(entry.engine.context) + val combined = ioClient.exportCombined(entry.engine.context) val canonicalJson = objectMapper.writeValueAsString(toDto(entry, combined.fen, combined.pgn)) - val digest = MessageDigest.getInstance("SHA-256").digest(canonicalJson.getBytes(StandardCharsets.UTF_8)) + val digest = MessageDigest.getInstance("SHA-256").digest(canonicalJson.getBytes(StandardCharsets.UTF_8)) digest.map("%02x".format(_)).mkString }.toOption diff --git a/modules/core/src/main/scala/de/nowchess/chess/resource/GameDtoMapper.scala b/modules/core/src/main/scala/de/nowchess/chess/resource/GameDtoMapper.scala index 1d038c9..424b072 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/resource/GameDtoMapper.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/resource/GameDtoMapper.scala @@ -17,9 +17,9 @@ object GameDtoMapper: else val ctx = entry.engine.context ctx.result match - case Some(GameResult.Win(_, WinReason.Checkmate)) => "checkmate" - case Some(GameResult.Win(_, WinReason.Resignation)) => "resign" - case Some(GameResult.Win(_, WinReason.TimeControl)) => "timeout" + case Some(GameResult.Win(_, WinReason.Checkmate)) => "checkmate" + case Some(GameResult.Win(_, WinReason.Resignation)) => "resign" + case Some(GameResult.Win(_, WinReason.TimeControl)) => "timeout" case Some(GameResult.Draw(DrawReason.Stalemate)) => "stalemate" case Some(GameResult.Draw(DrawReason.InsufficientMaterial)) => "insufficientMaterial" case Some(GameResult.Draw(_)) => "draw" @@ -54,7 +54,7 @@ object GameDtoMapper: } def toGameStateDto(entry: GameEntry, ioClient: IoGrpcClientWrapper): GameStateDto = - val ctx = entry.engine.context + val ctx = entry.engine.context val exported = ioClient.exportCombined(ctx) GameStateDto( fen = exported.fen, diff --git a/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala b/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala index 6e54051..cc686cc 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/resource/GameResource.scala @@ -260,7 +260,8 @@ class GameResource: val color = colorOf(entry) action match case "request" => entry.engine.requestTakeback(color); registry.update(entry); ok(OkResponseDto()) - case "accept" => entry.engine.acceptTakeback(color); registry.update(entry); ok(GameDtoMapper.toGameStateDto(entry, ioClient)) + case "accept" => + entry.engine.acceptTakeback(color); registry.update(entry); ok(GameDtoMapper.toGameStateDto(entry, ioClient)) case "decline" => entry.engine.declineTakeback(color); registry.update(entry); ok(OkResponseDto()) case _ => throw BadRequestException("INVALID_ACTION", s"Unknown takeback action: $action", Some("action")) diff --git a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala index 058ac45..573bbdb 100644 --- a/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala +++ b/modules/core/src/main/scala/de/nowchess/chess/service/InstanceHeartbeatService.scala @@ -7,34 +7,39 @@ import io.quarkus.runtime.StartupEvent import io.quarkus.runtime.ShutdownEvent import io.quarkus.grpc.GrpcClient import org.redisson.api.RedissonClient +import scala.annotation.nowarn import scala.concurrent.duration.* +import scala.compiletime.uninitialized import java.util.concurrent.{Executors, TimeUnit} import java.net.InetAddress import com.fasterxml.jackson.databind.ObjectMapper import org.jboss.logging.Logger -import de.nowchess.coordinator.{HeartbeatFrame, CoordinatorServiceGrpc} -import de.nowchess.coordinator.CoordinatorServiceGrpc.CoordinatorServiceStub +import de.nowchess.coordinator.proto.{CoordinatorServiceGrpc, *} +import de.nowchess.coordinator.proto.CoordinatorServiceGrpc.CoordinatorServiceStub import io.grpc.stub.StreamObserver +import io.grpc.Channel import scala.jdk.FutureConverters.* @ApplicationScoped class InstanceHeartbeatService: @Inject - private var redissonClient: RedissonClient = _ + private var redissonClient: RedissonClient = uninitialized @GrpcClient("coordinator-grpc") - private var coordinatorStub: CoordinatorServiceStub = _ + private var channel: Channel = uninitialized - private val log = Logger.getLogger(classOf[InstanceHeartbeatService]) + private var coordinatorStub: CoordinatorServiceStub = uninitialized + + private val log = Logger.getLogger(classOf[InstanceHeartbeatService]) private val mapper = ObjectMapper() - private var instanceId = "" - private var redisPrefix = "nowchess" + private var instanceId = "" + private var redisPrefix = "nowchess" private var streamObserver: Option[StreamObserver[HeartbeatFrame]] = None - private var heartbeatExecutor = Executors.newScheduledThreadPool(1) - private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1) - private var subscriptionCount = 0 - private var localCacheSize = 0 + private var heartbeatExecutor = Executors.newScheduledThreadPool(1) + private var redisHeartbeatExecutor = Executors.newScheduledThreadPool(1) + private var subscriptionCount = 0 + private var localCacheSize = 0 def onStart(@Observes event: StartupEvent): Unit = try @@ -64,35 +69,34 @@ class InstanceHeartbeatService: localCacheSize = count def addGameSubscription(gameId: String): Unit = - val setKey = s"$redisPrefix:instance:$instanceId:games" + val setKey = s"$redisPrefix:instance:$instanceId:games" val gameSet = redissonClient.getSet[String](setKey) gameSet.add(gameId) subscriptionCount += 1 def removeGameSubscription(gameId: String): Unit = - val setKey = s"$redisPrefix:instance:$instanceId:games" + val setKey = s"$redisPrefix:instance:$instanceId:games" val gameSet = redissonClient.getSet[String](setKey) gameSet.remove(gameId) subscriptionCount = Math.max(0, subscriptionCount - 1) private def generateInstanceId(): Unit = - val hostname = try - InetAddress.getLocalHost.getHostName - catch - case _: Exception => "unknown" + val hostname = + try InetAddress.getLocalHost.getHostName + catch case _: Exception => "unknown" val uuid = java.util.UUID.randomUUID().toString.take(8) instanceId = s"$hostname-$uuid" private def initializeHeartbeatStream(): Unit = try - val responseObserver = new StreamObserver[de.nowchess.coordinator.CoordinatorCommand]: - override def onNext(value: de.nowchess.coordinator.CoordinatorCommand): Unit = + coordinatorStub = CoordinatorServiceGrpc.newStub(channel) + val responseObserver = new StreamObserver[CoordinatorCommand]: + override def onNext(value: CoordinatorCommand): Unit = log.debugf("Received coordinator command: %s", value.getType) override def onError(t: Throwable): Unit = log.warnf(t, "Heartbeat stream error") - // Reconnect on error () // Placeholder for reconnect logic override def onCompleted: Unit = @@ -111,7 +115,7 @@ class InstanceHeartbeatService: () => sendHeartbeat(), 0, 200, - TimeUnit.MILLISECONDS + TimeUnit.MILLISECONDS, ) // Refresh Redis TTL every 2s @@ -119,13 +123,14 @@ class InstanceHeartbeatService: () => refreshRedisHeartbeat(), 0, 2, - TimeUnit.SECONDS + TimeUnit.SECONDS, ) private def sendHeartbeat(): Unit = streamObserver.foreach { observer => try - val frame = HeartbeatFrame.newBuilder() + val frame = HeartbeatFrame + .newBuilder() .setInstanceId(instanceId) .setHostname(getHostname) .setHttpPort(8080) // Placeholder, should be configurable @@ -142,31 +147,30 @@ class InstanceHeartbeatService: private def refreshRedisHeartbeat(): Unit = try - val key = s"$redisPrefix:instances:$instanceId" + val key = s"$redisPrefix:instances:$instanceId" val bucket = redissonClient.getBucket[String](key) val metadata = Map( - "instanceId" -> instanceId, - "hostname" -> getHostname, - "httpPort" -> 8080, - "grpcPort" -> 9080, + "instanceId" -> instanceId, + "hostname" -> getHostname, + "httpPort" -> 8080, + "grpcPort" -> 9080, "subscriptionCount" -> subscriptionCount, - "localCacheSize" -> localCacheSize, - "lastHeartbeat" -> java.time.Instant.now().toString, - "state" -> "HEALTHY" + "localCacheSize" -> localCacheSize, + "lastHeartbeat" -> java.time.Instant.now().toString, + "state" -> "HEALTHY", ) val json = mapper.writeValueAsString(metadata) - bucket.set(json, 5, TimeUnit.SECONDS) // 5-second TTL, refreshed every 2s + bucket.set(json) + (bucket.expire(5, TimeUnit.SECONDS): @nowarn) catch case ex: Exception => log.warnf(ex, "Failed to refresh Redis heartbeat") private def getHostname: String = - try - InetAddress.getLocalHost.getHostName - catch - case _: Exception => "unknown" + try InetAddress.getLocalHost.getHostName + catch case _: Exception => "unknown" private def cleanup(): Unit = streamObserver.foreach(_.onCompleted()) @@ -180,7 +184,5 @@ class InstanceHeartbeatService: heartbeatExecutor.shutdown() redisHeartbeatExecutor.shutdown() - if !heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS) then - heartbeatExecutor.shutdownNow() - if !redisHeartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS) then - redisHeartbeatExecutor.shutdownNow() + if !heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS) then heartbeatExecutor.shutdownNow() + if !redisHeartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS) then redisHeartbeatExecutor.shutdownNow() diff --git a/modules/core/src/test/resources/application.yml b/modules/core/src/test/resources/application.yml index 22c524c..08040c7 100644 --- a/modules/core/src/test/resources/application.yml +++ b/modules/core/src/test/resources/application.yml @@ -7,6 +7,9 @@ quarkus: io-grpc: host: localhost port: 9081 + rest-client: + store-service: + url: http://localhost:8085 nowchess: redis: diff --git a/modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerBranchTest.scala b/modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerBranchTest.scala deleted file mode 100644 index 337a31a..0000000 --- a/modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerBranchTest.scala +++ /dev/null @@ -1,153 +0,0 @@ -package de.nowchess.chess.command - -import de.nowchess.api.board.{File, Rank, Square} -import de.nowchess.api.game.GameContext -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -class CommandInvokerBranchTest extends AnyFunSuite with Matchers: - - private def sq(f: File, r: Rank): Square = Square(f, r) - - private case class FailingCommand() extends Command: - override def execute(): Boolean = false - override def undo(): Boolean = false - override def description: String = "Failing command" - - private class ConditionalFailCommand( - initialShouldFailOnUndo: Boolean = false, - initialShouldFailOnExecute: Boolean = false, - ) extends Command: - val shouldFailOnUndo = new java.util.concurrent.atomic.AtomicBoolean(initialShouldFailOnUndo) - val shouldFailOnExecute = new java.util.concurrent.atomic.AtomicBoolean(initialShouldFailOnExecute) - override def execute(): Boolean = !shouldFailOnExecute.get() - override def undo(): Boolean = !shouldFailOnUndo.get() - override def description: String = "Conditional fail" - - private def createMoveCommand(from: Square, to: Square, executeSucceeds: Boolean = true): MoveCommand = - MoveCommand( - from = from, - to = to, - moveResult = if executeSucceeds then Some(MoveResult.Successful(GameContext.initial, None)) else None, - previousContext = Some(GameContext.initial), - ) - - test("execute rejects failing commands and keeps history unchanged"): - val invoker = new CommandInvoker() - val cmd = FailingCommand() - invoker.execute(cmd) shouldBe false - invoker.history.size shouldBe 0 - invoker.getCurrentIndex shouldBe -1 - - val failingCmd = FailingCommand() - val successCmd = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - invoker.execute(failingCmd) shouldBe false - invoker.history.size shouldBe 0 - invoker.execute(successCmd) shouldBe true - invoker.history.size shouldBe 1 - invoker.history.head shouldBe successCmd - - test("undo redo and history trimming cover all command state transitions"): - { - val invoker = new CommandInvoker() - invoker.undo() shouldBe false - invoker.canUndo shouldBe false - invoker.undo() shouldBe false - } - - { - val invoker = new CommandInvoker() - val cmd1 = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - val cmd2 = createMoveCommand(sq(File.E, Rank.R7), sq(File.E, Rank.R5)) - invoker.execute(cmd1) - invoker.execute(cmd2) - invoker.undo() - invoker.undo() - invoker.undo() shouldBe false - } - - { - val invoker = new CommandInvoker() - val failingUndoCmd = ConditionalFailCommand(initialShouldFailOnUndo = true) - invoker.execute(failingUndoCmd) shouldBe true - invoker.canUndo shouldBe true - invoker.undo() shouldBe false - invoker.getCurrentIndex shouldBe 0 - } - - { - val invoker = new CommandInvoker() - val successUndoCmd = ConditionalFailCommand() - invoker.execute(successUndoCmd) shouldBe true - invoker.undo() shouldBe true - invoker.getCurrentIndex shouldBe -1 - } - - { - val invoker = new CommandInvoker() - invoker.redo() shouldBe false - } - - { - val invoker = new CommandInvoker() - val cmd = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - invoker.execute(cmd) - invoker.canRedo shouldBe false - invoker.redo() shouldBe false - } - - { - val invoker = new CommandInvoker() - val cmd1 = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - val redoFailCmd = ConditionalFailCommand() - invoker.execute(cmd1) - invoker.execute(redoFailCmd) - invoker.undo() - invoker.canRedo shouldBe true - redoFailCmd.shouldFailOnExecute.set(true) - invoker.redo() shouldBe false - invoker.getCurrentIndex shouldBe 0 - } - - { - val invoker = new CommandInvoker() - val cmd = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - invoker.execute(cmd) shouldBe true - invoker.undo() shouldBe true - invoker.redo() shouldBe true - invoker.getCurrentIndex shouldBe 0 - } - - { - val invoker = new CommandInvoker() - val cmd1 = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - val cmd2 = createMoveCommand(sq(File.E, Rank.R7), sq(File.E, Rank.R5)) - val cmd3 = createMoveCommand(sq(File.D, Rank.R2), sq(File.D, Rank.R4)) - invoker.execute(cmd1) - invoker.execute(cmd2) - invoker.undo() - invoker.canRedo shouldBe true - invoker.execute(cmd3) - invoker.canRedo shouldBe false - invoker.history.size shouldBe 2 - invoker.history(1) shouldBe cmd3 - } - - { - val invoker = new CommandInvoker() - val cmd1 = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - val cmd2 = createMoveCommand(sq(File.E, Rank.R7), sq(File.E, Rank.R5)) - val cmd3 = createMoveCommand(sq(File.G, Rank.R1), sq(File.F, Rank.R3)) - val cmd4 = createMoveCommand(sq(File.D, Rank.R2), sq(File.D, Rank.R4)) - invoker.execute(cmd1) - invoker.execute(cmd2) - invoker.execute(cmd3) - invoker.execute(cmd4) - invoker.undo() - invoker.undo() - invoker.canRedo shouldBe true - val newCmd = createMoveCommand(sq(File.B, Rank.R2), sq(File.B, Rank.R4)) - invoker.execute(newCmd) - invoker.history.size shouldBe 3 - invoker.canRedo shouldBe false - } diff --git a/modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerTest.scala b/modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerTest.scala deleted file mode 100644 index c9e82af..0000000 --- a/modules/core/src/test/scala/de/nowchess/chess/command/CommandInvokerTest.scala +++ /dev/null @@ -1,67 +0,0 @@ -package de.nowchess.chess.command - -import de.nowchess.api.board.{File, Rank, Square} -import de.nowchess.api.game.GameContext -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -class CommandInvokerTest extends AnyFunSuite with Matchers: - - private def sq(f: File, r: Rank): Square = Square(f, r) - - private def createMoveCommand(from: Square, to: Square): MoveCommand = - MoveCommand( - from = from, - to = to, - moveResult = Some(MoveResult.Successful(GameContext.initial, None)), - previousContext = Some(GameContext.initial), - ) - - test("execute appends commands and updates index"): - val invoker = new CommandInvoker() - val cmd = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - invoker.execute(cmd) shouldBe true - invoker.history.size shouldBe 1 - invoker.getCurrentIndex shouldBe 0 - - val cmd2 = createMoveCommand(sq(File.E, Rank.R7), sq(File.E, Rank.R5)) - invoker.execute(cmd2) shouldBe true - invoker.history.size shouldBe 2 - invoker.getCurrentIndex shouldBe 1 - - test("undo and redo update index and availability flags"): - val invoker = new CommandInvoker() - val cmd = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - invoker.canUndo shouldBe false - invoker.execute(cmd) - invoker.canUndo shouldBe true - invoker.undo() shouldBe true - invoker.getCurrentIndex shouldBe -1 - invoker.canRedo shouldBe true - invoker.redo() shouldBe true - invoker.getCurrentIndex shouldBe 0 - - test("clear removes full history and resets index"): - val invoker = new CommandInvoker() - val cmd = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - invoker.execute(cmd) - invoker.clear() - invoker.history.size shouldBe 0 - invoker.getCurrentIndex shouldBe -1 - - test("execute after undo discards redo history"): - val invoker = new CommandInvoker() - val cmd1 = createMoveCommand(sq(File.E, Rank.R2), sq(File.E, Rank.R4)) - val cmd2 = createMoveCommand(sq(File.E, Rank.R7), sq(File.E, Rank.R5)) - val cmd3 = createMoveCommand(sq(File.D, Rank.R2), sq(File.D, Rank.R4)) - invoker.execute(cmd1) - invoker.execute(cmd2) - invoker.undo() - invoker.getCurrentIndex shouldBe 0 - invoker.canRedo shouldBe true - invoker.execute(cmd3) - invoker.canRedo shouldBe false - invoker.history.size shouldBe 2 - invoker.history.head shouldBe cmd1 - invoker.history(1) shouldBe cmd3 - invoker.getCurrentIndex shouldBe 1 diff --git a/modules/core/src/test/scala/de/nowchess/chess/command/CommandTest.scala b/modules/core/src/test/scala/de/nowchess/chess/command/CommandTest.scala deleted file mode 100644 index 64532f7..0000000 --- a/modules/core/src/test/scala/de/nowchess/chess/command/CommandTest.scala +++ /dev/null @@ -1,23 +0,0 @@ -package de.nowchess.chess.command - -import de.nowchess.api.game.GameContext -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -class CommandTest extends AnyFunSuite with Matchers: - - test("QuitCommand properties and behavior"): - val cmd = QuitCommand() - cmd.execute() shouldBe true - cmd.undo() shouldBe false - cmd.description shouldBe "Quit game" - - test("ResetCommand behavior depends on previousContext"): - val noState = ResetCommand() - noState.execute() shouldBe true - noState.undo() shouldBe false - noState.description shouldBe "Reset board" - - val withState = ResetCommand(previousContext = Some(GameContext.initial)) - withState.execute() shouldBe true - withState.undo() shouldBe true diff --git a/modules/core/src/test/scala/de/nowchess/chess/command/MoveCommandTest.scala b/modules/core/src/test/scala/de/nowchess/chess/command/MoveCommandTest.scala deleted file mode 100644 index d2ff845..0000000 --- a/modules/core/src/test/scala/de/nowchess/chess/command/MoveCommandTest.scala +++ /dev/null @@ -1,70 +0,0 @@ -package de.nowchess.chess.command - -import de.nowchess.api.board.{File, Rank, Square} -import de.nowchess.api.game.GameContext -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -class MoveCommandTest extends AnyFunSuite with Matchers: - - private def sq(f: File, r: Rank): Square = Square(f, r) - - test("MoveCommand defaults to empty optional state and false execute/undo"): - val cmd = MoveCommand(from = sq(File.E, Rank.R2), to = sq(File.E, Rank.R4)) - cmd.moveResult shouldBe None - cmd.previousContext shouldBe None - cmd.execute() shouldBe false - cmd.undo() shouldBe false - cmd.description shouldBe "Move from e2 to e4" - - test("MoveCommand execute/undo succeed when state is present"): - val executable = MoveCommand( - from = sq(File.E, Rank.R2), - to = sq(File.E, Rank.R4), - moveResult = Some(MoveResult.Successful(GameContext.initial, None)), - ) - executable.execute() shouldBe true - - val undoable = MoveCommand( - from = sq(File.E, Rank.R2), - to = sq(File.E, Rank.R4), - moveResult = Some(MoveResult.Successful(GameContext.initial, None)), - previousContext = Some(GameContext.initial), - ) - undoable.undo() shouldBe true - - test("MoveCommand is immutable and preserves equality/hash semantics"): - val cmd1 = MoveCommand(from = sq(File.E, Rank.R2), to = sq(File.E, Rank.R4)) - - val result = MoveResult.Successful(GameContext.initial, None) - val cmd2 = cmd1.copy( - moveResult = Some(result), - previousContext = Some(GameContext.initial), - ) - - cmd1.moveResult shouldBe None - cmd1.previousContext shouldBe None - - cmd2.moveResult shouldBe Some(result) - cmd2.previousContext shouldBe Some(GameContext.initial) - - val eq1 = MoveCommand( - from = sq(File.E, Rank.R2), - to = sq(File.E, Rank.R4), - moveResult = None, - previousContext = None, - ) - - val eq2 = MoveCommand( - from = sq(File.E, Rank.R2), - to = sq(File.E, Rank.R4), - moveResult = None, - previousContext = None, - ) - - eq1 shouldBe eq2 - eq1.hashCode shouldBe eq2.hashCode - - val hash1 = eq1.hashCode - val hash2 = eq1.hashCode - hash1 shouldBe hash2 diff --git a/modules/core/src/test/scala/de/nowchess/chess/engine/GameEngineIntegrationTest.scala b/modules/core/src/test/scala/de/nowchess/chess/engine/GameEngineIntegrationTest.scala index 41de750..f1edee5 100644 --- a/modules/core/src/test/scala/de/nowchess/chess/engine/GameEngineIntegrationTest.scala +++ b/modules/core/src/test/scala/de/nowchess/chess/engine/GameEngineIntegrationTest.scala @@ -3,7 +3,7 @@ package de.nowchess.chess.engine import de.nowchess.api.board.{Board, Color, File, PieceType, Rank, Square} import de.nowchess.api.game.GameContext import de.nowchess.api.move.{Move, MoveType, PromotionPiece} -import de.nowchess.chess.observer.{GameEvent, InvalidMoveEvent, InvalidMoveReason, MoveRedoneEvent, Observer} +import de.nowchess.chess.observer.{GameEvent, InvalidMoveEvent, InvalidMoveReason, Observer} import de.nowchess.api.error.GameError import de.nowchess.api.io.GameContextImport import de.nowchess.api.rules.RuleSet @@ -21,15 +21,6 @@ class GameEngineIntegrationTest extends AnyFunSuite with Matchers: engine.subscribe((event: GameEvent) => events += event) events - test("accessors expose redo availability and command history"): - val engine = new GameEngine(ruleSet = DefaultRules) - - engine.canRedo shouldBe false - engine.commandHistory shouldBe empty - - engine.processUserInput("e2e4") - engine.commandHistory.nonEmpty shouldBe true - test("processUserInput handles undo redo empty and malformed commands"): val engine = new GameEngine(ruleSet = DefaultRules) val events = captureEvents(engine) @@ -72,26 +63,11 @@ class GameEngineIntegrationTest extends AnyFunSuite with Matchers: engine.loadPosition(target) engine.context shouldBe target - engine.commandHistory shouldBe empty events.lastOption.exists { case _: de.nowchess.chess.observer.BoardResetEvent => true case _ => false } shouldBe true - test("redo event includes captured piece description when replaying a capture"): - val engine = new GameEngine(ruleSet = DefaultRules) - val events = captureEvents(engine) - - EngineTestHelpers.loadFen(engine, "4k3/8/8/8/8/8/4K3/R6r w - - 0 1") - events.clear() - - engine.processUserInput("a1h1") - engine.processUserInput("undo") - engine.processUserInput("redo") - - val redo = events.collectFirst { case e: MoveRedoneEvent => e } - redo.flatMap(_.capturedPiece) shouldBe Some("Black Rook") - test("loadGame replay handles promotion moves when pending promotion exists"): val promotionMove = Move(sq("e2"), sq("e8"), MoveType.Promotion(PromotionPiece.Queen)) diff --git a/modules/core/src/test/scala/de/nowchess/chess/registry/GameRegistryImplTest.scala b/modules/core/src/test/scala/de/nowchess/chess/registry/GameRegistryImplTest.scala index 53d1362..606a373 100644 --- a/modules/core/src/test/scala/de/nowchess/chess/registry/GameRegistryImplTest.scala +++ b/modules/core/src/test/scala/de/nowchess/chess/registry/GameRegistryImplTest.scala @@ -1,12 +1,22 @@ package de.nowchess.chess.registry +import de.nowchess.api.game.GameContext import de.nowchess.api.player.{PlayerId, PlayerInfo} +import de.nowchess.chess.client.{CombinedExportResponse, StoreServiceClient} +import de.nowchess.chess.grpc.IoGrpcClientWrapper +import de.nowchess.io.fen.FenExporter +import de.nowchess.io.pgn.PgnExporter import de.nowchess.rules.sets.DefaultRules import de.nowchess.chess.engine.GameEngine +import io.quarkus.test.InjectMock import io.quarkus.test.junit.QuarkusTest import jakarta.inject.Inject -import org.junit.jupiter.api.{DisplayName, Test} +import org.eclipse.microprofile.rest.client.inject.RestClient +import org.junit.jupiter.api.{BeforeEach, DisplayName, Test} import org.junit.jupiter.api.Assertions.* +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock import scala.compiletime.uninitialized @@ -18,6 +28,25 @@ class GameRegistryImplTest: @Inject var registry: GameRegistry = uninitialized + @InjectMock + var ioWrapper: IoGrpcClientWrapper = uninitialized + + @InjectMock + @RestClient + var storeClient: StoreServiceClient = uninitialized + + @BeforeEach + def setupMocks(): Unit = + when(ioWrapper.exportCombined(any())).thenAnswer((inv: InvocationOnMock) => + val ctx = inv.getArgument[GameContext](0) + CombinedExportResponse(FenExporter.exportGameContext(ctx), PgnExporter.exportGameContext(ctx)), + ) + when(ioWrapper.importPgn(any[String]())).thenAnswer((inv: InvocationOnMock) => + de.nowchess.io.pgn.PgnParser + .importGameContext(inv.getArgument[String](0)) + .getOrElse(GameContext.initial), + ) + @Test @DisplayName("store saves entry") def testStore(): Unit = diff --git a/modules/core/src/test/scala/de/nowchess/chess/resource/GameResourceIntegrationTest.scala b/modules/core/src/test/scala/de/nowchess/chess/resource/GameResourceIntegrationTest.scala index 889006d..cb56c06 100644 --- a/modules/core/src/test/scala/de/nowchess/chess/resource/GameResourceIntegrationTest.scala +++ b/modules/core/src/test/scala/de/nowchess/chess/resource/GameResourceIntegrationTest.scala @@ -7,7 +7,7 @@ import de.nowchess.chess.client.CombinedExportResponse import de.nowchess.chess.exception.BadRequestException import de.nowchess.chess.grpc.{IoGrpcClientWrapper, RuleSetGrpcAdapter} import de.nowchess.io.fen.FenExporter -import de.nowchess.io.pgn.PgnParser +import de.nowchess.io.pgn.{PgnExporter, PgnParser} import de.nowchess.rules.sets.DefaultRules import io.quarkus.test.InjectMock import io.quarkus.test.junit.QuarkusTest @@ -37,17 +37,19 @@ class GameResourceIntegrationTest: @BeforeEach def setupMocks(): Unit = when(ioWrapper.importFen(any[String]())).thenReturn(GameContext.initial) - when(ioWrapper.importPgn(any[String]())).thenReturn( - PgnParser.importGameContext("1. e4 c5").toOption.get, + when(ioWrapper.importPgn(any[String]())).thenAnswer((inv: InvocationOnMock) => + PgnParser.importGameContext(inv.getArgument[String](0)).getOrElse(GameContext.initial), ) when(ioWrapper.exportCombined(any())).thenAnswer((inv: InvocationOnMock) => val ctx = inv.getArgument[GameContext](0) - CombinedExportResponse(FenExporter.exportGameContext(ctx), ""), + CombinedExportResponse(FenExporter.exportGameContext(ctx), PgnExporter.exportGameContext(ctx)), ) when(ioWrapper.exportFen(any())).thenAnswer((inv: InvocationOnMock) => FenExporter.exportGameContext(inv.getArgument[GameContext](0)), ) - when(ioWrapper.exportPgn(any())).thenReturn("") + when(ioWrapper.exportPgn(any())).thenAnswer((inv: InvocationOnMock) => + PgnExporter.exportGameContext(inv.getArgument[GameContext](0)), + ) when(ruleAdapter.legalMoves(any())(any())).thenAnswer((inv: InvocationOnMock) => DefaultRules.legalMoves(inv.getArgument[GameContext](0))(inv.getArgument[Square](1)), diff --git a/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala b/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala index 5cdcd34..c567020 100644 --- a/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala +++ b/modules/io/src/main/scala/de/nowchess/io/grpc/IoGrpcService.scala @@ -39,10 +39,22 @@ class IoGrpcService extends IoServiceGrpc.IoServiceImplBase: ) override def exportFen(req: ProtoGameContext, resp: StreamObserver[ProtoStringResult]): Unit = - respond(resp, ProtoStringResult.newBuilder().setValue(FenExporter.exportGameContext(IoProtoMapper.fromProtoGameContext(req))).build()) + respond( + resp, + ProtoStringResult + .newBuilder() + .setValue(FenExporter.exportGameContext(IoProtoMapper.fromProtoGameContext(req))) + .build(), + ) override def exportPgn(req: ProtoGameContext, resp: StreamObserver[ProtoStringResult]): Unit = - respond(resp, ProtoStringResult.newBuilder().setValue(PgnExporter.exportGameContext(IoProtoMapper.fromProtoGameContext(req))).build()) + respond( + resp, + ProtoStringResult + .newBuilder() + .setValue(PgnExporter.exportGameContext(IoProtoMapper.fromProtoGameContext(req))) + .build(), + ) private def respond[T](obs: StreamObserver[T], value: T): Unit = obs.onNext(value) diff --git a/modules/io/src/main/scala/de/nowchess/io/grpc/IoProtoMapper.scala b/modules/io/src/main/scala/de/nowchess/io/grpc/IoProtoMapper.scala index 7bcd9d5..4495e74 100644 --- a/modules/io/src/main/scala/de/nowchess/io/grpc/IoProtoMapper.scala +++ b/modules/io/src/main/scala/de/nowchess/io/grpc/IoProtoMapper.scala @@ -1,7 +1,7 @@ package de.nowchess.io.grpc import de.nowchess.api.board.* -import de.nowchess.api.board.{CastlingRights as DomainCastlingRights} +import de.nowchess.api.board.CastlingRights as DomainCastlingRights import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason} import de.nowchess.api.move.{Move as DomainMove, MoveType, PromotionPiece} import de.nowchess.io.proto.* @@ -58,7 +58,12 @@ object IoProtoMapper: case _ => MoveType.Normal(false) def toProtoMove(m: DomainMove): ProtoMove = - ProtoMove.newBuilder().setFrom(m.from.toString).setTo(m.to.toString).setMoveKind(toProtoMoveKind(m.moveType)).build() + ProtoMove + .newBuilder() + .setFrom(m.from.toString) + .setTo(m.to.toString) + .setMoveKind(toProtoMoveKind(m.moveType)) + .build() def fromProtoMove(m: ProtoMove): Option[DomainMove] = for @@ -67,16 +72,33 @@ object IoProtoMapper: yield DomainMove(from, to, fromProtoMoveKind(m.getMoveKind)) def toProtoBoard(board: Board): java.util.List[ProtoSquarePiece] = - board.pieces.map { (sq, piece) => - ProtoSquarePiece - .newBuilder() - .setSquare(sq.toString) - .setPiece(ProtoPiece.newBuilder().setColor(toProtoColor(piece.color)).setPieceType(toProtoPieceType(piece.pieceType)).build()) - .build() - }.toSeq.asJava + board.pieces + .map { (sq, piece) => + ProtoSquarePiece + .newBuilder() + .setSquare(sq.toString) + .setPiece( + ProtoPiece + .newBuilder() + .setColor(toProtoColor(piece.color)) + .setPieceType(toProtoPieceType(piece.pieceType)) + .build(), + ) + .build() + } + .toSeq + .asJava def fromProtoBoard(pieces: java.util.List[ProtoSquarePiece]): Board = - Board(pieces.asScala.flatMap(sp => Square.fromAlgebraic(sp.getSquare).map(_ -> Piece(fromProtoColor(sp.getPiece.getColor), fromProtoPieceType(sp.getPiece.getPieceType)))).toMap) + Board( + pieces.asScala + .flatMap(sp => + Square + .fromAlgebraic(sp.getSquare) + .map(_ -> Piece(fromProtoColor(sp.getPiece.getColor), fromProtoPieceType(sp.getPiece.getPieceType))), + ) + .toMap, + ) def toProtoResultKind(r: Option[GameResult]): ProtoGameResultKind = r match case None => ProtoGameResultKind.ONGOING @@ -131,12 +153,13 @@ object IoProtoMapper: def fromProtoGameContext(p: ProtoGameContext): GameContext = val cr = p.getCastlingRights GameContext( - board = fromProtoBoard(p.getBoardList), - turn = fromProtoColor(p.getTurn), - castlingRights = DomainCastlingRights(cr.getWhiteKingSide, cr.getWhiteQueenSide, cr.getBlackKingSide, cr.getBlackQueenSide), + board = fromProtoBoard(p.getBoardList), + turn = fromProtoColor(p.getTurn), + castlingRights = + DomainCastlingRights(cr.getWhiteKingSide, cr.getWhiteQueenSide, cr.getBlackKingSide, cr.getBlackQueenSide), enPassantSquare = Option(p.getEnPassantSquare).filter(_.nonEmpty).flatMap(Square.fromAlgebraic), halfMoveClock = p.getHalfMoveClock, - moves = p.getMovesList.asScala.flatMap(fromProtoMove).toList, - result = fromProtoResultKind(p.getResult), - initialBoard = fromProtoBoard(p.getInitialBoardList), + moves = p.getMovesList.asScala.flatMap(fromProtoMove).toList, + result = fromProtoResultKind(p.getResult), + initialBoard = fromProtoBoard(p.getInitialBoardList), ) diff --git a/modules/rule/build.gradle.kts b/modules/rule/build.gradle.kts index 4a6adea..ce82b80 100644 --- a/modules/rule/build.gradle.kts +++ b/modules/rule/build.gradle.kts @@ -116,7 +116,7 @@ tasks.jar { duplicatesStrategy = DuplicatesStrategy.EXCLUDE } -tasks.withType(org.gradle.api.tasks.scala.ScalaCompile::class).configureEach { +tasks.withType(ScalaCompile::class).configureEach { if (name == "compileScoverageScala") { source = source.asFileTree.matching { exclude("**/grpc/*.scala") diff --git a/modules/rule/src/main/scala/de/nowchess/rules/grpc/ProtoMapper.scala b/modules/rule/src/main/scala/de/nowchess/rules/grpc/ProtoMapper.scala index 4864afb..30cc60d 100644 --- a/modules/rule/src/main/scala/de/nowchess/rules/grpc/ProtoMapper.scala +++ b/modules/rule/src/main/scala/de/nowchess/rules/grpc/ProtoMapper.scala @@ -2,7 +2,7 @@ package de.nowchess.rules.grpc import de.nowchess.api.board.{CastlingRights as DomainCastlingRights, *} import de.nowchess.api.game.{DrawReason, GameContext, GameResult, WinReason} -import de.nowchess.api.move.{MoveType, PromotionPiece, Move as DomainMove} +import de.nowchess.api.move.{Move as DomainMove, MoveType, PromotionPiece} import de.nowchess.rules.proto.* import scala.jdk.CollectionConverters.* @@ -57,7 +57,12 @@ object ProtoMapper: case _ => MoveType.Normal(false) def toProtoMove(m: DomainMove): ProtoMove = - ProtoMove.newBuilder().setFrom(m.from.toString).setTo(m.to.toString).setMoveKind(toProtoMoveKind(m.moveType)).build() + ProtoMove + .newBuilder() + .setFrom(m.from.toString) + .setTo(m.to.toString) + .setMoveKind(toProtoMoveKind(m.moveType)) + .build() def fromProtoMove(m: ProtoMove): Option[DomainMove] = for @@ -66,16 +71,33 @@ object ProtoMapper: yield DomainMove(from, to, fromProtoMoveKind(m.getMoveKind)) def toProtoBoard(board: Board): java.util.List[ProtoSquarePiece] = - board.pieces.map { (sq, piece) => - ProtoSquarePiece - .newBuilder() - .setSquare(sq.toString) - .setPiece(ProtoPiece.newBuilder().setColor(toProtoColor(piece.color)).setPieceType(toProtoPieceType(piece.pieceType)).build()) - .build() - }.toSeq.asJava + board.pieces + .map { (sq, piece) => + ProtoSquarePiece + .newBuilder() + .setSquare(sq.toString) + .setPiece( + ProtoPiece + .newBuilder() + .setColor(toProtoColor(piece.color)) + .setPieceType(toProtoPieceType(piece.pieceType)) + .build(), + ) + .build() + } + .toSeq + .asJava def fromProtoBoard(pieces: java.util.List[ProtoSquarePiece]): Board = - Board(pieces.asScala.flatMap(sp => Square.fromAlgebraic(sp.getSquare).map(_ -> Piece(fromProtoColor(sp.getPiece.getColor), fromProtoPieceType(sp.getPiece.getPieceType)))).toMap) + Board( + pieces.asScala + .flatMap(sp => + Square + .fromAlgebraic(sp.getSquare) + .map(_ -> Piece(fromProtoColor(sp.getPiece.getColor), fromProtoPieceType(sp.getPiece.getPieceType))), + ) + .toMap, + ) def toProtoResultKind(r: Option[GameResult]): ProtoGameResultKind = r match case None => ProtoGameResultKind.ONGOING @@ -130,12 +152,13 @@ object ProtoMapper: def fromProtoGameContext(p: ProtoGameContext): GameContext = val cr = p.getCastlingRights GameContext( - board = fromProtoBoard(p.getBoardList), - turn = fromProtoColor(p.getTurn), - castlingRights = DomainCastlingRights(cr.getWhiteKingSide, cr.getWhiteQueenSide, cr.getBlackKingSide, cr.getBlackQueenSide), + board = fromProtoBoard(p.getBoardList), + turn = fromProtoColor(p.getTurn), + castlingRights = + DomainCastlingRights(cr.getWhiteKingSide, cr.getWhiteQueenSide, cr.getBlackKingSide, cr.getBlackQueenSide), enPassantSquare = Option(p.getEnPassantSquare).filter(_.nonEmpty).flatMap(Square.fromAlgebraic), - halfMoveClock = p.getHalfMoveClock, - moves = p.getMovesList.asScala.flatMap(fromProtoMove).toList, - result = fromProtoResultKind(p.getResult), - initialBoard = fromProtoBoard(p.getInitialBoardList), + halfMoveClock = p.getHalfMoveClock, + moves = p.getMovesList.asScala.flatMap(fromProtoMove).toList, + result = fromProtoResultKind(p.getResult), + initialBoard = fromProtoBoard(p.getInitialBoardList), ) diff --git a/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala b/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala index c37e35a..44771b5 100644 --- a/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala +++ b/modules/rule/src/main/scala/de/nowchess/rules/grpc/RuleGrpcService.scala @@ -12,15 +12,22 @@ import io.quarkus.grpc.GrpcService class RuleGrpcService extends RuleServiceGrpc.RuleServiceImplBase: private def parseSquare(s: String): Square = - Square.fromAlgebraic(s).getOrElse( - throw Status.INVALID_ARGUMENT.withDescription(s"Invalid square: $s").asRuntimeException(), - ) + Square + .fromAlgebraic(s) + .getOrElse( + throw Status.INVALID_ARGUMENT.withDescription(s"Invalid square: $s").asRuntimeException(), + ) override def candidateMoves(req: ProtoSquareRequest, resp: StreamObserver[ProtoMoveList]): Unit = val ctx = ProtoMapper.fromProtoGameContext(req.getContext) val sq = parseSquare(req.getSquare) val moves = DefaultRules.candidateMoves(ctx)(sq) - resp.onNext(ProtoMoveList.newBuilder().addAllMoves(moves.map(ProtoMapper.toProtoMove).asInstanceOf[java.util.List[ProtoMove]]).build()) + resp.onNext( + ProtoMoveList + .newBuilder() + .addAllMoves(moves.map(ProtoMapper.toProtoMove).asInstanceOf[java.util.List[ProtoMove]]) + .build(), + ) resp.onCompleted() override def legalMoves(req: ProtoSquareRequest, resp: StreamObserver[ProtoMoveList]): Unit = @@ -52,10 +59,12 @@ class RuleGrpcService extends RuleServiceGrpc.RuleServiceImplBase: respond(resp, boolResult(DefaultRules.isThreefoldRepetition(ProtoMapper.fromProtoGameContext(req)))) override def applyMove(req: ProtoMoveRequest, resp: StreamObserver[ProtoGameContext]): Unit = - val ctx = ProtoMapper.fromProtoGameContext(req.getContext) - val move = ProtoMapper.fromProtoMove(req.getMove).getOrElse( - throw Status.INVALID_ARGUMENT.withDescription("Invalid move").asRuntimeException(), - ) + val ctx = ProtoMapper.fromProtoGameContext(req.getContext) + val move = ProtoMapper + .fromProtoMove(req.getMove) + .getOrElse( + throw Status.INVALID_ARGUMENT.withDescription("Invalid move").asRuntimeException(), + ) respond(resp, ProtoMapper.toProtoGameContext(DefaultRules.applyMove(ctx)(move))) override def postMoveStatus(req: ProtoGameContext, resp: StreamObserver[ProtoPostMoveStatus]): Unit = diff --git a/modules/store/src/main/resources/application.yml b/modules/store/src/main/resources/application.yml new file mode 100644 index 0000000..103bdf3 --- /dev/null +++ b/modules/store/src/main/resources/application.yml @@ -0,0 +1,32 @@ +quarkus: + application.name: nowchess-store + http.port: 8085 + config.yaml.enabled: true + datasource: + db-kind: postgresql + username: ${DB_USER:nowchess} + password: ${DB_PASSWORD:nowchess} + jdbc.url: ${DB_URL:jdbc:postgresql://localhost:5432/nowchess} + hibernate-orm: + database.generation: update + +nowchess: + redis: + host: ${REDIS_HOST:localhost} + port: ${REDIS_PORT:6379} + prefix: ${REDIS_PREFIX:nowchess} + +"%test": + quarkus: + datasource: + db-kind: h2 + username: sa + password: "" + jdbc.url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE + hibernate-orm: + database.generation: drop-and-create + nowchess: + redis: + host: localhost + port: 6379 + prefix: test-store diff --git a/modules/store/src/main/scala/de/nowchess/store/config/RedissonProducer.scala b/modules/store/src/main/scala/de/nowchess/store/config/RedissonProducer.scala index e2b81ae..321fb6e 100644 --- a/modules/store/src/main/scala/de/nowchess/store/config/RedissonProducer.scala +++ b/modules/store/src/main/scala/de/nowchess/store/config/RedissonProducer.scala @@ -18,7 +18,8 @@ class RedissonProducer: @ApplicationScoped def redissonClient(): RedissonClient = val config = new Config() - config.useSingleServer() + config + .useSingleServer() .setAddress(s"redis://${redisConfig.host}:${redisConfig.port}") .setConnectionMinimumIdleSize(1) .setConnectTimeout(500) diff --git a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackEventDto.scala b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackEventDto.scala index 47dda46..2eef25b 100644 --- a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackEventDto.scala +++ b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackEventDto.scala @@ -1,24 +1,24 @@ package de.nowchess.store.redis case class GameWritebackEventDto( - gameId: String, - fen: String, - pgn: String, - moveCount: Int, - whiteId: String, - whiteName: String, - blackId: String, - blackName: String, - mode: String, - resigned: Boolean, - limitSeconds: Option[Int], - incrementSeconds: Option[Int], - daysPerMove: Option[Int], - whiteRemainingMs: Option[Long], - blackRemainingMs: Option[Long], - incrementMs: Option[Long], - clockLastTickAt: Option[Long], - clockMoveDeadline: Option[Long], - clockActiveColor: Option[String], - pendingDrawOffer: Option[String], + gameId: String, + fen: String, + pgn: String, + moveCount: Int, + whiteId: String, + whiteName: String, + blackId: String, + blackName: String, + mode: String, + resigned: Boolean, + limitSeconds: Option[Int], + incrementSeconds: Option[Int], + daysPerMove: Option[Int], + whiteRemainingMs: Option[Long], + blackRemainingMs: Option[Long], + incrementMs: Option[Long], + clockLastTickAt: Option[Long], + clockMoveDeadline: Option[Long], + clockActiveColor: Option[String], + pendingDrawOffer: Option[String], ) diff --git a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala index 5dd395b..8568112 100644 --- a/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala +++ b/modules/store/src/main/scala/de/nowchess/store/redis/GameWritebackStreamListener.scala @@ -14,17 +14,18 @@ import scala.util.Try class GameWritebackStreamListener: @Inject // scalafix:off DisableSyntax.var - var redisson: RedissonClient = uninitialized - @Inject var objectMapper: ObjectMapper = uninitialized + var redisson: RedissonClient = uninitialized + @Inject var objectMapper: ObjectMapper = uninitialized @Inject var writebackService: GameWritebackService = uninitialized // scalafix:on @PostConstruct def startListening(): Unit = val topic = redisson.getTopic("game-writeback") - topic.addListener(classOf[String], new MessageListener[String]: - def onMessage(channel: CharSequence, json: String): Unit = - Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])) - .toOption - .foreach(writebackService.writeBack) + topic.addListener( + classOf[String], + new MessageListener[String]: + def onMessage(channel: CharSequence, json: String): Unit = + Try(objectMapper.readValue(json, classOf[GameWritebackEventDto])).toOption + .foreach(writebackService.writeBack), ) diff --git a/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala b/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala index 20932b0..7fa1f06 100644 --- a/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala +++ b/modules/store/src/main/scala/de/nowchess/store/resource/StoreGameResource.scala @@ -19,5 +19,6 @@ class StoreGameResource: @Path("/{gameId}") @Produces(Array(MediaType.APPLICATION_JSON)) def getGame(@PathParam("gameId") gameId: String): Response = - repository.findByGameId(gameId) + repository + .findByGameId(gameId) .fold(Response.status(404).build())(r => Response.ok(r).build()) diff --git a/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala b/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala index 741dd6b..6e944e2 100644 --- a/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala +++ b/modules/ws/src/main/scala/de/nowchess/ws/resource/GameWebSocketResource.scala @@ -30,10 +30,12 @@ class GameWebSocketResource: @OnOpen def onOpen(connection: WebSocketConnection): Unit = val gameId = connection.pathParam("gameId") - val topic = redisson.getTopic(s2cTopic(gameId)) - val listenerId = topic.addListener(classOf[String], new MessageListener[String]: - def onMessage(channel: CharSequence, msg: String): Unit = - connection.sendText(msg).subscribe().`with`(_ => (), _ => ()) + val topic = redisson.getTopic(s2cTopic(gameId)) + val listenerId = topic.addListener( + classOf[String], + new MessageListener[String]: + def onMessage(channel: CharSequence, msg: String): Unit = + connection.sendText(msg).subscribe().`with`(_ => (), _ => ()), ) listenerIds.put(connection.id(), (gameId, listenerId)) val connectedMsg = s"""{"type":"CONNECTED","gameId":"$gameId"}"""