From c3e7b82ae806adf5713ce4d267c1155e73a40ff5 Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Wed, 24 Jun 2026 00:21:40 +0200 Subject: [PATCH] feat(analytics): add accuracy and blunder analysis job for Lichess data --- .../analytics/AccuracyBlunderJob.scala | 191 +++++++++++++++++ .../nowchess/analytics/ClockPressureJob.scala | 199 ++++++++++++++++++ .../nowchess/analytics/SmurfAnomalyJob.scala | 154 ++++++++++++++ 3 files changed, 544 insertions(+) create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/AccuracyBlunderJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/ClockPressureJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/SmurfAnomalyJob.scala diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/AccuracyBlunderJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/AccuracyBlunderJob.scala new file mode 100644 index 0000000..55d7ea2 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/AccuracyBlunderJob.scala @@ -0,0 +1,191 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions as F + +/** Per-move accuracy & blunder analysis mined from Lichess `[%eval ...]` move annotations. + * + * Unlike the flat single-`groupBy` summaries (opening rates, colour advantage), this job reconstructs the *quality of + * every move* from the engine evaluations Lichess embeds in the movetext (`{ [%eval 0.24] }`, mate scores `[%eval + * #-3]`) and turns them into the same accuracy signals lichess.com surfaces: average centipawn loss (ACPL), and counts + * of inaccuracies / mistakes / blunders. + * + * Pipeline (all Spark SQL string/array functions + window funcs — no UDFs, Catalyst-friendly): + * 1. Keep only games carrying `[%eval` comments. + * 2. `regexp_extract_all` pulls every eval in ply order; mate scores collapse to ±10 pawns, normal evals are clamped + * to ±10 so a single huge swing cannot dominate the mean. All evals are White-POV pawns. + * 3. `posexplode` → one row per ply; a per-game window `lag` gives the eval *before* the move. + * 4. Centipawn loss for the side that moved = how much the eval moved against them (white wants it up, black down), + * floored at 0 and scaled to centipawns. + * 5. Roll up to (game, side): ACPL + inaccuracy(≥50cp) / mistake(≥100cp) / blunder(≥200cp) counts, tagged with that + * side's Elo and whether they won. + * + * Outputs (Parquet + CSV + JDBC): + * - `accuracy_by_rating` — ACPL, avg blunders/mistakes/inaccuracies per game and win-rate, per Elo band. Shows how + * move quality scales with rating. + * - `blunder_outcome` — win-rate bucketed by number of blunders in the game. Quantifies "one blunder costs you the + * game". + * + * Requires the eval-annotated Lichess dump (`NOWCHESS_PGN_PATH` → an evals dump); JDBC games carry no per-move evals. + */ +object AccuracyBlunderJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-accuracy" + + val spark = SparkSession + .builder() + .appName("NowChess Accuracy & Blunders") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("pgn", "result", "white_elo", "black_elo") + .filter(F.col("result").isNotNull.and(F.col("pgn").contains("[%eval"))) + .withColumn("game_id", F.monotonically_increasing_id()) + + // White-POV pawn evals in ply order; mate → ±10, normal evals clamped to ±10. + val evalStrs = F.expr("""regexp_extract_all(pgn, '\\[%eval ([^\\]]+)\\]', 1)""") + val evalCps = F.expr( + "transform(eval_strs, x -> CASE " + + "WHEN x LIKE '#-%' THEN -10.0 " + + "WHEN x LIKE '#%' THEN 10.0 " + + "ELSE greatest(-10.0, least(10.0, cast(x as double))) END)", + ) + + val withEvals = games + .withColumn("eval_strs", evalStrs) + .withColumn("eval_cp", evalCps) + .filter(F.size(F.col("eval_cp")) >= 2) + + val plies = withEvals.select( + F.col("game_id"), + F.col("result"), + F.col("white_elo"), + F.col("black_elo"), + F.posexplode(F.col("eval_cp")).as(Seq("ply", "eval_after")), + ) + + val byGame = Window.partitionBy("game_id").orderBy("ply") + val mover = F.when(F.col("ply") % 2 === 0, "white").otherwise("black") + val evalBefore = F.coalesce(F.lag("eval_after", 1).over(byGame), F.lit(0.15)) + val cpl = F.greatest( + F.lit(0.0), + F.when(F.col("mover") === "white", evalBefore - F.col("eval_after")) + .otherwise(F.col("eval_after") - evalBefore), + ) * 100 + + val moves = plies + .withColumn("mover", mover) + .withColumn("cpl", cpl) + + val perSide = moves + .groupBy("game_id", "mover", "result", "white_elo", "black_elo") + .agg( + F.round(F.avg("cpl"), 1).as("acpl"), + F.sum(F.when(F.col("cpl") >= 200, 1).otherwise(0)).as("blunders"), + F.sum(F.when(F.col("cpl") >= 100 && F.col("cpl") < 200, 1).otherwise(0)).as("mistakes"), + F.sum(F.when(F.col("cpl") >= 50 && F.col("cpl") < 100, 1).otherwise(0)).as("inaccuracies"), + ) + .withColumn( + "self_elo", + F.when(F.col("mover") === "white", F.col("white_elo")).otherwise(F.col("black_elo")), + ) + .withColumn("won", F.when(F.col("mover") === F.col("result"), 1).otherwise(0)) + + writeAccuracyByRating(perSide, jdbcUrl, dbUser, dbPass, outputDir) + writeBlunderOutcome(perSide, jdbcUrl, dbUser, dbPass, outputDir) + + private def writeAccuracyByRating( + perSide: org.apache.spark.sql.DataFrame, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + ): Unit = + val elo = F.col("self_elo") + val band = F + .when(elo < 1200, "<1200") + .when(elo < 1500, "1200–1499") + .when(elo < 1800, "1500–1799") + .when(elo < 2100, "1800–2099") + .otherwise("2100+") + val bandOrder = F + .when(elo < 1200, 1) + .when(elo < 1500, 2) + .when(elo < 1800, 3) + .when(elo < 2100, 4) + .otherwise(5) + + val stats = perSide + .filter(elo.isNotNull) + .withColumn("band", band) + .withColumn("band_order", bandOrder) + .groupBy("band", "band_order") + .agg( + F.count("*").as("player_games"), + F.round(F.avg("acpl"), 1).as("avg_acpl"), + F.round(F.avg("blunders"), 2).as("avg_blunders"), + F.round(F.avg("mistakes"), 2).as("avg_mistakes"), + F.round(F.avg("inaccuracies"), 2).as("avg_inaccuracies"), + F.round(F.avg("won"), 3).as("win_rate"), + ) + .orderBy(F.asc("band_order")) + .drop("band_order") + + write(stats, outputDir, "accuracy_by_rating", jdbcUrl, dbUser, dbPass, "analytics_accuracy_by_rating") + + private def writeBlunderOutcome( + perSide: org.apache.spark.sql.DataFrame, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + ): Unit = + val b = F.col("blunders") + val bucket = F.when(b === 0, "0").when(b === 1, "1").when(b === 2, "2").otherwise("3+") + val order = F.when(b === 0, 0).when(b === 1, 1).when(b === 2, 2).otherwise(3) + + val stats = perSide + .withColumn("blunder_bucket", bucket) + .withColumn("bucket_order", order) + .groupBy("blunder_bucket", "bucket_order") + .agg( + F.count("*").as("player_games"), + F.round(F.avg("won"), 3).as("win_rate"), + F.round(F.avg("acpl"), 1).as("avg_acpl"), + ) + .orderBy(F.asc("bucket_order")) + .drop("bucket_order") + + write(stats, outputDir, "blunder_outcome", jdbcUrl, dbUser, dbPass, "analytics_blunder_outcome") + + private def write( + df: org.apache.spark.sql.DataFrame, + outputDir: String, + name: String, + jdbcUrl: String, + dbUser: String, + dbPass: String, + table: String, + ): Unit = + df.write.mode("overwrite").parquet(s"$outputDir/$name") + df.write.mode("overwrite").option("header", "true").csv(s"$outputDir/${name}_csv") + if !GameSource.isPgnMode then + df.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", table) + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/ClockPressureJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/ClockPressureJob.scala new file mode 100644 index 0000000..af59cd5 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/ClockPressureJob.scala @@ -0,0 +1,199 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions as F + +/** Time-management & clock-pressure analysis mined from Lichess `[%clk ...]` move annotations. + * + * Lichess records each player's remaining clock after every move (`{ [%clk 0:02:31] }`). This job reconstructs + * per-move thinking time and remaining-time from those stamps to answer questions the existing time-control summary + * cannot: how long do players actually think, how often do they fall into time scrambles (<10 s left), how often do + * they flag (lose on time), and does burning the clock correlate with winning? + * + * Pipeline (Spark SQL string/array funcs + window funcs — no UDFs): + * 1. `regexp_extract_all` pulls every `h:mm:ss` clock in ply order, converted to seconds. + * 2. `posexplode` → one row per ply; even plies are White's clock, odd plies Black's. + * 3. A per-(game,side) window `lag` gives the same side's previous clock; the difference is that move's thinking time. + * Remaining clock <10 s marks a time-scramble move. + * 4. Roll up to (game, side): avg move time, scramble fraction, min clock, Elo, win flag, and whether the side lost on + * time (`Termination "Time forfeit"`). + * + * Outputs (Parquet + CSV + JDBC): + * - `clock_by_rating` — avg move time, scramble fraction, flag-loss rate and win-rate per Elo band. + * - `scramble_outcome` — win-rate bucketed by how much of the game was played in time-scramble. Quantifies the cost of + * time trouble. + * + * Requires a clock-annotated Lichess dump (`NOWCHESS_PGN_PATH`). + */ +object ClockPressureJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-clock-pressure" + + val spark = SparkSession + .builder() + .appName("NowChess Clock Pressure") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("pgn", "result", "white_elo", "black_elo", "termination") + .filter(F.col("result").isNotNull.and(F.col("pgn").contains("[%clk"))) + .withColumn("game_id", F.monotonically_increasing_id()) + + val clkStrs = F.expr("""regexp_extract_all(pgn, '\\[%clk ([^\\]]+)\\]', 1)""") + // "h:mm:ss" → seconds. + val clkSecs = F.expr( + "transform(clk_strs, x -> " + + "cast(split(x, ':')[0] as double) * 3600 + " + + "cast(split(x, ':')[1] as double) * 60 + " + + "cast(split(x, ':')[2] as double))", + ) + + val withClk = games + .withColumn("clk_strs", clkStrs) + .withColumn("clk_sec", clkSecs) + .filter(F.size(F.col("clk_sec")) >= 4) + + val plies = withClk.select( + F.col("game_id"), + F.col("result"), + F.col("white_elo"), + F.col("black_elo"), + F.col("termination"), + F.posexplode(F.col("clk_sec")).as(Seq("ply", "clk_after")), + ) + + val mover = F.when(F.col("ply") % 2 === 0, "white").otherwise("black") + val bySide = Window.partitionBy("game_id", "mover").orderBy("ply") + val moveTime = F.lag("clk_after", 1).over(bySide) - F.col("clk_after") + + val moves = plies + .withColumn("mover", mover) + .withColumn("move_time", moveTime) + + val perSide = moves + .groupBy("game_id", "mover", "result", "white_elo", "black_elo", "termination") + .agg( + F.round(F.avg("move_time"), 1).as("avg_move_time"), + F.count("*").as("moves"), + F.round(F.min("clk_after"), 1).as("min_clk"), + F.sum(F.when(F.col("clk_after") < 10, 1).otherwise(0)).as("scramble_moves"), + ) + .withColumn("scramble_fraction", F.round(F.col("scramble_moves") / F.col("moves"), 3)) + .withColumn( + "self_elo", + F.when(F.col("mover") === "white", F.col("white_elo")).otherwise(F.col("black_elo")), + ) + .withColumn("won", F.when(F.col("mover") === F.col("result"), 1).otherwise(0)) + .withColumn( + "flag_loss", + F.when( + F.coalesce(F.col("termination"), F.lit("")).contains("Time forfeit") && F.col("won") === 0, + 1, + ).otherwise(0), + ) + + writeClockByRating(perSide, jdbcUrl, dbUser, dbPass, outputDir) + writeScrambleOutcome(perSide, jdbcUrl, dbUser, dbPass, outputDir) + + private def writeClockByRating( + perSide: org.apache.spark.sql.DataFrame, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + ): Unit = + val elo = F.col("self_elo") + val band = F + .when(elo < 1200, "<1200") + .when(elo < 1500, "1200–1499") + .when(elo < 1800, "1500–1799") + .when(elo < 2100, "1800–2099") + .otherwise("2100+") + val bandOrder = F + .when(elo < 1200, 1) + .when(elo < 1500, 2) + .when(elo < 1800, 3) + .when(elo < 2100, 4) + .otherwise(5) + + val stats = perSide + .filter(elo.isNotNull) + .withColumn("band", band) + .withColumn("band_order", bandOrder) + .groupBy("band", "band_order") + .agg( + F.count("*").as("player_games"), + F.round(F.avg("avg_move_time"), 1).as("avg_move_time_s"), + F.round(F.avg("scramble_fraction"), 3).as("avg_scramble_fraction"), + F.round(F.avg("flag_loss"), 3).as("flag_loss_rate"), + F.round(F.avg("won"), 3).as("win_rate"), + ) + .orderBy(F.asc("band_order")) + .drop("band_order") + + write(stats, outputDir, "clock_by_rating", jdbcUrl, dbUser, dbPass, "analytics_clock_by_rating") + + private def writeScrambleOutcome( + perSide: org.apache.spark.sql.DataFrame, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + ): Unit = + val sf = F.col("scramble_fraction") + val bucket = F + .when(sf === 0, "none") + .when(sf < 0.05, "<5%") + .when(sf < 0.20, "5–20%") + .otherwise(">20%") + val order = F + .when(sf === 0, 0) + .when(sf < 0.05, 1) + .when(sf < 0.20, 2) + .otherwise(3) + + val stats = perSide + .withColumn("scramble_bucket", bucket) + .withColumn("bucket_order", order) + .groupBy("scramble_bucket", "bucket_order") + .agg( + F.count("*").as("player_games"), + F.round(F.avg("won"), 3).as("win_rate"), + F.round(F.avg("flag_loss"), 3).as("flag_loss_rate"), + ) + .orderBy(F.asc("bucket_order")) + .drop("bucket_order") + + write(stats, outputDir, "scramble_outcome", jdbcUrl, dbUser, dbPass, "analytics_scramble_outcome") + + private def write( + df: org.apache.spark.sql.DataFrame, + outputDir: String, + name: String, + jdbcUrl: String, + dbUser: String, + dbPass: String, + table: String, + ): Unit = + df.write.mode("overwrite").parquet(s"$outputDir/$name") + df.write.mode("overwrite").option("header", "true").csv(s"$outputDir/${name}_csv") + if !GameSource.isPgnMode then + df.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", table) + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/SmurfAnomalyJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/SmurfAnomalyJob.scala new file mode 100644 index 0000000..87f5de2 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/SmurfAnomalyJob.scala @@ -0,0 +1,154 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions as F + +/** Smurf / sandbagging anomaly detection via population z-scores. + * + * Smurfs (strong players on fresh accounts) and sandbaggers leave a statistical signature: a win-rate, an upset-rate + * (beating higher-rated opponents) and a self-Elo climb that sit far above the population norm. This job builds those + * three features per player, standardises each against the whole player base, and flags the players whose combined + * deviation is extreme. + * + * Features per player (from each game's own/opponent Elo): + * - win_rate — fraction of decisive results won + * - upset_rate — wins vs higher-rated opponents / games vs higher-rated opponents + * - elo_climb — max self-Elo − min self-Elo across their games (rapid rating gain) + * + * Standardisation uses a single unbounded window (`Window.partitionBy()`), i.e. mean/stddev over every qualifying + * player, so z = (x − μ) / σ. The composite anomaly score sums the three z-scores. No UDFs — pure SQL aggregates + + * window functions, so Catalyst plans the whole job. + * + * Outputs (Parquet + CSV + JDBC): + * - `anomaly_scores` — every qualifying player with features, z-scores and composite, ranked most-anomalous first. + * - `flagged_smurfs` — the suspicious subset (high composite, or the classic high-winrate / few-games / steep-climb + * profile). + * + * Meaningful only when Elo is present (Lichess dump); requires `minGames` (arg 1, default 15) to avoid small-sample + * noise. + */ +object SmurfAnomalyJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-smurf-anomaly" + val minGames = if args.length > 1 then args(1).toInt else 15 + + val spark = SparkSession + .builder() + .appName("NowChess Smurf Anomaly Detection") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir, minGames) + spark.stop() + + def run( + spark: SparkSession, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + minGames: Int, + ): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("white_id", "black_id", "result", "white_elo", "black_elo") + .filter(F.col("result").isNotNull) + + val asWhite = games.select( + F.col("white_id").as("player_id"), + F.col("white_elo").as("self_elo"), + F.col("black_elo").as("opp_elo"), + F.when(F.col("result") === "white", 1).otherwise(0).as("won"), + ) + val asBlack = games.select( + F.col("black_id").as("player_id"), + F.col("black_elo").as("self_elo"), + F.col("white_elo").as("opp_elo"), + F.when(F.col("result") === "black", 1).otherwise(0).as("won"), + ) + + val playerGames = asWhite + .union(asBlack) + .filter(F.col("self_elo").isNotNull.and(F.col("opp_elo").isNotNull)) + + val higher = F.col("opp_elo") > F.col("self_elo") + + val features = playerGames + .groupBy("player_id") + .agg( + F.count("*").as("total_games"), + F.round(F.avg("won"), 3).as("win_rate"), + F.round(F.avg("self_elo"), 0).as("avg_self_elo"), + (F.max("self_elo") - F.min("self_elo")).as("elo_climb"), + F.sum(F.when(higher, 1).otherwise(0)).as("vs_higher"), + F.sum(F.when(higher && F.col("won") === 1, 1).otherwise(0)).as("upsets"), + ) + .filter(F.col("total_games") >= minGames) + .withColumn("upset_rate", F.round(F.col("upsets") / F.greatest(F.col("vs_higher"), F.lit(1)), 3)) + + val all = Window.partitionBy() + def z(col: String): org.apache.spark.sql.Column = + val mean = F.avg(col).over(all) + val std = F.stddev(col).over(all) + F.round((F.col(col) - mean) / F.when(std === 0 || std.isNull, F.lit(1.0)).otherwise(std), 2) + + val scored = features + .withColumn("z_win_rate", z("win_rate")) + .withColumn("z_upset_rate", z("upset_rate")) + .withColumn("z_elo_climb", z("elo_climb")) + .withColumn( + "anomaly_score", + F.round(F.col("z_win_rate") + F.col("z_upset_rate") + F.col("z_elo_climb"), 2), + ) + .withColumn( + "flagged", + (F.col("anomaly_score") >= 4.0) + .or(F.col("win_rate") >= 0.8 && F.col("total_games") < 50 && F.col("elo_climb") >= 300), + ) + + val ordered = scored + .select( + "player_id", + "total_games", + "win_rate", + "avg_self_elo", + "elo_climb", + "upset_rate", + "z_win_rate", + "z_upset_rate", + "z_elo_climb", + "anomaly_score", + "flagged", + ) + .orderBy(F.desc("anomaly_score")) + + write(ordered, outputDir, "anomaly_scores", jdbcUrl, dbUser, dbPass, "analytics_smurf_anomaly") + + val flagged = ordered.filter(F.col("flagged") === true) + write(flagged, outputDir, "flagged_smurfs", jdbcUrl, dbUser, dbPass, "analytics_flagged_smurfs") + + private def write( + df: org.apache.spark.sql.DataFrame, + outputDir: String, + name: String, + jdbcUrl: String, + dbUser: String, + dbPass: String, + table: String, + ): Unit = + df.write.mode("overwrite").parquet(s"$outputDir/$name") + df.write.mode("overwrite").option("header", "true").csv(s"$outputDir/${name}_csv") + if !GameSource.isPgnMode then + df.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", table) + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save()