From 259b3bbb24c0f23326269b93f4b3c84012f727cd Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Mon, 15 Jun 2026 21:58:05 +0200 Subject: [PATCH] feat(analytics): add Spark batch analytics module New standalone modules:analytics submodule with two Spark jobs: - OpeningBookJob: reads game_records.pgn, extracts first N plies using pure Catalyst SQL expressions (no UDFs), aggregates win/draw/loss rates per opening sequence, writes Parquet + CSV top-1000 summary. - PlayerStatsJob: unions each game into a player-centric view, aggregates total_games/wins/losses/draws/avg_move_count/win_rate per player_id, writes Parquet. Module uses Scala 3 calling spark-sql_2.13 via JVM binary compatibility (DataFrame API only; no spark.implicits._ / typed Datasets). Spark is compileOnly; the fat jar bundles only scala3-library + postgresql driver. Submit via spark-submit; see build.gradle.kts header for invocation. Co-Authored-By: Claude Sonnet 4.6 --- build.gradle.kts | 2 + modules/analytics/build.gradle.kts | 86 ++++++++++++++++ .../nowchess/analytics/OpeningBookJob.scala | 97 +++++++++++++++++++ .../nowchess/analytics/PlayerStatsJob.scala | 85 ++++++++++++++++ settings.gradle.kts | 1 + 5 files changed, 271 insertions(+) create mode 100644 modules/analytics/build.gradle.kts create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala diff --git a/build.gradle.kts b/build.gradle.kts index 0a7e09f..7a522f4 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -53,6 +53,8 @@ val coverageExclusions = listOf( "**/core/src/main/scala/de/nowchess/chess/resource/GameWebSocketResource.scala", // Coordinator infrastructure — gRPC, microservice orchestration "**/coordinator/src/main/scala/**", + // Analytics module — standalone Spark batch jobs; coverage not applicable (no Quarkus, no scoverage plugin) + "modules/analytics/**", ) // Converts a Sonar-style glob to a scoverage regex (matched against full source path). diff --git a/modules/analytics/build.gradle.kts b/modules/analytics/build.gradle.kts new file mode 100644 index 0000000..89e037c --- /dev/null +++ b/modules/analytics/build.gradle.kts @@ -0,0 +1,86 @@ +// Standalone Spark batch-analytics module. +// +// Spark 3.5.x ships only Scala 2.12/2.13 artifacts; Scala 3 code can consume +// them via JVM binary compatibility so long as we avoid macro-expanded APIs +// (spark.implicits._, typed Dataset[T]). We use the untyped DataFrame API +// exclusively, which is safe to call from Scala 3. +// +// Spark is declared compileOnly — the cluster provides it at runtime via +// spark-submit. Only the PostgreSQL driver and the Scala 3 runtime are +// bundled into the fat jar produced by the "jar" task. +// +// Build the submission jar: +// ./gradlew :modules:analytics:jar +// +// Run a job: +// spark-submit \ +// --class de.nowchess.analytics.OpeningBookJob \ +// modules/analytics/build/libs/analytics-.jar \ +// [outputDir] [maxPlies] +// +// Environment variables consumed: +// NOWCHESS_JDBC_URL (default: jdbc:postgresql://localhost:5432/nowchess) +// NOWCHESS_DB_USER (default: nowchess) +// NOWCHESS_DB_PASS (default: nowchess) + +plugins { + id("scala") + application +} + +group = "de.nowchess" +version = "1.0-SNAPSHOT" + +@Suppress("UNCHECKED_CAST") +val versions = rootProject.extra["VERSIONS"] as Map + +repositories { + mavenCentral() +} + +scala { + scalaVersion = versions["SCALA3"]!! +} + +val sparkVersion = "3.5.4" + +dependencies { + compileOnly("org.scala-lang:scala3-compiler_3") { + version { strictly(versions["SCALA3"]!!) } + } + implementation("org.scala-lang:scala3-library_3") { + version { strictly(versions["SCALA3"]!!) } + } + implementation("org.scala-lang:scala-library") { + version { strictly(versions["SCALA_LIBRARY"]!!) } + } + + // Spark is provided by the cluster — compile-only, not bundled. + compileOnly("org.apache.spark:spark-sql_2.13:$sparkVersion") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } + compileOnly("org.apache.spark:spark-core_2.13:$sparkVersion") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } + + // PostgreSQL JDBC driver bundled so it is available on executor classpath. + implementation("org.postgresql:postgresql:42.7.4") +} + +application { + mainClass.set("de.nowchess.analytics.OpeningBookJob") +} + +// Fat jar: includes runtimeClasspath (our code + pg driver + scala3-library) +// but NOT compileOnly Spark jars. +tasks.jar { + manifest { + attributes["Main-Class"] = "de.nowchess.analytics.OpeningBookJob" + } + from(configurations.runtimeClasspath.get().map { if (it.isDirectory) it else zipTree(it) }) + duplicatesStrategy = DuplicatesStrategy.EXCLUDE +} + +tasks.withType { + scalaCompileOptions.additionalParameters = listOf("-encoding", "UTF-8") +} diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala new file mode 100644 index 0000000..87b228e --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala @@ -0,0 +1,97 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +/** Reads completed games from the game_records table and produces an opening-book statistics table: for each unique + * opening (first N plies), it reports total games played and win/draw/loss rates from each side. + * + * Output is written as Parquet to `outputDir/opening_book` and a human-readable CSV summary (top-1000 openings by + * popularity) to `outputDir/opening_book_top1000`. + * + * PGN parsing is done entirely with Spark SQL string functions — no UDFs — so the Catalyst optimizer can push + * predicates and the job scales to any cluster size. + */ +object OpeningBookJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-opening-book" + val maxPlies = if args.length > 1 then args(1).toInt else 10 + + val spark = SparkSession + .builder() + .appName("NowChess Opening Book Generator") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir, maxPlies) + spark.stop() + + def run( + spark: SparkSession, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + maxPlies: Int, + ): Unit = + val games = spark.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "game_records") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .option("fetchsize", "10000") + .load() + .select("pgn", "result") + .filter(F.col("result").isNotNull.and(F.col("pgn").isNotNull)) + + val openingCol = extractOpening(F.col("pgn"), maxPlies) + + val withOpening = games + .withColumn("opening", openingCol) + .filter(F.col("opening").isNotNull.and(F.length(F.col("opening")) > 0)) + + val stats = withOpening + .groupBy("opening") + .agg( + F.count("*").as("total"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total").cast("double"), 3)) + .withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total").cast("double"), 3)) + .withColumn("draw_rate", F.round(F.col("draws") / F.col("total").cast("double"), 3)) + .orderBy(F.desc("total")) + + stats.write + .mode("overwrite") + .parquet(s"$outputDir/opening_book") + + stats + .limit(1000) + .write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/opening_book_top1000") + + /** Extracts the first `maxPlies` moves from a PGN column as a space-separated string. + * + * PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 * + * + * Steps: + * 1. Split on double-newline; take the moves section (index 1). 2. Strip the terminal result token (*, 1-0, 0-1, + * 1/2-1/2). 3. Strip move numbers (e.g., "1. ", "12. "). 4. Strip check/checkmate suffixes (+ #) for + * position-independent lookup. 5. Tokenize on whitespace, take first maxPlies tokens, rejoin with spaces. + */ + private def extractOpening(pgnCol: org.apache.spark.sql.Column, maxPlies: Int): org.apache.spark.sql.Column = + val moveSection = F.coalesce(F.split(pgnCol, "\n\n").getItem(1), pgnCol) + val noResult = F.regexp_replace(moveSection, "(1-0|0-1|1/2-1/2|\\*)\\s*$", "") + val noMoveNumbers = F.regexp_replace(noResult, "\\d+\\.+\\s*", " ") + val noAnnotations = F.regexp_replace(noMoveNumbers, "[+#]", "") + val moveArray = F.split(F.trim(noAnnotations), "\\s+") + F.array_join(F.slice(moveArray, 1, maxPlies), " ") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala new file mode 100644 index 0000000..e1a2e52 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala @@ -0,0 +1,85 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +/** Aggregates per-player statistics from completed games. + * + * Each game contributes one row per player (as white and as black), so the dataset is first unioned into a + * player-centric view before grouping. Output columns: player_id, total_games, wins, losses, draws, games_as_white, + * games_as_black, avg_move_count, win_rate + * + * Output is written as Parquet to `outputDir/player_stats`. + */ +object PlayerStatsJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-stats" + + val spark = SparkSession + .builder() + .appName("NowChess Player Stats") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run( + spark: SparkSession, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + ): Unit = + val games = spark.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "game_records") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .option("fetchsize", "10000") + .load() + .select("white_id", "black_id", "result", "move_count") + .filter(F.col("result").isNotNull) + + // Flatten each game into two rows: one per player, tagged with their side. + val asWhite = games.select( + F.col("white_id").as("player_id"), + F.col("result"), + F.col("move_count"), + F.lit("white").as("color"), + ) + val asBlack = games.select( + F.col("black_id").as("player_id"), + F.col("result"), + F.col("move_count"), + F.lit("black").as("color"), + ) + + val playerGames = asWhite.union(asBlack) + + val wonGame = F.col("color") === F.col("result") + val lostGame = (F.col("color") === "white" && F.col("result") === "black") + .or(F.col("color") === "black" && F.col("result") === "white") + + val stats = playerGames + .groupBy("player_id") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(wonGame, 1).otherwise(0)).as("wins"), + F.sum(F.when(lostGame, 1).otherwise(0)).as("losses"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + F.sum(F.when(F.col("color") === "white", 1).otherwise(0)).as("games_as_white"), + F.sum(F.when(F.col("color") === "black", 1).otherwise(0)).as("games_as_black"), + F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"), + ) + .withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3)) + .orderBy(F.desc("total_games")) + + stats.write + .mode("overwrite") + .parquet(s"$outputDir/player_stats") diff --git a/settings.gradle.kts b/settings.gradle.kts index 7a018a5..051943a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -27,4 +27,5 @@ include( "modules:store", "modules:coordinator", "modules:tournament", + "modules:analytics", ) \ No newline at end of file