Compare commits

..

5 Commits

Author SHA1 Message Date
Janis Eccarius 3718f7669a feat(tournament): add external server registry and official-bot bridge
Build & Test (NowChessSystems) TeamCity build finished
TournamentServerRegistry holds multiple external tournament servers in
memory; ExternalTournamentClient proxies all read/write/stream calls to
them. TournamentResource fans out list() across registered servers and
routes per-tournament calls to the owning server via a tournamentId cache.
TournamentServerResource exposes GET/POST/DELETE /api/tournament/servers.

Official-bot bridge: TournamentBotGamePlayer gains a runtime joinTournament
API (registers fresh bot identity, starts background stream loop).
TournamentJoinResource exposes POST /api/bots/official/join-tournament so
the UI can add official bots without a server restart.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 08:40:14 +02:00
Janis Eccarius 0e0ea4c989 feat(analytics): add PostgreSQL JDBC write-back to all four batch jobs
Each batch job now writes its results to a Postgres table in addition to
the existing Parquet/CSV output. OpeningBookJob → analytics_opening_stats,
PlayerStatsJob → analytics_player_stats, PlayerClusteringJob →
analytics_player_clusters + analytics_cluster_archetypes, PlayerGraphJob
→ analytics_player_graph. MLlib Vector columns are excluded from the JDBC
write by reusing the already-selected scalar DataFrame in
PlayerClusteringJob.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 22:35:30 +02:00
Janis Eccarius 95215b6a42 feat(analytics): add Dockerfile, CI workflow, and stable jar name for K8s deployment
- Pin jar output to analytics.jar (no version suffix) so Dockerfile COPY is stable
- Add Dockerfile based on apache/spark:3.5.4-scala2.13-java17-ubuntu
- Add versions.env (0.1.0) matching GitOps overlay image tag
- Add analytics-image.yml CI workflow following native-image.yml conventions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 22:30:31 +02:00
Janis Eccarius e1d80b9331 feat(analytics): add Structured Streaming, MLlib clustering, GraphX jobs
Three new Spark jobs demonstrating complementary Spark pillars:

LiveDashboardJob (Structured Streaming):
- Simulates NowChess game-over event stream via rate source
- Watermarking (45 s late-data tolerance)
- Tumbling 1-min windows → append-mode Parquet output
- Sliding 5-min/1-min windows → update-mode console output
- Checkpointing for exactly-once fault tolerance
- Production wiring comments show Kafka / spark-redis swap-in

PlayerClusteringJob (MLlib):
- Derives 4 player features from game_records via JDBC
- VectorAssembler + StandardScaler + KMeans inside a Pipeline
- ClusteringEvaluator (silhouette score) to measure quality
- Per-cluster archetype averages show what each tier represents

PlayerGraphJob (GraphX):
- Builds directed player graph (vertices=players, edges=games)
- PageRank — identifies most influential/active players
- ConnectedComponents — finds isolated player communities
- Bridges GraphX RDD results back to DataFrames via explicit schema
  (avoids spark.implicits._ which breaks Scala 3 → Spark 2.13 interop)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 22:15:24 +02:00
Janis Eccarius 259b3bbb24 feat(analytics): add Spark batch analytics module
New standalone modules:analytics submodule with two Spark jobs:

- OpeningBookJob: reads game_records.pgn, extracts first N plies using
  pure Catalyst SQL expressions (no UDFs), aggregates win/draw/loss rates
  per opening sequence, writes Parquet + CSV top-1000 summary.

- PlayerStatsJob: unions each game into a player-centric view, aggregates
  total_games/wins/losses/draws/avg_move_count/win_rate per player_id,
  writes Parquet.

