feat(analytics): add PostgreSQL JDBC write-back to all four batch jobs
Each batch job now writes its results to a Postgres table in addition to the existing Parquet/CSV output. OpeningBookJob → analytics_opening_stats, PlayerStatsJob → analytics_player_stats, PlayerClusteringJob → analytics_player_clusters + analytics_cluster_archetypes, PlayerGraphJob → analytics_player_graph. MLlib Vector columns are excluded from the JDBC write by reusing the already-selected scalar DataFrame in PlayerClusteringJob. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -72,13 +72,23 @@ object OpeningBookJob:
|
|||||||
.mode("overwrite")
|
.mode("overwrite")
|
||||||
.parquet(s"$outputDir/opening_book")
|
.parquet(s"$outputDir/opening_book")
|
||||||
|
|
||||||
stats
|
val top1000 = stats.limit(1000)
|
||||||
.limit(1000)
|
|
||||||
.write
|
top1000.write
|
||||||
.mode("overwrite")
|
.mode("overwrite")
|
||||||
.option("header", "true")
|
.option("header", "true")
|
||||||
.csv(s"$outputDir/opening_book_top1000")
|
.csv(s"$outputDir/opening_book_top1000")
|
||||||
|
|
||||||
|
top1000.write
|
||||||
|
.mode("overwrite")
|
||||||
|
.format("jdbc")
|
||||||
|
.option("url", jdbcUrl)
|
||||||
|
.option("dbtable", "analytics_opening_stats")
|
||||||
|
.option("user", dbUser)
|
||||||
|
.option("password", dbPass)
|
||||||
|
.option("driver", "org.postgresql.Driver")
|
||||||
|
.save()
|
||||||
|
|
||||||
/** Extracts the first `maxPlies` moves from a PGN column as a space-separated string.
|
/** Extracts the first `maxPlies` moves from a PGN column as a space-separated string.
|
||||||
*
|
*
|
||||||
* PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 *
|
* PGN format produced by PgnExporter: [Event "?"]\n[White "?"]\n...\n\n1. e4 e5 2. Nf3 Nc6 *
|
||||||
|
|||||||
@@ -115,9 +115,9 @@ object PlayerClusteringJob:
|
|||||||
|
|
||||||
archetypes.show(20, false)
|
archetypes.show(20, false)
|
||||||
|
|
||||||
predictions
|
val clustersDf = predictions.select("player_id", "total_games", "win_rate", "avg_move_count", "cluster")
|
||||||
.select("player_id", "total_games", "win_rate", "avg_move_count", "cluster")
|
|
||||||
.write
|
clustersDf.write
|
||||||
.mode("overwrite")
|
.mode("overwrite")
|
||||||
.parquet(s"$outputDir/player_clusters")
|
.parquet(s"$outputDir/player_clusters")
|
||||||
|
|
||||||
@@ -126,6 +126,26 @@ object PlayerClusteringJob:
|
|||||||
.option("header", "true")
|
.option("header", "true")
|
||||||
.csv(s"$outputDir/cluster_archetypes")
|
.csv(s"$outputDir/cluster_archetypes")
|
||||||
|
|
||||||
|
clustersDf.write
|
||||||
|
.mode("overwrite")
|
||||||
|
.format("jdbc")
|
||||||
|
.option("url", jdbcUrl)
|
||||||
|
.option("dbtable", "analytics_player_clusters")
|
||||||
|
.option("user", dbUser)
|
||||||
|
.option("password", dbPass)
|
||||||
|
.option("driver", "org.postgresql.Driver")
|
||||||
|
.save()
|
||||||
|
|
||||||
|
archetypes.write
|
||||||
|
.mode("overwrite")
|
||||||
|
.format("jdbc")
|
||||||
|
.option("url", jdbcUrl)
|
||||||
|
.option("dbtable", "analytics_cluster_archetypes")
|
||||||
|
.option("user", dbUser)
|
||||||
|
.option("password", dbPass)
|
||||||
|
.option("driver", "org.postgresql.Driver")
|
||||||
|
.save()
|
||||||
|
|
||||||
private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame =
|
private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame =
|
||||||
val asWhite = games.select(
|
val asWhite = games.select(
|
||||||
F.col("white_id").as("player_id"),
|
F.col("white_id").as("player_id"),
|
||||||
|
|||||||
@@ -116,6 +116,16 @@ object PlayerGraphJob:
|
|||||||
.mode("overwrite")
|
.mode("overwrite")
|
||||||
.parquet(s"$outputDir/player_graph")
|
.parquet(s"$outputDir/player_graph")
|
||||||
|
|
||||||
|
result.write
|
||||||
|
.mode("overwrite")
|
||||||
|
.format("jdbc")
|
||||||
|
.option("url", jdbcUrl)
|
||||||
|
.option("dbtable", "analytics_player_graph")
|
||||||
|
.option("user", dbUser)
|
||||||
|
.option("password", dbPass)
|
||||||
|
.option("driver", "org.postgresql.Driver")
|
||||||
|
.save()
|
||||||
|
|
||||||
// How many players belong to each connected component?
|
// How many players belong to each connected component?
|
||||||
// A large dominant component + many singletons is the expected shape.
|
// A large dominant component + many singletons is the expected shape.
|
||||||
val componentSizes = result
|
val componentSizes = result
|
||||||
|
|||||||
@@ -83,3 +83,13 @@ object PlayerStatsJob:
|
|||||||
stats.write
|
stats.write
|
||||||
.mode("overwrite")
|
.mode("overwrite")
|
||||||
.parquet(s"$outputDir/player_stats")
|
.parquet(s"$outputDir/player_stats")
|
||||||
|
|
||||||
|
stats.write
|
||||||
|
.mode("overwrite")
|
||||||
|
.format("jdbc")
|
||||||
|
.option("url", jdbcUrl)
|
||||||
|
.option("dbtable", "analytics_player_stats")
|
||||||
|
.option("user", dbUser)
|
||||||
|
.option("password", dbPass)
|
||||||
|
.option("driver", "org.postgresql.Driver")
|
||||||
|
.save()
|
||||||
|
|||||||
Reference in New Issue
Block a user