feat(analytics): add 7 new Spark analytics jobs and extend GameSource
Build & Test (NowChessSystems) TeamCity build finished

Adds GameLengthJob, ColorAdvantageJob, EloDistributionJob, TimeControlJob,
DailyActivityJob, RatingMismatchJob, and TerminationStatsJob bringing total
batch pipelines to 11 (+ 1 streaming).

Extends GameSource with loadExtended() / fromLichessPgnExtended() extracting
WhiteElo, BlackElo, TimeControl, UTCDate, UTCTime, Termination, ECO from PGN
headers; JDBC path returns nulls for extended columns, keeping all existing
jobs unaffected.

PlayerStatsJob gains a CSV output alongside the existing Parquet write so
the analytics webview can display player statistics without pyarrow.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Janis Eccarius
2026-06-21 15:00:58 +02:00
parent a91ba5da9a
commit 8e17c14dff
9 changed files with 508 additions and 0 deletions
@@ -0,0 +1,62 @@
package de.nowchess.analytics
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import scala.jdk.CollectionConverters.*
object ColorAdvantageJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-color-advantage"
val spark = SparkSession
.builder()
.appName("NowChess Color Advantage")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.load(spark, jdbcUrl, dbUser, dbPass)
.select("result")
.filter(F.col("result").isNotNull)
val totalGames = games.count()
val whiteWins = games.filter(F.col("result") === "white").count()
val blackWins = games.filter(F.col("result") === "black").count()
val draws = games.filter(F.col("result") === "draw").count()
val schema = StructType(
Seq(
StructField("color", DataTypes.StringType, false),
StructField("total_games", DataTypes.LongType, false),
StructField("wins", DataTypes.LongType, false),
StructField("losses", DataTypes.LongType, false),
StructField("draws", DataTypes.LongType, false),
),
)
val rows = List(
Row("white", totalGames, whiteWins, blackWins, draws),
Row("black", totalGames, blackWins, whiteWins, draws),
)
val stats = spark
.createDataFrame(rows.asJava, schema)
.withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.asc("color"))
stats.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/color_advantage")
@@ -0,0 +1,79 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
object DailyActivityJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-daily-activity"
val spark = SparkSession
.builder()
.appName("NowChess Daily Activity")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
.select("result", "utc_date", "utc_time")
.filter(F.col("utc_time").isNotNull.and(F.col("utc_date").isNotNull))
val hourOfDay = F.regexp_extract(F.col("utc_time"), "^(\\d{2})", 1).cast("int")
val dow = F.dayofweek(F.to_date(F.col("utc_date"), "yyyy.MM.dd"))
val tagged = games
.withColumn("hour_of_day", hourOfDay)
.withColumn("dow", dow)
val hourly = tagged
.groupBy("hour_of_day")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.asc("hour_of_day"))
.select("hour_of_day", "total_games", "white_wins", "black_wins", "draws", "white_win_rate")
hourly.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/hourly_activity")
val dayName = F
.when(F.col("dow") === 1, "Sunday")
.when(F.col("dow") === 2, "Monday")
.when(F.col("dow") === 3, "Tuesday")
.when(F.col("dow") === 4, "Wednesday")
.when(F.col("dow") === 5, "Thursday")
.when(F.col("dow") === 6, "Friday")
.otherwise("Saturday")
val weekly = tagged
.withColumn("day_of_week", dayName)
.withColumn("day_order", F.col("dow"))
.groupBy("day_of_week", "day_order")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.asc("day_order"))
.drop("day_order")
.select("day_of_week", "total_games", "white_wins", "black_wins", "draws", "white_win_rate")
weekly.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/weekly_activity")
@@ -0,0 +1,48 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
object EloDistributionJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-elo-distribution"
val spark = SparkSession
.builder()
.appName("NowChess Elo Distribution")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
.filter(F.col("white_elo").isNotNull)
val whiteElo = games.select(F.col("white_elo").as("elo"))
val blackElo = games.select(F.col("black_elo").as("elo"))
val allElo = whiteElo.union(blackElo).filter(F.col("elo").isNotNull)
val bucketMin = (F.floor(F.col("elo") / 200) * 200).cast("int")
val bucketLabel = F.when(
F.col("elo") >= 2800,
F.lit("2800+"),
).otherwise(F.concat(bucketMin.cast("string"), F.lit("-"), (bucketMin + 199).cast("string")))
val distribution = allElo
.withColumn("elo_bucket", bucketLabel)
.withColumn("bucket_order", F.when(F.col("elo") >= 2800, 2800).otherwise(bucketMin))
.groupBy("elo_bucket", "bucket_order")
.agg(F.count("*").as("player_count"))
.orderBy(F.asc("bucket_order"))
.select("elo_bucket", "player_count")
distribution.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/elo_distribution")
@@ -0,0 +1,91 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
object GameLengthJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-game-length"
val spark = SparkSession
.builder()
.appName("NowChess Game Length")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.load(spark, jdbcUrl, dbUser, dbPass)
.select("result", "move_count")
.filter(F.col("result").isNotNull.and(F.col("move_count").isNotNull))
val moves = F.col("move_count")
val bucket = F
.when(moves <= 10, "1-10")
.when(moves <= 20, "11-20")
.when(moves <= 30, "21-30")
.when(moves <= 40, "31-40")
.when(moves <= 60, "41-60")
.when(moves <= 100, "61-100")
.otherwise("101+")
val bucketOrder = F
.when(moves <= 10, 1)
.when(moves <= 20, 2)
.when(moves <= 30, 3)
.when(moves <= 40, 4)
.when(moves <= 60, 5)
.when(moves <= 100, 6)
.otherwise(7)
val tagged = games
.withColumn("move_bucket", bucket)
.withColumn("bucket_order", bucketOrder)
val distribution = tagged
.groupBy("move_bucket", "bucket_order")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3))
.withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total_games").cast("double"), 3))
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total_games").cast("double"), 3))
.orderBy(F.asc("bucket_order"))
.drop("bucket_order")
.select(
"move_bucket",
"total_games",
"white_wins",
"black_wins",
"draws",
"white_win_rate",
"black_win_rate",
"draw_rate",
)
distribution.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/game_length_distribution")
val byResult = games
.groupBy("result")
.agg(
F.round(F.avg("move_count"), 1).as("avg_move_count"),
F.min("move_count").as("min_moves"),
F.max("move_count").as("max_moves"),
)
.orderBy(F.asc("result"))
byResult.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/game_length_by_result")
@@ -33,6 +33,19 @@ object GameSource:
case Some(path) => fromLichessPgn(spark, path)
case None => fromJdbc(spark, jdbcUrl, dbUser, dbPass)
def loadExtended(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String): DataFrame =
sys.env.get(PgnPathEnv) match
case Some(path) => fromLichessPgnExtended(spark, path)
case None =>
fromJdbc(spark, jdbcUrl, dbUser, dbPass)
.withColumn("white_elo", F.lit(null).cast("int"))
.withColumn("black_elo", F.lit(null).cast("int"))
.withColumn("time_control", F.lit(null).cast("string"))
.withColumn("utc_date", F.lit(null).cast("string"))
.withColumn("utc_time", F.lit(null).cast("string"))
.withColumn("termination", F.lit(null).cast("string"))
.withColumn("eco", F.lit(null).cast("string"))
def fromJdbc(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String): DataFrame =
spark.read
.format("jdbc")
@@ -89,6 +102,49 @@ object GameSource:
)
.filter((F.col("white_id") =!= "").and(F.col("black_id") =!= ""))
private def fromLichessPgnExtended(spark: SparkSession, path: String): DataFrame =
val resolved = resolvePath(spark, path)
val record = F.col("value")
val resultTag = F.regexp_extract(record, "Result \"([^\"]*)\"", 1)
val result = F
.when(resultTag === "1-0", "white")
.when(resultTag === "0-1", "black")
.when(resultTag === "1/2-1/2", "draw")
.otherwise(F.lit(null).cast("string"))
val moveText = F.coalesce(F.split(record, "\n\n").getItem(1), F.lit(""))
val noComment = F.regexp_replace(moveText, "\\{[^}]*\\}", "")
val noResult = F.regexp_replace(noComment, "(1-0|0-1|1/2-1/2|\\*)", "")
val noNumbers = F.regexp_replace(noResult, "\\d+\\.+", " ")
val plies = F.size(F.filter(F.split(F.trim(noNumbers), "\\s+"), tok => F.length(tok) > 0))
def nullable(extracted: org.apache.spark.sql.Column): org.apache.spark.sql.Column =
F.when(F.length(extracted) > 0, extracted).otherwise(F.lit(null).cast("string"))
val whiteElo = nullable(F.regexp_extract(record, "WhiteElo \"([^\"]*)\"", 1)).cast("int")
val blackElo = nullable(F.regexp_extract(record, "BlackElo \"([^\"]*)\"", 1)).cast("int")
spark.read
.option("lineSep", "[Event ")
.text(resolved)
.filter(F.length(F.trim(record)) > 0)
.select(
F.regexp_extract(record, "White \"([^\"]*)\"", 1).as("white_id"),
F.regexp_extract(record, "Black \"([^\"]*)\"", 1).as("black_id"),
result.as("result"),
plies.as("move_count"),
F.concat(F.lit("[Event "), record).as("pgn"),
whiteElo.as("white_elo"),
blackElo.as("black_elo"),
nullable(F.regexp_extract(record, "TimeControl \"([^\"]*)\"", 1)).as("time_control"),
nullable(F.regexp_extract(record, "UTCDate \"([^\"]*)\"", 1)).as("utc_date"),
nullable(F.regexp_extract(record, "UTCTime \"([^\"]*)\"", 1)).as("utc_time"),
nullable(F.regexp_extract(record, "Termination \"([^\"]*)\"", 1)).as("termination"),
nullable(F.regexp_extract(record, "ECO \"([^\"]*)\"", 1)).as("eco"),
)
.filter((F.col("white_id") =!= "").and(F.col("black_id") =!= ""))
/** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, which
* distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` is redistributed.
* Non-URL paths are returned unchanged.
@@ -77,6 +77,11 @@ object PlayerStatsJob:
.mode("overwrite")
.parquet(s"$outputDir/player_stats")
stats.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/player_stats_csv")
if !GameSource.isPgnMode then
stats.write
.mode("overwrite")
@@ -0,0 +1,65 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
object RatingMismatchJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-rating-mismatch"
val spark = SparkSession
.builder()
.appName("NowChess Rating Mismatch")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
.select("result", "white_elo", "black_elo")
.filter(F.col("white_elo").isNotNull.and(F.col("black_elo").isNotNull))
val eloDiff = F.col("white_elo") - F.col("black_elo")
val bracket = F
.when(eloDiff < -200, "Black +200")
.when(eloDiff < -100, "Black +100200")
.when(eloDiff < -50, "Black +50100")
.when(eloDiff <= 50, "Even (±50)")
.when(eloDiff <= 100, "White +50100")
.when(eloDiff <= 200, "White +100200")
.otherwise("White +200")
val bracketOrder = F
.when(eloDiff < -200, 1)
.when(eloDiff < -100, 2)
.when(eloDiff < -50, 3)
.when(eloDiff <= 50, 4)
.when(eloDiff <= 100, 5)
.when(eloDiff <= 200, 6)
.otherwise(7)
val stats = games
.withColumn("elo_diff", eloDiff)
.withColumn("bracket", bracket)
.withColumn("bracket_order", bracketOrder)
.groupBy("bracket", "bracket_order")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3))
.orderBy(F.asc("bracket_order"))
.drop("bracket_order")
.select("bracket", "total_games", "white_wins", "black_wins", "draws", "white_win_rate")
stats.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/rating_mismatch")
@@ -0,0 +1,44 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
object TerminationStatsJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-termination-stats"
val spark = SparkSession
.builder()
.appName("NowChess Termination Stats")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
.select("result", "termination")
.filter(F.col("termination").isNotNull.and(F.col("termination") =!= ""))
val stats = games
.groupBy("termination")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total_games").cast("double"), 3))
.withColumnRenamed("termination", "termination_type")
.orderBy(F.desc("total_games"))
.select("termination_type", "total_games", "white_wins", "black_wins", "draws", "draw_rate")
stats.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/termination_stats")
@@ -0,0 +1,58 @@
package de.nowchess.analytics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions as F
object TimeControlJob:
def main(args: Array[String]): Unit =
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-time-control"
val spark = SparkSession
.builder()
.appName("NowChess Time Control")
.getOrCreate()
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
spark.stop()
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
val games = GameSource
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
.select("result", "time_control")
.filter(
F.col("time_control").isNotNull
.and(F.col("time_control") =!= "")
.and(F.col("time_control") =!= "-"),
)
val baseSeconds = F.regexp_extract(F.col("time_control"), "^(?:\\d+/)?(\\d+)", 1).cast("int")
val category = F
.when(baseSeconds < 30, "UltraBullet")
.when(baseSeconds < 180, "Bullet")
.when(baseSeconds < 480, "Blitz")
.when(baseSeconds < 1500, "Rapid")
.when(baseSeconds < 86400, "Classical")
.otherwise("Correspondence")
val stats = games
.withColumn("category", category)
.groupBy("category")
.agg(
F.count("*").as("total_games"),
F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"),
F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"),
F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"),
)
.withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3))
.withColumn("draw_rate", F.round(F.col("draws") / F.col("total_games").cast("double"), 3))
.orderBy(F.desc("total_games"))
.select("category", "total_games", "white_wins", "black_wins", "draws", "white_win_rate", "draw_rate")
stats.write
.mode("overwrite")
.option("header", "true")
.csv(s"$outputDir/time_control_stats")