Compare commits

...

12 Commits

Author SHA1 Message Date
TeamCity 9e800ecb59 ci: bump version with Build-124 2026-06-16 19:41:52 +00:00
Janis 39f1657e1d feat(analytics): add Spark batch analytics module (#70)
Build & Test (NowChessSystems) TeamCity build finished
Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com>
Reviewed-on: #70
2026-06-16 20:38:14 +02:00
Janis Eccarius 2fd85dbadb chore(analytics): merge remote main — keep analytics module additions
Build & Test (NowChessSystems) TeamCity build failed
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 20:10:56 +02:00
Janis Eccarius 46af1154de fix(analytics): upgrade Spark to 4.0.3 — 3.5.x has no official Docker image
apache/spark:3.5.4-scala2.13-java17-ubuntu does not exist on Docker Hub.
Oldest available scala2.13 image is 4.0.3. Bump compileOnly deps and
Dockerfile base image to match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 20:08:29 +02:00
TeamCity 85cb9b2e7a ci: bump version with Build-123 2026-06-15 20:52:53 +00:00
Janis Eccarius 0e0ea4c989 feat(analytics): add PostgreSQL JDBC write-back to all four batch jobs
Each batch job now writes its results to a Postgres table in addition to
the existing Parquet/CSV output. OpeningBookJob → analytics_opening_stats,
PlayerStatsJob → analytics_player_stats, PlayerClusteringJob →
analytics_player_clusters + analytics_cluster_archetypes, PlayerGraphJob
→ analytics_player_graph. MLlib Vector columns are excluded from the JDBC
write by reusing the already-selected scalar DataFrame in
PlayerClusteringJob.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 22:35:30 +02:00
Janis Eccarius 95215b6a42 feat(analytics): add Dockerfile, CI workflow, and stable jar name for K8s deployment
- Pin jar output to analytics.jar (no version suffix) so Dockerfile COPY is stable
- Add Dockerfile based on apache/spark:3.5.4-scala2.13-java17-ubuntu
- Add versions.env (0.1.0) matching GitOps overlay image tag
- Add analytics-image.yml CI workflow following native-image.yml conventions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 22:30:31 +02:00
Janis b1c9e962e7 fix(version): Wrong version file
Build & Test (NowChessSystems) TeamCity build finished
2026-06-15 22:26:27 +02:00
Janis Eccarius e1d80b9331 feat(analytics): add Structured Streaming, MLlib clustering, GraphX jobs
Three new Spark jobs demonstrating complementary Spark pillars:

LiveDashboardJob (Structured Streaming):
- Simulates NowChess game-over event stream via rate source
- Watermarking (45 s late-data tolerance)
- Tumbling 1-min windows → append-mode Parquet output
- Sliding 5-min/1-min windows → update-mode console output
- Checkpointing for exactly-once fault tolerance
- Production wiring comments show Kafka / spark-redis swap-in

PlayerClusteringJob (MLlib):
- Derives 4 player features from game_records via JDBC
- VectorAssembler + StandardScaler + KMeans inside a Pipeline
- ClusteringEvaluator (silhouette score) to measure quality
- Per-cluster archetype averages show what each tier represents

PlayerGraphJob (GraphX):
- Builds directed player graph (vertices=players, edges=games)
- PageRank — identifies most influential/active players
- ConnectedComponents — finds isolated player communities
- Bridges GraphX RDD results back to DataFrames via explicit schema
  (avoids spark.implicits._ which breaks Scala 3 → Spark 2.13 interop)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 22:15:24 +02:00
Janis Eccarius 259b3bbb24 feat(analytics): add Spark batch analytics module
New standalone modules:analytics submodule with two Spark jobs:

- OpeningBookJob: reads game_records.pgn, extracts first N plies using
  pure Catalyst SQL expressions (no UDFs), aggregates win/draw/loss rates
  per opening sequence, writes Parquet + CSV top-1000 summary.

- PlayerStatsJob: unions each game into a player-centric view, aggregates
  total_games/wins/losses/draws/avg_move_count/win_rate per player_id,
  writes Parquet.

Module uses Scala 3 calling spark-sql_2.13 via JVM binary compatibility
(DataFrame API only; no spark.implicits._ / typed Datasets). Spark is
compileOnly; the fat jar bundles only scala3-library + postgresql driver.
Submit via spark-submit; see build.gradle.kts header for invocation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 21:58:05 +02:00
Janis 0bdf72bddc feat(analysis): scaffold chess analysis microservice (NCS-71) NCI-10 (#69)
Build & Test (NowChessSystems) TeamCity build finished
NCS-95 NCS-96 NCS-97 NCI-10

---------

Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com>
Reviewed-on: #69
2026-06-15 21:40:24 +02:00
TeamCity 0a5a216032 ci: bump version with Build-121 2026-06-10 09:57:45 +00:00
47 changed files with 2108 additions and 69 deletions
+130
View File
@@ -0,0 +1,130 @@
name: Build & Push Analytics Image
on:
push:
branches:
- main
paths:
- 'modules/analytics/**'
workflow_dispatch:
jobs:
check-actor:
runs-on: ubuntu-latest
outputs:
allowed: ${{ steps.check.outputs.allowed }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 1
- id: check
run: |
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
echo "Triggered manually — allowing build"
echo "allowed=true" >> "$GITHUB_OUTPUT"
else
COMMIT_AUTHOR=$(git log -1 --format='%an')
COMMIT_SHA=$(git log -1 --format='%H')
COMMIT_MSG=$(git log -1 --format='%s')
echo "Commit: ${COMMIT_SHA}"
echo "Author: ${COMMIT_AUTHOR}"
echo "Message: ${COMMIT_MSG}"
if [[ "$COMMIT_AUTHOR" == "TeamCity" ]]; then
echo "Author is TeamCity — allowing build"
echo "allowed=true" >> "$GITHUB_OUTPUT"
else
echo "Author is not TeamCity — skipping build"
echo "allowed=false" >> "$GITHUB_OUTPUT"
fi
fi
build-and-push:
needs: check-actor
if: needs.check-actor.outputs.allowed == 'true'
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Read version from versions.env
id: version
run: |
source modules/analytics/versions.env
echo "version=${MAJOR}.${MINOR}.${PATCH}" >> "$GITHUB_OUTPUT"
- name: Check if image exists in GHCR
id: image-check
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
PACKAGE="now-chess-systems%2Fanalytics"
VERSION="${{ steps.version.outputs.version }}"
EXISTING_TAGS=$(gh api "orgs/now-chess/packages/container/${PACKAGE}/versions" \
--jq '.[].metadata.container.tags[]' 2>/dev/null || echo "")
echo "Existing tags: $(echo "${EXISTING_TAGS}" | tr '\n' ' ' | xargs)"
if echo "${EXISTING_TAGS}" | grep -qx "${VERSION}"; then
echo "Image ${VERSION} already exists — skipping build"
echo "exists=true" >> "$GITHUB_OUTPUT"
else
echo "Image ${VERSION} not found — will build"
echo "exists=false" >> "$GITHUB_OUTPUT"
fi
- name: Set up JDK 17
if: steps.image-check.outputs.exists == 'false'
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
- name: Cache Gradle packages
if: steps.image-check.outputs.exists == 'false'
uses: actions/cache@v4
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: gradle-${{ runner.os }}-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: gradle-${{ runner.os }}-
- name: Build fat jar
if: steps.image-check.outputs.exists == 'false'
run: ./gradlew :modules:analytics:jar --no-daemon
- name: Set up Docker Buildx
if: steps.image-check.outputs.exists == 'false'
uses: docker/setup-buildx-action@v3
- name: Log in to GitHub Container Registry
if: steps.image-check.outputs.exists == 'false'
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
if: steps.image-check.outputs.exists == 'false'
id: meta
uses: docker/metadata-action@v5
with:
images: ghcr.io/now-chess/now-chess-systems/analytics
tags: |
type=raw,value=${{ steps.version.outputs.version }}
type=raw,value=latest
- name: Build and push
if: steps.image-check.outputs.exists == 'false'
uses: docker/build-push-action@v6
with:
context: modules/analytics
file: modules/analytics/src/main/docker/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
+1
View File
@@ -49,6 +49,7 @@ jobs:
matrix:
module:
- account
- analysis
- bot-platform
- coordinator
- core
+2
View File
@@ -53,6 +53,8 @@ val coverageExclusions = listOf(
"**/core/src/main/scala/de/nowchess/chess/resource/GameWebSocketResource.scala",
// Coordinator infrastructure — gRPC, microservice orchestration
"**/coordinator/src/main/scala/**",
// Analytics module — standalone Spark batch jobs; coverage not applicable (no Quarkus, no scoverage plugin)
"modules/analytics/**",
)
// Converts a Sonar-style glob to a scoverage regex (matched against full source path).
+16
View File
@@ -0,0 +1,16 @@
# Changelog — analysis
## 0.1.0 (NCS-71)
- Initial scaffold: chess analysis microservice
- REST endpoint `POST /api/analysis/position` wrapping chess-api.com
- REST endpoint `GET /api/analysis/health`
## (2026-06-15)
### Features
* **analysis:** scaffold chess analysis microservice (NCS-71) NCI-10 ([#69](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/69)) ([0bdf72b](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0bdf72bddcd3e4cb7f731504c064633f76eade94))
### Bug Fixes
* **version:** Wrong version file ([b1c9e96](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/b1c9e962e74f35f6b1e2a074e1d3acbe2e2fb5e9))
+111
View File
@@ -0,0 +1,111 @@
plugins {
id("scala")
id("org.scoverage") version "8.1"
id("io.quarkus")
}
group = "de.nowchess"
version = "1.0-SNAPSHOT"
@Suppress("UNCHECKED_CAST")
val versions = rootProject.extra["VERSIONS"] as Map<String, String>
@Suppress("UNCHECKED_CAST")
val scoverageExcluded = rootProject.extra["SCOVERAGE_EXCLUDED"] as List<String>
repositories {
mavenCentral()
}
scala {
scalaVersion = versions["SCALA3"]!!
}
scoverage {
scoverageVersion.set(versions["SCOVERAGE"]!!)
excludedFiles.set(scoverageExcluded)
}
tasks.withType<ScalaCompile> {
scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8")
}
val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project
dependencies {
compileOnly("org.scala-lang:scala3-compiler_3") {
version {
strictly(versions["SCALA3"]!!)
}
}
implementation("org.scala-lang:scala3-library_3") {
version {
strictly(versions["SCALA3"]!!)
}
}
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation("io.quarkus:quarkus-rest")
implementation("io.quarkus:quarkus-rest-client")
implementation("io.quarkus:quarkus-rest-client-jackson")
implementation("io.quarkus:quarkus-rest-jackson")
implementation("io.quarkus:quarkus-config-yaml")
implementation("io.quarkus:quarkus-smallrye-fault-tolerance")
implementation("io.quarkus:quarkus-smallrye-health")
implementation("io.quarkus:quarkus-logging-json")
implementation("io.quarkus:quarkus-micrometer")
implementation("io.quarkus:quarkus-micrometer-registry-prometheus")
implementation("io.quarkus:quarkus-opentelemetry")
implementation("io.quarkus:quarkus-arc")
implementation("com.fasterxml.jackson.module:jackson-module-scala_3:${versions["JACKSON_SCALA"]!!}")
testImplementation(platform("org.junit:junit-bom:5.13.4"))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.scalatest:scalatest_3:${versions["SCALATEST"]!!}")
testImplementation("co.helmethair:scalatest-junit-runner:${versions["SCALATEST_JUNIT"]!!}")
testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.rest-assured:rest-assured")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
}
configurations.matching { !it.name.startsWith("scoverage") }.configureEach {
resolutionStrategy.force("org.scala-lang:scala-library:${versions["SCALA_LIBRARY"]!!}")
}
configurations.scoverage {
resolutionStrategy.eachDependency {
if (requested.group == "org.scoverage" && requested.name.startsWith("scalac-scoverage-plugin_")) {
useTarget("${requested.group}:scalac-scoverage-plugin_2.13.16:2.3.0")
}
}
}
tasks.withType<JavaCompile> {
options.encoding = "UTF-8"
options.compilerArgs.add("-parameters")
}
tasks.withType<Jar>().configureEach {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
tasks.test {
useJUnitPlatform {
includeEngines("scalatest", "junit-jupiter")
testLogging {
events("passed", "skipped", "failed")
}
}
finalizedBy(tasks.reportScoverage)
}
tasks.reportScoverage {
dependsOn(tasks.test)
}
tasks.jar {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
@@ -0,0 +1,29 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode.
#
# Before building the container image run:
#
# ./gradlew :modules:analysis:build -Dquarkus.native.enabled=true
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.native -t quarkus/backcore .
#
# Then run the container using:
#
# docker run -i --rm -p 8087:8087 quarkus/backcore
#
# The `registry.access.redhat.com/ubi9/ubi-minimal:9.7` base image is based on UBI 9.
# To use UBI 8, switch to `quay.io/ubi8/ubi-minimal:8.10`.
###
FROM registry.access.redhat.com/ubi9/ubi-minimal:9.7
WORKDIR /work/
RUN chown 1001 /work \
&& chmod "g+rwX" /work \
&& chown 1001:root /work
COPY --chown=1001:root --chmod=0755 modules/analysis/build/*-runner /work/application
EXPOSE 8087
USER 1001
ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"]
@@ -0,0 +1,40 @@
quarkus:
http:
port: 8087
application:
name: nowchess-analysis
config:
yaml:
enabled: true
nowchess:
analysis:
chess-api:
base-url: ${CHESS_API_URL:https://chess-api.com/v1}
timeout-ms: ${CHESS_API_TIMEOUT_MS:5000}
"%dev":
quarkus:
rest-client:
chess-api:
url: https://chess-api.com/v1
connect-timeout: 5000
read-timeout: 5000
"%deployed":
quarkus:
log:
console:
json: true
otel:
traces:
sampler: parentbased_traceidratio
sampler-arg: 0.1
exporter:
otlp:
endpoint: ${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
rest-client:
chess-api:
url: ${CHESS_API_URL:https://chess-api.com/v1}
connect-timeout: ${CHESS_API_CONNECT_TIMEOUT_MS:5000}
read-timeout: ${CHESS_API_TIMEOUT_MS:5000}
@@ -0,0 +1,18 @@
package de.nowchess.analysis.client
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient
/** MicroProfile REST client for chess-api.com v1.
*
* Base URL is resolved from `quarkus.rest-client.chess-api.url` in application.yml.
*/
@Path("/")
@RegisterRestClient(configKey = "chess-api")
trait ChessApiClient:
@POST
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
def analyse(body: ChessApiRequestDto): ChessApiResponseDto
@@ -0,0 +1,4 @@
package de.nowchess.analysis.client
/** Request body sent to chess-api.com v1 `/` endpoint. */
case class ChessApiRequestDto(fen: String, depth: Int)
@@ -0,0 +1,23 @@
package de.nowchess.analysis.client
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
/** Response from chess-api.com v1 analysis endpoint.
*
* The API returns a JSON object. Fields not listed here are ignored.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
case class ChessApiResponseDto(
/** Best move in UCI format (e.g. "e2e4"). */
move: Option[String] = None,
/** Centipawn evaluation (from white's perspective). */
centipawns: Option[Double] = None,
/** Mate-in-N (positive = white wins, negative = black wins). */
mate: Option[Int] = None,
/** Principal variation: space-separated UCI moves. */
pv: Option[String] = None,
/** Actual depth searched. */
depth: Option[Int] = None,
/** Text description of the position/move quality. */
text: Option[String] = None,
)
@@ -0,0 +1,17 @@
package de.nowchess.analysis.config
import com.fasterxml.jackson.core.Version
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.quarkus.jackson.ObjectMapperCustomizer
import jakarta.inject.Singleton
@Singleton
class JacksonConfig extends ObjectMapperCustomizer:
def customize(mapper: ObjectMapper): Unit =
mapper.registerModule(new DefaultScalaModule() {
override def version(): Version =
// scalafix:off DisableSyntax.null
new Version(2, 21, 1, null, "com.fasterxml.jackson.module", "jackson-module-scala")
// scalafix:on DisableSyntax.null
})
@@ -0,0 +1,18 @@
package de.nowchess.analysis.config
import de.nowchess.analysis.client.{ChessApiRequestDto, ChessApiResponseDto}
import de.nowchess.analysis.dto.{AnalysisRequestDto, AnalysisResponseDto}
import de.nowchess.analysis.error.AnalysisErrorDto
import io.quarkus.runtime.annotations.RegisterForReflection
@RegisterForReflection(
targets = Array(
classOf[AnalysisRequestDto],
classOf[AnalysisResponseDto],
classOf[ChessApiRequestDto],
classOf[ChessApiResponseDto],
classOf[AnalysisErrorDto],
),
registerFullHierarchy = true,
)
class NativeReflectionConfig
@@ -0,0 +1,10 @@
package de.nowchess.analysis.dto
/** Request body for the analysis endpoint.
*
* @param fen
* FEN string representing the position to analyse.
* @param depth
* Engine search depth (1-99). Defaults to 12 when absent.
*/
case class AnalysisRequestDto(fen: String, depth: Option[Int] = None)
@@ -0,0 +1,25 @@
package de.nowchess.analysis.dto
/** Response from the analysis endpoint.
*
* @param fen
* The analysed FEN.
* @param depth
* The search depth used.
* @param bestMove
* Best move in UCI notation (e.g. "e2e4"), or None if not available.
* @param evaluation
* Centipawn evaluation from white's perspective, or None.
* @param mate
* Mate-in-N value (positive = white wins, negative = black wins), or None.
* @param continuationMoves
* Principal variation as list of UCI moves.
*/
case class AnalysisResponseDto(
fen: String,
depth: Int,
bestMove: Option[String],
evaluation: Option[Double],
mate: Option[Int],
continuationMoves: List[String],
)
@@ -0,0 +1,3 @@
package de.nowchess.analysis.error
case class AnalysisErrorDto(code: String, message: String)
@@ -0,0 +1,8 @@
package de.nowchess.analysis.error
sealed class AnalysisException(val status: Int, val code: String, message: String) extends RuntimeException(message)
class InvalidFenException(fen: String) extends AnalysisException(400, "INVALID_FEN", s"Invalid FEN string: $fen")
class AnalysisUpstreamException(cause: Throwable)
extends AnalysisException(502, "UPSTREAM_ERROR", s"Chess API unavailable: ${cause.getMessage}")
@@ -0,0 +1,13 @@
package de.nowchess.analysis.error
import jakarta.ws.rs.core.{MediaType, Response}
import jakarta.ws.rs.ext.{ExceptionMapper, Provider}
@Provider
class AnalysisExceptionMapper extends ExceptionMapper[AnalysisException]:
def toResponse(ex: AnalysisException): Response =
Response
.status(ex.status)
.entity(AnalysisErrorDto(ex.code, ex.getMessage))
.`type`(MediaType.APPLICATION_JSON)
.build()
@@ -0,0 +1,33 @@
package de.nowchess.analysis.resource
import de.nowchess.analysis.dto.{AnalysisRequestDto, AnalysisResponseDto}
import de.nowchess.analysis.service.AnalysisService
import jakarta.annotation.security.PermitAll
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{MediaType, Response}
import scala.compiletime.uninitialized
@Path("/api/analysis")
@ApplicationScoped
class AnalysisResource:
// scalafix:off DisableSyntax.var
@Inject
var analysisService: AnalysisService = uninitialized
// scalafix:on DisableSyntax.var
/** Analyse a chess position.
*
* Accepts a FEN string and optional depth, proxies to chess-api.com, and returns structured analysis data.
*/
@POST
@Path("/position")
@PermitAll
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
def analysePosition(body: AnalysisRequestDto): Response =
val result = analysisService.analyse(body)
Response.ok(result).build()
@@ -0,0 +1,68 @@
package de.nowchess.analysis.service
import de.nowchess.analysis.client.{ChessApiClient, ChessApiRequestDto}
import de.nowchess.analysis.dto.{AnalysisRequestDto, AnalysisResponseDto}
import de.nowchess.analysis.error.{AnalysisUpstreamException, InvalidFenException}
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.eclipse.microprofile.rest.client.inject.RestClient
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@ApplicationScoped
class AnalysisService:
private val log = Logger.getLogger(classOf[AnalysisService])
private val DefaultDepth = 12
private val MinDepth = 1
private val MaxDepth = 99
// scalafix:off DisableSyntax.var
@Inject
@RestClient
var chessApiClient: ChessApiClient = uninitialized
// scalafix:on DisableSyntax.var
// scalafix:off DisableSyntax.throw
def analyse(request: AnalysisRequestDto): AnalysisResponseDto =
val fen = request.fen.trim
if fen.isEmpty then throw InvalidFenException(fen)
validateFen(fen)
val depth = request.depth
.map(d => d.max(MinDepth).min(MaxDepth))
.getOrElse(DefaultDepth)
log.debugf("Analysing FEN '%s' at depth %d", fen, depth)
val apiResponse =
try chessApiClient.analyse(ChessApiRequestDto(fen, depth))
catch
case ex: Exception =>
log.warnf(ex, "Chess API call failed for FEN '%s'", fen)
throw AnalysisUpstreamException(ex)
val continuationMoves = apiResponse.pv
.map(_.split(" ").toList.filter(_.nonEmpty))
.getOrElse(List.empty)
AnalysisResponseDto(
fen = fen,
depth = apiResponse.depth.getOrElse(depth),
bestMove = apiResponse.move,
evaluation = apiResponse.centipawns,
mate = apiResponse.mate,
continuationMoves = continuationMoves,
)
// scalafix:on DisableSyntax.throw
/** Rudimentary FEN structure validation — checks the board part has 8 ranks. */
// scalafix:off DisableSyntax.throw
private def validateFen(fen: String): Unit =
val parts = fen.split(" ")
if parts.length < 1 then throw InvalidFenException(fen)
val ranks = parts(0).split("/")
if ranks.length != 8 then throw InvalidFenException(fen)
// scalafix:on DisableSyntax.throw
@@ -0,0 +1,4 @@
quarkus:
rest-client:
chess-api:
url: http://localhost:9999
@@ -0,0 +1,106 @@
package de.nowchess.analysis.resource
import de.nowchess.analysis.dto.{AnalysisRequestDto, AnalysisResponseDto}
import de.nowchess.analysis.error.{AnalysisUpstreamException, InvalidFenException}
import de.nowchess.analysis.service.AnalysisService
import io.quarkus.test.InjectMock
import io.quarkus.test.junit.QuarkusTest
import io.restassured.RestAssured
import io.restassured.http.ContentType
import org.hamcrest.Matchers.*
import org.junit.jupiter.api.{DisplayName, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import scala.compiletime.uninitialized
// scalafix:off
@QuarkusTest
@DisplayName("AnalysisResource")
class AnalysisResourceTest:
@InjectMock
var analysisService: AnalysisService = uninitialized
private val validFen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1"
private def givenJson() = RestAssured.`given`().contentType(ContentType.JSON)
@Test
@DisplayName("POST /api/analysis/position returns 200 with analysis data")
def testAnalysePositionOk(): Unit =
when(analysisService.analyse(any()))
.thenReturn(
AnalysisResponseDto(
fen = validFen,
depth = 12,
bestMove = Some("e2e4"),
evaluation = Some(0.3),
mate = None,
continuationMoves = List("e2e4", "e7e5"),
),
)
givenJson()
.body(s"""{"fen": "$validFen"}""")
.when()
.post("/api/analysis/position")
.`then`()
.statusCode(200)
.body("fen", equalTo(validFen))
.body("depth", equalTo(12))
.body("bestMove", equalTo("e2e4"))
.body("evaluation", equalTo(0.3f))
.body("continuationMoves", hasItems("e2e4", "e7e5"))
@Test
@DisplayName("POST /api/analysis/position returns 400 for invalid FEN")
def testAnalysePositionInvalidFen(): Unit =
when(analysisService.analyse(any()))
.thenThrow(new InvalidFenException("bad-fen"))
givenJson()
.body("""{"fen": "bad-fen"}""")
.when()
.post("/api/analysis/position")
.`then`()
.statusCode(400)
.body("code", equalTo("INVALID_FEN"))
@Test
@DisplayName("POST /api/analysis/position returns 502 on upstream failure")
def testAnalysePositionUpstreamError(): Unit =
when(analysisService.analyse(any()))
.thenThrow(new AnalysisUpstreamException(new RuntimeException("timeout")))
givenJson()
.body(s"""{"fen": "$validFen"}""")
.when()
.post("/api/analysis/position")
.`then`()
.statusCode(502)
.body("code", equalTo("UPSTREAM_ERROR"))
@Test
@DisplayName("POST /api/analysis/position accepts custom depth")
def testAnalysePositionCustomDepth(): Unit =
when(analysisService.analyse(any()))
.thenReturn(
AnalysisResponseDto(
fen = validFen,
depth = 20,
bestMove = Some("d2d4"),
evaluation = Some(0.15),
mate = None,
continuationMoves = List.empty,
),
)
givenJson()
.body(s"""{"fen": "$validFen", "depth": 20}""")
.when()
.post("/api/analysis/position")
.`then`()
.statusCode(200)
.body("depth", equalTo(20))
// scalafix:on
@@ -0,0 +1,139 @@
package de.nowchess.analysis.service
import de.nowchess.analysis.client.{ChessApiClient, ChessApiRequestDto, ChessApiResponseDto}
import de.nowchess.analysis.dto.AnalysisRequestDto
import de.nowchess.analysis.error.{AnalysisUpstreamException, InvalidFenException}
import io.quarkus.test.InjectMock
import io.quarkus.test.junit.QuarkusTest
import jakarta.inject.Inject
import org.eclipse.microprofile.rest.client.inject.RestClient
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.{DisplayName, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{verify, when}
import scala.compiletime.uninitialized
// scalafix:off
@QuarkusTest
@DisplayName("AnalysisService")
class AnalysisServiceTest:
@Inject
var service: AnalysisService = uninitialized
@InjectMock
@RestClient
var chessApiClient: ChessApiClient = uninitialized
private val validFen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1"
@Test
@DisplayName("analyse returns response with best move from chess-api.com")
def testAnalyseReturnsBestMove(): Unit =
when(chessApiClient.analyse(any()))
.thenReturn(
ChessApiResponseDto(
move = Some("e2e4"),
centipawns = Some(0.3),
mate = None,
pv = Some("e2e4 e7e5 g1f3"),
depth = Some(12),
),
)
val response = service.analyse(AnalysisRequestDto(validFen, Some(12)))
assertEquals(validFen, response.fen)
assertEquals(12, response.depth)
assertEquals(Some("e2e4"), response.bestMove)
assertEquals(Some(0.3), response.evaluation)
assertEquals(None, response.mate)
assertEquals(List("e2e4", "e7e5", "g1f3"), response.continuationMoves)
@Test
@DisplayName("analyse uses default depth 12 when not specified")
def testAnalyseUsesDefaultDepth(): Unit =
when(chessApiClient.analyse(any()))
.thenReturn(ChessApiResponseDto(move = Some("d2d4"), depth = Some(12)))
val response = service.analyse(AnalysisRequestDto(validFen))
verify(chessApiClient).analyse(ChessApiRequestDto(validFen, 12))
assertEquals(12, response.depth)
@Test
@DisplayName("analyse clamps depth to [1, 99]")
def testAnalyseClampsDepth(): Unit =
when(chessApiClient.analyse(any()))
.thenReturn(ChessApiResponseDto(move = Some("e2e4"), depth = Some(99)))
service.analyse(AnalysisRequestDto(validFen, Some(200)))
verify(chessApiClient).analyse(ChessApiRequestDto(validFen, 99))
@Test
@DisplayName("analyse clamps depth minimum to 1")
def testAnalyseClampsDepthMin(): Unit =
when(chessApiClient.analyse(any()))
.thenReturn(ChessApiResponseDto(move = Some("e2e4"), depth = Some(1)))
service.analyse(AnalysisRequestDto(validFen, Some(0)))
verify(chessApiClient).analyse(ChessApiRequestDto(validFen, 1))
@Test
@DisplayName("analyse handles empty pv gracefully")
def testAnalyseEmptyPv(): Unit =
when(chessApiClient.analyse(any()))
.thenReturn(ChessApiResponseDto(move = Some("e2e4"), pv = None, depth = Some(5)))
val response = service.analyse(AnalysisRequestDto(validFen, Some(5)))
assertEquals(List.empty, response.continuationMoves)
@Test
@DisplayName("analyse throws InvalidFenException for empty FEN")
def testAnalyseThrowsOnEmptyFen(): Unit =
assertThrows(
classOf[InvalidFenException],
() => service.analyse(AnalysisRequestDto("")),
)
@Test
@DisplayName("analyse throws InvalidFenException for malformed FEN")
def testAnalyseThrowsOnMalformedFen(): Unit =
assertThrows(
classOf[InvalidFenException],
() => service.analyse(AnalysisRequestDto("not/a/valid/fen")),
)
@Test
@DisplayName("analyse wraps chess-api.com exception in AnalysisUpstreamException")
def testAnalyseWrapsUpstreamException(): Unit =
when(chessApiClient.analyse(any()))
.thenThrow(new RuntimeException("connection refused"))
assertThrows(
classOf[AnalysisUpstreamException],
() => service.analyse(AnalysisRequestDto(validFen)),
)
@Test
@DisplayName("analyse returns mate value from chess-api.com response")
def testAnalyseReturnsMate(): Unit =
when(chessApiClient.analyse(any()))
.thenReturn(
ChessApiResponseDto(
move = Some("d1h5"),
centipawns = None,
mate = Some(3),
depth = Some(10),
),
)
val response = service.analyse(AnalysisRequestDto(validFen, Some(10)))
assertEquals(Some(3), response.mate)
assertEquals(None, response.evaluation)
// scalafix:on
+3
View File
@@ -0,0 +1,3 @@
MAJOR=0
MINOR=2
PATCH=0
+12
View File
@@ -0,0 +1,12 @@
## (2026-06-16)
### Features
* **analytics:** add Dockerfile, CI workflow, and stable jar name for K8s deployment ([95215b6](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/95215b6a420fd526df1aa395f9b087556c8ad03b))
* **analytics:** add PostgreSQL JDBC write-back to all four batch jobs ([0e0ea4c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0e0ea4c9893c6efed52e633e55d05ab3ed004502))
* **analytics:** add Spark batch analytics module ([259b3bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/259b3bbb24c0f23326269b93f4b3c84012f727cd))
* **analytics:** add Structured Streaming, MLlib clustering, GraphX jobs ([e1d80b9](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/e1d80b9331666feea191b1fd08aa762f3581c918))
### Bug Fixes
* **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c))
+96
View File
@@ -0,0 +1,96 @@
// Standalone Spark batch-analytics module.
//
// Spark 3.5.x ships only Scala 2.12/2.13 artifacts; Scala 3 code can consume
// them via JVM binary compatibility so long as we avoid macro-expanded APIs
// (spark.implicits._, typed Dataset[T]). We use the untyped DataFrame API
// exclusively, which is safe to call from Scala 3.
//
// Spark is declared compileOnly — the cluster provides it at runtime via
// spark-submit. Only the PostgreSQL driver and the Scala 3 runtime are
// bundled into the fat jar produced by the "jar" task.
//
// Build the submission jar:
// ./gradlew :modules:analytics:jar
//
// Run a job:
// spark-submit \
// --class de.nowchess.analytics.OpeningBookJob \
// modules/analytics/build/libs/analytics-<version>.jar \
// [outputDir] [maxPlies]
//
// Environment variables consumed:
// NOWCHESS_JDBC_URL (default: jdbc:postgresql://localhost:5432/nowchess)
// NOWCHESS_DB_USER (default: nowchess)
// NOWCHESS_DB_PASS (default: nowchess)
plugins {
id("scala")
application
}
group = "de.nowchess"
version = "1.0-SNAPSHOT"
@Suppress("UNCHECKED_CAST")
val versions = rootProject.extra["VERSIONS"] as Map<String, String>
repositories {
mavenCentral()
}
scala {
scalaVersion = versions["SCALA3"]!!
}
val sparkVersion = "4.0.3"
dependencies {
compileOnly("org.scala-lang:scala3-compiler_3") {
version { strictly(versions["SCALA3"]!!) }
}
implementation("org.scala-lang:scala3-library_3") {
version { strictly(versions["SCALA3"]!!) }
}
implementation("org.scala-lang:scala-library") {
version { strictly(versions["SCALA_LIBRARY"]!!) }
}
// Spark is provided by the cluster — compile-only, not bundled.
compileOnly("org.apache.spark:spark-sql_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
compileOnly("org.apache.spark:spark-core_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
compileOnly("org.apache.spark:spark-mllib_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
compileOnly("org.apache.spark:spark-graphx_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
// PostgreSQL JDBC driver bundled so it is available on executor classpath.
implementation("org.postgresql:postgresql:42.7.4")
}
application {
mainClass.set("de.nowchess.analytics.OpeningBookJob")
}
// Fat jar: includes runtimeClasspath (our code + pg driver + scala3-library)
// but NOT compileOnly Spark jars.
// archiveVersion is cleared so the output is always "analytics.jar" — stable
// name required by the Dockerfile COPY instruction.
tasks.jar {
archiveBaseName.set("analytics")
archiveVersion.set("")
manifest {
attributes["Main-Class"] = "de.nowchess.analytics.OpeningBookJob"
}
from(configurations.runtimeClasspath.get().map { if (it.isDirectory) it else zipTree(it) })
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
tasks.withType<ScalaCompile> {
scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8")
}
@@ -0,0 +1,9 @@
FROM apache/spark:4.0.3-scala2.13-java17-ubuntu
USER root
# analytics.jar = fat jar containing app code + PostgreSQL JDBC driver + Scala 3 runtime.
# Spark itself is provided by the base image at /opt/spark — it is NOT included in the jar.
COPY build/libs/analytics.jar /app/analytics.jar
USER spark
@@ -0,0 +1,138 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
import org.apache.spark.sql.streaming.Trigger
/** Demonstrates Spark Structured Streaming on NowChess game-over events.
*
* Spark concepts shown:
* - Continuous micro-batch processing (`readStream`)
* - Watermarking for late-data tolerance (events up to 45 s late are accepted)
* - Tumbling window aggregations — fixed 1-minute buckets, zero overlap
* - Sliding window aggregations — 5-minute window, 1-minute slide
* - Append vs Update output modes and when each is valid
* - Exactly-once fault tolerance via checkpointing
* - Multiple concurrent streaming queries on the same session
*
* Production wiring: NowChess already publishes game-over events to a Redis Stream (`nowchess:game-over`, see
* GameRedisPublisher). Swap the `rate` source below for one of the production sources shown in the comment block.
*/
object LiveDashboardJob:
def main(args: Array[String]): Unit =
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-live-dashboard"
val spark = SparkSession
.builder()
.appName("NowChess Live Dashboard")
.getOrCreate()
run(spark, outputDir)
def run(spark: SparkSession, outputDir: String): Unit =
// ── Production sources (replace rate source with one of these) ─────────
//
// Kafka (via a Redis → Kafka bridge):
// spark.readStream
// .format("kafka")
// .option("kafka.bootstrap.servers", sys.env("KAFKA_BROKERS"))
// .option("subscribe", "nowchess.game-over")
// .load()
// .select(F.from_json(F.col("value").cast("string"), gameOverSchema).as("e"))
// .select("e.*")
//
// spark-redis (com.redislabs:spark-redis:3.1.0):
// spark.readStream
// .format("redis")
// .option("stream.keys", "nowchess:game-over")
// .schema(gameOverSchema)
// .load()
// ─────────────────────────────────────────────────────────────────────
// Simulated stream: 10 game-over events / second.
// `rate` source emits (timestamp: Timestamp, value: Long) — Spark built-in, no deps.
val rawStream = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.load()
// Derive game-outcome columns from the monotonic counter.
// In production these come directly from the event payload.
val events = rawStream
.withColumn(
"result",
F.when(F.col("value") % 3 === 0L, "white")
.when(F.col("value") % 3 === 1L, "black")
.otherwise("draw"),
)
.withColumn(
"termination",
F.when(F.col("value") % 4 === 0L, "checkmate")
.when(F.col("value") % 4 === 1L, "resignation")
.when(F.col("value") % 4 === 2L, "timeout")
.otherwise("agreement"),
)
// Watermark: accept events up to 45 seconds late.
// Spark will not emit a window result until the watermark passes its end time.
.withWatermark("timestamp", "45 seconds")
// ── Query 1: tumbling 1-minute windows ────────────────────────────────
// Each window is a non-overlapping 60-second bucket.
// outputMode("append") only emits a window after the watermark seals it —
// guarantees that late arrivals were already counted before output.
val gamesByWindow = events
.groupBy(F.window(F.col("timestamp"), "1 minute"), F.col("result"))
.agg(F.count("*").as("games"))
.select(
F.col("window.start").as("window_start"),
F.col("window.end").as("window_end"),
F.col("result"),
F.col("games"),
)
// ── Query 2: sliding 5-minute / 1-minute windows ──────────────────────
// Each window covers 5 minutes of data, and a new window opens every minute.
// outputMode("update") emits a row whenever an existing window changes —
// gives a live rolling view of termination patterns.
val terminationTrend = events
.groupBy(F.window(F.col("timestamp"), "5 minutes", "1 minute"))
.agg(
F.count("*").as("total"),
F.sum(F.when(F.col("termination") === "checkmate", 1).otherwise(0)).as("checkmates"),
F.sum(F.when(F.col("termination") === "resignation", 1).otherwise(0)).as("resignations"),
F.sum(F.when(F.col("termination") === "timeout", 1).otherwise(0)).as("timeouts"),
)
.withColumn(
"checkmate_pct",
F.round(F.col("checkmates") / F.col("total").cast("double") * 100, 1),
)
.select(
F.col("window.start").as("window_start"),
F.col("total"),
F.col("checkmate_pct"),
F.col("resignations"),
F.col("timeouts"),
)
// Write sealed windows to Parquet — safe to query with any SQL engine.
gamesByWindow.writeStream
.outputMode("append")
.format("parquet")
.option("path", s"$outputDir/game_counts_by_window")
.option("checkpointLocation", s"$outputDir/_checkpoints/game_counts")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Print live rolling stats to the console every 10 seconds.
terminationTrend.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.option("numRows", "10")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// Block until any query fails or the process is killed.
spark.streams.awaitAnyTermination()
@@ -0,0 +1,107 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Reads completed games from the game_records table and produces an opening-book statistics table: for each unique
* opening (first N plies), it reports total games played and win/draw/loss rates from each side.
*
* Output is written as Parquet to `outputDir/opening_book` and a human-readable CSV summary (top-1000 openings by
* popularity) to `outputDir/opening_book_top1000`.
*
* PGN parsing is done entirely with Spark SQL string functions — no UDFs — so the Catalyst optimizer can push
* predicates and the job scales to any cluster size.
*/
object OpeningBookJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-opening-book"
val maxPlies = if args.length > 1 then args(1).toInt else 10
val spark = SparkSession
.builder()
.appName("NowChess Opening Book Generator")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir, maxPlies)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
maxPlies: Int,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("pgn", "result")
.filter(F.col("result").isNotNull.and(F.col("pgn").isNotNull))
val openingCol = extractOpening(F.col("pgn"), maxPlies)
val withOpening = games
.withColumn("opening", openingCol)
.filter(F.col("opening").isNotNull.and(F.length(F.col("opening")) > 0))
val stats = withOpening
.groupBy("opening")
.agg(
F.count("*").as("total"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total").cast("double"), 3))
.withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total").cast("double"), 3))
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total").cast("double"), 3))
.orderBy(F.desc("total"))
stats.write
.mode("overwrite")
.parquet(s"$outputDir/opening_book")
val top1000 = stats.limit(1000)
top1000.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/opening_book_top1000")
top1000.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_opening_stats")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
/** Extracts the first `maxPlies` moves from a PGN column as a space-separated string.
*
* PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 *
*
* Steps:
* 1. Split on double-newline; take the moves section (index 1). 2. Strip the terminal result token (*, 1-0, 0-1,
* 1/2-1/2). 3. Strip move numbers (e.g., "1. ", "12. "). 4. Strip check/checkmate suffixes (+ #) for
* position-independent lookup. 5. Tokenize on whitespace, take first maxPlies tokens, rejoin with spaces.
*/
private def extractOpening(pgnCol: org.apache.spark.sql.Column, maxPlies: Int): org.apache.spark.sql.Column =
val moveSection = F.coalesce(F.split(pgnCol, "\n\n").getItem(1), pgnCol)
val noResult = F.regexp_replace(moveSection, "(1-0|0-1|1/2-1/2|\\*)\\s*$", "")
val noMoveNumbers = F.regexp_replace(noResult, "\\d+\\.+\\s*", " ")
val noAnnotations = F.regexp_replace(noMoveNumbers, "[+#]", "")
val moveArray = F.split(F.trim(noAnnotations), "\\s+")
F.array_join(F.slice(moveArray, 1, maxPlies), " ")
@@ -0,0 +1,174 @@
package de.nowchess.analytics
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Clusters NowChess players into skill tiers using K-Means via MLlib.
*
* Spark / MLlib concepts shown:
* - Feature engineering from raw relational data (JDBC → DataFrame)
* - VectorAssembler — combine scalar columns into a dense feature vector
* - StandardScaler — zero-mean / unit-variance normalisation so that total_games (can be 1000+) does not dominate
* win_rate (01)
* - KMeans clustering — unsupervised partitioning into k skill tiers
* - Pipeline — compose transformers + estimator into a single reusable object
* - ClusteringEvaluator — silhouette score to assess cluster quality
*
* Features per player (all derived from game_records): total_games — how active the player is win_rate — overall
* strength avg_move_count — game-length preference (tactical vs positional) games_as_white_ratio — colour bias
*
* Output: Parquet: player_id + cluster (0..k-1) + feature values CSV: per-cluster archetype averages (interpret what
* each tier means)
*/
object PlayerClusteringJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-clusters"
val k = if args.length > 1 then args(1).toInt else 4
val spark = SparkSession
.builder()
.appName("NowChess Player Clustering")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir, k)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
k: Int,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result", "move_count")
.filter(F.col("result").isNotNull)
val playerStats = buildPlayerStats(games)
.filter(F.col("total_games") >= 5)
val featureCols = Array("total_games", "win_rate", "avg_move_count", "games_as_white_ratio")
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("raw_features")
.setHandleInvalid("skip")
val scaler = new StandardScaler()
.setInputCol("raw_features")
.setOutputCol("features")
.setWithStd(true)
.setWithMean(true)
val kmeans = new KMeans()
.setK(k)
.setSeed(42L)
.setFeaturesCol("features")
.setPredictionCol("cluster")
val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans))
val model = pipeline.fit(playerStats)
val predictions = model.transform(playerStats)
val silhouette = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("cluster")
.evaluate(predictions)
println(s"[Clustering] k=$k silhouette=$silhouette")
// Average feature values per cluster reveal what each tier represents.
// Example interpretation for k=4:
// Cluster 0: high total_games + high win_rate → experienced strong players
// Cluster 1: low total_games + low win_rate → beginners / casual
// Cluster 2: high total_games + mid win_rate → active intermediate
// Cluster 3: low total_games + high win_rate → strong but infrequent
val archetypes = predictions
.groupBy("cluster")
.agg(
F.count("*").as("player_count"),
F.round(F.avg("total_games"), 1).as("avg_total_games"),
F.round(F.avg("win_rate"), 3).as("avg_win_rate"),
F.round(F.avg("avg_move_count"), 1).as("avg_move_count"),
F.round(F.avg("games_as_white_ratio"), 3).as("avg_white_ratio"),
)
.orderBy("cluster")
archetypes.show(20, false)
val clustersDf = predictions.select("player_id", "total_games", "win_rate", "avg_move_count", "cluster")
clustersDf.write
.mode("overwrite")
.parquet(s"$outputDir/player_clusters")
archetypes.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/cluster_archetypes")
clustersDf.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_player_clusters")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
archetypes.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_cluster_archetypes")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame =
val asWhite = games.select(
F.col("white_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit(1).as("is_white"),
)
val asBlack = games.select(
F.col("black_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit(0).as("is_white"),
)
val won = (F.col("is_white") === 1 && F.col("result") === "white")
.or(F.col("is_white") === 0 && F.col("result") === "black")
asWhite
.union(asBlack)
.groupBy("player_id")
.agg(
F.count("*").as("total_games"),
F.round(F.sum(F.when(won, 1.0).otherwise(0.0)) / F.count("*"), 3).as("win_rate"),
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
F.round(F.avg(F.col("is_white").cast("double")), 3).as("games_as_white_ratio"),
)
@@ -0,0 +1,161 @@
package de.nowchess.analytics
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
/** Models the NowChess player network as a directed graph and runs GraphX analytics.
*
* Spark / GraphX concepts shown:
* - Building a Graph from RDDs derived from a JDBC DataFrame
* - PageRank — measures a player's "influence"; high score = many games against other high-ranked players (analogous
* to web link authority)
* - Connected Components — finds isolated player communities; players who have never played anyone from another
* component cannot be linked
* - Converting GraphX results back to DataFrames for SQL-style joins and output
*
* Graph model: Vertices: one per unique player (vertex ID = hashCode of player UUID string) Edges: one per completed
* game (white → black), attributed with result
*
* Note: hashCode gives a 32-bit → 64-bit vertex ID; collision probability is negligible for typical player counts. For
* millions of players, replace with MLlib StringIndexer to generate collision-free Long IDs.
*/
object PlayerGraphJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-graph"
val spark = SparkSession
.builder()
.appName("NowChess Player Graph Analytics")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
): Unit =
val gamesRdd: RDD[Row] = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result")
.filter(F.col("result").isNotNull)
.rdd
val toVid: String => VertexId = s => s.hashCode.toLong
// Each row contributes two vertex entries (white and black player).
val vertices: RDD[(VertexId, String)] = gamesRdd
.flatMap { row =>
Seq(
(toVid(row.getString(0)), row.getString(0)),
(toVid(row.getString(1)), row.getString(1)),
)
}
.distinct()
// Directed edge white → black, labelled with the game result.
val edges: RDD[Edge[String]] = gamesRdd.map { row =>
Edge(toVid(row.getString(0)), toVid(row.getString(1)), row.getString(2))
}
val graph: Graph[String, String] = Graph(vertices, edges)
println(s"[Graph] vertices=${graph.numVertices} edges=${graph.numEdges}")
// ── PageRank ────────────────────────────────────────────────────────────
// Convergence tolerance 0.01 — lower = more iterations = more accurate.
// Returns Graph[Double, Double]; vertex attribute = PageRank score.
val pageRanks: RDD[(VertexId, Double)] = graph.pageRank(0.01).vertices
// ── Connected Components ────────────────────────────────────────────────
// Returns Graph[VertexId, ED]; vertex attribute = minimum vertex ID in
// the component (serves as a stable component label).
val components: RDD[(VertexId, VertexId)] = graph.connectedComponents().vertices
// Convert each RDD result to a DataFrame so we can join with SQL semantics.
val vertexDf = rddToFrame(spark, vertices, "player_id", StringType)
val pageRankDf = rddToFrame(spark, pageRanks, "page_rank", DoubleType)
val componentDf = rddToFrame(spark, components, "component_id", LongType)
val result = vertexDf
.join(pageRankDf, "vertex_id")
.join(componentDf, "vertex_id")
.drop("vertex_id")
.withColumn("page_rank", F.round(F.col("page_rank"), 4))
.orderBy(F.desc("page_rank"))
println("[Graph] Top 20 players by PageRank:")
result.show(20, false)
result.write
.mode("overwrite")
.parquet(s"$outputDir/player_graph")
result.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_player_graph")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
// How many players belong to each connected component?
// A large dominant component + many singletons is the expected shape.
val componentSizes = result
.groupBy("component_id")
.agg(F.count("*").as("player_count"))
.orderBy(F.desc("player_count"))
println("[Graph] Connected component sizes:")
componentSizes.show(10, false)
componentSizes.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/component_sizes")
// Build a two-column DataFrame (vertex_id: Long, valueCol: valueType) from an RDD.
// Used to bridge GraphX RDD results into the DataFrame API without implicits.
private def rddToFrame[T](
spark: SparkSession,
rdd: RDD[(VertexId, T)],
valueCol: String,
valueType: DataType,
): org.apache.spark.sql.DataFrame =
val schema = StructType(
List(
StructField("vertex_id", LongType, nullable = false),
StructField(valueCol, valueType, nullable = false),
),
)
spark.createDataFrame(
rdd.map { case (vid, v) => Row.fromSeq(Seq[Any](vid, v)) },
schema,
)
@@ -0,0 +1,95 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Aggregates per-player statistics from completed games.
*
* Each game contributes one row per player (as white and as black), so the dataset is first unioned into a
* player-centric view before grouping. Output columns: player_id, total_games, wins, losses, draws, games_as_white,
* games_as_black, avg_move_count, win_rate
*
* Output is written as Parquet to `outputDir/player_stats`.
*/
object PlayerStatsJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-stats"
val spark = SparkSession
.builder()
.appName("NowChess Player Stats")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result", "move_count")
.filter(F.col("result").isNotNull)
// Flatten each game into two rows: one per player, tagged with their side.
val asWhite = games.select(
F.col("white_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit("white").as("color"),
)
val asBlack = games.select(
F.col("black_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit("black").as("color"),
)
val playerGames = asWhite.union(asBlack)
val wonGame = F.col("color") === F.col("result")
val lostGame = (F.col("color") === "white" && F.col("result") === "black")
.or(F.col("color") === "black" && F.col("result") === "white")
val stats = playerGames
.groupBy("player_id")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(wonGame, 1).otherwise(0)).as("wins"),
F.sum(F.when(lostGame, 1).otherwise(0)).as("losses"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
F.sum(F.when(F.col("color") === "white", 1).otherwise(0)).as("games_as_white"),
F.sum(F.when(F.col("color") === "black", 1).otherwise(0)).as("games_as_black"),
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
)
.withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.desc("total_games"))
stats.write
.mode("overwrite")
.parquet(s"$outputDir/player_stats")
stats.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_player_stats")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
+3
View File
@@ -0,0 +1,3 @@
MAJOR=0
MINOR=2
PATCH=0
+27
View File
@@ -254,3 +254,30 @@
### Reverts
* Revert "refactor: update metrics paths formatting in application.yml for clarity" ([3870566](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/38705663498d5f47c40dafe2f26198589ede8656))
## (2026-06-16)
### Features
* add initialization metrics for various services ([d438e97](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d438e97f32bdde0bfc63c1b4a8cc810cdd093166))
* add OpenTelemetry trace configuration with parentbased sampler ([3904d5a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3904d5ad8ad4930ddee65287a7bfab785a6148f5))
* **analytics:** add Spark batch analytics module ([#70](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/70)) ([39f1657](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/39f1657e1db6e84889af338c43be8cb5c03c3ec3))
* **config:** update application.yml for PostgreSQL and remove staging/production configurations ([2404e61](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/2404e6164c3b50ffccbea5238d636060d6abe4d6))
* **config:** update application.yml for staging and production environments ([6113432](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/6113432a14c476a3a0dfc0d449e17d023697f2ba))
* configure logging and add OpenTelemetry support ([#49](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/49)) ([d57c488](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/d57c4886612d1d92da0e1b79209fc83e6ef537a1))
* **docker:** add .dockerignore and .gitignore files for build exclusions ([c987d8e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c987d8e258c0e6c4cfbdaa8381c64c410d7a2b83))
* **docker:** add Dockerfiles for building Quarkus application in native and JVM modes ([3f2d2bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/3f2d2bb4c97fa8cddba66e1da4427c54236dfeed))
* **docker:** add Dockerfiles for Quarkus application in JVM and native modes ([34b9933](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/34b993304670cf2aa62cd2f6460cee7b9864b08e))
* **events:** migrate game-creation and bot flows to Redis Streams NCS-89 ([#62](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/62)) ([a24924c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a24924c23057db3d700a75dbc4333557789cd991))
* NCS-78 Add Traceability to the Applications ([#46](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/46)) ([649566e](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/649566eb3fcf38f91c8896a739f74ea318af312d))
* NCS-78 Add Traceability to the Applications ([#47](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/47)) ([87dfc6c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/87dfc6c2bcce7f7d58fc641bd8d468a2e584c108))
* NCS-82 add Swiss-system tournament module ([#55](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/55)) ([c5661de](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c5661de4a0ebf4b33211f5a391840dcf744656b7))
* **official-bots:** consume GameOver stream for bot cleanup ([#67](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/67)) ([db9d153](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/db9d1533912f4b41c4d1ca80ccffdde5d23d6ff6))
* true-microservices ([#40](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/40)) ([5909242](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/590924254e8a2754de661a57a03e43f89ceb6299))
### Bug Fixes
* **official-bots:** NCS-70-auto-register official bots with account service ([#59](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/59)) ([7117a93](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/7117a93376272094d0b1a6abf2121254ce396684))
### Reverts
* Revert "refactor: update metrics paths formatting in application.yml for clarity" ([3870566](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/38705663498d5f47c40dafe2f26198589ede8656))
@@ -0,0 +1,7 @@
package de.nowchess.bot.resource
case class JoinTournamentRequest(
tournamentId: String,
difficulty: String,
serverUrl: Option[String],
)
@@ -0,0 +1,7 @@
package de.nowchess.bot.resource
case class JoinTournamentResponse(
botId: String,
difficulty: String,
status: String,
)
@@ -0,0 +1,44 @@
package de.nowchess.bot.resource
import de.nowchess.bot.service.TournamentBotGamePlayer
import jakarta.annotation.security.RolesAllowed
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{MediaType, Response}
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@Path("/api/bots/official")
@ApplicationScoped
@RolesAllowed(Array("**"))
@Produces(Array(MediaType.APPLICATION_JSON))
@Consumes(Array(MediaType.APPLICATION_JSON))
class TournamentJoinResource:
private val log = Logger.getLogger(classOf[TournamentJoinResource])
// scalafix:off DisableSyntax.var
@Inject var player: TournamentBotGamePlayer = uninitialized
// scalafix:on DisableSyntax.var
@POST
@Path("/join-tournament")
def joinTournament(req: JoinTournamentRequest): Response =
val serverUrl = req.serverUrl.filter(_.nonEmpty).getOrElse(player.defaultServerUrl)
val difficulty = if req.difficulty.nonEmpty then req.difficulty else "medium"
log.infof(
"Official bot join requested — tournament=%s difficulty=%s server=%s",
req.tournamentId,
difficulty,
serverUrl,
)
player.joinTournament(req.tournamentId, difficulty, serverUrl) match
case Right(botId) =>
val resp = JoinTournamentResponse(botId, difficulty, "joining")
Response.ok(resp).build()
case Left(err) =>
Response
.status(Response.Status.BAD_GATEWAY)
.entity(s"""{"error":"$err"}""")
.build()
@@ -38,6 +38,9 @@ class TournamentBotGamePlayer:
@volatile private var running = true
// scalafix:on DisableSyntax.var
val defaultServerUrl: String =
System.getenv().asScala.getOrElse("TOURNAMENT_SERVER_URL", "http://localhost:8089")
@PostConstruct
def initialize(): Unit =
config match
@@ -45,10 +48,43 @@ class TournamentBotGamePlayer:
log.info("Tournament bot disabled — set TOURNAMENT_ID and TOURNAMENT_BOT_TOKEN to enable")
case Some(cfg) =>
log.infof("Tournament bot enabled — server=%s tournament=%s bot=%s", cfg.serverUrl, cfg.tournamentId, cfg.botId)
val thread = new Thread(() => connect(cfg), s"TournamentBot-${cfg.tournamentId}")
startAsync(cfg)
def joinTournament(tournamentId: String, difficulty: String, serverUrl: String): Either[String, String] =
registerBot(serverUrl, difficulty) match
case None => Left("Failed to register bot with tournament server")
case Some((botId, token)) =>
val cfg = TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty)
if join(cfg) then
startAsync(cfg)
Right(botId)
else Left("Failed to join tournament")
private def startAsync(cfg: TournamentBotConfig): Unit =
val thread = new Thread(() => streamLoop(cfg), s"TournamentBot-${cfg.tournamentId}")
thread.setDaemon(true)
thread.start()
private def registerBot(serverUrl: String, difficulty: String): Option[(String, String)] =
Try {
val name = s"NowChess ${difficulty.capitalize}"
val body = s"""{"name":"$name","isBot":true}"""
val response = client
.target(serverUrl)
.path("api")
.path("auth")
.path("register")
.request(MediaType.APPLICATION_JSON)
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
if response.getStatus == 201 then
val node = objectMapper.readTree(response.readEntity(classOf[String]))
val id = node.path("id").asText()
val token = node.path("token").asText()
response.close()
if id.nonEmpty && token.nonEmpty then Some((id, token)) else None
else { log.warnf("Bot registration returned status %d", response.getStatus); response.close(); None }
}.getOrElse(None)
@PreDestroy
def cleanup(): Unit =
running = false
@@ -56,8 +92,7 @@ class TournamentBotGamePlayer:
Try(client.close())
log.info("Tournament bot stopped")
private def connect(cfg: TournamentBotConfig): Unit =
if join(cfg) then
private def streamLoop(cfg: TournamentBotConfig): Unit =
while running do
Try(streamEvents(cfg)) match
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
@@ -86,41 +121,23 @@ class TournamentBotGamePlayer:
log.infof("Listening to tournament %s event stream", cfg.tournamentId)
forEachLine(response.readEntity(classOf[InputStream])): line =>
parse(line).foreach: node =>
if node.path("type").asText() == "gameStart" then onGameStart(cfg, node.path("gameId").asText())
if node.path("type").asText() == "gameStart" then
onGameStart(cfg, node.path("gameId").asText(), node.path("color").asText())
private def onGameStart(cfg: TournamentBotConfig, gameId: String): Unit =
if gameId.nonEmpty && activeGames.add(gameId) then
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId) })
private def onGameStart(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
if gameId.nonEmpty && color.nonEmpty && activeGames.add(gameId) then
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId, color) })
()
private def playGame(cfg: TournamentBotConfig, gameId: String): Unit =
private def playGame(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
Try {
colorFor(cfg, gameId) match
case None =>
log.debugf("Game %s is not ours — ignoring", gameId)
activeGames.remove(gameId)
case Some(color) =>
log.infof("Playing game %s as %s", gameId, color)
val stream = openGameStream(cfg, gameId)
maybeMoveFromCurrentState(cfg, gameId, color)
stream.foreach(consumeGameStream(cfg, gameId, color, _))
openGameStream(cfg, gameId).foreach(consumeGameStream(cfg, gameId, color, _))
activeGames.remove(gameId)
} match
case Failure(ex) => log.errorf(ex, "Game %s crashed", gameId); activeGames.remove(gameId)
case Success(_) => ()
private def colorFor(cfg: TournamentBotConfig, gameId: String): Option[String] =
fetchGame(cfg, gameId).flatMap: game =>
val white = game.path("white").path("id").asText()
val black = game.path("black").path("id").asText()
if white == cfg.botId then Some("white")
else if black == cfg.botId then Some("black")
else None
private def maybeMoveFromCurrentState(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
fetchGame(cfg, gameId).foreach: game =>
maybeMove(cfg, gameId, color, game.path("turn").asText(), game.path("status").asText(), game.path("fen").asText())
private def consumeGameStream(cfg: TournamentBotConfig, gameId: String, color: String, stream: InputStream): Unit =
val reader = new BufferedReader(new InputStreamReader(stream))
// scalafix:off DisableSyntax.var
@@ -134,9 +151,24 @@ class TournamentBotGamePlayer:
.foreach { line =>
parse(line).foreach: node =>
node.path("type").asText() match
case "gameState" =>
maybeMove(
cfg,
gameId,
color,
node.path("turn").asText(),
node.path("status").asText(),
node.path("fen").asText(),
)
case "move" =>
maybeMove(cfg, gameId, color, node.path("turn").asText(), "ongoing", node.path("fen").asText())
case "gameEnd" => log.infof("Game %s ended — status=%s", gameId, node.path("status").asText()); done = true
case "gameEnd" =>
log.infof(
"Game %s ended — status=%s winner=%s",
gameId,
node.path("status").asText(),
node.path("winner").asText(),
); done = true
case _ => ()
}
@@ -169,14 +201,6 @@ class TournamentBotGamePlayer:
case Failure(ex) => log.errorf(ex, "Error submitting move %s in game %s", uci, gameId)
case Success(_) => ()
private def fetchGame(cfg: TournamentBotConfig, gameId: String): Option[JsonNode] =
Try {
val response = target(cfg).path("game").path(gameId).request(MediaType.APPLICATION_JSON).get()
val node = if response.getStatus == 200 then Some(response.readEntity(classOf[JsonNode])) else None
response.close()
node
}.getOrElse(None)
private def openGameStream(cfg: TournamentBotConfig, gameId: String): Option[InputStream] =
Try {
val response = authed(cfg, target(cfg).path("game").path(gameId).path("stream"))
+1 -1
View File
@@ -1,3 +1,3 @@
MAJOR=0
MINOR=17
MINOR=18
PATCH=0
+25
View File
@@ -0,0 +1,25 @@
## (2026-06-10)
### Features
* NCS-121 pipeline for tournament ([#68](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/68)) ([145f467](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/145f4676483f92bfe6f2d9ca40e2cb4200982e87))
* NCS-82 add Swiss-system tournament module ([#55](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/55)) ([c5661de](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c5661de4a0ebf4b33211f5a391840dcf744656b7))
* **reflection:** add GameWritebackEventDto to native reflection configuration ([1aee39c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/1aee39c1ad286984501ac4b47da2b72d60b58a6f))
* **reflection:** add native reflection configuration for tournament classes ([65bc6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/65bc6a759937543df2d29905688bfa9e68d0c9d4))
### Bug Fixes
* **tournament:** replace scala.util.Random singleton with UUID for native image ([a50884a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a50884a11b1de500e74c18fd08d2d102d53cc3e9))
## (2026-06-16)
### Features
* **analytics:** add Spark batch analytics module ([#70](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/70)) ([39f1657](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/39f1657e1db6e84889af338c43be8cb5c03c3ec3))
* NCS-121 pipeline for tournament ([#68](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/68)) ([145f467](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/145f4676483f92bfe6f2d9ca40e2cb4200982e87))
* NCS-82 add Swiss-system tournament module ([#55](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/55)) ([c5661de](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/c5661de4a0ebf4b33211f5a391840dcf744656b7))
* **reflection:** add GameWritebackEventDto to native reflection configuration ([1aee39c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/1aee39c1ad286984501ac4b47da2b72d60b58a6f))
* **reflection:** add native reflection configuration for tournament classes ([65bc6a7](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/65bc6a759937543df2d29905688bfa9e68d0c9d4))
### Bug Fixes
* **tournament:** replace scala.util.Random singleton with UUID for native image ([a50884a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a50884a11b1de500e74c18fd08d2d102d53cc3e9))
@@ -31,6 +31,9 @@ import io.quarkus.runtime.annotations.RegisterForReflection
classOf[CoreCreateGameRequest],
classOf[CoreGameResponse],
classOf[GameWritebackEventDto],
classOf[ExternalTournamentServer],
classOf[RegisterServerRequest],
classOf[ExternalTournamentServerList],
),
)
class NativeReflectionConfig
@@ -0,0 +1,5 @@
package de.nowchess.tournament.dto
case class ExternalTournamentServer(id: String, label: String, url: String)
case class RegisterServerRequest(label: String, url: String)
case class ExternalTournamentServerList(servers: List[ExternalTournamentServer])
@@ -1,17 +1,24 @@
package de.nowchess.tournament.resource
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import de.nowchess.tournament.dto.*
import de.nowchess.tournament.error.TournamentError
import de.nowchess.tournament.service.{TournamentService, TournamentStreamManager}
import de.nowchess.tournament.service.{
ExternalTournamentClient,
TournamentServerRegistry,
TournamentService,
TournamentStreamManager,
}
import io.smallrye.mutiny.Multi
import jakarta.annotation.security.{PermitAll, RolesAllowed}
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response}
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response, StreamingOutput}
import org.eclipse.microprofile.jwt.JsonWebToken
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.jdk.CollectionConverters.*
@Path("/api/tournament")
@ApplicationScoped
@@ -25,18 +32,45 @@ class TournamentResource:
@Inject var tournamentService: TournamentService = uninitialized
@Inject var streamManager: TournamentStreamManager = uninitialized
@Inject var jwt: JsonWebToken = uninitialized
@Inject var registry: TournamentServerRegistry = uninitialized
@Inject var externalClient: ExternalTournamentClient = uninitialized
@Inject var objectMapper: ObjectMapper = uninitialized
@Context var headers: HttpHeaders = uninitialized
// scalafix:on
@GET
@PermitAll
def list(): Response =
val (created, started, finished) = tournamentService.list()
val dto = TournamentListDto(
created = created.map(t => tournamentService.toDto(t)),
started = started.map(t => tournamentService.toDto(t)),
finished = finished.map(t => tournamentService.toDto(t)),
)
Response.ok(dto).build()
val internalCreated = created.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
val internalStarted = started.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
val internalFinished = finished.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
val (extCreated, extStarted, extFinished) = registry
.serverUrls()
.foldLeft(
(List.empty[JsonNode], List.empty[JsonNode], List.empty[JsonNode]),
) { case ((ac, as, af), url) =>
externalClient.fetchList(url).fold((ac, as, af)) { node =>
val c = node.path("created").elements().asScala.toList
val s = node.path("started").elements().asScala.toList
val f = node.path("finished").elements().asScala.toList
(c ++ s ++ f).foreach(t => registry.bindTournament(t.path("id").asText(), url))
(ac ++ c, as ++ s, af ++ f)
}
}
val merged = objectMapper.createObjectNode()
val createdArr = objectMapper.createArrayNode()
val startedArr = objectMapper.createArrayNode()
val finishedArr = objectMapper.createArrayNode()
(internalCreated ++ extCreated).foreach(createdArr.add)
(internalStarted ++ extStarted).foreach(startedArr.add)
(internalFinished ++ extFinished).foreach(finishedArr.add)
merged.set("created", createdArr)
merged.set("started", startedArr)
merged.set("finished", finishedArr)
Response.ok(merged).build()
@POST
@RolesAllowed(Array("**"))
@@ -58,10 +92,13 @@ class TournamentResource:
@PermitAll
def get(@PathParam("id") id: String): Response =
tournamentService.get(id) match
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
case Some(t) =>
val standings = tournamentService.getStandings(id)
Response.ok(tournamentService.toDto(t, standings)).build()
case None =>
resolveServer(id)
.flatMap(url => externalClient.fetch(url, id).map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
@DELETE
@Path("/{id}")
@@ -79,7 +116,17 @@ class TournamentResource:
val userId = Option(jwt.getSubject).getOrElse("")
tournamentService.start(id, userId) match
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
case Left(error) => errorResponse(error)
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/start", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@POST
@Path("/{id}/join")
@@ -93,7 +140,17 @@ class TournamentResource:
val botName = Option(jwt.getClaim[AnyRef]("name")).map(_.toString).getOrElse(botId)
tournamentService.join(id, botId, botName) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) => errorResponse(error)
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/join", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@POST
@Path("/{id}/withdraw")
@@ -106,7 +163,17 @@ class TournamentResource:
val botId = Option(jwt.getSubject).getOrElse("")
tournamentService.withdraw(id, botId) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) => errorResponse(error)
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/withdraw", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@GET
@Path("/{id}/results")
@@ -133,20 +200,23 @@ class TournamentResource:
@PermitAll
def roundPairings(@PathParam("id") id: String, @PathParam("round") round: Int): Response =
tournamentService.get(id) match
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
case Some(_) =>
val pairings = tournamentService.getPairings(id, round)
Response.ok(RoundPairingsDto(round, pairings)).build()
case None =>
resolveServer(id)
.flatMap(url => externalClient.fetchPairings(url, id, round).map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
@GET
@Path("/{id}/export/games")
@PermitAll
@Produces(Array(MediaType.APPLICATION_JSON, MediaType.WILDCARD, "application/x-ndjson", "application/x-chess-pgn"))
def exportGames(@PathParam("id") id: String, @Context headers: HttpHeaders): Response =
def exportGames(@PathParam("id") id: String, @Context reqHeaders: HttpHeaders): Response =
tournamentService.get(id) match
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
case Some(_) =>
val acceptHeader = Option(headers.getHeaderString("Accept")).getOrElse("")
val acceptHeader = Option(reqHeaders.getHeaderString("Accept")).getOrElse("")
val pairings = tournamentService.getAllPairings(id)
if acceptHeader.contains("application/x-ndjson") then
val ndjson = pairings
@@ -176,6 +246,67 @@ class TournamentResource:
emitter.onTermination(() => streamManager.unregister(id, botId, emitter))
}
@GET
@Path("/{id}/game/{gameId}")
@PermitAll
def getGame(@PathParam("id") id: String, @PathParam("gameId") gameId: String): Response =
resolveServer(id)
.flatMap(url => externalClient.fetch(url, s"$id/game/$gameId").map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
@GET
@Path("/{id}/game/{gameId}/stream")
@PermitAll
@Produces(Array("application/x-ndjson"))
def streamGame(@PathParam("id") id: String, @PathParam("gameId") gameId: String): Response =
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.flatMap(url => externalClient.proxyGetStream(url, s"api/tournament/$id/game/$gameId/stream", auth))
.map { stream =>
Response
.ok(new StreamingOutput {
def write(output: java.io.OutputStream): Unit =
val buf = new Array[Byte](4096)
// scalafix:off DisableSyntax.var
var n = stream.read(buf)
while n >= 0 do
output.write(buf, 0, n)
output.flush()
n = stream.read(buf)
// scalafix:on
})
.`type`("application/x-ndjson")
.build()
}
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
@POST
@Path("/{id}/game/{gameId}/move/{uci}")
@RolesAllowed(Array("**"))
def makeMove(
@PathParam("id") id: String,
@PathParam("gameId") gameId: String,
@PathParam("uci") uci: String,
): Response =
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/game/$gameId/move/$uci", auth)
Response.status(status).entity(body).build()
}
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
private def resolveServer(tournamentId: String): Option[String] =
registry.findServerUrl(tournamentId).orElse {
registry
.serverUrls()
.find(url => externalClient.fetch(url, tournamentId).isDefined)
.map { url =>
registry.bindTournament(tournamentId, url)
url
}
}
private def errorResponse(error: TournamentError): Response =
val status = error match
case TournamentError.NotFound(_) => Response.Status.NOT_FOUND
@@ -0,0 +1,35 @@
package de.nowchess.tournament.resource
import de.nowchess.tournament.dto.{ErrorDto, RegisterServerRequest}
import de.nowchess.tournament.service.TournamentServerRegistry
import jakarta.annotation.security.RolesAllowed
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{MediaType, Response}
import scala.compiletime.uninitialized
@Path("/api/tournament/servers")
@ApplicationScoped
@RolesAllowed(Array("**"))
@Produces(Array(MediaType.APPLICATION_JSON))
@Consumes(Array(MediaType.APPLICATION_JSON))
class TournamentServerResource:
// scalafix:off DisableSyntax.var
@Inject var registry: TournamentServerRegistry = uninitialized
// scalafix:on
@GET
def list(): Response =
Response.ok(registry.list()).build()
@POST
def register(req: RegisterServerRequest): Response =
Response.status(201).entity(registry.register(req.label, req.url)).build()
@DELETE
@Path("/{id}")
def remove(@PathParam("id") id: String): Response =
if registry.remove(id) then Response.noContent().build()
else Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Server $id not found")).build()
@@ -0,0 +1,80 @@
package de.nowchess.tournament.service
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.client.{Client, ClientBuilder, Entity}
import jakarta.ws.rs.core.MediaType
import scala.compiletime.uninitialized
import scala.util.Try
@ApplicationScoped
class ExternalTournamentClient:
// scalafix:off DisableSyntax.var
@Inject var objectMapper: ObjectMapper = uninitialized
// scalafix:on
private def buildClient(): Client = ClientBuilder.newClient()
def fetchList(serverUrl: String): Option[JsonNode] =
Try {
val client = buildClient()
val response = client.target(s"$serverUrl/api/tournament").request(MediaType.APPLICATION_JSON).get()
try
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
else None
finally
response.close()
client.close()
}.getOrElse(None)
def fetch(serverUrl: String, id: String): Option[JsonNode] =
Try {
val client = buildClient()
val response = client.target(s"$serverUrl/api/tournament/$id").request(MediaType.APPLICATION_JSON).get()
try
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
else None
finally
response.close()
client.close()
}.getOrElse(None)
def fetchPairings(serverUrl: String, id: String, round: Int): Option[JsonNode] =
Try {
val client = buildClient()
val response =
client.target(s"$serverUrl/api/tournament/$id/round/$round").request(MediaType.APPLICATION_JSON).get()
try
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
else None
finally
response.close()
client.close()
}.getOrElse(None)
def proxyPost(serverUrl: String, path: String, authHeader: Option[String]): (Int, String) =
Try {
val client = buildClient()
val builder = client.target(s"$serverUrl/$path").request(MediaType.APPLICATION_JSON)
val withAuth = authHeader.fold(builder)(h => builder.header("Authorization", h))
val response = withAuth.post(Entity.json(""))
try (response.getStatus, response.readEntity(classOf[String]))
finally
response.close()
client.close()
}.getOrElse((502, """{"error":"External server unreachable"}"""))
def proxyGetStream(serverUrl: String, path: String, authHeader: Option[String]): Option[java.io.InputStream] =
Try {
val client = buildClient()
val builder = client.target(s"$serverUrl/$path").request("application/x-ndjson")
val withAuth = authHeader.fold(builder)(h => builder.header("Authorization", h))
val response = withAuth.get()
if response.getStatus == 200 then Some(response.readEntity(classOf[java.io.InputStream]))
else
response.close()
client.close()
None
}.getOrElse(None)
@@ -0,0 +1,34 @@
package de.nowchess.tournament.service
import de.nowchess.tournament.dto.ExternalTournamentServer
import jakarta.enterprise.context.ApplicationScoped
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.*
@ApplicationScoped
class TournamentServerRegistry:
private val servers = new ConcurrentHashMap[String, ExternalTournamentServer]()
private val tournaments = new ConcurrentHashMap[String, String]()
def register(label: String, url: String): ExternalTournamentServer =
val id = UUID.randomUUID().toString
val server = ExternalTournamentServer(id, label, url.stripSuffix("/"))
servers.put(id, server)
server
def list(): List[ExternalTournamentServer] =
servers.values().asScala.toList
def remove(id: String): Boolean =
Option(servers.remove(id)).isDefined
def serverUrls(): List[String] =
servers.values().asScala.map(_.url).toList
def bindTournament(tournamentId: String, serverUrl: String): Unit =
tournaments.put(tournamentId, serverUrl)
def findServerUrl(tournamentId: String): Option[String] =
Option(tournaments.get(tournamentId))
+1 -1
View File
@@ -1,3 +1,3 @@
MAJOR=0
MINOR=1
MINOR=3
PATCH=0
+1
View File
@@ -27,4 +27,5 @@ include(
"modules:store",
"modules:coordinator",
"modules:tournament",
"modules:analytics",
)