From 751a58b6061f7434115e229a7661894c76768bc2 Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 17 Jun 2026 10:42:42 +0200 Subject: [PATCH] feat(official-bots): park expert bot on tournament server at startup (#76) Reviewed-on: https://git.janis-eccarius.de/NowChess/NowChessSystems/pulls/76 --- modules/analytics/build.gradle.kts | 7 ++ .../de/nowchess/analytics/GameSource.scala | 119 ++++++++++++++++++ .../nowchess/analytics/OpeningBookJob.scala | 30 ++--- .../analytics/PlayerClusteringJob.scala | 48 ++++--- .../nowchess/analytics/PlayerGraphJob.scala | 30 ++--- .../nowchess/analytics/PlayerStatsJob.scala | 30 ++--- 6 files changed, 183 insertions(+), 81 deletions(-) create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala diff --git a/modules/analytics/build.gradle.kts b/modules/analytics/build.gradle.kts index ecbd1d7..f7d0d77 100644 --- a/modules/analytics/build.gradle.kts +++ b/modules/analytics/build.gradle.kts @@ -22,6 +22,9 @@ // NOWCHESS_JDBC_URL (default: jdbc:postgresql://localhost:5432/nowchess) // NOWCHESS_DB_USER (default: nowchess) // NOWCHESS_DB_PASS (default: nowchess) +// NOWCHESS_PGN_PATH (optional) — file or http(s) URL of a Lichess PGN dump (.pgn or .pgn.zst). +// When set, all batch jobs read games from the dump instead of PostgreSQL and +// skip JDBC write-back (Parquet/CSV output only). Demo data source. plugins { id("scala") @@ -71,6 +74,10 @@ dependencies { // PostgreSQL JDBC driver bundled so it is available on executor classpath. implementation("org.postgresql:postgresql:42.7.4") + + // zstd-jni: decompress Lichess .pgn.zst dumps in-process. Provided at runtime by Spark + // (it uses zstd-jni internally for shuffle/event-log compression), so compile-only here. + compileOnly("com.github.luben:zstd-jni:1.5.6-9") } application { diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala new file mode 100644 index 0000000..4bfdc08 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala @@ -0,0 +1,119 @@ +package de.nowchess.analytics + +import org.apache.spark.SparkFiles +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +/** Normalised game-record source for the batch jobs. + * + * Every batch job consumes the same five-column shape: + * - white_id, black_id : player identifiers + * - result : one of "white", "black", "draw" + * - move_count : number of plies + * - pgn : full PGN ("[Event …]…\n\n1. e4 …"), header and movetext separated by a blank line + * + * Two backends, selected by the `NOWCHESS_PGN_PATH` environment variable: + * - unset → PostgreSQL `game_records` table (production) + * - set → a Lichess PGN dump file/URL (demo). Point it at a `lichess_db_standard_rated_*.pgn[.zst]` + * to drive every batch job from real Lichess games. + * + * Lichess parsing uses only Spark SQL string functions — no UDFs — so Catalyst can push predicates, + * matching the no-UDF approach already used in OpeningBookJob. + */ +object GameSource: + + private val PgnPathEnv = "NOWCHESS_PGN_PATH" + + /** True when a Lichess PGN dump is configured; jobs use this to skip JDBC write-back. */ + def isPgnMode: Boolean = sys.env.contains(PgnPathEnv) + + def load(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String): DataFrame = + sys.env.get(PgnPathEnv) match + case Some(path) => fromLichessPgn(spark, path) + case None => fromJdbc(spark, jdbcUrl, dbUser, dbPass) + + def fromJdbc(spark: SparkSession, jdbcUrl: String, dbUser: String, dbPass: String): DataFrame = + spark.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "game_records") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .option("fetchsize", "10000") + .load() + .select("white_id", "black_id", "result", "move_count", "pgn") + + /** Parses a Lichess PGN dump into the normalised game shape. + * + * `path` may be: + * - an http(s)/ftp URL — fetched once via SparkContext.addFile and distributed to executors, then read + * from the local replica (no S3/PVC needed; handy for a staging demo) + * - any Hadoop-readable path (file://, hdfs://, s3a://, …) + * + * `.zst` dumps (Lichess' native format) are decompressed in-process via zstd-jni; `.gz`/`.bz2` are + * handled by Spark's text reader codecs. + * + * Records are split on the "[Event " tag that opens every game, so each row holds one complete game + * (the empty fragment before the first game is filtered out). Header tags are read with regexp_extract; + * the movetext (after the blank line) is cleaned of clock/eval comments and move numbers to count plies. + */ + def fromLichessPgn(spark: SparkSession, path: String): DataFrame = + val resolved = resolvePath(spark, path) + val record = F.col("value") + + val resultTag = F.regexp_extract(record, "Result \"([^\"]*)\"", 1) + val result = F + .when(resultTag === "1-0", "white") + .when(resultTag === "0-1", "black") + .when(resultTag === "1/2-1/2", "draw") + .otherwise(F.lit(null).cast("string")) + + val moveText = F.coalesce(F.split(record, "\n\n").getItem(1), F.lit("")) + val noComment = F.regexp_replace(moveText, "\\{[^}]*\\}", "") + val noResult = F.regexp_replace(noComment, "(1-0|0-1|1/2-1/2|\\*)", "") + val noNumbers = F.regexp_replace(noResult, "\\d+\\.+", " ") + val plies = F.size(F.filter(F.split(F.trim(noNumbers), "\\s+"), tok => F.length(tok) > 0)) + + spark.read + .option("lineSep", "[Event ") + .text(resolved) + .filter(F.length(F.trim(record)) > 0) + .select( + F.regexp_extract(record, "White \"([^\"]*)\"", 1).as("white_id"), + F.regexp_extract(record, "Black \"([^\"]*)\"", 1).as("black_id"), + result.as("result"), + plies.as("move_count"), + F.concat(F.lit("[Event "), record).as("pgn"), + ) + .filter((F.col("white_id") =!= "").and(F.col("black_id") =!= "")) + + /** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, + * which distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` + * is redistributed. Non-URL paths are returned unchanged. + */ + private def resolvePath(spark: SparkSession, path: String): String = + if !path.matches("^(https?|ftp)://.*") then path + else + spark.sparkContext.addFile(path) + val local = SparkFiles.get(baseName(path)) + if !local.endsWith(".zst") then "file://" + local + else distribute(spark, decompressZstd(local)) + + private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1) + + private def distribute(spark: SparkSession, localPath: String): String = + spark.sparkContext.addFile("file://" + localPath) + "file://" + SparkFiles.get(baseName(localPath)) + + /** Decompresses a `.zst` file to a temp `.pgn` using zstd-jni (bundled with Spark at runtime). */ + private def decompressZstd(srcPath: String): String = + val out = java.io.File.createTempFile("lichess-", ".pgn") + out.deleteOnExit() + val in = com.github.luben.zstd.ZstdInputStream( + java.io.BufferedInputStream(java.io.FileInputStream(srcPath)), + ) + try java.nio.file.Files.copy(in, out.toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING) + finally in.close() + out.getAbsolutePath diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala index fd6a101..77ec579 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala @@ -37,15 +37,8 @@ object OpeningBookJob: outputDir: String, maxPlies: Int, ): Unit = - val games = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "game_records") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .option("fetchsize", "10000") - .load() + val games = GameSource + .load(spark, jdbcUrl, dbUser, dbPass) .select("pgn", "result") .filter(F.col("result").isNotNull.and(F.col("pgn").isNotNull)) @@ -79,15 +72,16 @@ object OpeningBookJob: .option("header", "true") .csv(s"$outputDir/opening_book_top1000") - top1000.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_opening_stats") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + if !GameSource.isPgnMode then + top1000.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_opening_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() /** Extracts the first `maxPlies` moves from a PGN column as a space-separated string. * diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala index 6081f3a..0e38e75 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala @@ -50,15 +50,8 @@ object PlayerClusteringJob: outputDir: String, k: Int, ): Unit = - val games = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "game_records") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .option("fetchsize", "10000") - .load() + val games = GameSource + .load(spark, jdbcUrl, dbUser, dbPass) .select("white_id", "black_id", "result", "move_count") .filter(F.col("result").isNotNull) @@ -126,25 +119,26 @@ object PlayerClusteringJob: .option("header", "true") .csv(s"$outputDir/cluster_archetypes") - clustersDf.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_player_clusters") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + if !GameSource.isPgnMode then + clustersDf.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_clusters") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() - archetypes.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_cluster_archetypes") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + archetypes.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_cluster_archetypes") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = val asWhite = games.select( diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala index c037823..3b7baa5 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala @@ -53,15 +53,8 @@ object PlayerGraphJob: dbPass: String, outputDir: String, ): Unit = - val gamesRdd: RDD[Row] = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "game_records") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .option("fetchsize", "10000") - .load() + val gamesRdd: RDD[Row] = GameSource + .load(spark, jdbcUrl, dbUser, dbPass) .select("white_id", "black_id", "result") .filter(F.col("result").isNotNull) .rdd @@ -116,15 +109,16 @@ object PlayerGraphJob: .mode("overwrite") .parquet(s"$outputDir/player_graph") - result.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_player_graph") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + if !GameSource.isPgnMode then + result.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_graph") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() // How many players belong to each connected component? // A large dominant component + many singletons is the expected shape. diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala index cc9df7a..8b0418c 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala @@ -34,15 +34,8 @@ object PlayerStatsJob: dbPass: String, outputDir: String, ): Unit = - val games = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "game_records") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .option("fetchsize", "10000") - .load() + val games = GameSource + .load(spark, jdbcUrl, dbUser, dbPass) .select("white_id", "black_id", "result", "move_count") .filter(F.col("result").isNotNull) @@ -84,12 +77,13 @@ object PlayerStatsJob: .mode("overwrite") .parquet(s"$outputDir/player_stats") - stats.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_player_stats") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + if !GameSource.isPgnMode then + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save()