Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 97015cb95e | |||
| a268a9acb7 |
@@ -65,3 +65,19 @@
|
|||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
||||||
* **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c))
|
* **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c))
|
||||||
|
## (2026-06-21)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* **analytics:** add 7 new Spark analytics jobs and extend GameSource ([8e17c14](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/8e17c14dff740cd115011dfbf17de35083b8fe46))
|
||||||
|
* **analytics:** add Dockerfile, CI workflow, and stable jar name for K8s deployment ([95215b6](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/95215b6a420fd526df1aa395f9b087556c8ad03b))
|
||||||
|
* **analytics:** add PostgreSQL JDBC write-back to all four batch jobs ([0e0ea4c](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/0e0ea4c9893c6efed52e633e55d05ab3ed004502))
|
||||||
|
* **analytics:** add Spark batch analytics module ([259b3bb](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/259b3bbb24c0f23326269b93f4b3c84012f727cd))
|
||||||
|
* **analytics:** add Structured Streaming, MLlib clustering, GraphX jobs ([e1d80b9](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/e1d80b9331666feea191b1fd08aa762f3581c918))
|
||||||
|
* **analytics:** always write results to PostgreSQL regardless of input source ([da0e6d1](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/da0e6d1ee2d391ecb6291396f82471eb51b1b25e))
|
||||||
|
* **official-bots:** park expert bot on tournament server at startup ([#76](https://git.janis-eccarius.de/NowChess/NowChessSystems/issues/76)) ([751a58b](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/751a58b6061f7434115e229a7661894c76768bc2))
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
* **analytics:** upgrade Spark to 4.0.3 — 3.5.x has no official Docker image ([46af115](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/46af1154de34a8596cb6cb28c6fad7aba90f597c))
|
||||||
|
* **analytics:** write decompressed PGN to shared PVC path for executor access ([a268a9a](https://git.janis-eccarius.de/NowChess/NowChessSystems/commit/a268a9acb7ba190c76e996ccf3ea3bd00e5cec92))
|
||||||
|
|||||||
@@ -145,31 +145,42 @@ object GameSource:
|
|||||||
)
|
)
|
||||||
.filter((F.col("white_id") =!= "").and(F.col("black_id") =!= ""))
|
.filter((F.col("white_id") =!= "").and(F.col("black_id") =!= ""))
|
||||||
|
|
||||||
/** Turns an http(s)/ftp URL into a cluster-local path by fetching it once with SparkContext.addFile, which
|
/** Turns an http(s)/ftp URL into a path readable by all executors.
|
||||||
* distributes the file to every executor. `.zst` is decompressed in-process and the plain `.pgn` is redistributed.
|
*
|
||||||
|
* Downloads the file once on the driver, decompresses `.zst` if needed, then writes the result to
|
||||||
|
* `NOWCHESS_PGN_CACHE_DIR` (default `/tmp`). That directory must be on a filesystem shared between the driver pod
|
||||||
|
* and all executor pods — in the k8s deployment this is the `spark-analytics-output` PVC mounted at
|
||||||
|
* `/spark-output`, so set `NOWCHESS_PGN_CACHE_DIR=/spark-output/.pgn-cache`.
|
||||||
|
*
|
||||||
|
* Skips download if the destination file already exists (cache-friendly for repeated runs).
|
||||||
* Non-URL paths are returned unchanged.
|
* Non-URL paths are returned unchanged.
|
||||||
*/
|
*/
|
||||||
private def resolvePath(spark: SparkSession, path: String): String =
|
private def resolvePath(spark: SparkSession, path: String): String =
|
||||||
if !path.matches("^(https?|ftp)://.*") then path
|
if !path.matches("^(https?|ftp)://.*") then path
|
||||||
else
|
else
|
||||||
spark.sparkContext.addFile(path)
|
val cacheDir = sys.env.getOrElse("NOWCHESS_PGN_CACHE_DIR", "/tmp")
|
||||||
val local = SparkFiles.get(baseName(path))
|
val destName = baseName(path).stripSuffix(".zst")
|
||||||
if !local.endsWith(".zst") then "file://" + local
|
val destPath = s"$cacheDir/$destName"
|
||||||
else distribute(spark, decompressZstd(local))
|
if !java.io.File(destPath).exists() then
|
||||||
|
spark.sparkContext.addFile(path)
|
||||||
|
val downloaded = SparkFiles.get(baseName(path))
|
||||||
|
if downloaded.endsWith(".zst") then decompressZstd(downloaded, destPath)
|
||||||
|
else
|
||||||
|
java.io.File(destPath).getParentFile.mkdirs()
|
||||||
|
java.nio.file.Files.copy(
|
||||||
|
java.nio.file.Paths.get(downloaded),
|
||||||
|
java.io.File(destPath).toPath,
|
||||||
|
java.nio.file.StandardCopyOption.REPLACE_EXISTING,
|
||||||
|
)
|
||||||
|
"file://" + destPath
|
||||||
|
|
||||||
private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1)
|
private def baseName(path: String): String = path.substring(path.lastIndexOf('/') + 1)
|
||||||
|
|
||||||
private def distribute(spark: SparkSession, localPath: String): String =
|
/** Decompresses a `.zst` file to `destPath` using zstd-jni (bundled with Spark at runtime). */
|
||||||
spark.sparkContext.addFile("file://" + localPath)
|
private def decompressZstd(srcPath: String, destPath: String): Unit =
|
||||||
"file://" + SparkFiles.get(baseName(localPath))
|
java.io.File(destPath).getParentFile.mkdirs()
|
||||||
|
|
||||||
/** Decompresses a `.zst` file to a temp `.pgn` using zstd-jni (bundled with Spark at runtime). */
|
|
||||||
private def decompressZstd(srcPath: String): String =
|
|
||||||
val out = java.io.File.createTempFile("lichess-", ".pgn")
|
|
||||||
out.deleteOnExit()
|
|
||||||
val in = com.github.luben.zstd.ZstdInputStream(
|
val in = com.github.luben.zstd.ZstdInputStream(
|
||||||
java.io.BufferedInputStream(java.io.FileInputStream(srcPath)),
|
java.io.BufferedInputStream(java.io.FileInputStream(srcPath)),
|
||||||
)
|
)
|
||||||
try java.nio.file.Files.copy(in, out.toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
|
try java.nio.file.Files.copy(in, java.io.File(destPath).toPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
|
||||||
finally in.close()
|
finally in.close()
|
||||||
out.getAbsolutePath
|
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
MAJOR=0
|
MAJOR=0
|
||||||
MINOR=6
|
MINOR=7
|
||||||
PATCH=0
|
PATCH=0
|
||||||
|
|||||||
Reference in New Issue
Block a user