From 0e0ea4c9893c6efed52e633e55d05ab3ed004502 Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Mon, 15 Jun 2026 22:35:30 +0200 Subject: [PATCH] feat(analytics): add PostgreSQL JDBC write-back to all four batch jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each batch job now writes its results to a Postgres table in addition to the existing Parquet/CSV output. OpeningBookJob → analytics_opening_stats, PlayerStatsJob → analytics_player_stats, PlayerClusteringJob → analytics_player_clusters + analytics_cluster_archetypes, PlayerGraphJob → analytics_player_graph. MLlib Vector columns are excluded from the JDBC write by reusing the already-selected scalar DataFrame in PlayerClusteringJob. Co-Authored-By: Claude Sonnet 4.6 --- .../nowchess/analytics/OpeningBookJob.scala | 16 +++++++++--- .../analytics/PlayerClusteringJob.scala | 26 ++++++++++++++++--- .../nowchess/analytics/PlayerGraphJob.scala | 10 +++++++ .../nowchess/analytics/PlayerStatsJob.scala | 10 +++++++ 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala index 87b228e..fd6a101 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala @@ -72,13 +72,23 @@ object OpeningBookJob: .mode("overwrite") .parquet(s"$outputDir/opening_book") - stats - .limit(1000) - .write + val top1000 = stats.limit(1000) + + top1000.write .mode("overwrite") .option("header", "true") .csv(s"$outputDir/opening_book_top1000") + top1000.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_opening_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + /** Extracts the first `maxPlies` moves from a PGN column as a space-separated string. * * PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 * diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala index ada64c2..6081f3a 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala @@ -115,9 +115,9 @@ object PlayerClusteringJob: archetypes.show(20, false) - predictions - .select("player_id", "total_games", "win_rate", "avg_move_count", "cluster") - .write + val clustersDf = predictions.select("player_id", "total_games", "win_rate", "avg_move_count", "cluster") + + clustersDf.write .mode("overwrite") .parquet(s"$outputDir/player_clusters") @@ -126,6 +126,26 @@ object PlayerClusteringJob: .option("header", "true") .csv(s"$outputDir/cluster_archetypes") + clustersDf.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_clusters") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + + archetypes.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_cluster_archetypes") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = val asWhite = games.select( F.col("white_id").as("player_id"), diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala index f4b7155..c037823 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala @@ -116,6 +116,16 @@ object PlayerGraphJob: .mode("overwrite") .parquet(s"$outputDir/player_graph") + result.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_graph") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + // How many players belong to each connected component? // A large dominant component + many singletons is the expected shape. val componentSizes = result diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala index e1a2e52..cc9df7a 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala @@ -83,3 +83,13 @@ object PlayerStatsJob: stats.write .mode("overwrite") .parquet(s"$outputDir/player_stats") + + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save()