From 8e17c14dff740cd115011dfbf17de35083b8fe46 Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Sun, 21 Jun 2026 15:00:58 +0200 Subject: [PATCH] feat(analytics): add 7 new Spark analytics jobs and extend GameSource Adds GameLengthJob, ColorAdvantageJob, EloDistributionJob, TimeControlJob, DailyActivityJob, RatingMismatchJob, and TerminationStatsJob bringing total batch pipelines to 11 (+ 1 streaming). Extends GameSource with loadExtended() / fromLichessPgnExtended() extracting WhiteElo, BlackElo, TimeControl, UTCDate, UTCTime, Termination, ECO from PGN headers; JDBC path returns nulls for extended columns, keeping all existing jobs unaffected. PlayerStatsJob gains a CSV output alongside the existing Parquet write so the analytics webview can display player statistics without pyarrow. Co-Authored-By: Claude Sonnet 4.6 --- .../analytics/ColorAdvantageJob.scala | 62 +++++++++++++ .../nowchess/analytics/DailyActivityJob.scala | 79 ++++++++++++++++ .../analytics/EloDistributionJob.scala | 48 ++++++++++ .../de/nowchess/analytics/GameLengthJob.scala | 91 +++++++++++++++++++ .../de/nowchess/analytics/GameSource.scala | 56 ++++++++++++ .../nowchess/analytics/PlayerStatsJob.scala | 5 + .../analytics/RatingMismatchJob.scala | 65 +++++++++++++ .../analytics/TerminationStatsJob.scala | 44 +++++++++ .../nowchess/analytics/TimeControlJob.scala | 58 ++++++++++++ 9 files changed, 508 insertions(+) create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala new file mode 100644 index 0000000..4c26044 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala @@ -0,0 +1,62 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F +import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType + +import scala.jdk.CollectionConverters.* + +object ColorAdvantageJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-color-advantage" + + val spark = SparkSession + .builder() + .appName("NowChess Color Advantage") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .load(spark, jdbcUrl, dbUser, dbPass) + .select("result") + .filter(F.col("result").isNotNull) + + val totalGames = games.count() + val whiteWins = games.filter(F.col("result") === "white").count() + val blackWins = games.filter(F.col("result") === "black").count() + val draws = games.filter(F.col("result") === "draw").count() + + val schema = StructType( + Seq( + StructField("color", DataTypes.StringType, false), + StructField("total_games", DataTypes.LongType, false), + StructField("wins", DataTypes.LongType, false), + StructField("losses", DataTypes.LongType, false), + StructField("draws", DataTypes.LongType, false), + ), + ) + + val rows = List( + Row("white", totalGames, whiteWins, blackWins, draws), + Row("black", totalGames, blackWins, whiteWins, draws), + ) + + val stats = spark + .createDataFrame(rows.asJava, schema) + .withColumn("win_rate", F.round(F.col("wins") / F.col("total_games").cast("double"), 3)) + .orderBy(F.asc("color")) + + stats.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/color_advantage") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala new file mode 100644 index 0000000..f10094c --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala @@ -0,0 +1,79 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +object DailyActivityJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-daily-activity" + + val spark = SparkSession + .builder() + .appName("NowChess Daily Activity") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("result", "utc_date", "utc_time") + .filter(F.col("utc_time").isNotNull.and(F.col("utc_date").isNotNull)) + + val hourOfDay = F.regexp_extract(F.col("utc_time"), "^(\\d{2})", 1).cast("int") + val dow = F.dayofweek(F.to_date(F.col("utc_date"), "yyyy.MM.dd")) + + val tagged = games + .withColumn("hour_of_day", hourOfDay) + .withColumn("dow", dow) + + val hourly = tagged + .groupBy("hour_of_day") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3)) + .orderBy(F.asc("hour_of_day")) + .select("hour_of_day", "total_games", "white_wins", "black_wins", "draws", "white_win_rate") + + hourly.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/hourly_activity") + + val dayName = F + .when(F.col("dow") === 1, "Sunday") + .when(F.col("dow") === 2, "Monday") + .when(F.col("dow") === 3, "Tuesday") + .when(F.col("dow") === 4, "Wednesday") + .when(F.col("dow") === 5, "Thursday") + .when(F.col("dow") === 6, "Friday") + .otherwise("Saturday") + + val weekly = tagged + .withColumn("day_of_week", dayName) + .withColumn("day_order", F.col("dow")) + .groupBy("day_of_week", "day_order") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3)) + .orderBy(F.asc("day_order")) + .drop("day_order") + .select("day_of_week", "total_games", "white_wins", "black_wins", "draws", "white_win_rate") + + weekly.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/weekly_activity") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala new file mode 100644 index 0000000..fb6ba48 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala @@ -0,0 +1,48 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +object EloDistributionJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-elo-distribution" + + val spark = SparkSession + .builder() + .appName("NowChess Elo Distribution") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .filter(F.col("white_elo").isNotNull) + + val whiteElo = games.select(F.col("white_elo").as("elo")) + val blackElo = games.select(F.col("black_elo").as("elo")) + val allElo = whiteElo.union(blackElo).filter(F.col("elo").isNotNull) + + val bucketMin = (F.floor(F.col("elo") / 200) * 200).cast("int") + val bucketLabel = F.when( + F.col("elo") >= 2800, + F.lit("2800+"), + ).otherwise(F.concat(bucketMin.cast("string"), F.lit("-"), (bucketMin + 199).cast("string"))) + + val distribution = allElo + .withColumn("elo_bucket", bucketLabel) + .withColumn("bucket_order", F.when(F.col("elo") >= 2800, 2800).otherwise(bucketMin)) + .groupBy("elo_bucket", "bucket_order") + .agg(F.count("*").as("player_count")) + .orderBy(F.asc("bucket_order")) + .select("elo_bucket", "player_count") + + distribution.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/elo_distribution") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala new file mode 100644 index 0000000..0600754 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala @@ -0,0 +1,91 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +object GameLengthJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-game-length" + + val spark = SparkSession + .builder() + .appName("NowChess Game Length") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .load(spark, jdbcUrl, dbUser, dbPass) + .select("result", "move_count") + .filter(F.col("result").isNotNull.and(F.col("move_count").isNotNull)) + + val moves = F.col("move_count") + val bucket = F + .when(moves <= 10, "1-10") + .when(moves <= 20, "11-20") + .when(moves <= 30, "21-30") + .when(moves <= 40, "31-40") + .when(moves <= 60, "41-60") + .when(moves <= 100, "61-100") + .otherwise("101+") + val bucketOrder = F + .when(moves <= 10, 1) + .when(moves <= 20, 2) + .when(moves <= 30, 3) + .when(moves <= 40, 4) + .when(moves <= 60, 5) + .when(moves <= 100, 6) + .otherwise(7) + + val tagged = games + .withColumn("move_bucket", bucket) + .withColumn("bucket_order", bucketOrder) + + val distribution = tagged + .groupBy("move_bucket", "bucket_order") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3)) + .withColumn("black_win_rate", F.round(F.col("black_wins") / F.col("total_games").cast("double"), 3)) + .withColumn("draw_rate", F.round(F.col("draws") / F.col("total_games").cast("double"), 3)) + .orderBy(F.asc("bucket_order")) + .drop("bucket_order") + .select( + "move_bucket", + "total_games", + "white_wins", + "black_wins", + "draws", + "white_win_rate", + "black_win_rate", + "draw_rate", + ) + + distribution.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/game_length_distribution") + + val byResult = games + .groupBy("result") + .agg( + F.round(F.avg("move_count"), 1).as("avg_move_count"), + F.min("move_count").as("min_moves"), + F.max("move_count").as("max_moves"), + ) + .orderBy(F.asc("result")) + + byResult.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/game_length_by_result") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala index acdf783..aa21271 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala @@ -33,6 +33,19 @@ object GameSource: case Some(path) => fromLichessPgn(spark, path) case None => fromJdbc(spark, jdbcUrl, dbUser, dbPass) + def loadExtended(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String): DataFrame = + sys.env.get(PgnPathEnv) match + case Some(path) => fromLichessPgnExtended(spark, path) + case None => + fromJdbc(spark, jdbcUrl, dbUser, dbPass) + .withColumn("white_elo", F.lit(null).cast("int")) + .withColumn("black_elo", F.lit(null).cast("int")) + .withColumn("time_control", F.lit(null).cast("string")) + .withColumn("utc_date", F.lit(null).cast("string")) + .withColumn("utc_time", F.lit(null).cast("string")) + .withColumn("termination", F.lit(null).cast("string")) + .withColumn("eco", F.lit(null).cast("string")) + def fromJdbc(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String): DataFrame = spark.read .format("jdbc") @@ -89,6 +102,49 @@ object GameSource: ) .filter((F.col("white_id") =!= "").and(F.col("black_id") =!= "")) + private def fromLichessPgnExtended(spark: SparkSession, path: String): DataFrame = + val resolved = resolvePath(spark, path) + val record = F.col("value") + + val resultTag = F.regexp_extract(record, "Result \"([^\"]*)\"", 1) + val result = F + .when(resultTag === "1-0", "white") + .when(resultTag === "0-1", "black") + .when(resultTag === "1/2-1/2", "draw") + .otherwise(F.lit(null).cast("string")) + + val moveText = F.coalesce(F.split(record, "\n\n").getItem(1), F.lit("")) + val noComment = F.regexp_replace(moveText, "\\{[^}]*\\}", "") + val noResult = F.regexp_replace(noComment, "(1-0|0-1|1/2-1/2|\\*)", "") + val noNumbers = F.regexp_replace(noResult, "\\d+\\.+", " ") + val plies = F.size(F.filter(F.split(F.trim(noNumbers), "\\s+"), tok => F.length(tok) > 0)) + + def nullable(extracted: org.apache.spark.sql.Column): org.apache.spark.sql.Column = + F.when(F.length(extracted) > 0, extracted).otherwise(F.lit(null).cast("string")) + + val whiteElo = nullable(F.regexp_extract(record, "WhiteElo \"([^\"]*)\"", 1)).cast("int") + val blackElo = nullable(F.regexp_extract(record, "BlackElo \"([^\"]*)\"", 1)).cast("int") + + spark.read + .option("lineSep", "[Event ") + .text(resolved) + .filter(F.length(F.trim(record)) > 0) + .select( + F.regexp_extract(record, "White \"([^\"]*)\"", 1).as("white_id"), + F.regexp_extract(record, "Black \"([^\"]*)\"", 1).as("black_id"), + result.as("result"), + plies.as("move_count"), + F.concat(F.lit("[Event "), record).as("pgn"), + whiteElo.as("white_elo"), + blackElo.as("black_elo"), + nullable(F.regexp_extract(record, "TimeControl \"([^\"]*)\"", 1)).as("time_control"), + nullable(F.regexp_extract(record, "UTCDate \"([^\"]*)\"", 1)).as("utc_date"), + nullable(F.regexp_extract(record, "UTCTime \"([^\"]*)\"", 1)).as("utc_time"), + nullable(F.regexp_extract(record, "Termination \"([^\"]*)\"", 1)).as("termination"), + nullable(F.regexp_extract(record, "ECO \"([^\"]*)\"", 1)).as("eco"), + ) + .filter((F.col("white_id") =!= "").and(F.col("black_id") =!= "")) + /** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, which * distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` is redistributed. * Non-URL paths are returned unchanged. diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala index 8b0418c..c250e65 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala @@ -77,6 +77,11 @@ object PlayerStatsJob: .mode("overwrite") .parquet(s"$outputDir/player_stats") + stats.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/player_stats_csv") + if !GameSource.isPgnMode then stats.write .mode("overwrite") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala new file mode 100644 index 0000000..b5c68e7 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala @@ -0,0 +1,65 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +object RatingMismatchJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-rating-mismatch" + + val spark = SparkSession + .builder() + .appName("NowChess Rating Mismatch") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("result", "white_elo", "black_elo") + .filter(F.col("white_elo").isNotNull.and(F.col("black_elo").isNotNull)) + + val eloDiff = F.col("white_elo") - F.col("black_elo") + val bracket = F + .when(eloDiff < -200, "Black +200") + .when(eloDiff < -100, "Black +100–200") + .when(eloDiff < -50, "Black +50–100") + .when(eloDiff <= 50, "Even (±50)") + .when(eloDiff <= 100, "White +50–100") + .when(eloDiff <= 200, "White +100–200") + .otherwise("White +200") + val bracketOrder = F + .when(eloDiff < -200, 1) + .when(eloDiff < -100, 2) + .when(eloDiff < -50, 3) + .when(eloDiff <= 50, 4) + .when(eloDiff <= 100, 5) + .when(eloDiff <= 200, 6) + .otherwise(7) + + val stats = games + .withColumn("elo_diff", eloDiff) + .withColumn("bracket", bracket) + .withColumn("bracket_order", bracketOrder) + .groupBy("bracket", "bracket_order") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3)) + .orderBy(F.asc("bracket_order")) + .drop("bracket_order") + .select("bracket", "total_games", "white_wins", "black_wins", "draws", "white_win_rate") + + stats.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/rating_mismatch") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala new file mode 100644 index 0000000..e280b6c --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala @@ -0,0 +1,44 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +object TerminationStatsJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-termination-stats" + + val spark = SparkSession + .builder() + .appName("NowChess Termination Stats") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("result", "termination") + .filter(F.col("termination").isNotNull.and(F.col("termination") =!= "")) + + val stats = games + .groupBy("termination") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("draw_rate", F.round(F.col("draws") / F.col("total_games").cast("double"), 3)) + .withColumnRenamed("termination", "termination_type") + .orderBy(F.desc("total_games")) + .select("termination_type", "total_games", "white_wins", "black_wins", "draws", "draw_rate") + + stats.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/termination_stats") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala new file mode 100644 index 0000000..df3b3da --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala @@ -0,0 +1,58 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +object TimeControlJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-time-control" + + val spark = SparkSession + .builder() + .appName("NowChess Time Control") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String, outputDir: String): Unit = + val games = GameSource + .loadExtended(spark, jdbcUrl, dbUser, dbPass) + .select("result", "time_control") + .filter( + F.col("time_control").isNotNull + .and(F.col("time_control") =!= "") + .and(F.col("time_control") =!= "-"), + ) + + val baseSeconds = F.regexp_extract(F.col("time_control"), "^(?:\\d+/)?(\\d+)", 1).cast("int") + val category = F + .when(baseSeconds < 30, "UltraBullet") + .when(baseSeconds < 180, "Bullet") + .when(baseSeconds < 480, "Blitz") + .when(baseSeconds < 1500, "Rapid") + .when(baseSeconds < 86400, "Classical") + .otherwise("Correspondence") + + val stats = games + .withColumn("category", category) + .groupBy("category") + .agg( + F.count("*").as("total_games"), + F.sum(F.when(F.col("result") === "white", 1).otherwise(0)).as("white_wins"), + F.sum(F.when(F.col("result") === "black", 1).otherwise(0)).as("black_wins"), + F.sum(F.when(F.col("result") === "draw", 1).otherwise(0)).as("draws"), + ) + .withColumn("white_win_rate", F.round(F.col("white_wins") / F.col("total_games").cast("double"), 3)) + .withColumn("draw_rate", F.round(F.col("draws") / F.col("total_games").cast("double"), 3)) + .orderBy(F.desc("total_games")) + .select("category", "total_games", "white_wins", "black_wins", "draws", "white_win_rate", "draw_rate") + + stats.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/time_control_stats")