From a268a9acb7ba190c76e996ccf3ea3bd00e5cec92 Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Sun, 21 Jun 2026 16:30:38 +0200 Subject: [PATCH] fix(analytics): write decompressed PGN to shared PVC path for executor access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SparkFiles.get() on the driver returns a driver-local path. When this was passed to spark.read.text() the executor tried to open that path on its own filesystem (separate pod), silently reading 0 rows. Fix: download and decompress the Lichess PGN to NOWCHESS_PGN_CACHE_DIR (default /tmp) which must be a filesystem shared between driver and executor pods. In the k8s deployment this is the spark-analytics-output PVC mounted at /spark-output, so set NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache. Also caches the decompressed file across runs — skips download if already present. Co-Authored-By: Claude Sonnet 4.6 --- .../de/nowchess/analytics/GameSource.scala | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala index aa21271..95efb75 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/GameSource.scala @@ -145,31 +145,42 @@ object GameSource: ) .filter((F.col("white_id") =!= "").and(F.col("black_id") =!= "")) - /** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, which - * distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` is redistributed. + /** Turns an http(s)/ftp URL into a path readable by all executors. + * + * Downloads the file once on the driver, decompresses `.zst` if needed, then writes the result to + * `NOWCHESS_PGN_CACHE_DIR` (default `/tmp`). That directory must be on a filesystem shared between the driver pod + * and all executor pods — in the k8s deployment this is the `spark-analytics-output` PVC mounted at + * `/spark-output`, so set `NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache`. + * + * Skips download if the destination file already exists (cache-friendly for repeated runs). * Non-URL paths are returned unchanged. */ private def resolvePath(spark: SparkSession, path: String): String = if !path.matches("^(https?|ftp)://.*") then path else - spark.sparkContext.addFile(path) - val local = SparkFiles.get(baseName(path)) - if !local.endsWith(".zst") then "file://" + local - else distribute(spark, decompressZstd(local)) + val cacheDir = sys.env.getOrElse("NOWCHESS_PGN_CACHE_DIR", "/tmp") + val destName = baseName(path).stripSuffix(".zst") + val destPath = s"$cacheDir/$destName" + if !java.io.File(destPath).exists() then + spark.sparkContext.addFile(path) + val downloaded = SparkFiles.get(baseName(path)) + if downloaded.endsWith(".zst") then decompressZstd(downloaded, destPath) + else + java.io.File(destPath).getParentFile.mkdirs() + java.nio.file.Files.copy( + java.nio.file.Paths.get(downloaded), + java.io.File(destPath).toPath, + java.nio.file.StandardCopyOption.REPLACE_EXISTING, + ) + "file://" + destPath private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1) - private def distribute(spark: SparkSession, localPath: String): String = - spark.sparkContext.addFile("file://" + localPath) - "file://" + SparkFiles.get(baseName(localPath)) - - /** Decompresses a `.zst` file to a temp `.pgn` using zstd-jni (bundled with Spark at runtime). */ - private def decompressZstd(srcPath: String): String = - val out = java.io.File.createTempFile("lichess-", ".pgn") - out.deleteOnExit() + /** Decompresses a `.zst` file to `destPath` using zstd-jni (bundled with Spark at runtime). */ + private def decompressZstd(srcPath: String, destPath: String): Unit = + java.io.File(destPath).getParentFile.mkdirs() val in = com.github.luben.zstd.ZstdInputStream( java.io.BufferedInputStream(java.io.FileInputStream(srcPath)), ) - try java.nio.file.Files.copy(in, out.toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING) + try java.nio.file.Files.copy(in, java.io.File(destPath).toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING) finally in.close() - out.getAbsolutePath