From da0e6d1ee2d391ecb6291396f82471eb51b1b25e Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Sun, 21 Jun 2026 15:35:34 +0200 Subject: [PATCH] feat(analytics): always write results to PostgreSQL regardless of input source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove isPgnMode JDBC guard from all 4 original jobs so staging (Lichess PGN mode) and production (game_records JDBC mode) both persist analytics results to the DB. Add JDBC write-back to all 7 new jobs: - GameLengthJob → analytics_game_length_distribution + analytics_game_length_by_result - ColorAdvantageJob → analytics_color_advantage - EloDistributionJob → analytics_elo_distribution - TimeControlJob → analytics_time_control_stats - DailyActivityJob → analytics_hourly_activity + analytics_weekly_activity - RatingMismatchJob → analytics_rating_mismatch - TerminationStatsJob → analytics_termination_stats Add analytics_component_sizes JDBC write to PlayerGraphJob. Co-Authored-By: Claude Sonnet 4.6 --- .../analytics/ColorAdvantageJob.scala | 10 +++++ .../nowchess/analytics/DailyActivityJob.scala | 20 ++++++++++ .../analytics/EloDistributionJob.scala | 10 +++++ .../de/nowchess/analytics/GameLengthJob.scala | 20 ++++++++++ .../nowchess/analytics/OpeningBookJob.scala | 19 +++++----- .../analytics/PlayerClusteringJob.scala | 37 +++++++++---------- .../nowchess/analytics/PlayerGraphJob.scala | 29 ++++++++++----- .../nowchess/analytics/PlayerStatsJob.scala | 19 +++++----- .../analytics/RatingMismatchJob.scala | 10 +++++ .../analytics/TerminationStatsJob.scala | 10 +++++ .../nowchess/analytics/TimeControlJob.scala | 10 +++++ 11 files changed, 145 insertions(+), 49 deletions(-) diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala index 4c26044..a3e0979 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/ColorAdvantageJob.scala @@ -60,3 +60,13 @@ object ColorAdvantageJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/color_advantage") + + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_color_advantage") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala index f10094c..6136a34 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/DailyActivityJob.scala @@ -49,6 +49,16 @@ object DailyActivityJob: .option("header", "true") .csv(s"$outputDir/hourly_activity") + hourly.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_hourly_activity") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + val dayName = F .when(F.col("dow") === 1, "Sunday") .when(F.col("dow") === 2, "Monday") @@ -77,3 +87,13 @@ object DailyActivityJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/weekly_activity") + + weekly.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_weekly_activity") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala index fb6ba48..c3dc315 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/EloDistributionJob.scala @@ -46,3 +46,13 @@ object EloDistributionJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/elo_distribution") + + distribution.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_elo_distribution") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala index 0600754..868f644 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/GameLengthJob.scala @@ -76,6 +76,16 @@ object GameLengthJob: .option("header", "true") .csv(s"$outputDir/game_length_distribution") + distribution.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_game_length_distribution") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + val byResult = games .groupBy("result") .agg( @@ -89,3 +99,13 @@ object GameLengthJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/game_length_by_result") + + byResult.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_game_length_by_result") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala index 77ec579..1b1352a 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/OpeningBookJob.scala @@ -72,16 +72,15 @@ object OpeningBookJob: .option("header", "true") .csv(s"$outputDir/opening_book_top1000") - if !GameSource.isPgnMode then - top1000.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_opening_stats") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + top1000.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_opening_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() /** Extracts the first `maxPlies` moves from a PGN column as a space-separated string. * diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala index 0e38e75..3588f61 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala @@ -119,26 +119,25 @@ object PlayerClusteringJob: .option("header", "true") .csv(s"$outputDir/cluster_archetypes") - if !GameSource.isPgnMode then - clustersDf.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_player_clusters") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + clustersDf.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_clusters") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() - archetypes.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_cluster_archetypes") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + archetypes.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_cluster_archetypes") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = val asWhite = games.select( diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala index 3b7baa5..acc9bdf 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala @@ -109,16 +109,15 @@ object PlayerGraphJob: .mode("overwrite") .parquet(s"$outputDir/player_graph") - if !GameSource.isPgnMode then - result.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_player_graph") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + result.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_graph") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() // How many players belong to each connected component? // A large dominant component + many singletons is the expected shape. @@ -135,6 +134,16 @@ object PlayerGraphJob: .option("header", "true") .csv(s"$outputDir/component_sizes") + componentSizes.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_component_sizes") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() + // Build a two-column DataFrame (vertex_id: Long, valueCol: valueType) from an RDD. // Used to bridge GraphX RDD results into the DataFrame API without implicits. private def rddToFrame[T]( diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala index c250e65..736b824 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerStatsJob.scala @@ -82,13 +82,12 @@ object PlayerStatsJob: .option("header", "true") .csv(s"$outputDir/player_stats_csv") - if !GameSource.isPgnMode then - stats.write - .mode("overwrite") - .format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "analytics_player_stats") - .option("user", dbUser) - .option("password", dbPass) - .option("driver", "org.postgresql.Driver") - .save() + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_player_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala index b5c68e7..477d42f 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/RatingMismatchJob.scala @@ -63,3 +63,13 @@ object RatingMismatchJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/rating_mismatch") + + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_rating_mismatch") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala index e280b6c..367d5d8 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/TerminationStatsJob.scala @@ -42,3 +42,13 @@ object TerminationStatsJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/termination_stats") + + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_termination_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala index df3b3da..a9a272a 100644 --- a/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/TimeControlJob.scala @@ -56,3 +56,13 @@ object TimeControlJob: .mode("overwrite") .option("header", "true") .csv(s"$outputDir/time_control_stats") + + stats.write + .mode("overwrite") + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "analytics_time_control_stats") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .save()