Compare commits

..

2 Commits

Author SHA1 Message Date
TeamCity 97015cb95e ci: bump version with Build-133 2026-06-21 14:51:19 +00:00
Janis Eccarius a268a9acb7 fix(analytics): write decompressed PGN to shared PVC path for executor access
Build & Test (NowChessSystems) TeamCity build finished
SparkFiles.get() on the driver returns a driver-local path. When this was
passed to spark.read.text() the executor tried to open that path on its own
filesystem (separate pod), silently reading 0 rows.

Fix: download and decompress the Lichess PGN to NOWCHESS_PGN_CACHE_DIR
(default /tmp) which must be a filesystem shared between driver and executor
pods. In the k8s deployment this is the spark-analytics-output PVC mounted
at /spark-output, so set NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache.

Also caches the decompressed file across runs — skips download if already
present.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-21 16:31:05 +02:00
3 changed files with 44 additions and 17 deletions
+16
View File
@@ -65,3 +65,19 @@
### Bug Fixes ### Bug Fixes
* **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c)) * **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c))
## (2026-06-21)
### Features
* **analytics:** add 7 new Spark analytics jobs and extend GameSource ([8e17c14](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/8e17c14dff740cd115011dfbf17de35083b8fe46))
* **analytics:** add Dockerfile, CI workflow, and stable jar name for K8s deployment ([95215b6](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/95215b6a420fd526df1aa395f9b087556c8ad03b))
* **analytics:** add PostgreSQL JDBC write-back to all four batch jobs ([0e0ea4c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0e0ea4c9893c6efed52e633e55d05ab3ed004502))
* **analytics:** add Spark batch analytics module ([259b3bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/259b3bbb24c0f23326269b93f4b3c84012f727cd))
* **analytics:** add Structured Streaming, MLlib clustering, GraphX jobs ([e1d80b9](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/e1d80b9331666feea191b1fd08aa762f3581c918))
* **analytics:** always write results to PostgreSQL regardless of input source ([da0e6d1](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/da0e6d1ee2d391ecb6291396f82471eb51b1b25e))
* **official-bots:** park expert bot on tournament server at startup ([#76](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/76)) ([751a58b](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/751a58b6061f7434115e229a7661894c76768bc2))
### Bug Fixes
* **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c))
* **analytics:** write decompressed PGN to shared PVC path for executor access ([a268a9a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a268a9acb7ba190c76e996ccf3ea3bd00e5cec92))
@@ -145,31 +145,42 @@ object GameSource:
) )
.filter((F.col("white_id") =!= "").and(F.col("black_id") =!= "")) .filter((F.col("white_id") =!= "").and(F.col("black_id") =!= ""))
/** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, which /** Turns an http(s)/ftp URL into a path readable by all executors.
* distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` is redistributed. *
* Downloads the file once on the driver, decompresses `.zst` if needed, then writes the result to
* `NOWCHESS_PGN_CACHE_DIR` (default `/tmp`). That directory must be on a filesystem shared between the driver pod
* and all executor pods — in the k8s deployment this is the `spark-analytics-output` PVC mounted at
* `/spark-output`, so set `NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache`.
*
* Skips download if the destination file already exists (cache-friendly for repeated runs).
* Non-URL paths are returned unchanged. * Non-URL paths are returned unchanged.
*/ */
private def resolvePath(spark: SparkSession, path: String): String = private def resolvePath(spark: SparkSession, path: String): String =
if !path.matches("^(https?|ftp)://.*") then path if !path.matches("^(https?|ftp)://.*") then path
else else
spark.sparkContext.addFile(path) val cacheDir = sys.env.getOrElse("NOWCHESS_PGN_CACHE_DIR", "/tmp")
val local = SparkFiles.get(baseName(path)) val destName = baseName(path).stripSuffix(".zst")
if !local.endsWith(".zst") then "file://" + local val destPath = s"$cacheDir/$destName"
else distribute(spark, decompressZstd(local)) if !java.io.File(destPath).exists() then
spark.sparkContext.addFile(path)
val downloaded = SparkFiles.get(baseName(path))
if downloaded.endsWith(".zst") then decompressZstd(downloaded, destPath)
else
java.io.File(destPath).getParentFile.mkdirs()
java.nio.file.Files.copy(
java.nio.file.Paths.get(downloaded),
java.io.File(destPath).toPath,
java.nio.file.StandardCopyOption.REPLACE_EXISTING,
)
"file://" + destPath
private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1) private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1)
private def distribute(spark: SparkSession, localPath: String): String = /** Decompresses a `.zst` file to `destPath` using zstd-jni (bundled with Spark at runtime). */
spark.sparkContext.addFile("file://" + localPath) private def decompressZstd(srcPath: String, destPath: String): Unit =
"file://" + SparkFiles.get(baseName(localPath)) java.io.File(destPath).getParentFile.mkdirs()
/** Decompresses a `.zst` file to a temp `.pgn` using zstd-jni (bundled with Spark at runtime). */
private def decompressZstd(srcPath: String): String =
val out = java.io.File.createTempFile("lichess-", ".pgn")
out.deleteOnExit()
val in = com.github.luben.zstd.ZstdInputStream( val in = com.github.luben.zstd.ZstdInputStream(
java.io.BufferedInputStream(java.io.FileInputStream(srcPath)), java.io.BufferedInputStream(java.io.FileInputStream(srcPath)),
) )
try java.nio.file.Files.copy(in, out.toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING) try java.nio.file.Files.copy(in, java.io.File(destPath).toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
finally in.close() finally in.close()
out.getAbsolutePath
+1 -1
View File
@@ -1,3 +1,3 @@
MAJOR=0 MAJOR=0
MINOR=6 MINOR=7
PATCH=0 PATCH=0