feat(analytics): add Spark batch analytics module

New standalone modules:analytics submodule with two Spark jobs:

- OpeningBookJob: reads game_records.pgn, extracts first N plies using
  pure Catalyst SQL expressions (no UDFs), aggregates win/draw/loss rates
  per opening sequence, writes Parquet + CSV top-1000 summary.

- PlayerStatsJob: unions each game into a player-centric view, aggregates
  total_games/wins/losses/draws/avg_move_count/win_rate per player_id,
  writes Parquet.

Module uses Scala 3 calling spark-sql_2.13 via JVM binary compatibility
(DataFrame API only; no spark.implicits._ / typed Datasets). Spark is
compileOnly; the fat jar bundles only scala3-library + postgresql driver.
Submit via spark-submit; see build.gradle.kts header for invocation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Janis Eccarius
2026-06-15 21:58:05 +02:00
parent 0a5a216032
commit 259b3bbb24
5 changed files with 271 additions and 0 deletions
@@ -0,0 +1,97 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Reads completed games from the game_records table and produces an opening-book statistics table: for each unique
* opening (first N plies), it reports total games played and win/draw/loss rates from each side.
*
* Output is written as Parquet to `outputDir/opening_book` and a human-readable CSV summary (top-1000 openings by
* popularity) to `outputDir/opening_book_top1000`.
*
* PGN parsing is done entirely with Spark SQL string functions — no UDFs — so the Catalyst optimizer can push
* predicates and the job scales to any cluster size.
*/
object OpeningBookJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-opening-book"
val maxPlies = if args.length > 1 then args(1).toInt else 10
val spark = SparkSession
.builder()
.appName("NowChess Opening Book Generator")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir, maxPlies)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
maxPlies: Int,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("pgn", "result")
.filter(F.col("result").isNotNull.and(F.col("pgn").isNotNull))
val openingCol = extractOpening(F.col("pgn"), maxPlies)
val withOpening = games
.withColumn("opening", openingCol)
.filter(F.col("opening").isNotNull.and(F.length(F.col("opening")) > 0))
val stats = withOpening
.groupBy("opening")
.agg(
F.count("*").as("total"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total").cast("double"), 3))
.withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total").cast("double"), 3))
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total").cast("double"), 3))
.orderBy(F.desc("total"))
stats.write
.mode("overwrite")
.parquet(s"$outputDir/opening_book")
stats
.limit(1000)
.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/opening_book_top1000")
/** Extracts the first `maxPlies` moves from a PGN column as a space-separated string.
*
* PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 *
*
* Steps:
* 1. Split on double-newline; take the moves section (index 1). 2. Strip the terminal result token (*, 1-0, 0-1,
* 1/2-1/2). 3. Strip move numbers (e.g., "1. ", "12. "). 4. Strip check/checkmate suffixes (+ #) for
* position-independent lookup. 5. Tokenize on whitespace, take first maxPlies tokens, rejoin with spaces.
*/
private def extractOpening(pgnCol: org.apache.spark.sql.Column, maxPlies: Int): org.apache.spark.sql.Column =
val moveSection = F.coalesce(F.split(pgnCol, "\n\n").getItem(1), pgnCol)
val noResult = F.regexp_replace(moveSection, "(1-0|0-1|1/2-1/2|\\*)\\s*$", "")
val noMoveNumbers = F.regexp_replace(noResult, "\\d+\\.+\\s*", " ")
val noAnnotations = F.regexp_replace(noMoveNumbers, "[+#]", "")
val moveArray = F.split(F.trim(noAnnotations), "\\s+")
F.array_join(F.slice(moveArray, 1, maxPlies), " ")
@@ -0,0 +1,85 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
/** Aggregates per-player statistics from completed games.
*
* Each game contributes one row per player (as white and as black), so the dataset is first unioned into a
* player-centric view before grouping. Output columns: player_id, total_games, wins, losses, draws, games_as_white,
* games_as_black, avg_move_count, win_rate
*
* Output is written as Parquet to `outputDir/player_stats`.
*/
object PlayerStatsJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-stats"
val spark = SparkSession
.builder()
.appName("NowChess Player Stats")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(
spark: SparkSession,
jdbcUrl: String,
dbUser: String,
dbPass: String,
outputDir: String,
): Unit =
val games = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "game_records")
.option("user", dbUser)
.option("password", dbPass)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", "10000")
.load()
.select("white_id", "black_id", "result", "move_count")
.filter(F.col("result").isNotNull)
// Flatten each game into two rows: one per player, tagged with their side.
val asWhite = games.select(
F.col("white_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit("white").as("color"),
)
val asBlack = games.select(
F.col("black_id").as("player_id"),
F.col("result"),
F.col("move_count"),
F.lit("black").as("color"),
)
val playerGames = asWhite.union(asBlack)
val wonGame = F.col("color") === F.col("result")
val lostGame = (F.col("color") === "white" && F.col("result") === "black")
.or(F.col("color") === "black" && F.col("result") === "white")
val stats = playerGames
.groupBy("player_id")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(wonGame, 1).otherwise(0)).as("wins"),
F.sum(F.when(lostGame, 1).otherwise(0)).as("losses"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
F.sum(F.when(F.col("color") === "white", 1).otherwise(0)).as("games_as_white"),
F.sum(F.when(F.col("color") === "black", 1).otherwise(0)).as("games_as_black"),
F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"),
)
.withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.desc("total_games"))
stats.write
.mode("overwrite")
.parquet(s"$outputDir/player_stats")