Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3718f7669a | |||
| 0e0ea4c989 | |||
| 95215b6a42 | |||
| e1d80b9331 | |||
| 259b3bbb24 |
@@ -0,0 +1,130 @@
|
||||
name: Build & Push Analytics Image
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
- 'modules/analytics/**'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
check-actor:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
allowed: ${{ steps.check.outputs.allowed }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 1
|
||||
|
||||
- id: check
|
||||
run: |
|
||||
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
|
||||
echo "Triggered manually — allowing build"
|
||||
echo "allowed=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
COMMIT_AUTHOR=$(git log -1 --format='%an')
|
||||
COMMIT_SHA=$(git log -1 --format='%H')
|
||||
COMMIT_MSG=$(git log -1 --format='%s')
|
||||
echo "Commit: ${COMMIT_SHA}"
|
||||
echo "Author: ${COMMIT_AUTHOR}"
|
||||
echo "Message: ${COMMIT_MSG}"
|
||||
if [[ "$COMMIT_AUTHOR" == "TeamCity" ]]; then
|
||||
echo "Author is TeamCity — allowing build"
|
||||
echo "allowed=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "Author is not TeamCity — skipping build"
|
||||
echo "allowed=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
fi
|
||||
|
||||
build-and-push:
|
||||
needs: check-actor
|
||||
if: needs.check-actor.outputs.allowed == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
packages: write
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Read version from versions.env
|
||||
id: version
|
||||
run: |
|
||||
source modules/analytics/versions.env
|
||||
echo "version=${MAJOR}.${MINOR}.${PATCH}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Check if image exists in GHCR
|
||||
id: image-check
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
PACKAGE="now-chess-systems%2Fanalytics"
|
||||
VERSION="${{ steps.version.outputs.version }}"
|
||||
EXISTING_TAGS=$(gh api "orgs/now-chess/packages/container/${PACKAGE}/versions" \
|
||||
--jq '.[].metadata.container.tags[]' 2>/dev/null || echo "")
|
||||
echo "Existing tags: $(echo "${EXISTING_TAGS}" | tr '\n' ' ' | xargs)"
|
||||
if echo "${EXISTING_TAGS}" | grep -qx "${VERSION}"; then
|
||||
echo "Image ${VERSION} already exists — skipping build"
|
||||
echo "exists=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "Image ${VERSION} not found — will build"
|
||||
echo "exists=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Set up JDK 17
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
uses: actions/setup-java@v4
|
||||
with:
|
||||
java-version: '17'
|
||||
distribution: 'temurin'
|
||||
|
||||
- name: Cache Gradle packages
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.gradle/caches
|
||||
~/.gradle/wrapper
|
||||
key: gradle-${{ runner.os }}-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
|
||||
restore-keys: gradle-${{ runner.os }}-
|
||||
|
||||
- name: Build fat jar
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
run: ./gradlew :modules:analytics:jar --no-daemon
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Log in to GitHub Container Registry
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Extract metadata
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ghcr.io/now-chess/now-chess-systems/analytics
|
||||
tags: |
|
||||
type=raw,value=${{ steps.version.outputs.version }}
|
||||
type=raw,value=latest
|
||||
|
||||
- name: Build and push
|
||||
if: steps.image-check.outputs.exists == 'false'
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: modules/analytics
|
||||
file: modules/analytics/src/main/docker/Dockerfile
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
@@ -53,6 +53,8 @@ val coverageExclusions = listOf(
|
||||
"**/core/src/main/scala/de/nowchess/chess/resource/GameWebSocketResource.scala",
|
||||
// Coordinator infrastructure — gRPC, microservice orchestration
|
||||
"**/coordinator/src/main/scala/**",
|
||||
// Analytics module — standalone Spark batch jobs; coverage not applicable (no Quarkus, no scoverage plugin)
|
||||
"modules/analytics/**",
|
||||
)
|
||||
|
||||
// Converts a Sonar-style glob to a scoverage regex (matched against full source path).
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
// Standalone Spark batch-analytics module.
|
||||
//
|
||||
// Spark 3.5.x ships only Scala 2.12/2.13 artifacts; Scala 3 code can consume
|
||||
// them via JVM binary compatibility so long as we avoid macro-expanded APIs
|
||||
// (spark.implicits._, typed Dataset[T]). We use the untyped DataFrame API
|
||||
// exclusively, which is safe to call from Scala 3.
|
||||
//
|
||||
// Spark is declared compileOnly — the cluster provides it at runtime via
|
||||
// spark-submit. Only the PostgreSQL driver and the Scala 3 runtime are
|
||||
// bundled into the fat jar produced by the "jar" task.
|
||||
//
|
||||
// Build the submission jar:
|
||||
// ./gradlew :modules:analytics:jar
|
||||
//
|
||||
// Run a job:
|
||||
// spark-submit \
|
||||
// --class de.nowchess.analytics.OpeningBookJob \
|
||||
// modules/analytics/build/libs/analytics-<version>.jar \
|
||||
// [outputDir] [maxPlies]
|
||||
//
|
||||
// Environment variables consumed:
|
||||
// NOWCHESS_JDBC_URL (default: jdbc:postgresql://localhost:5432/nowchess)
|
||||
// NOWCHESS_DB_USER (default: nowchess)
|
||||
// NOWCHESS_DB_PASS (default: nowchess)
|
||||
|
||||
plugins {
|
||||
id("scala")
|
||||
application
|
||||
}
|
||||
|
||||
group = "de.nowchess"
|
||||
version = "1.0-SNAPSHOT"
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val versions = rootProject.extra["VERSIONS"] as Map<String, String>
|
||||
|
||||
repositories {
|
||||
mavenCentral()
|
||||
}
|
||||
|
||||
scala {
|
||||
scalaVersion = versions["SCALA3"]!!
|
||||
}
|
||||
|
||||
val sparkVersion = "3.5.4"
|
||||
|
||||
dependencies {
|
||||
compileOnly("org.scala-lang:scala3-compiler_3") {
|
||||
version { strictly(versions["SCALA3"]!!) }
|
||||
}
|
||||
implementation("org.scala-lang:scala3-library_3") {
|
||||
version { strictly(versions["SCALA3"]!!) }
|
||||
}
|
||||
implementation("org.scala-lang:scala-library") {
|
||||
version { strictly(versions["SCALA_LIBRARY"]!!) }
|
||||
}
|
||||
|
||||
// Spark is provided by the cluster — compile-only, not bundled.
|
||||
compileOnly("org.apache.spark:spark-sql_2.13:$sparkVersion") {
|
||||
exclude(group = "org.slf4j", module = "slf4j-log4j12")
|
||||
}
|
||||
compileOnly("org.apache.spark:spark-core_2.13:$sparkVersion") {
|
||||
exclude(group = "org.slf4j", module = "slf4j-log4j12")
|
||||
}
|
||||
compileOnly("org.apache.spark:spark-mllib_2.13:$sparkVersion") {
|
||||
exclude(group = "org.slf4j", module = "slf4j-log4j12")
|
||||
}
|
||||
compileOnly("org.apache.spark:spark-graphx_2.13:$sparkVersion") {
|
||||
exclude(group = "org.slf4j", module = "slf4j-log4j12")
|
||||
}
|
||||
|
||||
// PostgreSQL JDBC driver bundled so it is available on executor classpath.
|
||||
implementation("org.postgresql:postgresql:42.7.4")
|
||||
}
|
||||
|
||||
application {
|
||||
mainClass.set("de.nowchess.analytics.OpeningBookJob")
|
||||
}
|
||||
|
||||
// Fat jar: includes runtimeClasspath (our code + pg driver + scala3-library)
|
||||
// but NOT compileOnly Spark jars.
|
||||
// archiveVersion is cleared so the output is always "analytics.jar" — stable
|
||||
// name required by the Dockerfile COPY instruction.
|
||||
tasks.jar {
|
||||
archiveBaseName.set("analytics")
|
||||
archiveVersion.set("")
|
||||
manifest {
|
||||
attributes["Main-Class"] = "de.nowchess.analytics.OpeningBookJob"
|
||||
}
|
||||
from(configurations.runtimeClasspath.get().map { if (it.isDirectory) it else zipTree(it) })
|
||||
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
|
||||
}
|
||||
|
||||
tasks.withType<ScalaCompile> {
|
||||
scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8")
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
FROM apache/spark:3.5.4-scala2.13-java17-ubuntu
|
||||
|
||||
USER root
|
||||
|
||||
# analytics.jar = fat jar containing app code + PostgreSQL JDBC driver + Scala 3 runtime.
|
||||
# Spark itself is provided by the base image at /opt/spark — it is NOT included in the jar.
|
||||
COPY build/libs/analytics.jar /app/analytics.jar
|
||||
|
||||
USER spark
|
||||
@@ -0,0 +1,138 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.functions as F
|
||||
import org.apache.spark.sql.streaming.Trigger
|
||||
|
||||
/** Demonstrates Spark Structured Streaming on NowChess game-over events.
|
||||
*
|
||||
* Spark concepts shown:
|
||||
* - Continuous micro-batch processing (`readStream`)
|
||||
* - Watermarking for late-data tolerance (events up to 45 s late are accepted)
|
||||
* - Tumbling window aggregations — fixed 1-minute buckets, zero overlap
|
||||
* - Sliding window aggregations — 5-minute window, 1-minute slide
|
||||
* - Append vs Update output modes and when each is valid
|
||||
* - Exactly-once fault tolerance via checkpointing
|
||||
* - Multiple concurrent streaming queries on the same session
|
||||
*
|
||||
* Production wiring: NowChess already publishes game-over events to a Redis Stream (`nowchess:game-over`, see
|
||||
* GameRedisPublisher). Swap the `rate` source below for one of the production sources shown in the comment block.
|
||||
*/
|
||||
object LiveDashboardJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-live-dashboard"
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Live Dashboard")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, outputDir)
|
||||
|
||||
def run(spark: SparkSession, outputDir: String): Unit =
|
||||
|
||||
// ── Production sources (replace rate source with one of these) ─────────
|
||||
//
|
||||
// Kafka (via a Redis → Kafka bridge):
|
||||
// spark.readStream
|
||||
// .format("kafka")
|
||||
// .option("kafka.bootstrap.servers", sys.env("KAFKA_BROKERS"))
|
||||
// .option("subscribe", "nowchess.game-over")
|
||||
// .load()
|
||||
// .select(F.from_json(F.col("value").cast("string"), gameOverSchema).as("e"))
|
||||
// .select("e.*")
|
||||
//
|
||||
// spark-redis (com.redislabs:spark-redis:3.1.0):
|
||||
// spark.readStream
|
||||
// .format("redis")
|
||||
// .option("stream.keys", "nowchess:game-over")
|
||||
// .schema(gameOverSchema)
|
||||
// .load()
|
||||
// ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
// Simulated stream: 10 game-over events / second.
|
||||
// `rate` source emits (timestamp: Timestamp, value: Long) — Spark built-in, no deps.
|
||||
val rawStream = spark.readStream
|
||||
.format("rate")
|
||||
.option("rowsPerSecond", "10")
|
||||
.load()
|
||||
|
||||
// Derive game-outcome columns from the monotonic counter.
|
||||
// In production these come directly from the event payload.
|
||||
val events = rawStream
|
||||
.withColumn(
|
||||
"result",
|
||||
F.when(F.col("value") % 3 === 0L, "white")
|
||||
.when(F.col("value") % 3 === 1L, "black")
|
||||
.otherwise("draw"),
|
||||
)
|
||||
.withColumn(
|
||||
"termination",
|
||||
F.when(F.col("value") % 4 === 0L, "checkmate")
|
||||
.when(F.col("value") % 4 === 1L, "resignation")
|
||||
.when(F.col("value") % 4 === 2L, "timeout")
|
||||
.otherwise("agreement"),
|
||||
)
|
||||
// Watermark: accept events up to 45 seconds late.
|
||||
// Spark will not emit a window result until the watermark passes its end time.
|
||||
.withWatermark("timestamp", "45 seconds")
|
||||
|
||||
// ── Query 1: tumbling 1-minute windows ────────────────────────────────
|
||||
// Each window is a non-overlapping 60-second bucket.
|
||||
// outputMode("append") only emits a window after the watermark seals it —
|
||||
// guarantees that late arrivals were already counted before output.
|
||||
val gamesByWindow = events
|
||||
.groupBy(F.window(F.col("timestamp"), "1 minute"), F.col("result"))
|
||||
.agg(F.count("*").as("games"))
|
||||
.select(
|
||||
F.col("window.start").as("window_start"),
|
||||
F.col("window.end").as("window_end"),
|
||||
F.col("result"),
|
||||
F.col("games"),
|
||||
)
|
||||
|
||||
// ── Query 2: sliding 5-minute / 1-minute windows ──────────────────────
|
||||
// Each window covers 5 minutes of data, and a new window opens every minute.
|
||||
// outputMode("update") emits a row whenever an existing window changes —
|
||||
// gives a live rolling view of termination patterns.
|
||||
val terminationTrend = events
|
||||
.groupBy(F.window(F.col("timestamp"), "5 minutes", "1 minute"))
|
||||
.agg(
|
||||
F.count("*").as("total"),
|
||||
F.sum(F.when(F.col("termination") === "checkmate", 1).otherwise(0)).as("checkmates"),
|
||||
F.sum(F.when(F.col("termination") === "resignation", 1).otherwise(0)).as("resignations"),
|
||||
F.sum(F.when(F.col("termination") === "timeout", 1).otherwise(0)).as("timeouts"),
|
||||
)
|
||||
.withColumn(
|
||||
"checkmate_pct",
|
||||
F.round(F.col("checkmates") / F.col("total").cast("double") * 100, 1),
|
||||
)
|
||||
.select(
|
||||
F.col("window.start").as("window_start"),
|
||||
F.col("total"),
|
||||
F.col("checkmate_pct"),
|
||||
F.col("resignations"),
|
||||
F.col("timeouts"),
|
||||
)
|
||||
|
||||
// Write sealed windows to Parquet — safe to query with any SQL engine.
|
||||
gamesByWindow.writeStream
|
||||
.outputMode("append")
|
||||
.format("parquet")
|
||||
.option("path", s"$outputDir/game_counts_by_window")
|
||||
.option("checkpointLocation", s"$outputDir/_checkpoints/game_counts")
|
||||
.trigger(Trigger.ProcessingTime("30 seconds"))
|
||||
.start()
|
||||
|
||||
// Print live rolling stats to the console every 10 seconds.
|
||||
terminationTrend.writeStream
|
||||
.outputMode("update")
|
||||
.format("console")
|
||||
.option("truncate", "false")
|
||||
.option("numRows", "10")
|
||||
.trigger(Trigger.ProcessingTime("10 seconds"))
|
||||
.start()
|
||||
|
||||
// Block until any query fails or the process is killed.
|
||||
spark.streams.awaitAnyTermination()
|
||||
@@ -0,0 +1,107 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.functions as F
|
||||
|
||||
/** Reads completed games from the game_records table and produces an opening-book statistics table: for each unique
|
||||
* opening (first N plies), it reports total games played and win/draw/loss rates from each side.
|
||||
*
|
||||
* Output is written as Parquet to `outputDir/opening_book` and a human-readable CSV summary (top-1000 openings by
|
||||
* popularity) to `outputDir/opening_book_top1000`.
|
||||
*
|
||||
* PGN parsing is done entirely with Spark SQL string functions — no UDFs — so the Catalyst optimizer can push
|
||||
* predicates and the job scales to any cluster size.
|
||||
*/
|
||||
object OpeningBookJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-opening-book"
|
||||
val maxPlies = if args.length > 1 then args(1).toInt else 10
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Opening Book Generator")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir, maxPlies)
|
||||
spark.stop()
|
||||
|
||||
def run(
|
||||
spark: SparkSession,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
maxPlies: Int,
|
||||
): Unit =
|
||||
val games = spark.read
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "game_records")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.option("fetchsize", "10000")
|
||||
.load()
|
||||
.select("pgn", "result")
|
||||
.filter(F.col("result").isNotNull.and(F.col("pgn").isNotNull))
|
||||
|
||||
val openingCol = extractOpening(F.col("pgn"), maxPlies)
|
||||
|
||||
val withOpening = games
|
||||
.withColumn("opening", openingCol)
|
||||
.filter(F.col("opening").isNotNull.and(F.length(F.col("opening")) > 0))
|
||||
|
||||
val stats = withOpening
|
||||
.groupBy("opening")
|
||||
.agg(
|
||||
F.count("*").as("total"),
|
||||
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
|
||||
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
|
||||
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
|
||||
)
|
||||
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total").cast("double"), 3))
|
||||
.withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total").cast("double"), 3))
|
||||
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total").cast("double"), 3))
|
||||
.orderBy(F.desc("total"))
|
||||
|
||||
stats.write
|
||||
.mode("overwrite")
|
||||
.parquet(s"$outputDir/opening_book")
|
||||
|
||||
val top1000 = stats.limit(1000)
|
||||
|
||||
top1000.write
|
||||
.mode("overwrite")
|
||||
.option("header", "true")
|
||||
.csv(s"$outputDir/opening_book_top1000")
|
||||
|
||||
top1000.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "analytics_opening_stats")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
|
||||
/** Extracts the first `maxPlies` moves from a PGN column as a space-separated string.
|
||||
*
|
||||
* PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 *
|
||||
*
|
||||
* Steps:
|
||||
* 1. Split on double-newline; take the moves section (index 1). 2. Strip the terminal result token (*, 1-0, 0-1,
|
||||
* 1/2-1/2). 3. Strip move numbers (e.g., "1. ", "12. "). 4. Strip check/checkmate suffixes (+ #) for
|
||||
* position-independent lookup. 5. Tokenize on whitespace, take first maxPlies tokens, rejoin with spaces.
|
||||
*/
|
||||
private def extractOpening(pgnCol: org.apache.spark.sql.Column, maxPlies: Int): org.apache.spark.sql.Column =
|
||||
val moveSection = F.coalesce(F.split(pgnCol, "\n\n").getItem(1), pgnCol)
|
||||
val noResult = F.regexp_replace(moveSection, "(1-0|0-1|1/2-1/2|\\*)\\s*$", "")
|
||||
val noMoveNumbers = F.regexp_replace(noResult, "\\d+\\.+\\s*", " ")
|
||||
val noAnnotations = F.regexp_replace(noMoveNumbers, "[+#]", "")
|
||||
val moveArray = F.split(F.trim(noAnnotations), "\\s+")
|
||||
F.array_join(F.slice(moveArray, 1, maxPlies), " ")
|
||||
@@ -0,0 +1,174 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.ml.Pipeline
|
||||
import org.apache.spark.ml.clustering.KMeans
|
||||
import org.apache.spark.ml.evaluation.ClusteringEvaluator
|
||||
import org.apache.spark.ml.feature.StandardScaler
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.functions as F
|
||||
|
||||
/** Clusters NowChess players into skill tiers using K-Means via MLlib.
|
||||
*
|
||||
* Spark / MLlib concepts shown:
|
||||
* - Feature engineering from raw relational data (JDBC → DataFrame)
|
||||
* - VectorAssembler — combine scalar columns into a dense feature vector
|
||||
* - StandardScaler — zero-mean / unit-variance normalisation so that total_games (can be 1000+) does not dominate
|
||||
* win_rate (0–1)
|
||||
* - KMeans clustering — unsupervised partitioning into k skill tiers
|
||||
* - Pipeline — compose transformers + estimator into a single reusable object
|
||||
* - ClusteringEvaluator — silhouette score to assess cluster quality
|
||||
*
|
||||
* Features per player (all derived from game_records): total_games — how active the player is win_rate — overall
|
||||
* strength avg_move_count — game-length preference (tactical vs positional) games_as_white_ratio — colour bias
|
||||
*
|
||||
* Output: Parquet: player_id + cluster (0..k-1) + feature values CSV: per-cluster archetype averages (interpret what
|
||||
* each tier means)
|
||||
*/
|
||||
object PlayerClusteringJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-clusters"
|
||||
val k = if args.length > 1 then args(1).toInt else 4
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Player Clustering")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir, k)
|
||||
spark.stop()
|
||||
|
||||
def run(
|
||||
spark: SparkSession,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
k: Int,
|
||||
): Unit =
|
||||
val games = spark.read
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "game_records")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.option("fetchsize", "10000")
|
||||
.load()
|
||||
.select("white_id", "black_id", "result", "move_count")
|
||||
.filter(F.col("result").isNotNull)
|
||||
|
||||
val playerStats = buildPlayerStats(games)
|
||||
.filter(F.col("total_games") >= 5)
|
||||
|
||||
val featureCols = Array("total_games", "win_rate", "avg_move_count", "games_as_white_ratio")
|
||||
|
||||
val assembler = new VectorAssembler()
|
||||
.setInputCols(featureCols)
|
||||
.setOutputCol("raw_features")
|
||||
.setHandleInvalid("skip")
|
||||
|
||||
val scaler = new StandardScaler()
|
||||
.setInputCol("raw_features")
|
||||
.setOutputCol("features")
|
||||
.setWithStd(true)
|
||||
.setWithMean(true)
|
||||
|
||||
val kmeans = new KMeans()
|
||||
.setK(k)
|
||||
.setSeed(42L)
|
||||
.setFeaturesCol("features")
|
||||
.setPredictionCol("cluster")
|
||||
|
||||
val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans))
|
||||
|
||||
val model = pipeline.fit(playerStats)
|
||||
val predictions = model.transform(playerStats)
|
||||
|
||||
val silhouette = new ClusteringEvaluator()
|
||||
.setFeaturesCol("features")
|
||||
.setPredictionCol("cluster")
|
||||
.evaluate(predictions)
|
||||
|
||||
println(s"[Clustering] k=$k silhouette=$silhouette")
|
||||
|
||||
// Average feature values per cluster reveal what each tier represents.
|
||||
// Example interpretation for k=4:
|
||||
// Cluster 0: high total_games + high win_rate → experienced strong players
|
||||
// Cluster 1: low total_games + low win_rate → beginners / casual
|
||||
// Cluster 2: high total_games + mid win_rate → active intermediate
|
||||
// Cluster 3: low total_games + high win_rate → strong but infrequent
|
||||
val archetypes = predictions
|
||||
.groupBy("cluster")
|
||||
.agg(
|
||||
F.count("*").as("player_count"),
|
||||
F.round(F.avg("total_games"), 1).as("avg_total_games"),
|
||||
F.round(F.avg("win_rate"), 3).as("avg_win_rate"),
|
||||
F.round(F.avg("avg_move_count"), 1).as("avg_move_count"),
|
||||
F.round(F.avg("games_as_white_ratio"), 3).as("avg_white_ratio"),
|
||||
)
|
||||
.orderBy("cluster")
|
||||
|
||||
archetypes.show(20, false)
|
||||
|
||||
val clustersDf = predictions.select("player_id", "total_games", "win_rate", "avg_move_count", "cluster")
|
||||
|
||||
clustersDf.write
|
||||
.mode("overwrite")
|
||||
.parquet(s"$outputDir/player_clusters")
|
||||
|
||||
archetypes.write
|
||||
.mode("overwrite")
|
||||
.option("header", "true")
|
||||
.csv(s"$outputDir/cluster_archetypes")
|
||||
|
||||
clustersDf.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "analytics_player_clusters")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
|
||||
archetypes.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "analytics_cluster_archetypes")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
|
||||
private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame =
|
||||
val asWhite = games.select(
|
||||
F.col("white_id").as("player_id"),
|
||||
F.col("result"),
|
||||
F.col("move_count"),
|
||||
F.lit(1).as("is_white"),
|
||||
)
|
||||
val asBlack = games.select(
|
||||
F.col("black_id").as("player_id"),
|
||||
F.col("result"),
|
||||
F.col("move_count"),
|
||||
F.lit(0).as("is_white"),
|
||||
)
|
||||
|
||||
val won = (F.col("is_white") === 1 && F.col("result") === "white")
|
||||
.or(F.col("is_white") === 0 && F.col("result") === "black")
|
||||
|
||||
asWhite
|
||||
.union(asBlack)
|
||||
.groupBy("player_id")
|
||||
.agg(
|
||||
F.count("*").as("total_games"),
|
||||
F.round(F.sum(F.when(won, 1.0).otherwise(0.0)) / F.count("*"), 3).as("win_rate"),
|
||||
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
|
||||
F.round(F.avg(F.col("is_white").cast("double")), 3).as("games_as_white_ratio"),
|
||||
)
|
||||
@@ -0,0 +1,161 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.graphx.Edge
|
||||
import org.apache.spark.graphx.Graph
|
||||
import org.apache.spark.graphx.VertexId
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.functions as F
|
||||
import org.apache.spark.sql.types.DataType
|
||||
import org.apache.spark.sql.types.DoubleType
|
||||
import org.apache.spark.sql.types.LongType
|
||||
import org.apache.spark.sql.types.StringType
|
||||
import org.apache.spark.sql.types.StructField
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
/** Models the NowChess player network as a directed graph and runs GraphX analytics.
|
||||
*
|
||||
* Spark / GraphX concepts shown:
|
||||
* - Building a Graph from RDDs derived from a JDBC DataFrame
|
||||
* - PageRank — measures a player's "influence"; high score = many games against other high-ranked players (analogous
|
||||
* to web link authority)
|
||||
* - Connected Components — finds isolated player communities; players who have never played anyone from another
|
||||
* component cannot be linked
|
||||
* - Converting GraphX results back to DataFrames for SQL-style joins and output
|
||||
*
|
||||
* Graph model: Vertices: one per unique player (vertex ID = hashCode of player UUID string) Edges: one per completed
|
||||
* game (white → black), attributed with result
|
||||
*
|
||||
* Note: hashCode gives a 32-bit → 64-bit vertex ID; collision probability is negligible for typical player counts. For
|
||||
* millions of players, replace with MLlib StringIndexer to generate collision-free Long IDs.
|
||||
*/
|
||||
object PlayerGraphJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-graph"
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Player Graph Analytics")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
spark.stop()
|
||||
|
||||
def run(
|
||||
spark: SparkSession,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
): Unit =
|
||||
val gamesRdd: RDD[Row] = spark.read
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "game_records")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.option("fetchsize", "10000")
|
||||
.load()
|
||||
.select("white_id", "black_id", "result")
|
||||
.filter(F.col("result").isNotNull)
|
||||
.rdd
|
||||
|
||||
val toVid: String => VertexId = s => s.hashCode.toLong
|
||||
|
||||
// Each row contributes two vertex entries (white and black player).
|
||||
val vertices: RDD[(VertexId, String)] = gamesRdd
|
||||
.flatMap { row =>
|
||||
Seq(
|
||||
(toVid(row.getString(0)), row.getString(0)),
|
||||
(toVid(row.getString(1)), row.getString(1)),
|
||||
)
|
||||
}
|
||||
.distinct()
|
||||
|
||||
// Directed edge white → black, labelled with the game result.
|
||||
val edges: RDD[Edge[String]] = gamesRdd.map { row =>
|
||||
Edge(toVid(row.getString(0)), toVid(row.getString(1)), row.getString(2))
|
||||
}
|
||||
|
||||
val graph: Graph[String, String] = Graph(vertices, edges)
|
||||
|
||||
println(s"[Graph] vertices=${graph.numVertices} edges=${graph.numEdges}")
|
||||
|
||||
// ── PageRank ────────────────────────────────────────────────────────────
|
||||
// Convergence tolerance 0.01 — lower = more iterations = more accurate.
|
||||
// Returns Graph[Double, Double]; vertex attribute = PageRank score.
|
||||
val pageRanks: RDD[(VertexId, Double)] = graph.pageRank(0.01).vertices
|
||||
|
||||
// ── Connected Components ────────────────────────────────────────────────
|
||||
// Returns Graph[VertexId, ED]; vertex attribute = minimum vertex ID in
|
||||
// the component (serves as a stable component label).
|
||||
val components: RDD[(VertexId, VertexId)] = graph.connectedComponents().vertices
|
||||
|
||||
// Convert each RDD result to a DataFrame so we can join with SQL semantics.
|
||||
val vertexDf = rddToFrame(spark, vertices, "player_id", StringType)
|
||||
val pageRankDf = rddToFrame(spark, pageRanks, "page_rank", DoubleType)
|
||||
val componentDf = rddToFrame(spark, components, "component_id", LongType)
|
||||
|
||||
val result = vertexDf
|
||||
.join(pageRankDf, "vertex_id")
|
||||
.join(componentDf, "vertex_id")
|
||||
.drop("vertex_id")
|
||||
.withColumn("page_rank", F.round(F.col("page_rank"), 4))
|
||||
.orderBy(F.desc("page_rank"))
|
||||
|
||||
println("[Graph] Top 20 players by PageRank:")
|
||||
result.show(20, false)
|
||||
|
||||
result.write
|
||||
.mode("overwrite")
|
||||
.parquet(s"$outputDir/player_graph")
|
||||
|
||||
result.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "analytics_player_graph")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
|
||||
// How many players belong to each connected component?
|
||||
// A large dominant component + many singletons is the expected shape.
|
||||
val componentSizes = result
|
||||
.groupBy("component_id")
|
||||
.agg(F.count("*").as("player_count"))
|
||||
.orderBy(F.desc("player_count"))
|
||||
|
||||
println("[Graph] Connected component sizes:")
|
||||
componentSizes.show(10, false)
|
||||
|
||||
componentSizes.write
|
||||
.mode("overwrite")
|
||||
.option("header", "true")
|
||||
.csv(s"$outputDir/component_sizes")
|
||||
|
||||
// Build a two-column DataFrame (vertex_id: Long, valueCol: valueType) from an RDD.
|
||||
// Used to bridge GraphX RDD results into the DataFrame API without implicits.
|
||||
private def rddToFrame[T](
|
||||
spark: SparkSession,
|
||||
rdd: RDD[(VertexId, T)],
|
||||
valueCol: String,
|
||||
valueType: DataType,
|
||||
): org.apache.spark.sql.DataFrame =
|
||||
val schema = StructType(
|
||||
List(
|
||||
StructField("vertex_id", LongType, nullable = false),
|
||||
StructField(valueCol, valueType, nullable = false),
|
||||
),
|
||||
)
|
||||
spark.createDataFrame(
|
||||
rdd.map { case (vid, v) => Row.fromSeq(Seq[Any](vid, v)) },
|
||||
schema,
|
||||
)
|
||||
@@ -0,0 +1,95 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.functions as F
|
||||
|
||||
/** Aggregates per-player statistics from completed games.
|
||||
*
|
||||
* Each game contributes one row per player (as white and as black), so the dataset is first unioned into a
|
||||
* player-centric view before grouping. Output columns: player_id, total_games, wins, losses, draws, games_as_white,
|
||||
* games_as_black, avg_move_count, win_rate
|
||||
*
|
||||
* Output is written as Parquet to `outputDir/player_stats`.
|
||||
*/
|
||||
object PlayerStatsJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-stats"
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Player Stats")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
spark.stop()
|
||||
|
||||
def run(
|
||||
spark: SparkSession,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
): Unit =
|
||||
val games = spark.read
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "game_records")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.option("fetchsize", "10000")
|
||||
.load()
|
||||
.select("white_id", "black_id", "result", "move_count")
|
||||
.filter(F.col("result").isNotNull)
|
||||
|
||||
// Flatten each game into two rows: one per player, tagged with their side.
|
||||
val asWhite = games.select(
|
||||
F.col("white_id").as("player_id"),
|
||||
F.col("result"),
|
||||
F.col("move_count"),
|
||||
F.lit("white").as("color"),
|
||||
)
|
||||
val asBlack = games.select(
|
||||
F.col("black_id").as("player_id"),
|
||||
F.col("result"),
|
||||
F.col("move_count"),
|
||||
F.lit("black").as("color"),
|
||||
)
|
||||
|
||||
val playerGames = asWhite.union(asBlack)
|
||||
|
||||
val wonGame = F.col("color") === F.col("result")
|
||||
val lostGame = (F.col("color") === "white" && F.col("result") === "black")
|
||||
.or(F.col("color") === "black" && F.col("result") === "white")
|
||||
|
||||
val stats = playerGames
|
||||
.groupBy("player_id")
|
||||
.agg(
|
||||
F.count("*").as("total_games"),
|
||||
F.sum(F.when(wonGame, 1).otherwise(0)).as("wins"),
|
||||
F.sum(F.when(lostGame, 1).otherwise(0)).as("losses"),
|
||||
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
|
||||
F.sum(F.when(F.col("color") === "white", 1).otherwise(0)).as("games_as_white"),
|
||||
F.sum(F.when(F.col("color") === "black", 1).otherwise(0)).as("games_as_black"),
|
||||
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
|
||||
)
|
||||
.withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3))
|
||||
.orderBy(F.desc("total_games"))
|
||||
|
||||
stats.write
|
||||
.mode("overwrite")
|
||||
.parquet(s"$outputDir/player_stats")
|
||||
|
||||
stats.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", "analytics_player_stats")
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
@@ -0,0 +1,3 @@
|
||||
MAJOR=0
|
||||
MINOR=1
|
||||
PATCH=0
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package de.nowchess.bot.resource
|
||||
|
||||
case class JoinTournamentRequest(
|
||||
tournamentId: String,
|
||||
difficulty: String,
|
||||
serverUrl: Option[String],
|
||||
)
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package de.nowchess.bot.resource
|
||||
|
||||
case class JoinTournamentResponse(
|
||||
botId: String,
|
||||
difficulty: String,
|
||||
status: String,
|
||||
)
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
package de.nowchess.bot.resource
|
||||
|
||||
import de.nowchess.bot.service.TournamentBotGamePlayer
|
||||
import jakarta.annotation.security.RolesAllowed
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.*
|
||||
import jakarta.ws.rs.core.{MediaType, Response}
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@Path("/api/bots/official")
|
||||
@ApplicationScoped
|
||||
@RolesAllowed(Array("**"))
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
@Consumes(Array(MediaType.APPLICATION_JSON))
|
||||
class TournamentJoinResource:
|
||||
|
||||
private val log = Logger.getLogger(classOf[TournamentJoinResource])
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var player: TournamentBotGamePlayer = uninitialized
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
@POST
|
||||
@Path("/join-tournament")
|
||||
def joinTournament(req: JoinTournamentRequest): Response =
|
||||
val serverUrl = req.serverUrl.filter(_.nonEmpty).getOrElse(player.defaultServerUrl)
|
||||
val difficulty = if req.difficulty.nonEmpty then req.difficulty else "medium"
|
||||
log.infof(
|
||||
"Official bot join requested — tournament=%s difficulty=%s server=%s",
|
||||
req.tournamentId,
|
||||
difficulty,
|
||||
serverUrl,
|
||||
)
|
||||
player.joinTournament(req.tournamentId, difficulty, serverUrl) match
|
||||
case Right(botId) =>
|
||||
val resp = JoinTournamentResponse(botId, difficulty, "joining")
|
||||
Response.ok(resp).build()
|
||||
case Left(err) =>
|
||||
Response
|
||||
.status(Response.Status.BAD_GATEWAY)
|
||||
.entity(s"""{"error":"$err"}""")
|
||||
.build()
|
||||
+70
-46
@@ -38,6 +38,9 @@ class TournamentBotGamePlayer:
|
||||
@volatile private var running = true
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
val defaultServerUrl: String =
|
||||
System.getenv().asScala.getOrElse("TOURNAMENT_SERVER_URL", "http://localhost:8089")
|
||||
|
||||
@PostConstruct
|
||||
def initialize(): Unit =
|
||||
config match
|
||||
@@ -45,9 +48,42 @@ class TournamentBotGamePlayer:
|
||||
log.info("Tournament bot disabled — set TOURNAMENT_ID and TOURNAMENT_BOT_TOKEN to enable")
|
||||
case Some(cfg) =>
|
||||
log.infof("Tournament bot enabled — server=%s tournament=%s bot=%s", cfg.serverUrl, cfg.tournamentId, cfg.botId)
|
||||
val thread = new Thread(() => connect(cfg), s"TournamentBot-${cfg.tournamentId}")
|
||||
thread.setDaemon(true)
|
||||
thread.start()
|
||||
startAsync(cfg)
|
||||
|
||||
def joinTournament(tournamentId: String, difficulty: String, serverUrl: String): Either[String, String] =
|
||||
registerBot(serverUrl, difficulty) match
|
||||
case None => Left("Failed to register bot with tournament server")
|
||||
case Some((botId, token)) =>
|
||||
val cfg = TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty)
|
||||
if join(cfg) then
|
||||
startAsync(cfg)
|
||||
Right(botId)
|
||||
else Left("Failed to join tournament")
|
||||
|
||||
private def startAsync(cfg: TournamentBotConfig): Unit =
|
||||
val thread = new Thread(() => streamLoop(cfg), s"TournamentBot-${cfg.tournamentId}")
|
||||
thread.setDaemon(true)
|
||||
thread.start()
|
||||
|
||||
private def registerBot(serverUrl: String, difficulty: String): Option[(String, String)] =
|
||||
Try {
|
||||
val name = s"NowChess ${difficulty.capitalize}"
|
||||
val body = s"""{"name":"$name","isBot":true}"""
|
||||
val response = client
|
||||
.target(serverUrl)
|
||||
.path("api")
|
||||
.path("auth")
|
||||
.path("register")
|
||||
.request(MediaType.APPLICATION_JSON)
|
||||
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
|
||||
if response.getStatus == 201 then
|
||||
val node = objectMapper.readTree(response.readEntity(classOf[String]))
|
||||
val id = node.path("id").asText()
|
||||
val token = node.path("token").asText()
|
||||
response.close()
|
||||
if id.nonEmpty && token.nonEmpty then Some((id, token)) else None
|
||||
else { log.warnf("Bot registration returned status %d", response.getStatus); response.close(); None }
|
||||
}.getOrElse(None)
|
||||
|
||||
@PreDestroy
|
||||
def cleanup(): Unit =
|
||||
@@ -56,12 +92,11 @@ class TournamentBotGamePlayer:
|
||||
Try(client.close())
|
||||
log.info("Tournament bot stopped")
|
||||
|
||||
private def connect(cfg: TournamentBotConfig): Unit =
|
||||
if join(cfg) then
|
||||
while running do
|
||||
Try(streamEvents(cfg)) match
|
||||
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
|
||||
case Success(_) => sleep(2000)
|
||||
private def streamLoop(cfg: TournamentBotConfig): Unit =
|
||||
while running do
|
||||
Try(streamEvents(cfg)) match
|
||||
case Failure(ex) => log.warnf(ex, "Tournament event stream dropped — reconnecting"); sleep(5000)
|
||||
case Success(_) => sleep(2000)
|
||||
|
||||
private def join(cfg: TournamentBotConfig): Boolean =
|
||||
Try {
|
||||
@@ -86,41 +121,23 @@ class TournamentBotGamePlayer:
|
||||
log.infof("Listening to tournament %s event stream", cfg.tournamentId)
|
||||
forEachLine(response.readEntity(classOf[InputStream])): line =>
|
||||
parse(line).foreach: node =>
|
||||
if node.path("type").asText() == "gameStart" then onGameStart(cfg, node.path("gameId").asText())
|
||||
if node.path("type").asText() == "gameStart" then
|
||||
onGameStart(cfg, node.path("gameId").asText(), node.path("color").asText())
|
||||
|
||||
private def onGameStart(cfg: TournamentBotConfig, gameId: String): Unit =
|
||||
if gameId.nonEmpty && activeGames.add(gameId) then
|
||||
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId) })
|
||||
private def onGameStart(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
|
||||
if gameId.nonEmpty && color.nonEmpty && activeGames.add(gameId) then
|
||||
workers.submit(new Runnable { def run(): Unit = playGame(cfg, gameId, color) })
|
||||
()
|
||||
|
||||
private def playGame(cfg: TournamentBotConfig, gameId: String): Unit =
|
||||
private def playGame(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
|
||||
Try {
|
||||
colorFor(cfg, gameId) match
|
||||
case None =>
|
||||
log.debugf("Game %s is not ours — ignoring", gameId)
|
||||
activeGames.remove(gameId)
|
||||
case Some(color) =>
|
||||
log.infof("Playing game %s as %s", gameId, color)
|
||||
val stream = openGameStream(cfg, gameId)
|
||||
maybeMoveFromCurrentState(cfg, gameId, color)
|
||||
stream.foreach(consumeGameStream(cfg, gameId, color, _))
|
||||
activeGames.remove(gameId)
|
||||
log.infof("Playing game %s as %s", gameId, color)
|
||||
openGameStream(cfg, gameId).foreach(consumeGameStream(cfg, gameId, color, _))
|
||||
activeGames.remove(gameId)
|
||||
} match
|
||||
case Failure(ex) => log.errorf(ex, "Game %s crashed", gameId); activeGames.remove(gameId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def colorFor(cfg: TournamentBotConfig, gameId: String): Option[String] =
|
||||
fetchGame(cfg, gameId).flatMap: game =>
|
||||
val white = game.path("white").path("id").asText()
|
||||
val black = game.path("black").path("id").asText()
|
||||
if white == cfg.botId then Some("white")
|
||||
else if black == cfg.botId then Some("black")
|
||||
else None
|
||||
|
||||
private def maybeMoveFromCurrentState(cfg: TournamentBotConfig, gameId: String, color: String): Unit =
|
||||
fetchGame(cfg, gameId).foreach: game =>
|
||||
maybeMove(cfg, gameId, color, game.path("turn").asText(), game.path("status").asText(), game.path("fen").asText())
|
||||
|
||||
private def consumeGameStream(cfg: TournamentBotConfig, gameId: String, color: String, stream: InputStream): Unit =
|
||||
val reader = new BufferedReader(new InputStreamReader(stream))
|
||||
// scalafix:off DisableSyntax.var
|
||||
@@ -134,10 +151,25 @@ class TournamentBotGamePlayer:
|
||||
.foreach { line =>
|
||||
parse(line).foreach: node =>
|
||||
node.path("type").asText() match
|
||||
case "gameState" =>
|
||||
maybeMove(
|
||||
cfg,
|
||||
gameId,
|
||||
color,
|
||||
node.path("turn").asText(),
|
||||
node.path("status").asText(),
|
||||
node.path("fen").asText(),
|
||||
)
|
||||
case "move" =>
|
||||
maybeMove(cfg, gameId, color, node.path("turn").asText(), "ongoing", node.path("fen").asText())
|
||||
case "gameEnd" => log.infof("Game %s ended — status=%s", gameId, node.path("status").asText()); done = true
|
||||
case _ => ()
|
||||
case "gameEnd" =>
|
||||
log.infof(
|
||||
"Game %s ended — status=%s winner=%s",
|
||||
gameId,
|
||||
node.path("status").asText(),
|
||||
node.path("winner").asText(),
|
||||
); done = true
|
||||
case _ => ()
|
||||
}
|
||||
|
||||
private def maybeMove(
|
||||
@@ -169,14 +201,6 @@ class TournamentBotGamePlayer:
|
||||
case Failure(ex) => log.errorf(ex, "Error submitting move %s in game %s", uci, gameId)
|
||||
case Success(_) => ()
|
||||
|
||||
private def fetchGame(cfg: TournamentBotConfig, gameId: String): Option[JsonNode] =
|
||||
Try {
|
||||
val response = target(cfg).path("game").path(gameId).request(MediaType.APPLICATION_JSON).get()
|
||||
val node = if response.getStatus == 200 then Some(response.readEntity(classOf[JsonNode])) else None
|
||||
response.close()
|
||||
node
|
||||
}.getOrElse(None)
|
||||
|
||||
private def openGameStream(cfg: TournamentBotConfig, gameId: String): Option[InputStream] =
|
||||
Try {
|
||||
val response = authed(cfg, target(cfg).path("game").path(gameId).path("stream"))
|
||||
|
||||
+3
@@ -31,6 +31,9 @@ import io.quarkus.runtime.annotations.RegisterForReflection
|
||||
classOf[CoreCreateGameRequest],
|
||||
classOf[CoreGameResponse],
|
||||
classOf[GameWritebackEventDto],
|
||||
classOf[ExternalTournamentServer],
|
||||
classOf[RegisterServerRequest],
|
||||
classOf[ExternalTournamentServerList],
|
||||
),
|
||||
)
|
||||
class NativeReflectionConfig
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
package de.nowchess.tournament.dto
|
||||
|
||||
case class ExternalTournamentServer(id: String, label: String, url: String)
|
||||
case class RegisterServerRequest(label: String, url: String)
|
||||
case class ExternalTournamentServerList(servers: List[ExternalTournamentServer])
|
||||
+152
-21
@@ -1,17 +1,24 @@
|
||||
package de.nowchess.tournament.resource
|
||||
|
||||
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
|
||||
import de.nowchess.tournament.dto.*
|
||||
import de.nowchess.tournament.error.TournamentError
|
||||
import de.nowchess.tournament.service.{TournamentService, TournamentStreamManager}
|
||||
import de.nowchess.tournament.service.{
|
||||
ExternalTournamentClient,
|
||||
TournamentServerRegistry,
|
||||
TournamentService,
|
||||
TournamentStreamManager,
|
||||
}
|
||||
import io.smallrye.mutiny.Multi
|
||||
import jakarta.annotation.security.{PermitAll, RolesAllowed}
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.*
|
||||
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response}
|
||||
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response, StreamingOutput}
|
||||
import org.eclipse.microprofile.jwt.JsonWebToken
|
||||
import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
|
||||
@Path("/api/tournament")
|
||||
@ApplicationScoped
|
||||
@@ -22,21 +29,48 @@ class TournamentResource:
|
||||
private val log = Logger.getLogger(classOf[TournamentResource])
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var tournamentService: TournamentService = uninitialized
|
||||
@Inject var streamManager: TournamentStreamManager = uninitialized
|
||||
@Inject var jwt: JsonWebToken = uninitialized
|
||||
@Inject var tournamentService: TournamentService = uninitialized
|
||||
@Inject var streamManager: TournamentStreamManager = uninitialized
|
||||
@Inject var jwt: JsonWebToken = uninitialized
|
||||
@Inject var registry: TournamentServerRegistry = uninitialized
|
||||
@Inject var externalClient: ExternalTournamentClient = uninitialized
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
@Context var headers: HttpHeaders = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
@GET
|
||||
@PermitAll
|
||||
def list(): Response =
|
||||
val (created, started, finished) = tournamentService.list()
|
||||
val dto = TournamentListDto(
|
||||
created = created.map(t => tournamentService.toDto(t)),
|
||||
started = started.map(t => tournamentService.toDto(t)),
|
||||
finished = finished.map(t => tournamentService.toDto(t)),
|
||||
)
|
||||
Response.ok(dto).build()
|
||||
val internalCreated = created.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
|
||||
val internalStarted = started.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
|
||||
val internalFinished = finished.map(t => objectMapper.valueToTree[JsonNode](tournamentService.toDto(t)))
|
||||
|
||||
val (extCreated, extStarted, extFinished) = registry
|
||||
.serverUrls()
|
||||
.foldLeft(
|
||||
(List.empty[JsonNode], List.empty[JsonNode], List.empty[JsonNode]),
|
||||
) { case ((ac, as, af), url) =>
|
||||
externalClient.fetchList(url).fold((ac, as, af)) { node =>
|
||||
val c = node.path("created").elements().asScala.toList
|
||||
val s = node.path("started").elements().asScala.toList
|
||||
val f = node.path("finished").elements().asScala.toList
|
||||
(c ++ s ++ f).foreach(t => registry.bindTournament(t.path("id").asText(), url))
|
||||
(ac ++ c, as ++ s, af ++ f)
|
||||
}
|
||||
}
|
||||
|
||||
val merged = objectMapper.createObjectNode()
|
||||
val createdArr = objectMapper.createArrayNode()
|
||||
val startedArr = objectMapper.createArrayNode()
|
||||
val finishedArr = objectMapper.createArrayNode()
|
||||
(internalCreated ++ extCreated).foreach(createdArr.add)
|
||||
(internalStarted ++ extStarted).foreach(startedArr.add)
|
||||
(internalFinished ++ extFinished).foreach(finishedArr.add)
|
||||
merged.set("created", createdArr)
|
||||
merged.set("started", startedArr)
|
||||
merged.set("finished", finishedArr)
|
||||
Response.ok(merged).build()
|
||||
|
||||
@POST
|
||||
@RolesAllowed(Array("**"))
|
||||
@@ -58,10 +92,13 @@ class TournamentResource:
|
||||
@PermitAll
|
||||
def get(@PathParam("id") id: String): Response =
|
||||
tournamentService.get(id) match
|
||||
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
|
||||
case Some(t) =>
|
||||
val standings = tournamentService.getStandings(id)
|
||||
Response.ok(tournamentService.toDto(t, standings)).build()
|
||||
case None =>
|
||||
resolveServer(id)
|
||||
.flatMap(url => externalClient.fetch(url, id).map(node => Response.ok(node).build()))
|
||||
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
|
||||
|
||||
@DELETE
|
||||
@Path("/{id}")
|
||||
@@ -78,8 +115,18 @@ class TournamentResource:
|
||||
def start(@PathParam("id") id: String): Response =
|
||||
val userId = Option(jwt.getSubject).getOrElse("")
|
||||
tournamentService.start(id, userId) match
|
||||
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
|
||||
case Left(error) => errorResponse(error)
|
||||
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
|
||||
case Left(error) =>
|
||||
error match
|
||||
case TournamentError.NotFound(_) =>
|
||||
val auth = Option(headers.getHeaderString("Authorization"))
|
||||
resolveServer(id)
|
||||
.map { url =>
|
||||
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/start", auth)
|
||||
Response.status(status).entity(body).build()
|
||||
}
|
||||
.getOrElse(errorResponse(error))
|
||||
case _ => errorResponse(error)
|
||||
|
||||
@POST
|
||||
@Path("/{id}/join")
|
||||
@@ -92,8 +139,18 @@ class TournamentResource:
|
||||
val botId = Option(jwt.getSubject).getOrElse("")
|
||||
val botName = Option(jwt.getClaim[AnyRef]("name")).map(_.toString).getOrElse(botId)
|
||||
tournamentService.join(id, botId, botName) match
|
||||
case Right(_) => Response.ok(OkDto()).build()
|
||||
case Left(error) => errorResponse(error)
|
||||
case Right(_) => Response.ok(OkDto()).build()
|
||||
case Left(error) =>
|
||||
error match
|
||||
case TournamentError.NotFound(_) =>
|
||||
val auth = Option(headers.getHeaderString("Authorization"))
|
||||
resolveServer(id)
|
||||
.map { url =>
|
||||
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/join", auth)
|
||||
Response.status(status).entity(body).build()
|
||||
}
|
||||
.getOrElse(errorResponse(error))
|
||||
case _ => errorResponse(error)
|
||||
|
||||
@POST
|
||||
@Path("/{id}/withdraw")
|
||||
@@ -105,8 +162,18 @@ class TournamentResource:
|
||||
else
|
||||
val botId = Option(jwt.getSubject).getOrElse("")
|
||||
tournamentService.withdraw(id, botId) match
|
||||
case Right(_) => Response.ok(OkDto()).build()
|
||||
case Left(error) => errorResponse(error)
|
||||
case Right(_) => Response.ok(OkDto()).build()
|
||||
case Left(error) =>
|
||||
error match
|
||||
case TournamentError.NotFound(_) =>
|
||||
val auth = Option(headers.getHeaderString("Authorization"))
|
||||
resolveServer(id)
|
||||
.map { url =>
|
||||
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/withdraw", auth)
|
||||
Response.status(status).entity(body).build()
|
||||
}
|
||||
.getOrElse(errorResponse(error))
|
||||
case _ => errorResponse(error)
|
||||
|
||||
@GET
|
||||
@Path("/{id}/results")
|
||||
@@ -133,20 +200,23 @@ class TournamentResource:
|
||||
@PermitAll
|
||||
def roundPairings(@PathParam("id") id: String, @PathParam("round") round: Int): Response =
|
||||
tournamentService.get(id) match
|
||||
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
|
||||
case Some(_) =>
|
||||
val pairings = tournamentService.getPairings(id, round)
|
||||
Response.ok(RoundPairingsDto(round, pairings)).build()
|
||||
case None =>
|
||||
resolveServer(id)
|
||||
.flatMap(url => externalClient.fetchPairings(url, id, round).map(node => Response.ok(node).build()))
|
||||
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
|
||||
|
||||
@GET
|
||||
@Path("/{id}/export/games")
|
||||
@PermitAll
|
||||
@Produces(Array(MediaType.APPLICATION_JSON, MediaType.WILDCARD, "application/x-ndjson", "application/x-chess-pgn"))
|
||||
def exportGames(@PathParam("id") id: String, @Context headers: HttpHeaders): Response =
|
||||
def exportGames(@PathParam("id") id: String, @Context reqHeaders: HttpHeaders): Response =
|
||||
tournamentService.get(id) match
|
||||
case None => Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build()
|
||||
case Some(_) =>
|
||||
val acceptHeader = Option(headers.getHeaderString("Accept")).getOrElse("")
|
||||
val acceptHeader = Option(reqHeaders.getHeaderString("Accept")).getOrElse("")
|
||||
val pairings = tournamentService.getAllPairings(id)
|
||||
if acceptHeader.contains("application/x-ndjson") then
|
||||
val ndjson = pairings
|
||||
@@ -176,6 +246,67 @@ class TournamentResource:
|
||||
emitter.onTermination(() => streamManager.unregister(id, botId, emitter))
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/{id}/game/{gameId}")
|
||||
@PermitAll
|
||||
def getGame(@PathParam("id") id: String, @PathParam("gameId") gameId: String): Response =
|
||||
resolveServer(id)
|
||||
.flatMap(url => externalClient.fetch(url, s"$id/game/$gameId").map(node => Response.ok(node).build()))
|
||||
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
|
||||
|
||||
@GET
|
||||
@Path("/{id}/game/{gameId}/stream")
|
||||
@PermitAll
|
||||
@Produces(Array("application/x-ndjson"))
|
||||
def streamGame(@PathParam("id") id: String, @PathParam("gameId") gameId: String): Response =
|
||||
val auth = Option(headers.getHeaderString("Authorization"))
|
||||
resolveServer(id)
|
||||
.flatMap(url => externalClient.proxyGetStream(url, s"api/tournament/$id/game/$gameId/stream", auth))
|
||||
.map { stream =>
|
||||
Response
|
||||
.ok(new StreamingOutput {
|
||||
def write(output: java.io.OutputStream): Unit =
|
||||
val buf = new Array[Byte](4096)
|
||||
// scalafix:off DisableSyntax.var
|
||||
var n = stream.read(buf)
|
||||
while n >= 0 do
|
||||
output.write(buf, 0, n)
|
||||
output.flush()
|
||||
n = stream.read(buf)
|
||||
// scalafix:on
|
||||
})
|
||||
.`type`("application/x-ndjson")
|
||||
.build()
|
||||
}
|
||||
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
|
||||
|
||||
@POST
|
||||
@Path("/{id}/game/{gameId}/move/{uci}")
|
||||
@RolesAllowed(Array("**"))
|
||||
def makeMove(
|
||||
@PathParam("id") id: String,
|
||||
@PathParam("gameId") gameId: String,
|
||||
@PathParam("uci") uci: String,
|
||||
): Response =
|
||||
val auth = Option(headers.getHeaderString("Authorization"))
|
||||
resolveServer(id)
|
||||
.map { url =>
|
||||
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/game/$gameId/move/$uci", auth)
|
||||
Response.status(status).entity(body).build()
|
||||
}
|
||||
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
|
||||
|
||||
private def resolveServer(tournamentId: String): Option[String] =
|
||||
registry.findServerUrl(tournamentId).orElse {
|
||||
registry
|
||||
.serverUrls()
|
||||
.find(url => externalClient.fetch(url, tournamentId).isDefined)
|
||||
.map { url =>
|
||||
registry.bindTournament(tournamentId, url)
|
||||
url
|
||||
}
|
||||
}
|
||||
|
||||
private def errorResponse(error: TournamentError): Response =
|
||||
val status = error match
|
||||
case TournamentError.NotFound(_) => Response.Status.NOT_FOUND
|
||||
|
||||
+35
@@ -0,0 +1,35 @@
|
||||
package de.nowchess.tournament.resource
|
||||
|
||||
import de.nowchess.tournament.dto.{ErrorDto, RegisterServerRequest}
|
||||
import de.nowchess.tournament.service.TournamentServerRegistry
|
||||
import jakarta.annotation.security.RolesAllowed
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.*
|
||||
import jakarta.ws.rs.core.{MediaType, Response}
|
||||
import scala.compiletime.uninitialized
|
||||
|
||||
@Path("/api/tournament/servers")
|
||||
@ApplicationScoped
|
||||
@RolesAllowed(Array("**"))
|
||||
@Produces(Array(MediaType.APPLICATION_JSON))
|
||||
@Consumes(Array(MediaType.APPLICATION_JSON))
|
||||
class TournamentServerResource:
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var registry: TournamentServerRegistry = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
@GET
|
||||
def list(): Response =
|
||||
Response.ok(registry.list()).build()
|
||||
|
||||
@POST
|
||||
def register(req: RegisterServerRequest): Response =
|
||||
Response.status(201).entity(registry.register(req.label, req.url)).build()
|
||||
|
||||
@DELETE
|
||||
@Path("/{id}")
|
||||
def remove(@PathParam("id") id: String): Response =
|
||||
if registry.remove(id) then Response.noContent().build()
|
||||
else Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Server $id not found")).build()
|
||||
+80
@@ -0,0 +1,80 @@
|
||||
package de.nowchess.tournament.service
|
||||
|
||||
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.ws.rs.client.{Client, ClientBuilder, Entity}
|
||||
import jakarta.ws.rs.core.MediaType
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.util.Try
|
||||
|
||||
@ApplicationScoped
|
||||
class ExternalTournamentClient:
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject var objectMapper: ObjectMapper = uninitialized
|
||||
// scalafix:on
|
||||
|
||||
private def buildClient(): Client = ClientBuilder.newClient()
|
||||
|
||||
def fetchList(serverUrl: String): Option[JsonNode] =
|
||||
Try {
|
||||
val client = buildClient()
|
||||
val response = client.target(s"$serverUrl/api/tournament").request(MediaType.APPLICATION_JSON).get()
|
||||
try
|
||||
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
|
||||
else None
|
||||
finally
|
||||
response.close()
|
||||
client.close()
|
||||
}.getOrElse(None)
|
||||
|
||||
def fetch(serverUrl: String, id: String): Option[JsonNode] =
|
||||
Try {
|
||||
val client = buildClient()
|
||||
val response = client.target(s"$serverUrl/api/tournament/$id").request(MediaType.APPLICATION_JSON).get()
|
||||
try
|
||||
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
|
||||
else None
|
||||
finally
|
||||
response.close()
|
||||
client.close()
|
||||
}.getOrElse(None)
|
||||
|
||||
def fetchPairings(serverUrl: String, id: String, round: Int): Option[JsonNode] =
|
||||
Try {
|
||||
val client = buildClient()
|
||||
val response =
|
||||
client.target(s"$serverUrl/api/tournament/$id/round/$round").request(MediaType.APPLICATION_JSON).get()
|
||||
try
|
||||
if response.getStatus == 200 then Some(objectMapper.readTree(response.readEntity(classOf[String])))
|
||||
else None
|
||||
finally
|
||||
response.close()
|
||||
client.close()
|
||||
}.getOrElse(None)
|
||||
|
||||
def proxyPost(serverUrl: String, path: String, authHeader: Option[String]): (Int, String) =
|
||||
Try {
|
||||
val client = buildClient()
|
||||
val builder = client.target(s"$serverUrl/$path").request(MediaType.APPLICATION_JSON)
|
||||
val withAuth = authHeader.fold(builder)(h => builder.header("Authorization", h))
|
||||
val response = withAuth.post(Entity.json(""))
|
||||
try (response.getStatus, response.readEntity(classOf[String]))
|
||||
finally
|
||||
response.close()
|
||||
client.close()
|
||||
}.getOrElse((502, """{"error":"External server unreachable"}"""))
|
||||
|
||||
def proxyGetStream(serverUrl: String, path: String, authHeader: Option[String]): Option[java.io.InputStream] =
|
||||
Try {
|
||||
val client = buildClient()
|
||||
val builder = client.target(s"$serverUrl/$path").request("application/x-ndjson")
|
||||
val withAuth = authHeader.fold(builder)(h => builder.header("Authorization", h))
|
||||
val response = withAuth.get()
|
||||
if response.getStatus == 200 then Some(response.readEntity(classOf[java.io.InputStream]))
|
||||
else
|
||||
response.close()
|
||||
client.close()
|
||||
None
|
||||
}.getOrElse(None)
|
||||
+34
@@ -0,0 +1,34 @@
|
||||
package de.nowchess.tournament.service
|
||||
|
||||
import de.nowchess.tournament.dto.ExternalTournamentServer
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.jdk.CollectionConverters.*
|
||||
|
||||
@ApplicationScoped
|
||||
class TournamentServerRegistry:
|
||||
|
||||
private val servers = new ConcurrentHashMap[String, ExternalTournamentServer]()
|
||||
private val tournaments = new ConcurrentHashMap[String, String]()
|
||||
|
||||
def register(label: String, url: String): ExternalTournamentServer =
|
||||
val id = UUID.randomUUID().toString
|
||||
val server = ExternalTournamentServer(id, label, url.stripSuffix("/"))
|
||||
servers.put(id, server)
|
||||
server
|
||||
|
||||
def list(): List[ExternalTournamentServer] =
|
||||
servers.values().asScala.toList
|
||||
|
||||
def remove(id: String): Boolean =
|
||||
Option(servers.remove(id)).isDefined
|
||||
|
||||
def serverUrls(): List[String] =
|
||||
servers.values().asScala.map(_.url).toList
|
||||
|
||||
def bindTournament(tournamentId: String, serverUrl: String): Unit =
|
||||
tournaments.put(tournamentId, serverUrl)
|
||||
|
||||
def findServerUrl(tournamentId: String): Option[String] =
|
||||
Option(tournaments.get(tournamentId))
|
||||
@@ -27,4 +27,5 @@ include(
|
||||
"modules:store",
|
||||
"modules:coordinator",
|
||||
"modules:tournament",
|
||||
"modules:analytics",
|
||||
)
|
||||
Reference in New Issue
Block a user