Module uses Scala 3 calling spark-sql_2.13 via JVM binary compatibility
(DataFrame API only; no spark.implicits._ / typed Datasets). Spark is
compileOnly; the fat jar bundles only scala3-library + postgresql driver.
Submit via spark-submit; see build.gradle.kts header for invocation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 21:58:05 +02:00
21 changed files with 1353 additions and 67 deletions
+130
View File
@@ -0,0 +1,130 @@
name: Build & Push Analytics Image
on:
push:
branches:
- main
paths:
- 'modules/analytics/**'
workflow_dispatch:
jobs:
check-actor:
runs-on: ubuntu-latest
outputs:
allowed: ${{ steps.check.outputs.allowed }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 1
- id: check
run: |
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
echo "Triggered manually — allowing build"
echo "allowed=true" >> "$GITHUB_OUTPUT"
else
COMMIT_AUTHOR=$(git log -1 --format='%an')
COMMIT_SHA=$(git log -1 --format='%H')
COMMIT_MSG=$(git log -1 --format='%s')
echo "Commit: ${COMMIT_SHA}"
echo "Author: ${COMMIT_AUTHOR}"
echo "Message: ${COMMIT_MSG}"
if [[ "$COMMIT_AUTHOR" == "TeamCity" ]]; then
echo "Author is TeamCity — allowing build"
echo "allowed=true" >> "$GITHUB_OUTPUT"
else
echo "Author is not TeamCity — skipping build"
echo "allowed=false" >> "$GITHUB_OUTPUT"
fi
fi
build-and-push:
needs: check-actor
if: needs.check-actor.outputs.allowed == 'true'
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Read version from versions.env
id: version
run: |
source modules/analytics/versions.env
echo "version=${MAJOR}.${MINOR}.${PATCH}" >> "$GITHUB_OUTPUT"
- name: Check if image exists in GHCR
id: image-check
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
PACKAGE="now-chess-systems%2Fanalytics"
VERSION="${{ steps.version.outputs.version }}"
EXISTING_TAGS=$(gh api "orgs/now-chess/packages/container/${PACKAGE}/versions" \
--jq '.[].metadata.container.tags[]' 2>/dev/null || echo "")
echo "Existing tags: $(echo "${EXISTING_TAGS}" | tr '\n' ' ' | xargs)"
if echo "${EXISTING_TAGS}" | grep -qx "${VERSION}"; then
echo "Image ${VERSION} already exists — skipping build"
echo "exists=true" >> "$GITHUB_OUTPUT"
else
echo "Image ${VERSION} not found — will build"
echo "exists=false" >> "$GITHUB_OUTPUT"
fi
- name: Set up JDK 17
if: steps.image-check.outputs.exists == 'false'
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
- name: Cache Gradle packages
if: steps.image-check.outputs.exists == 'false'
uses: actions/cache@v4
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: gradle-${{ runner.os }}-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: gradle-${{ runner.os }}-
- name: Build fat jar
if: steps.image-check.outputs.exists == 'false'
run: ./gradlew :modules:analytics:jar --no-daemon
- name: Set up Docker Buildx
if: steps.image-check.outputs.exists == 'false'
uses: docker/setup-buildx-action@v3
- name: Log in to GitHub Container Registry
if: steps.image-check.outputs.exists == 'false'
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
if: steps.image-check.outputs.exists == 'false'
id: meta
uses: docker/metadata-action@v5
with:
images: ghcr.io/now-chess/now-chess-systems/analytics
tags: |
type=raw,value=${{ steps.version.outputs.version }}
type=raw,value=latest
- name: Build and push
if: steps.image-check.outputs.exists == 'false'
uses: docker/build-push-action@v6
with:
context: modules/analytics
file: modules/analytics/src/main/docker/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
+2
View File
@@ -53,6 +53,8 @@ val coverageExclusions = listOf(
"**/core/src/main/scala/de/nowchess/chess/resource/GameWebSocketResource.scala",
// Coordinator infrastructure — gRPC, microservice orchestration
"**/coordinator/src/main/scala/**",
// Analytics module — standalone Spark batch jobs; coverage not applicable (no Quarkus, no scoverage plugin)
"modules/analytics/**",
)
// Converts a Sonar-style glob to a scoverage regex (matched against full source path).
+96
View File
@@ -0,0 +1,96 @@
// Standalone Spark batch-analytics module.
//
// Spark 3.5.x ships only Scala 2.12/2.13 artifacts; Scala 3 code can consume
// them via JVM binary compatibility so long as we avoid macro-expanded APIs
// (spark.implicits._, typed Dataset[T]). We use the untyped DataFrame API
// exclusively, which is safe to call from Scala 3.
//
// Spark is declared compileOnly — the cluster provides it at runtime via
// spark-submit. Only the PostgreSQL driver and the Scala 3 runtime are
// bundled into the fat jar produced by the "jar" task.
//
// Build the submission jar:
// ./gradlew :modules:analytics:jar
//
// Run a job:
// spark-submit \
// --class de.nowchess.analytics.OpeningBookJob \
// modules/analytics/build/libs/analytics-<version>.jar \
// [outputDir] [maxPlies]
//
// Environment variables consumed:
// NOWCHESS_JDBC_URL (default: jdbc:postgresql://localhost:5432/nowchess)
// NOWCHESS_DB_USER (default: nowchess)
// NOWCHESS_DB_PASS (default: nowchess)
plugins {
id("scala")
application
}
group = "de.nowchess"
version = "1.0-SNAPSHOT"
@Suppress("UNCHECKED_CAST")
val versions = rootProject.extra["VERSIONS"] as Map<String, String>
repositories {
mavenCentral()
}
scala {
scalaVersion = versions["SCALA3"]!!
}
val sparkVersion = "3.5.4"
dependencies {
compileOnly("org.scala-lang:scala3-compiler_3") {
version { strictly(versions["SCALA3"]!!) }
}
implementation("org.scala-lang:scala3-library_3") {
version { strictly(versions["SCALA3"]!!) }
}
implementation("org.scala-lang:scala-library") {
version { strictly(versions["SCALA_LIBRARY"]!!) }
}
// Spark is provided by the cluster — compile-only, not bundled.
compileOnly("org.apache.spark:spark-sql_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
compileOnly("org.apache.spark:spark-core_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
compileOnly("org.apache.spark:spark-mllib_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
compileOnly("org.apache.spark:spark-graphx_2.13:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
// PostgreSQL JDBC driver bundled so it is available on executor classpath.
implementation("org.postgresql:postgresql:42.7.4")
}
application {
mainClass.set("de.nowchess.analytics.OpeningBookJob")
}
// Fat jar: includes runtimeClasspath (our code + pg driver + scala3-library)
// but NOT compileOnly Spark jars.
// archiveVersion is cleared so the output is always "analytics.jar" — stable
// name required by the Dockerfile COPY instruction.
tasks.jar {
archiveBaseName.set("analytics")
archiveVersion.set("")
manifest {
attributes["Main-Class"] = "de.nowchess.analytics.OpeningBookJob"
}
from(configurations.runtimeClasspath.get().map { if (it.isDirectory) it else zipTree(it) })
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
tasks.withType<ScalaCompile> {
scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8")
}
@@ -0,0 +1,9 @@
FROM apache/spark:3.5.4-scala2.13-java17-ubuntu
USER root
# analytics.jar = fat jar containing app code + PostgreSQL JDBC driver + Scala 3 runtime.
# Spark itself is provided by the base image at /opt/spark — it is NOT included in the jar.
COPY build/libs/analytics.jar /app/analytics.jar
USER spark
@@ -0,0 +1,138 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
import org.apache.spark.sql.streaming.Trigger
/** Demonstrates Spark Structured Streaming on NowChess game-over events.
*
* Spark concepts shown:
* - Continuous micro-batch processing (`readStream`)
* - Watermarking for late-data tolerance (events up to 45 s late are accepted)
* - Tumbling window aggregations — fixed 1-minute buckets, zero overlap
* - Sliding window aggregations — 5-minute window, 1-minute slide
* - Append vs Update output modes and when each is valid
* - Exactly-once fault tolerance via checkpointing
* - Multiple concurrent streaming queries on the same session
*
* Production wiring: NowChess already publishes game-over events to a Redis Stream (`nowchess:game-over`, see
* GameRedisPublisher). Swap the `rate` source below for one of the production sources shown in the comment block.
*/
object LiveDashboardJob:
def main(args: Array[String]): Unit =
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-live-dashboard"
val spark = SparkSession
.builder()
.appName("NowChess Live Dashboard")
.getOrCreate()
run(spark, outputDir)
def run(spark: SparkSession, outputDir: String): Unit =
// ── Production sources (replace rate source with one of these) ─────────
//
// Kafka (via a Redis → Kafka bridge):
// spark.readStream
// .format("kafka")
// .option("kafka.bootstrap.servers", sys.env("KAFKA_BROKERS"))
// .option("subscribe", "nowchess.game-over")
// .load()
// .select(F.from_json(F.col("value").cast("string"), gameOverSchema).as("e"))
// .select("e.*")
//
// spark-redis (com.redislabs:spark-redis:3.1.0):
// spark.readStream
// .format("redis")
// .option("stream.keys", "nowchess:game-over")
// .schema(gameOverSchema)
// .load()
// ─────────────────────────────────────────────────────────────────────
// Simulated stream: 10 game-over events / second.
// `rate` source emits (timestamp: Timestamp, value: Long) — Spark built-in, no deps.
val rawStream = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.load()
// Derive game-outcome columns from the monotonic counter.
// In production these come directly from the event payload.
val events = rawStream
.withColumn(
"result",
F.when(F.col("value") % 3 === 0L, "white")
.when(F.col("value") % 3 === 1L, "black")
.otherwise("draw"),
)
.withColumn(
"termination",
F.when(F.col("value") % 4 === 0L, "checkmate")
.when(F.col("value") % 4 === 1L, "resignation")
.when(F.col("value") % 4 === 2L, "timeout")
.otherwise("agreement"),
)
// Watermark: accept events up to 45 seconds late.
// Spark will not emit a window result until the watermark passes its end time.
.withWatermark("timestamp", "45 seconds")
// ── Query 1: tumbling 1-minute windows ────────────────────────────────
// Each window is a non-overlapping 60-second bucket.
// outputMode("append") only emits a window after the watermark seals it —
// guarantees that late arrivals were already counted before output.
val gamesByWindow = events
.groupBy(F.window(F.col("timestamp"), "1 minute"), F.col("result"))
.agg(F.count("*").as("games"))
.select(
F.col("window.start").as("window_start"),
F.col("window.end").as("window_end"),
F.col("result"),
F.col("games"),
)
// ── Query 2: sliding 5-minute / 1-minute windows ──────────────────────
// Each window covers 5 minutes of data, and a new window opens every minute.
// outputMode("update") emits a row whenever an existing window changes —
// gives a live rolling view of termination patterns.
val terminationTrend = events
.groupBy(F.window(F.col("timestamp"), "5 minutes", "1 minute"))
.agg(
F.count("*").as("total"),
F.sum(F.when(F.col("termination") === "checkmate", 1).otherwise(0)).as("checkmates"),
F.sum(F.when(F.col("termination") === "resignation", 1).otherwise(0)).as("resignations"),
F.sum(F.when(F.col("termination") === "timeout", 1).otherwise(0)).as("timeouts"),
)
.withColumn(
"checkmate_pct",
F.round(F.col("checkmates") / F.col("total").cast("double") * 100, 1),
)
.select(
F.col("window.start").as("window_start"),
F.col("total"),
F.col("checkmate_pct"),
F.col("resignations"),
F.col("timeouts"),
)
// Write sealed windows to Parquet — safe to query with any SQL engine.
gamesByWindow.writeStream
.outputMode("append")
.format("parquet")
.option("path", s"$outputDir/game_counts_by_window")
.option("checkpointLocation", s"$outputDir/_checkpoints/game_counts")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Print live rolling stats to the console every 10 seconds.
terminationTrend.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.option("numRows", "10")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// Block until any query fails or the process is killed.
spark.streams.awaitAnyTermination()
@@ -0,0 +1,107 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Reads completed games from the game_records table and produces an opening-book statistics table: for each unique
* opening (first N plies), it reports total games played and win/draw/loss rates from each side.
*
* Output is written as Parquet to `outputDir/opening_book` and a human-readable CSV summary (top-1000 openings by
* popularity) to `outputDir/opening_book_top1000`.
*
* PGN parsing is done entirely with Spark SQL string functions — no UDFs — so the Catalyst optimizer can push
* predicates and the job scales to any cluster size.
*/
object OpeningBookJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-opening-book"
val maxPlies = if args.length > 1 then args(1).toInt else 10
val spark = SparkSession
.builder()
.appName("NowChess Opening Book Generator")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir, maxPlies)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
maxPlies: Int,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("pgn", "result")
.filter(F.col("result").isNotNull.and(F.col("pgn").isNotNull))
val openingCol = extractOpening(F.col("pgn"), maxPlies)
val withOpening = games
.withColumn("opening", openingCol)
.filter(F.col("opening").isNotNull.and(F.length(F.col("opening")) > 0))
val stats = withOpening
.groupBy("opening")
.agg(
F.count("*").as("total"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total").cast("double"), 3))
.withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total").cast("double"), 3))
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total").cast("double"), 3))
.orderBy(F.desc("total"))
stats.write
.mode("overwrite")
.parquet(s"$outputDir/opening_book")
val top1000 = stats.limit(1000)
top1000.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/opening_book_top1000")
top1000.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_opening_stats")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
/** Extracts the first `maxPlies` moves from a PGN column as a space-separated string.
*
* PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 *
*
* Steps:
* 1. Split on double-newline; take the moves section (index 1). 2. Strip the terminal result token (*, 1-0, 0-1,
* 1/2-1/2). 3. Strip move numbers (e.g., "1. ", "12. "). 4. Strip check/checkmate suffixes (+ #) for
* position-independent lookup. 5. Tokenize on whitespace, take first maxPlies tokens, rejoin with spaces.
*/
private def extractOpening(pgnCol: org.apache.spark.sql.Column, maxPlies: Int): org.apache.spark.sql.Column =
val moveSection = F.coalesce(F.split(pgnCol, "\n\n").getItem(1), pgnCol)
val noResult = F.regexp_replace(moveSection, "(1-0|0-1|1/2-1/2|\\*)\\s*$", "")
val noMoveNumbers = F.regexp_replace(noResult, "\\d+\\.+\\s*", " ")
val noAnnotations = F.regexp_replace(noMoveNumbers, "[+#]", "")
val moveArray = F.split(F.trim(noAnnotations), "\\s+")
F.array_join(F.slice(moveArray, 1, maxPlies), " ")
@@ -0,0 +1,174 @@
package de.nowchess.analytics
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Clusters NowChess players into skill tiers using K-Means via MLlib.
*
* Spark / MLlib concepts shown:
* - Feature engineering from raw relational data (JDBC → DataFrame)
* - VectorAssembler — combine scalar columns into a dense feature vector
* - StandardScaler — zero-mean / unit-variance normalisation so that total_games (can be 1000+) does not dominate
* win_rate (01)
* - KMeans clustering — unsupervised partitioning into k skill tiers
* - Pipeline — compose transformers + estimator into a single reusable object
* - ClusteringEvaluator — silhouette score to assess cluster quality
*
* Features per player (all derived from game_records): total_games — how active the player is win_rate — overall
* strength avg_move_count — game-length preference (tactical vs positional) games_as_white_ratio — colour bias
*
* Output: Parquet: player_id + cluster (0..k-1) + feature values CSV: per-cluster archetype averages (interpret what
* each tier means)
*/
object PlayerClusteringJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-clusters"
val k = if args.length > 1 then args(1).toInt else 4
val spark = SparkSession
.builder()
.appName("NowChess Player Clustering")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir, k)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
k: Int,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result", "move_count")
.filter(F.col("result").isNotNull)
val playerStats = buildPlayerStats(games)
.filter(F.col("total_games") >= 5)
val featureCols = Array("total_games", "win_rate", "avg_move_count", "games_as_white_ratio")
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("raw_features")
.setHandleInvalid("skip")
val scaler = new StandardScaler()
.setInputCol("raw_features")
.setOutputCol("features")
.setWithStd(true)
.setWithMean(true)
val kmeans = new KMeans()
.setK(k)
.setSeed(42L)
.setFeaturesCol("features")
.setPredictionCol("cluster")
val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans))
val model = pipeline.fit(playerStats)
val predictions = model.transform(playerStats)
val silhouette = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("cluster")
.evaluate(predictions)
println(s"[Clustering] k=$k silhouette=$silhouette")
// Average feature values per cluster reveal what each tier represents.
// Example interpretation for k=4:
// Cluster 0: high total_games + high win_rate → experienced strong players
// Cluster 1: low total_games + low win_rate → beginners / casual
// Cluster 2: high total_games + mid win_rate → active intermediate
// Cluster 3: low total_games + high win_rate → strong but infrequent
val archetypes = predictions
.groupBy("cluster")
.agg(
F.count("*").as("player_count"),
F.round(F.avg("total_games"), 1).as("avg_total_games"),
F.round(F.avg("win_rate"), 3).as("avg_win_rate"),
F.round(F.avg("avg_move_count"), 1).as("avg_move_count"),
F.round(F.avg("games_as_white_ratio"), 3).as("avg_white_ratio"),
)
.orderBy("cluster")
archetypes.show(20, false)
val clustersDf = predictions.select("player_id", "total_games", "win_rate", "avg_move_count", "cluster")
clustersDf.write
.mode("overwrite")
.parquet(s"$outputDir/player_clusters")
archetypes.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/cluster_archetypes")
clustersDf.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_player_clusters")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
archetypes.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_cluster_archetypes")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame =
val asWhite = games.select(
F.col("white_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit(1).as("is_white"),
)
val asBlack = games.select(
F.col("black_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit(0).as("is_white"),
)
val won = (F.col("is_white") === 1 && F.col("result") === "white")
.or(F.col("is_white") === 0 && F.col("result") === "black")
asWhite
.union(asBlack)
.groupBy("player_id")
.agg(
F.count("*").as("total_games"),
F.round(F.sum(F.when(won, 1.0).otherwise(0.0)) / F.count("*"), 3).as("win_rate"),
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
F.round(F.avg(F.col("is_white").cast("double")), 3).as("games_as_white_ratio"),
)
@@ -0,0 +1,161 @@
package de.nowchess.analytics
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
/** Models the NowChess player network as a directed graph and runs GraphX analytics.
*
* Spark / GraphX concepts shown:
* - Building a Graph from RDDs derived from a JDBC DataFrame
* - PageRank — measures a player's "influence"; high score = many games against other high-ranked players (analogous
* to web link authority)
* - Connected Components — finds isolated player communities; players who have never played anyone from another
* component cannot be linked
* - Converting GraphX results back to DataFrames for SQL-style joins and output
*
* Graph model: Vertices: one per unique player (vertex ID = hashCode of player UUID string) Edges: one per completed
* game (white → black), attributed with result
*
* Note: hashCode gives a 32-bit → 64-bit vertex ID; collision probability is negligible for typical player counts. For
* millions of players, replace with MLlib StringIndexer to generate collision-free Long IDs.
*/
object PlayerGraphJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-graph"
val spark = SparkSession
.builder()
.appName("NowChess Player Graph Analytics")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
): Unit =
val gamesRdd: RDD[Row] = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result")
.filter(F.col("result").isNotNull)
.rdd
val toVid: String => VertexId = s => s.hashCode.toLong
// Each row contributes two vertex entries (white and black player).
val vertices: RDD[(VertexId, String)] = gamesRdd
.flatMap { row =>
Seq(
(toVid(row.getString(0)), row.getString(0)),
(toVid(row.getString(1)), row.getString(1)),
)
}
.distinct()
// Directed edge white → black, labelled with the game result.
val edges: RDD[Edge[String]] = gamesRdd.map { row =>
Edge(toVid(row.getString(0)), toVid(row.getString(1)), row.getString(2))
}
val graph: Graph[String, String] = Graph(vertices, edges)
println(s"[Graph] vertices=${graph.numVertices} edges=${graph.numEdges}")
// ── PageRank ────────────────────────────────────────────────────────────
// Convergence tolerance 0.01 — lower = more iterations = more accurate.
// Returns Graph[Double, Double]; vertex attribute = PageRank score.
val pageRanks: RDD[(VertexId, Double)] = graph.pageRank(0.01).vertices
// ── Connected Components ────────────────────────────────────────────────
// Returns Graph[VertexId, ED]; vertex attribute = minimum vertex ID in
// the component (serves as a stable component label).
val components: RDD[(VertexId, VertexId)] = graph.connectedComponents().vertices
// Convert each RDD result to a DataFrame so we can join with SQL semantics.
val vertexDf = rddToFrame(spark, vertices, "player_id", StringType)
val pageRankDf = rddToFrame(spark, pageRanks, "page_rank", DoubleType)
val componentDf = rddToFrame(spark, components, "component_id", LongType)
val result = vertexDf
.join(pageRankDf, "vertex_id")
.join(componentDf, "vertex_id")
.drop("vertex_id")
.withColumn("page_rank", F.round(F.col("page_rank"), 4))
.orderBy(F.desc("page_rank"))
println("[Graph] Top 20 players by PageRank:")
result.show(20, false)
result.write
.mode("overwrite")
.parquet(s"$outputDir/player_graph")
result.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_player_graph")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
// How many players belong to each connected component?
// A large dominant component + many singletons is the expected shape.
val componentSizes = result
.groupBy("component_id")
.agg(F.count("*").as("player_count"))
.orderBy(F.desc("player_count"))
println("[Graph] Connected component sizes:")
componentSizes.show(10, false)
componentSizes.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/component_sizes")
// Build a two-column DataFrame (vertex_id: Long, valueCol: valueType) from an RDD.
// Used to bridge GraphX RDD results into the DataFrame API without implicits.
private def rddToFrame[T](
spark: SparkSession,
rdd: RDD[(VertexId, T)],
valueCol: String,
valueType: DataType,
): org.apache.spark.sql.DataFrame =
val schema = StructType(
List(
StructField("vertex_id", LongType, nullable = false),
StructField(valueCol, valueType, nullable = false),
),
)
spark.createDataFrame(
rdd.map { case (vid, v) => Row.fromSeq(Seq[Any](vid, v)) },
schema,
)
@@ -0,0 +1,95 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Aggregates per-player statistics from completed games.
*
* Each game contributes one row per player (as white and as black), so the dataset is first unioned into a
* player-centric view before grouping. Output columns: player_id, total_games, wins, losses, draws, games_as_white,
* games_as_black, avg_move_count, win_rate
*
* Output is written as Parquet to `outputDir/player_stats`.
*/
object PlayerStatsJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-stats"
val spark = SparkSession
.builder()
.appName("NowChess Player Stats")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result", "move_count")
.filter(F.col("result").isNotNull)
// Flatten each game into two rows: one per player, tagged with their side.
val asWhite = games.select(
F.col("white_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit("white").as("color"),
)
val asBlack = games.select(
F.col("black_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit("black").as("color"),
)
val playerGames = asWhite.union(asBlack)
val wonGame = F.col("color") === F.col("result")
val lostGame = (F.col("color") === "white" && F.col("result") === "black")
.or(F.col("color") === "black" && F.col("result") === "white")
val stats = playerGames
.groupBy("player_id")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(wonGame, 1).otherwise(0)).as("wins"),
F.sum(F.when(lostGame, 1).otherwise(0)).as("losses"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
F.sum(F.when(F.col("color") === "white", 1).otherwise(0)).as("games_as_white"),
F.sum(F.when(F.col("color") === "black", 1).otherwise(0)).as("games_as_black"),
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
)
.withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.desc("total_games"))
stats.write
.mode("overwrite")
.parquet(s"$outputDir/player_stats")
stats.write
.mode("overwrite")
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "analytics_player_stats")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.save()
+3
View File
@@ -0,0 +1,3 @@
MAJOR=0
MINOR=1
PATCH=0
@@ -0,0 +1,7 @@
package de.nowchess.bot.resource
case class JoinTournamentRequest(
tournamentId: String,
difficulty: String,
serverUrl: Option[String],
)
@@ -0,0 +1,7 @@
package de.nowchess.bot.resource
case class JoinTournamentResponse(
botId: String,
difficulty: String,
status: String,
)
@@ -0,0 +1,44 @@
package de.nowchess.bot.resource
import de.nowchess.bot.service.TournamentBotGamePlayer
import jakarta.annotation.security.RolesAllowed
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{MediaType, Response}
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@Path("/api/bots/official")
@ApplicationScoped
@RolesAllowed(Array("**"))
@Produces(Array(MediaType.APPLICATION_JSON))
@Consumes(Array(MediaType.APPLICATION_JSON))
class TournamentJoinResource:
private val log = Logger.getLogger(classOf[TournamentJoinResource])
// scalafix:off DisableSyntax.var
@Inject var player: TournamentBotGamePlayer = uninitialized
// scalafix:on DisableSyntax.var
@POST
@Path("/join-tournament")
def joinTournament(req: JoinTournamentRequest): Response =
val serverUrl = req.serverUrl.filter(_.nonEmpty).getOrElse(player.defaultServerUrl)
val difficulty = if req.difficulty.nonEmpty then req.difficulty else "medium"
log.infof(
"Official bot join requested — tournament=%s difficulty=%s server=%s",
req.tournamentId,
difficulty,
serverUrl,
)
player.joinTournament(req.tournamentId, difficulty, serverUrl) match
case Right(botId) =>
val resp = JoinTournamentResponse(botId, difficulty, "joining")
Response.ok(resp).build()
case Left(err) =>
Response
.status(Response.Status.BAD_GATEWAY)
.entity(s"""{"error":"$err"}""")
.build()
@@ -38,6 +38,9 @@ class TournamentBotGamePlayer:
@volatile private var running = true
// scalafix:on DisableSyntax.var
val defaultServerUrl: String =
System.getenv().asScala.getOrElse("TOURNAMENT_SERVER_URL", "http://localhost:8089")
@PostConstruct
def initialize(): Unit =
config match
@@ -45,9 +48,42 @@ class TournamentBotGamePlayer:
log.info("Tournament bot disabled — set TOURNAMENT_ID and TOURNAMENT_BOT_TOKEN to enable")
case Some(cfg) =>
log.infof("Tournament bot enabled — server=%s tournament=%s bot=%s", cfg.serverUrl, cfg.tournamentId, cfg.botId)
val thread = new Thread(() => connect(cfg), s"TournamentBot-${cfg.tournamentId}")
thread.setDaemon(true)
thread.start()
startAsync(cfg)
def joinTournament(tournamentId: String, difficulty: String, serverUrl: String): Either[String, String] =
registerBot(serverUrl, difficulty) match
case None => Left("Failed to register bot with tournament server")
case Some((botId, token)) =>
val cfg = TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty)
if join(cfg) then
startAsync(cfg)
Right(botId)
else Left("Failed to join tournament")
private def startAsync(cfg: TournamentBotConfig): Unit =
val thread = new Thread(() => streamLoop(cfg), s"TournamentBot-${cfg.tournamentId}")
thread.setDaemon(true)
thread.start()
private def registerBot(serverUrl: String, difficulty: String): Option[(String, String)] =
Try {
val name = s"NowChess ${difficulty.capitalize}"
val body = s"""{"name":"$name","isBot":true}"""
val response = client
.target(serverUrl)
.path("api")
.path("auth")
.path("register")
.request(MediaType.APPLICATION_JSON)
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
if response.getStatus == 201 then
val node = objectMapper.readTree(response.readEntity(classOf[String]))
val id = node.path("id").asText()
val token = node.path("token").asText()
response.close()
if id.nonEmpty && token.nonEmpty then Some((id, token)) else None
else { log.warnf("Bot registration returned status %d", response.getStatus); response.close(); None }
}.getOrElse(None)
@PreDestroy
def cleanup(): Unit =
@@ -56,12 +92,11 @@ class TournamentBotGamePlayer:
Try(client.close())
log.info("Tournament bot stopped")
private def connect(cfg: TournamentBotConfig): Unit =
if join(cfg) then
while running do
Try(streamEvents(cfg)) match
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
case Success(_) => sleep(2000)
private def streamLoop(cfg: TournamentBotConfig): Unit =
while running do
Try(streamEvents(cfg)) match
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
case Success(_) => sleep(2000)
private def join(cfg: TournamentBotConfig): Boolean =
Try {
@@ -86,41 +121,23 @@ class TournamentBotGamePlayer:
log.infof("Listening to tournament %s event stream", cfg.tournamentId)
forEachLine(response.readEntity(classOf[InputStream])): line =>
parse(line).foreach: node =>
if node.path("type").asText() == "gameStart" then onGameStart(cfg, node.path("gameId").asText())
if node.path("type").asText() == "gameStart" then
onGameStart(cfg, node.path("gameId").asText(), node.path("color").asText())
private def onGameStart(cfg: TournamentBotConfig, gameId: String): Unit =
if gameId.nonEmpty && activeGames.add(gameId) then
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId) })
private def onGameStart(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
if gameId.nonEmpty && color.nonEmpty && activeGames.add(gameId) then
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId, color) })
()
private def playGame(cfg: TournamentBotConfig, gameId: String): Unit =
private def playGame(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
Try {
colorFor(cfg, gameId) match
case None =>
log.debugf("Game %s is not ours — ignoring", gameId)
activeGames.remove(gameId)
case Some(color) =>
log.infof("Playing game %s as %s", gameId, color)
val stream = openGameStream(cfg, gameId)
maybeMoveFromCurrentState(cfg, gameId, color)
stream.foreach(consumeGameStream(cfg, gameId, color, _))
activeGames.remove(gameId)
log.infof("Playing game %s as %s", gameId, color)
openGameStream(cfg, gameId).foreach(consumeGameStream(cfg, gameId, color, _))
activeGames.remove(gameId)
} match
case Failure(ex) => log.errorf(ex, "Game %s crashed", gameId); activeGames.remove(gameId)
case Success(_) => ()
private def colorFor(cfg: TournamentBotConfig, gameId: String): Option[String] =
fetchGame(cfg, gameId).flatMap: game =>
val white = game.path("white").path("id").asText()
val black = game.path("black").path("id").asText()
if white == cfg.botId then Some("white")
else if black == cfg.botId then Some("black")
else None
private def maybeMoveFromCurrentState(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
fetchGame(cfg, gameId).foreach: game =>
maybeMove(cfg, gameId, color, game.path("turn").asText(), game.path("status").asText(), game.path("fen").asText())
private def consumeGameStream(cfg: TournamentBotConfig, gameId: String, color: String, stream: InputStream): Unit =
val reader = new BufferedReader(new InputStreamReader(stream))
// scalafix:off DisableSyntax.var
@@ -134,10 +151,25 @@ class TournamentBotGamePlayer:
.foreach { line =>
parse(line).foreach: node =>
node.path("type").asText() match
case "gameState" =>
maybeMove(
cfg,
gameId,
color,
node.path("turn").asText(),
node.path("status").asText(),
node.path("fen").asText(),
)
case "move" =>
maybeMove(cfg, gameId, color, node.path("turn").asText(), "ongoing", node.path("fen").asText())
case "gameEnd" => log.infof("Game %s ended — status=%s", gameId, node.path("status").asText()); done = true
case _ => ()
case "gameEnd" =>
log.infof(
"Game %s ended — status=%s winner=%s",
gameId,
node.path("status").asText(),
node.path("winner").asText(),
); done = true
case _ => ()
}
private def maybeMove(
@@ -169,14 +201,6 @@ class TournamentBotGamePlayer:
case Failure(ex) => log.errorf(ex, "Error submitting move %s in game %s", uci, gameId)
case Success(_) => ()
private def fetchGame(cfg: TournamentBotConfig, gameId: String): Option[JsonNode] =
Try {
val response = target(cfg).path("game").path(gameId).request(MediaType.APPLICATION_JSON).get()
val node = if response.getStatus == 200 then Some(response.readEntity(classOf[JsonNode])) else None
response.close()
node
}.getOrElse(None)
private def openGameStream(cfg: TournamentBotConfig, gameId: String): Option[InputStream] =
Try {
val response = authed(cfg, target(cfg).path("game").path(gameId).path("stream"))
@@ -31,6 +31,9 @@ import io.quarkus.runtime.annotations.RegisterForReflection
classOf[CoreCreateGameRequest],
classOf[CoreGameResponse],
classOf[GameWritebackEventDto],
classOf[ExternalTournamentServer],
classOf[RegisterServerRequest],
classOf[ExternalTournamentServerList],
),
)
class NativeReflectionConfig
@@ -0,0 +1,5 @@
package de.nowchess.tournament.dto
case class ExternalTournamentServer(id: String, label: String, url: String)
case class RegisterServerRequest(label: String, url: String)
case class ExternalTournamentServerList(servers: List[ExternalTournamentServer])
@@ -1,17 +1,24 @@
package de.nowchess.tournament.resource
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import de.nowchess.tournament.dto.*
import de.nowchess.tournament.error.TournamentError
import de.nowchess.tournament.service.{TournamentService, TournamentStreamManager}
import de.nowchess.tournament.service.{
ExternalTournamentClient,
TournamentServerRegistry,
TournamentService,
TournamentStreamManager,
}
import io.smallrye.mutiny.Multi
import jakarta.annotation.security.{PermitAll, RolesAllowed}
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response}
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response, StreamingOutput}
import org.eclipse.microprofile.jwt.JsonWebToken
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
import scala.jdk.CollectionConverters.*
@Path("/api/tournament")
@ApplicationScoped
@@ -22,21 +29,48 @@ class TournamentResource:
private val log = Logger.getLogger(classOf[TournamentResource])
// scalafix:off DisableSyntax.var
@Inject var tournamentService: TournamentService = uninitialized
@Inject var streamManager: TournamentStreamManager = uninitialized
@Inject var jwt: JsonWebToken = uninitialized
@Inject var tournamentService: TournamentService = uninitialized
@Inject var streamManager: TournamentStreamManager = uninitialized
@Inject var jwt: JsonWebToken = uninitialized
@Inject var registry: TournamentServerRegistry = uninitialized
@Inject var externalClient: ExternalTournamentClient = uninitialized
@Inject var objectMapper: ObjectMapper = uninitialized
@Context var headers: HttpHeaders = uninitialized
// scalafix:on
@GET
@PermitAll
def list(): Response =
val (created, started, finished) = tournamentService.list()
val dto = TournamentListDto(
created = created.map(t => tournamentService.toDto(t)),
started = started.map(t => tournamentService.toDto(t)),
finished = finished.map(t => tournamentService.toDto(t)),
)
Response.ok(dto).build()
val internalCreated = created.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
val internalStarted = started.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
val internalFinished = finished.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
val (extCreated, extStarted, extFinished) = registry
.serverUrls()
.foldLeft(
(List.empty[JsonNode], List.empty[JsonNode], List.empty[JsonNode]),
) { case ((ac, as, af), url) =>
externalClient.fetchList(url).fold((ac, as, af)) { node =>
val c = node.path("created").elements().asScala.toList
val s = node.path("started").elements().asScala.toList
val f = node.path("finished").elements().asScala.toList
(c ++ s ++ f).foreach(t => registry.bindTournament(t.path("id").asText(), url))
(ac ++ c, as ++ s, af ++ f)
}
}
val merged = objectMapper.createObjectNode()
val createdArr = objectMapper.createArrayNode()
val startedArr = objectMapper.createArrayNode()
val finishedArr = objectMapper.createArrayNode()
(internalCreated ++ extCreated).foreach(createdArr.add)
(internalStarted ++ extStarted).foreach(startedArr.add)
(internalFinished ++ extFinished).foreach(finishedArr.add)
merged.set("created", createdArr)
merged.set("started", startedArr)
merged.set("finished", finishedArr)
Response.ok(merged).build()
@POST
@RolesAllowed(Array("**"))
@@ -58,10 +92,13 @@ class TournamentResource:
@PermitAll
def get(@PathParam("id") id: String): Response =
tournamentService.get(id) match
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
case Some(t) =>
val standings = tournamentService.getStandings(id)
Response.ok(tournamentService.toDto(t, standings)).build()
case None =>
resolveServer(id)
.flatMap(url => externalClient.fetch(url, id).map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
@DELETE
@Path("/{id}")
@@ -78,8 +115,18 @@ class TournamentResource:
def start(@PathParam("id") id: String): Response =
val userId = Option(jwt.getSubject).getOrElse("")
tournamentService.start(id, userId) match
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
case Left(error) => errorResponse(error)
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/start", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@POST
@Path("/{id}/join")
@@ -92,8 +139,18 @@ class TournamentResource:
val botId = Option(jwt.getSubject).getOrElse("")
val botName = Option(jwt.getClaim[AnyRef]("name")).map(_.toString).getOrElse(botId)
tournamentService.join(id, botId, botName) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) => errorResponse(error)
case Right(_) => Response.ok(OkDto()).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/join", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@POST
@Path("/{id}/withdraw")
@@ -105,8 +162,18 @@ class TournamentResource:
else
val botId = Option(jwt.getSubject).getOrElse("")
tournamentService.withdraw(id, botId) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) => errorResponse(error)
case Right(_) => Response.ok(OkDto()).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/withdraw", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@GET
@Path("/{id}/results")
@@ -133,20 +200,23 @@ class TournamentResource:
@PermitAll
def roundPairings(@PathParam("id") id: String, @PathParam("round") round: Int): Response =
tournamentService.get(id) match
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
case Some(_) =>
val pairings = tournamentService.getPairings(id, round)
Response.ok(RoundPairingsDto(round, pairings)).build()
case None =>
resolveServer(id)
.flatMap(url => externalClient.fetchPairings(url, id, round).map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
@GET
@Path("/{id}/export/games")
@PermitAll
@Produces(Array(MediaType.APPLICATION_JSON, MediaType.WILDCARD, "application/x-ndjson", "application/x-chess-pgn"))
def exportGames(@PathParam("id") id: String, @Context headers: HttpHeaders): Response =
def exportGames(@PathParam("id") id: String, @Context reqHeaders: HttpHeaders): Response =
tournamentService.get(id) match
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
case Some(_) =>
val acceptHeader = Option(headers.getHeaderString("Accept")).getOrElse("")
val acceptHeader = Option(reqHeaders.getHeaderString("Accept")).getOrElse("")
val pairings = tournamentService.getAllPairings(id)
if acceptHeader.contains("application/x-ndjson") then
val ndjson = pairings
@@ -176,6 +246,67 @@ class TournamentResource:
emitter.onTermination(() => streamManager.unregister(id, botId, emitter))
}
@GET
@Path("/{id}/game/{gameId}")
@PermitAll
def getGame(@PathParam("id") id: String, @PathParam("gameId") gameId: String): Response =
resolveServer(id)
.flatMap(url => externalClient.fetch(url, s"$id/game/$gameId").map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
@GET
@Path("/{id}/game/{gameId}/stream")
@PermitAll
@Produces(Array("application/x-ndjson"))
def streamGame(@PathParam("id") id: String, @PathParam("gameId") gameId: String): Response =
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.flatMap(url => externalClient.proxyGetStream(url, s"api/tournament/$id/game/$gameId/stream", auth))
.map { stream =>
Response
.ok(new StreamingOutput {
def write(output: java.io.OutputStream): Unit =
val buf = new Array[Byte](4096)
// scalafix:off DisableSyntax.var
var n = stream.read(buf)
while n >= 0 do
output.write(buf, 0, n)
output.flush()
n = stream.read(buf)
// scalafix:on
})
.`type`("application/x-ndjson")
.build()
}
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
@POST
@Path("/{id}/game/{gameId}/move/{uci}")
@RolesAllowed(Array("**"))
def makeMove(
@PathParam("id") id: String,
@PathParam("gameId") gameId: String,
@PathParam("uci") uci: String,
): Response =
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/game/$gameId/move/$uci", auth)
Response.status(status).entity(body).build()
}
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
private def resolveServer(tournamentId: String): Option[String] =
registry.findServerUrl(tournamentId).orElse {
registry
.serverUrls()
.find(url => externalClient.fetch(url, tournamentId).isDefined)
.map { url =>
registry.bindTournament(tournamentId, url)
url
}
}
private def errorResponse(error: TournamentError): Response =
val status = error match
case TournamentError.NotFound(_) => Response.Status.NOT_FOUND
@@ -0,0 +1,35 @@
package de.nowchess.tournament.resource
import de.nowchess.tournament.dto.{ErrorDto, RegisterServerRequest}
import de.nowchess.tournament.service.TournamentServerRegistry
import jakarta.annotation.security.RolesAllowed
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{MediaType, Response}
import scala.compiletime.uninitialized
@Path("/api/tournament/servers")
@ApplicationScoped
@RolesAllowed(Array("**"))
@Produces(Array(MediaType.APPLICATION_JSON))
@Consumes(Array(MediaType.APPLICATION_JSON))
class TournamentServerResource:
// scalafix:off DisableSyntax.var
@Inject var registry: TournamentServerRegistry = uninitialized
// scalafix:on
@GET
def list(): Response =
Response.ok(registry.list()).build()
@POST
def register(req: RegisterServerRequest): Response =
Response.status(201).entity(registry.register(req.label, req.url)).build()
@DELETE
@Path("/{id}")
def remove(@PathParam("id") id: String): Response =
if registry.remove(id) then Response.noContent().build()
else Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Server $id not found")).build()
@@ -0,0 +1,80 @@
package de.nowchess.tournament.service
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.client.{Client, ClientBuilder, Entity}
import jakarta.ws.rs.core.MediaType
import scala.compiletime.uninitialized
import scala.util.Try
@ApplicationScoped
class ExternalTournamentClient:
// scalafix:off DisableSyntax.var
@Inject var objectMapper: ObjectMapper = uninitialized
// scalafix:on
private def buildClient(): Client = ClientBuilder.newClient()
def fetchList(serverUrl: String): Option[JsonNode] =
Try {
val client = buildClient()
val response = client.target(s"$serverUrl/api/tournament").request(MediaType.APPLICATION_JSON).get()
try
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
else None
finally
response.close()
client.close()
}.getOrElse(None)
def fetch(serverUrl: String, id: String): Option[JsonNode] =
Try {
val client = buildClient()
val response = client.target(s"$serverUrl/api/tournament/$id").request(MediaType.APPLICATION_JSON).get()
try
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
else None
finally
response.close()
client.close()
}.getOrElse(None)
def fetchPairings(serverUrl: String, id: String, round: Int): Option[JsonNode] =
Try {
val client = buildClient()
val response =
client.target(s"$serverUrl/api/tournament/$id/round/$round").request(MediaType.APPLICATION_JSON).get()
try
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
else None
finally
response.close()
client.close()
}.getOrElse(None)
def proxyPost(serverUrl: String, path: String, authHeader: Option[String]): (Int, String) =
Try {
val client = buildClient()
val builder = client.target(s"$serverUrl/$path").request(MediaType.APPLICATION_JSON)
val withAuth = authHeader.fold(builder)(h => builder.header("Authorization", h))
val response = withAuth.post(Entity.json(""))
try (response.getStatus, response.readEntity(classOf[String]))
finally
response.close()
client.close()
}.getOrElse((502, """{"error":"External server unreachable"}"""))
def proxyGetStream(serverUrl: String, path: String, authHeader: Option[String]): Option[java.io.InputStream] =
Try {
val client = buildClient()
val builder = client.target(s"$serverUrl/$path").request("application/x-ndjson")
val withAuth = authHeader.fold(builder)(h => builder.header("Authorization", h))
val response = withAuth.get()
if response.getStatus == 200 then Some(response.readEntity(classOf[java.io.InputStream]))
else
response.close()
client.close()
None
}.getOrElse(None)
@@ -0,0 +1,34 @@
package de.nowchess.tournament.service
import de.nowchess.tournament.dto.ExternalTournamentServer
import jakarta.enterprise.context.ApplicationScoped
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.*
@ApplicationScoped
class TournamentServerRegistry:
private val servers = new ConcurrentHashMap[String, ExternalTournamentServer]()
private val tournaments = new ConcurrentHashMap[String, String]()
def register(label: String, url: String): ExternalTournamentServer =
val id = UUID.randomUUID().toString
val server = ExternalTournamentServer(id, label, url.stripSuffix("/"))
servers.put(id, server)
server
def list(): List[ExternalTournamentServer] =
servers.values().asScala.toList
def remove(id: String): Boolean =
Option(servers.remove(id)).isDefined
def serverUrls(): List[String] =
servers.values().asScala.map(_.url).toList
def bindTournament(tournamentId: String, serverUrl: String): Unit =
tournaments.put(tournamentId, serverUrl)
def findServerUrl(tournamentId: String): Option[String] =
Option(tournaments.get(tournamentId))
+1
View File
@@ -27,4 +27,5 @@ include(
"modules:store",
"modules:coordinator",
"modules:tournament",
"modules:analytics",
)