feat(analytics): add accuracy and blunder analysis job for Lichess data
Build & Test (NowChessSystems) TeamCity build finished
Build & Test (NowChessSystems) TeamCity build finished
This commit is contained in:
@@ -0,0 +1,191 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.expressions.Window
|
||||
import org.apache.spark.sql.functions as F
|
||||
|
||||
/** Per-move accuracy & blunder analysis mined from Lichess `[%eval ...]` move annotations.
|
||||
*
|
||||
* Unlike the flat single-`groupBy` summaries (opening rates, colour advantage), this job reconstructs the *quality of
|
||||
* every move* from the engine evaluations Lichess embeds in the movetext (`{ [%eval 0.24] }`, mate scores `[%eval
|
||||
* #-3]`) and turns them into the same accuracy signals lichess.com surfaces: average centipawn loss (ACPL), and counts
|
||||
* of inaccuracies / mistakes / blunders.
|
||||
*
|
||||
* Pipeline (all Spark SQL string/array functions + window funcs — no UDFs, Catalyst-friendly):
|
||||
* 1. Keep only games carrying `[%eval` comments.
|
||||
* 2. `regexp_extract_all` pulls every eval in ply order; mate scores collapse to ±10 pawns, normal evals are clamped
|
||||
* to ±10 so a single huge swing cannot dominate the mean. All evals are White-POV pawns.
|
||||
* 3. `posexplode` → one row per ply; a per-game window `lag` gives the eval *before* the move.
|
||||
* 4. Centipawn loss for the side that moved = how much the eval moved against them (white wants it up, black down),
|
||||
* floored at 0 and scaled to centipawns.
|
||||
* 5. Roll up to (game, side): ACPL + inaccuracy(≥50cp) / mistake(≥100cp) / blunder(≥200cp) counts, tagged with that
|
||||
* side's Elo and whether they won.
|
||||
*
|
||||
* Outputs (Parquet + CSV + JDBC):
|
||||
* - `accuracy_by_rating` — ACPL, avg blunders/mistakes/inaccuracies per game and win-rate, per Elo band. Shows how
|
||||
* move quality scales with rating.
|
||||
* - `blunder_outcome` — win-rate bucketed by number of blunders in the game. Quantifies "one blunder costs you the
|
||||
* game".
|
||||
*
|
||||
* Requires the eval-annotated Lichess dump (`NOWCHESS_PGN_PATH` → an evals dump); JDBC games carry no per-move evals.
|
||||
*/
|
||||
object AccuracyBlunderJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-accuracy"
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Accuracy & Blunders")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
spark.stop()
|
||||
|
||||
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
|
||||
val games = GameSource
|
||||
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
|
||||
.select("pgn", "result", "white_elo", "black_elo")
|
||||
.filter(F.col("result").isNotNull.and(F.col("pgn").contains("[%eval")))
|
||||
.withColumn("game_id", F.monotonically_increasing_id())
|
||||
|
||||
// White-POV pawn evals in ply order; mate → ±10, normal evals clamped to ±10.
|
||||
val evalStrs = F.expr("""regexp_extract_all(pgn, '\\[%eval ([^\\]]+)\\]', 1)""")
|
||||
val evalCps = F.expr(
|
||||
"transform(eval_strs, x -> CASE " +
|
||||
"WHEN x LIKE '#-%' THEN -10.0 " +
|
||||
"WHEN x LIKE '#%' THEN 10.0 " +
|
||||
"ELSE greatest(-10.0, least(10.0, cast(x as double))) END)",
|
||||
)
|
||||
|
||||
val withEvals = games
|
||||
.withColumn("eval_strs", evalStrs)
|
||||
.withColumn("eval_cp", evalCps)
|
||||
.filter(F.size(F.col("eval_cp")) >= 2)
|
||||
|
||||
val plies = withEvals.select(
|
||||
F.col("game_id"),
|
||||
F.col("result"),
|
||||
F.col("white_elo"),
|
||||
F.col("black_elo"),
|
||||
F.posexplode(F.col("eval_cp")).as(Seq("ply", "eval_after")),
|
||||
)
|
||||
|
||||
val byGame = Window.partitionBy("game_id").orderBy("ply")
|
||||
val mover = F.when(F.col("ply") % 2 === 0, "white").otherwise("black")
|
||||
val evalBefore = F.coalesce(F.lag("eval_after", 1).over(byGame), F.lit(0.15))
|
||||
val cpl = F.greatest(
|
||||
F.lit(0.0),
|
||||
F.when(F.col("mover") === "white", evalBefore - F.col("eval_after"))
|
||||
.otherwise(F.col("eval_after") - evalBefore),
|
||||
) * 100
|
||||
|
||||
val moves = plies
|
||||
.withColumn("mover", mover)
|
||||
.withColumn("cpl", cpl)
|
||||
|
||||
val perSide = moves
|
||||
.groupBy("game_id", "mover", "result", "white_elo", "black_elo")
|
||||
.agg(
|
||||
F.round(F.avg("cpl"), 1).as("acpl"),
|
||||
F.sum(F.when(F.col("cpl") >= 200, 1).otherwise(0)).as("blunders"),
|
||||
F.sum(F.when(F.col("cpl") >= 100 && F.col("cpl") < 200, 1).otherwise(0)).as("mistakes"),
|
||||
F.sum(F.when(F.col("cpl") >= 50 && F.col("cpl") < 100, 1).otherwise(0)).as("inaccuracies"),
|
||||
)
|
||||
.withColumn(
|
||||
"self_elo",
|
||||
F.when(F.col("mover") === "white", F.col("white_elo")).otherwise(F.col("black_elo")),
|
||||
)
|
||||
.withColumn("won", F.when(F.col("mover") === F.col("result"), 1).otherwise(0))
|
||||
|
||||
writeAccuracyByRating(perSide, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
writeBlunderOutcome(perSide, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
|
||||
private def writeAccuracyByRating(
|
||||
perSide: org.apache.spark.sql.DataFrame,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
): Unit =
|
||||
val elo = F.col("self_elo")
|
||||
val band = F
|
||||
.when(elo < 1200, "<1200")
|
||||
.when(elo < 1500, "1200–1499")
|
||||
.when(elo < 1800, "1500–1799")
|
||||
.when(elo < 2100, "1800–2099")
|
||||
.otherwise("2100+")
|
||||
val bandOrder = F
|
||||
.when(elo < 1200, 1)
|
||||
.when(elo < 1500, 2)
|
||||
.when(elo < 1800, 3)
|
||||
.when(elo < 2100, 4)
|
||||
.otherwise(5)
|
||||
|
||||
val stats = perSide
|
||||
.filter(elo.isNotNull)
|
||||
.withColumn("band", band)
|
||||
.withColumn("band_order", bandOrder)
|
||||
.groupBy("band", "band_order")
|
||||
.agg(
|
||||
F.count("*").as("player_games"),
|
||||
F.round(F.avg("acpl"), 1).as("avg_acpl"),
|
||||
F.round(F.avg("blunders"), 2).as("avg_blunders"),
|
||||
F.round(F.avg("mistakes"), 2).as("avg_mistakes"),
|
||||
F.round(F.avg("inaccuracies"), 2).as("avg_inaccuracies"),
|
||||
F.round(F.avg("won"), 3).as("win_rate"),
|
||||
)
|
||||
.orderBy(F.asc("band_order"))
|
||||
.drop("band_order")
|
||||
|
||||
write(stats, outputDir, "accuracy_by_rating", jdbcUrl, dbUser, dbPass, "analytics_accuracy_by_rating")
|
||||
|
||||
private def writeBlunderOutcome(
|
||||
perSide: org.apache.spark.sql.DataFrame,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
): Unit =
|
||||
val b = F.col("blunders")
|
||||
val bucket = F.when(b === 0, "0").when(b === 1, "1").when(b === 2, "2").otherwise("3+")
|
||||
val order = F.when(b === 0, 0).when(b === 1, 1).when(b === 2, 2).otherwise(3)
|
||||
|
||||
val stats = perSide
|
||||
.withColumn("blunder_bucket", bucket)
|
||||
.withColumn("bucket_order", order)
|
||||
.groupBy("blunder_bucket", "bucket_order")
|
||||
.agg(
|
||||
F.count("*").as("player_games"),
|
||||
F.round(F.avg("won"), 3).as("win_rate"),
|
||||
F.round(F.avg("acpl"), 1).as("avg_acpl"),
|
||||
)
|
||||
.orderBy(F.asc("bucket_order"))
|
||||
.drop("bucket_order")
|
||||
|
||||
write(stats, outputDir, "blunder_outcome", jdbcUrl, dbUser, dbPass, "analytics_blunder_outcome")
|
||||
|
||||
private def write(
|
||||
df: org.apache.spark.sql.DataFrame,
|
||||
outputDir: String,
|
||||
name: String,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
table: String,
|
||||
): Unit =
|
||||
df.write.mode("overwrite").parquet(s"$outputDir/$name")
|
||||
df.write.mode("overwrite").option("header", "true").csv(s"$outputDir/${name}_csv")
|
||||
if !GameSource.isPgnMode then
|
||||
df.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", table)
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
@@ -0,0 +1,199 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.expressions.Window
|
||||
import org.apache.spark.sql.functions as F
|
||||
|
||||
/** Time-management & clock-pressure analysis mined from Lichess `[%clk ...]` move annotations.
|
||||
*
|
||||
* Lichess records each player's remaining clock after every move (`{ [%clk 0:02:31] }`). This job reconstructs
|
||||
* per-move thinking time and remaining-time from those stamps to answer questions the existing time-control summary
|
||||
* cannot: how long do players actually think, how often do they fall into time scrambles (<10 s left), how often do
|
||||
* they flag (lose on time), and does burning the clock correlate with winning?
|
||||
*
|
||||
* Pipeline (Spark SQL string/array funcs + window funcs — no UDFs):
|
||||
* 1. `regexp_extract_all` pulls every `h:mm:ss` clock in ply order, converted to seconds.
|
||||
* 2. `posexplode` → one row per ply; even plies are White's clock, odd plies Black's.
|
||||
* 3. A per-(game,side) window `lag` gives the same side's previous clock; the difference is that move's thinking time.
|
||||
* Remaining clock <10 s marks a time-scramble move.
|
||||
* 4. Roll up to (game, side): avg move time, scramble fraction, min clock, Elo, win flag, and whether the side lost on
|
||||
* time (`Termination "Time forfeit"`).
|
||||
*
|
||||
* Outputs (Parquet + CSV + JDBC):
|
||||
* - `clock_by_rating` — avg move time, scramble fraction, flag-loss rate and win-rate per Elo band.
|
||||
* - `scramble_outcome` — win-rate bucketed by how much of the game was played in time-scramble. Quantifies the cost of
|
||||
* time trouble.
|
||||
*
|
||||
* Requires a clock-annotated Lichess dump (`NOWCHESS_PGN_PATH`).
|
||||
*/
|
||||
object ClockPressureJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-clock-pressure"
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Clock Pressure")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
spark.stop()
|
||||
|
||||
def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit =
|
||||
val games = GameSource
|
||||
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
|
||||
.select("pgn", "result", "white_elo", "black_elo", "termination")
|
||||
.filter(F.col("result").isNotNull.and(F.col("pgn").contains("[%clk")))
|
||||
.withColumn("game_id", F.monotonically_increasing_id())
|
||||
|
||||
val clkStrs = F.expr("""regexp_extract_all(pgn, '\\[%clk ([^\\]]+)\\]', 1)""")
|
||||
// "h:mm:ss" → seconds.
|
||||
val clkSecs = F.expr(
|
||||
"transform(clk_strs, x -> " +
|
||||
"cast(split(x, ':')[0] as double) * 3600 + " +
|
||||
"cast(split(x, ':')[1] as double) * 60 + " +
|
||||
"cast(split(x, ':')[2] as double))",
|
||||
)
|
||||
|
||||
val withClk = games
|
||||
.withColumn("clk_strs", clkStrs)
|
||||
.withColumn("clk_sec", clkSecs)
|
||||
.filter(F.size(F.col("clk_sec")) >= 4)
|
||||
|
||||
val plies = withClk.select(
|
||||
F.col("game_id"),
|
||||
F.col("result"),
|
||||
F.col("white_elo"),
|
||||
F.col("black_elo"),
|
||||
F.col("termination"),
|
||||
F.posexplode(F.col("clk_sec")).as(Seq("ply", "clk_after")),
|
||||
)
|
||||
|
||||
val mover = F.when(F.col("ply") % 2 === 0, "white").otherwise("black")
|
||||
val bySide = Window.partitionBy("game_id", "mover").orderBy("ply")
|
||||
val moveTime = F.lag("clk_after", 1).over(bySide) - F.col("clk_after")
|
||||
|
||||
val moves = plies
|
||||
.withColumn("mover", mover)
|
||||
.withColumn("move_time", moveTime)
|
||||
|
||||
val perSide = moves
|
||||
.groupBy("game_id", "mover", "result", "white_elo", "black_elo", "termination")
|
||||
.agg(
|
||||
F.round(F.avg("move_time"), 1).as("avg_move_time"),
|
||||
F.count("*").as("moves"),
|
||||
F.round(F.min("clk_after"), 1).as("min_clk"),
|
||||
F.sum(F.when(F.col("clk_after") < 10, 1).otherwise(0)).as("scramble_moves"),
|
||||
)
|
||||
.withColumn("scramble_fraction", F.round(F.col("scramble_moves") / F.col("moves"), 3))
|
||||
.withColumn(
|
||||
"self_elo",
|
||||
F.when(F.col("mover") === "white", F.col("white_elo")).otherwise(F.col("black_elo")),
|
||||
)
|
||||
.withColumn("won", F.when(F.col("mover") === F.col("result"), 1).otherwise(0))
|
||||
.withColumn(
|
||||
"flag_loss",
|
||||
F.when(
|
||||
F.coalesce(F.col("termination"), F.lit("")).contains("Time forfeit") && F.col("won") === 0,
|
||||
1,
|
||||
).otherwise(0),
|
||||
)
|
||||
|
||||
writeClockByRating(perSide, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
writeScrambleOutcome(perSide, jdbcUrl, dbUser, dbPass, outputDir)
|
||||
|
||||
private def writeClockByRating(
|
||||
perSide: org.apache.spark.sql.DataFrame,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
): Unit =
|
||||
val elo = F.col("self_elo")
|
||||
val band = F
|
||||
.when(elo < 1200, "<1200")
|
||||
.when(elo < 1500, "1200–1499")
|
||||
.when(elo < 1800, "1500–1799")
|
||||
.when(elo < 2100, "1800–2099")
|
||||
.otherwise("2100+")
|
||||
val bandOrder = F
|
||||
.when(elo < 1200, 1)
|
||||
.when(elo < 1500, 2)
|
||||
.when(elo < 1800, 3)
|
||||
.when(elo < 2100, 4)
|
||||
.otherwise(5)
|
||||
|
||||
val stats = perSide
|
||||
.filter(elo.isNotNull)
|
||||
.withColumn("band", band)
|
||||
.withColumn("band_order", bandOrder)
|
||||
.groupBy("band", "band_order")
|
||||
.agg(
|
||||
F.count("*").as("player_games"),
|
||||
F.round(F.avg("avg_move_time"), 1).as("avg_move_time_s"),
|
||||
F.round(F.avg("scramble_fraction"), 3).as("avg_scramble_fraction"),
|
||||
F.round(F.avg("flag_loss"), 3).as("flag_loss_rate"),
|
||||
F.round(F.avg("won"), 3).as("win_rate"),
|
||||
)
|
||||
.orderBy(F.asc("band_order"))
|
||||
.drop("band_order")
|
||||
|
||||
write(stats, outputDir, "clock_by_rating", jdbcUrl, dbUser, dbPass, "analytics_clock_by_rating")
|
||||
|
||||
private def writeScrambleOutcome(
|
||||
perSide: org.apache.spark.sql.DataFrame,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
): Unit =
|
||||
val sf = F.col("scramble_fraction")
|
||||
val bucket = F
|
||||
.when(sf === 0, "none")
|
||||
.when(sf < 0.05, "<5%")
|
||||
.when(sf < 0.20, "5–20%")
|
||||
.otherwise(">20%")
|
||||
val order = F
|
||||
.when(sf === 0, 0)
|
||||
.when(sf < 0.05, 1)
|
||||
.when(sf < 0.20, 2)
|
||||
.otherwise(3)
|
||||
|
||||
val stats = perSide
|
||||
.withColumn("scramble_bucket", bucket)
|
||||
.withColumn("bucket_order", order)
|
||||
.groupBy("scramble_bucket", "bucket_order")
|
||||
.agg(
|
||||
F.count("*").as("player_games"),
|
||||
F.round(F.avg("won"), 3).as("win_rate"),
|
||||
F.round(F.avg("flag_loss"), 3).as("flag_loss_rate"),
|
||||
)
|
||||
.orderBy(F.asc("bucket_order"))
|
||||
.drop("bucket_order")
|
||||
|
||||
write(stats, outputDir, "scramble_outcome", jdbcUrl, dbUser, dbPass, "analytics_scramble_outcome")
|
||||
|
||||
private def write(
|
||||
df: org.apache.spark.sql.DataFrame,
|
||||
outputDir: String,
|
||||
name: String,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
table: String,
|
||||
): Unit =
|
||||
df.write.mode("overwrite").parquet(s"$outputDir/$name")
|
||||
df.write.mode("overwrite").option("header", "true").csv(s"$outputDir/${name}_csv")
|
||||
if !GameSource.isPgnMode then
|
||||
df.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", table)
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
@@ -0,0 +1,154 @@
|
||||
package de.nowchess.analytics
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.expressions.Window
|
||||
import org.apache.spark.sql.functions as F
|
||||
|
||||
/** Smurf / sandbagging anomaly detection via population z-scores.
|
||||
*
|
||||
* Smurfs (strong players on fresh accounts) and sandbaggers leave a statistical signature: a win-rate, an upset-rate
|
||||
* (beating higher-rated opponents) and a self-Elo climb that sit far above the population norm. This job builds those
|
||||
* three features per player, standardises each against the whole player base, and flags the players whose combined
|
||||
* deviation is extreme.
|
||||
*
|
||||
* Features per player (from each game's own/opponent Elo):
|
||||
* - win_rate — fraction of decisive results won
|
||||
* - upset_rate — wins vs higher-rated opponents / games vs higher-rated opponents
|
||||
* - elo_climb — max self-Elo − min self-Elo across their games (rapid rating gain)
|
||||
*
|
||||
* Standardisation uses a single unbounded window (`Window.partitionBy()`), i.e. mean/stddev over every qualifying
|
||||
* player, so z = (x − μ) / σ. The composite anomaly score sums the three z-scores. No UDFs — pure SQL aggregates +
|
||||
* window functions, so Catalyst plans the whole job.
|
||||
*
|
||||
* Outputs (Parquet + CSV + JDBC):
|
||||
* - `anomaly_scores` — every qualifying player with features, z-scores and composite, ranked most-anomalous first.
|
||||
* - `flagged_smurfs` — the suspicious subset (high composite, or the classic high-winrate / few-games / steep-climb
|
||||
* profile).
|
||||
*
|
||||
* Meaningful only when Elo is present (Lichess dump); requires `minGames` (arg 1, default 15) to avoid small-sample
|
||||
* noise.
|
||||
*/
|
||||
object SmurfAnomalyJob:
|
||||
|
||||
def main(args: Array[String]): Unit =
|
||||
val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess")
|
||||
val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess")
|
||||
val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess")
|
||||
val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-smurf-anomaly"
|
||||
val minGames = if args.length > 1 then args(1).toInt else 15
|
||||
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName("NowChess Smurf Anomaly Detection")
|
||||
.getOrCreate()
|
||||
|
||||
run(spark, jdbcUrl, dbUser, dbPass, outputDir, minGames)
|
||||
spark.stop()
|
||||
|
||||
def run(
|
||||
spark: SparkSession,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
outputDir: String,
|
||||
minGames: Int,
|
||||
): Unit =
|
||||
val games = GameSource
|
||||
.loadExtended(spark, jdbcUrl, dbUser, dbPass)
|
||||
.select("white_id", "black_id", "result", "white_elo", "black_elo")
|
||||
.filter(F.col("result").isNotNull)
|
||||
|
||||
val asWhite = games.select(
|
||||
F.col("white_id").as("player_id"),
|
||||
F.col("white_elo").as("self_elo"),
|
||||
F.col("black_elo").as("opp_elo"),
|
||||
F.when(F.col("result") === "white", 1).otherwise(0).as("won"),
|
||||
)
|
||||
val asBlack = games.select(
|
||||
F.col("black_id").as("player_id"),
|
||||
F.col("black_elo").as("self_elo"),
|
||||
F.col("white_elo").as("opp_elo"),
|
||||
F.when(F.col("result") === "black", 1).otherwise(0).as("won"),
|
||||
)
|
||||
|
||||
val playerGames = asWhite
|
||||
.union(asBlack)
|
||||
.filter(F.col("self_elo").isNotNull.and(F.col("opp_elo").isNotNull))
|
||||
|
||||
val higher = F.col("opp_elo") > F.col("self_elo")
|
||||
|
||||
val features = playerGames
|
||||
.groupBy("player_id")
|
||||
.agg(
|
||||
F.count("*").as("total_games"),
|
||||
F.round(F.avg("won"), 3).as("win_rate"),
|
||||
F.round(F.avg("self_elo"), 0).as("avg_self_elo"),
|
||||
(F.max("self_elo") - F.min("self_elo")).as("elo_climb"),
|
||||
F.sum(F.when(higher, 1).otherwise(0)).as("vs_higher"),
|
||||
F.sum(F.when(higher && F.col("won") === 1, 1).otherwise(0)).as("upsets"),
|
||||
)
|
||||
.filter(F.col("total_games") >= minGames)
|
||||
.withColumn("upset_rate", F.round(F.col("upsets") / F.greatest(F.col("vs_higher"), F.lit(1)), 3))
|
||||
|
||||
val all = Window.partitionBy()
|
||||
def z(col: String): org.apache.spark.sql.Column =
|
||||
val mean = F.avg(col).over(all)
|
||||
val std = F.stddev(col).over(all)
|
||||
F.round((F.col(col) - mean) / F.when(std === 0 || std.isNull, F.lit(1.0)).otherwise(std), 2)
|
||||
|
||||
val scored = features
|
||||
.withColumn("z_win_rate", z("win_rate"))
|
||||
.withColumn("z_upset_rate", z("upset_rate"))
|
||||
.withColumn("z_elo_climb", z("elo_climb"))
|
||||
.withColumn(
|
||||
"anomaly_score",
|
||||
F.round(F.col("z_win_rate") + F.col("z_upset_rate") + F.col("z_elo_climb"), 2),
|
||||
)
|
||||
.withColumn(
|
||||
"flagged",
|
||||
(F.col("anomaly_score") >= 4.0)
|
||||
.or(F.col("win_rate") >= 0.8 && F.col("total_games") < 50 && F.col("elo_climb") >= 300),
|
||||
)
|
||||
|
||||
val ordered = scored
|
||||
.select(
|
||||
"player_id",
|
||||
"total_games",
|
||||
"win_rate",
|
||||
"avg_self_elo",
|
||||
"elo_climb",
|
||||
"upset_rate",
|
||||
"z_win_rate",
|
||||
"z_upset_rate",
|
||||
"z_elo_climb",
|
||||
"anomaly_score",
|
||||
"flagged",
|
||||
)
|
||||
.orderBy(F.desc("anomaly_score"))
|
||||
|
||||
write(ordered, outputDir, "anomaly_scores", jdbcUrl, dbUser, dbPass, "analytics_smurf_anomaly")
|
||||
|
||||
val flagged = ordered.filter(F.col("flagged") === true)
|
||||
write(flagged, outputDir, "flagged_smurfs", jdbcUrl, dbUser, dbPass, "analytics_flagged_smurfs")
|
||||
|
||||
private def write(
|
||||
df: org.apache.spark.sql.DataFrame,
|
||||
outputDir: String,
|
||||
name: String,
|
||||
jdbcUrl: String,
|
||||
dbUser: String,
|
||||
dbPass: String,
|
||||
table: String,
|
||||
): Unit =
|
||||
df.write.mode("overwrite").parquet(s"$outputDir/$name")
|
||||
df.write.mode("overwrite").option("header", "true").csv(s"$outputDir/${name}_csv")
|
||||
if !GameSource.isPgnMode then
|
||||
df.write
|
||||
.mode("overwrite")
|
||||
.format("jdbc")
|
||||
.option("url", jdbcUrl)
|
||||
.option("dbtable", table)
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPass)
|
||||
.option("driver", "org.postgresql.Driver")
|
||||
.save()
|
||||
Reference in New Issue
Block a user