fix(analytics): write decompressed PGN to shared PVC path for executor access
Build & Test (NowChessSystems) TeamCity build finished

SparkFiles.get() on the driver returns a driver-local path. When this was
passed to spark.read.text() the executor tried to open that path on its own
filesystem (separate pod), silently reading 0 rows.

Fix: download and decompress the Lichess PGN to NOWCHESS_PGN_CACHE_DIR
(default /tmp) which must be a filesystem shared between driver and executor
pods. In the k8s deployment this is the spark-analytics-output PVC mounted
at /spark-output, so set NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache.

Also caches the decompressed file across runs — skips download if already
present.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Janis Eccarius
2026-06-21 16:30:38 +02:00
parent 71cb2cc56c
commit a268a9acb7
@@ -145,31 +145,42 @@ object GameSource:
)
.filter((F.col("white_id") =!= "").and(F.col("black_id") =!= ""))
/** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, which
* distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` is redistributed.
/** Turns an http(s)/ftp URL into a path readable by all executors.
*
* Downloads the file once on the driver, decompresses `.zst` if needed, then writes the result to
* `NOWCHESS_PGN_CACHE_DIR` (default `/tmp`). That directory must be on a filesystem shared between the driver pod
* and all executor pods — in the k8s deployment this is the `spark-analytics-output` PVC mounted at
* `/spark-output`, so set `NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache`.
*
* Skips download if the destination file already exists (cache-friendly for repeated runs).
* Non-URL paths are returned unchanged.
*/
private def resolvePath(spark: SparkSession, path: String): String =
if !path.matches("^(https?|ftp)://.*") then path
else
spark.sparkContext.addFile(path)
val local = SparkFiles.get(baseName(path))
if !local.endsWith(".zst") then "file://" + local
else distribute(spark, decompressZstd(local))
val cacheDir = sys.env.getOrElse("NOWCHESS_PGN_CACHE_DIR", "/tmp")
val destName = baseName(path).stripSuffix(".zst")
val destPath = s"$cacheDir/$destName"
if !java.io.File(destPath).exists() then
spark.sparkContext.addFile(path)
val downloaded = SparkFiles.get(baseName(path))
if downloaded.endsWith(".zst") then decompressZstd(downloaded, destPath)
else
java.io.File(destPath).getParentFile.mkdirs()
java.nio.file.Files.copy(
java.nio.file.Paths.get(downloaded),
java.io.File(destPath).toPath,
java.nio.file.StandardCopyOption.REPLACE_EXISTING,
)
"file://" + destPath
private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1)
private def distribute(spark: SparkSession, localPath: String): String =
spark.sparkContext.addFile("file://" + localPath)
"file://" + SparkFiles.get(baseName(localPath))
/** Decompresses a `.zst` file to a temp `.pgn` using zstd-jni (bundled with Spark at runtime). */
private def decompressZstd(srcPath: String): String =
val out = java.io.File.createTempFile("lichess-", ".pgn")
out.deleteOnExit()
/** Decompresses a `.zst` file to `destPath` using zstd-jni (bundled with Spark at runtime). */
private def decompressZstd(srcPath: String, destPath: String): Unit =
java.io.File(destPath).getParentFile.mkdirs()
val in = com.github.luben.zstd.ZstdInputStream(
java.io.BufferedInputStream(java.io.FileInputStream(srcPath)),
)
try java.nio.file.Files.copy(in, out.toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
try java.nio.file.Files.copy(in, java.io.File(destPath).toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
finally in.close()
out.getAbsolutePath