From e1d80b9331666feea191b1fd08aa762f3581c918 Mon Sep 17 00:00:00 2001 From: Janis Eccarius Date: Mon, 15 Jun 2026 22:15:24 +0200 Subject: [PATCH] feat(analytics): add Structured Streaming, MLlib clustering, GraphX jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new Spark jobs demonstrating complementary Spark pillars: LiveDashboardJob (Structured Streaming): - Simulates NowChess game-over event stream via rate source - Watermarking (45 s late-data tolerance) - Tumbling 1-min windows → append-mode Parquet output - Sliding 5-min/1-min windows → update-mode console output - Checkpointing for exactly-once fault tolerance - Production wiring comments show Kafka / spark-redis swap-in PlayerClusteringJob (MLlib): - Derives 4 player features from game_records via JDBC - VectorAssembler + StandardScaler + KMeans inside a Pipeline - ClusteringEvaluator (silhouette score) to measure quality - Per-cluster archetype averages show what each tier represents PlayerGraphJob (GraphX): - Builds directed player graph (vertices=players, edges=games) - PageRank — identifies most influential/active players - ConnectedComponents — finds isolated player communities - Bridges GraphX RDD results back to DataFrames via explicit schema (avoids spark.implicits._ which breaks Scala 3 → Spark 2.13 interop) Co-Authored-By: Claude Sonnet 4.6 --- modules/analytics/build.gradle.kts | 6 + .../nowchess/analytics/LiveDashboardJob.scala | 138 ++++++++++++++++ .../analytics/PlayerClusteringJob.scala | 154 ++++++++++++++++++ .../nowchess/analytics/PlayerGraphJob.scala | 151 +++++++++++++++++ 4 files changed, 449 insertions(+) create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/LiveDashboardJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala create mode 100644 modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala diff --git a/modules/analytics/build.gradle.kts b/modules/analytics/build.gradle.kts index 89e037c..88219a7 100644 --- a/modules/analytics/build.gradle.kts +++ b/modules/analytics/build.gradle.kts @@ -62,6 +62,12 @@ dependencies { compileOnly("org.apache.spark:spark-core_2.13:$sparkVersion") { exclude(group = "org.slf4j", module = "slf4j-log4j12") } + compileOnly("org.apache.spark:spark-mllib_2.13:$sparkVersion") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } + compileOnly("org.apache.spark:spark-graphx_2.13:$sparkVersion") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } // PostgreSQL JDBC driver bundled so it is available on executor classpath. implementation("org.postgresql:postgresql:42.7.4") diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/LiveDashboardJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/LiveDashboardJob.scala new file mode 100644 index 0000000..66260e6 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/LiveDashboardJob.scala @@ -0,0 +1,138 @@ +package de.nowchess.analytics + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F +import org.apache.spark.sql.streaming.Trigger + +/** Demonstrates Spark Structured Streaming on NowChess game-over events. + * + * Spark concepts shown: + * - Continuous micro-batch processing (`readStream`) + * - Watermarking for late-data tolerance (events up to 45 s late are accepted) + * - Tumbling window aggregations — fixed 1-minute buckets, zero overlap + * - Sliding window aggregations — 5-minute window, 1-minute slide + * - Append vs Update output modes and when each is valid + * - Exactly-once fault tolerance via checkpointing + * - Multiple concurrent streaming queries on the same session + * + * Production wiring: NowChess already publishes game-over events to a Redis Stream (`nowchess:game-over`, see + * GameRedisPublisher). Swap the `rate` source below for one of the production sources shown in the comment block. + */ +object LiveDashboardJob: + + def main(args: Array[String]): Unit = + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-live-dashboard" + + val spark = SparkSession + .builder() + .appName("NowChess Live Dashboard") + .getOrCreate() + + run(spark, outputDir) + + def run(spark: SparkSession, outputDir: String): Unit = + + // ── Production sources (replace rate source with one of these) ───────── + // + // Kafka (via a Redis → Kafka bridge): + // spark.readStream + // .format("kafka") + // .option("kafka.bootstrap.servers", sys.env("KAFKA_BROKERS")) + // .option("subscribe", "nowchess.game-over") + // .load() + // .select(F.from_json(F.col("value").cast("string"), gameOverSchema).as("e")) + // .select("e.*") + // + // spark-redis (com.redislabs:spark-redis:3.1.0): + // spark.readStream + // .format("redis") + // .option("stream.keys", "nowchess:game-over") + // .schema(gameOverSchema) + // .load() + // ───────────────────────────────────────────────────────────────────── + + // Simulated stream: 10 game-over events / second. + // `rate` source emits (timestamp: Timestamp, value: Long) — Spark built-in, no deps. + val rawStream = spark.readStream + .format("rate") + .option("rowsPerSecond", "10") + .load() + + // Derive game-outcome columns from the monotonic counter. + // In production these come directly from the event payload. + val events = rawStream + .withColumn( + "result", + F.when(F.col("value") % 3 === 0L, "white") + .when(F.col("value") % 3 === 1L, "black") + .otherwise("draw"), + ) + .withColumn( + "termination", + F.when(F.col("value") % 4 === 0L, "checkmate") + .when(F.col("value") % 4 === 1L, "resignation") + .when(F.col("value") % 4 === 2L, "timeout") + .otherwise("agreement"), + ) + // Watermark: accept events up to 45 seconds late. + // Spark will not emit a window result until the watermark passes its end time. + .withWatermark("timestamp", "45 seconds") + + // ── Query 1: tumbling 1-minute windows ──────────────────────────────── + // Each window is a non-overlapping 60-second bucket. + // outputMode("append") only emits a window after the watermark seals it — + // guarantees that late arrivals were already counted before output. + val gamesByWindow = events + .groupBy(F.window(F.col("timestamp"), "1 minute"), F.col("result")) + .agg(F.count("*").as("games")) + .select( + F.col("window.start").as("window_start"), + F.col("window.end").as("window_end"), + F.col("result"), + F.col("games"), + ) + + // ── Query 2: sliding 5-minute / 1-minute windows ────────────────────── + // Each window covers 5 minutes of data, and a new window opens every minute. + // outputMode("update") emits a row whenever an existing window changes — + // gives a live rolling view of termination patterns. + val terminationTrend = events + .groupBy(F.window(F.col("timestamp"), "5 minutes", "1 minute")) + .agg( + F.count("*").as("total"), + F.sum(F.when(F.col("termination") === "checkmate", 1).otherwise(0)).as("checkmates"), + F.sum(F.when(F.col("termination") === "resignation", 1).otherwise(0)).as("resignations"), + F.sum(F.when(F.col("termination") === "timeout", 1).otherwise(0)).as("timeouts"), + ) + .withColumn( + "checkmate_pct", + F.round(F.col("checkmates") / F.col("total").cast("double") * 100, 1), + ) + .select( + F.col("window.start").as("window_start"), + F.col("total"), + F.col("checkmate_pct"), + F.col("resignations"), + F.col("timeouts"), + ) + + // Write sealed windows to Parquet — safe to query with any SQL engine. + gamesByWindow.writeStream + .outputMode("append") + .format("parquet") + .option("path", s"$outputDir/game_counts_by_window") + .option("checkpointLocation", s"$outputDir/_checkpoints/game_counts") + .trigger(Trigger.ProcessingTime("30 seconds")) + .start() + + // Print live rolling stats to the console every 10 seconds. + terminationTrend.writeStream + .outputMode("update") + .format("console") + .option("truncate", "false") + .option("numRows", "10") + .trigger(Trigger.ProcessingTime("10 seconds")) + .start() + + // Block until any query fails or the process is killed. + spark.streams.awaitAnyTermination() diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala new file mode 100644 index 0000000..ada64c2 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerClusteringJob.scala @@ -0,0 +1,154 @@ +package de.nowchess.analytics + +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.clustering.KMeans +import org.apache.spark.ml.evaluation.ClusteringEvaluator +import org.apache.spark.ml.feature.StandardScaler +import org.apache.spark.ml.feature.VectorAssembler +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F + +/** Clusters NowChess players into skill tiers using K-Means via MLlib. + * + * Spark / MLlib concepts shown: + * - Feature engineering from raw relational data (JDBC → DataFrame) + * - VectorAssembler — combine scalar columns into a dense feature vector + * - StandardScaler — zero-mean / unit-variance normalisation so that total_games (can be 1000+) does not dominate + * win_rate (0–1) + * - KMeans clustering — unsupervised partitioning into k skill tiers + * - Pipeline — compose transformers + estimator into a single reusable object + * - ClusteringEvaluator — silhouette score to assess cluster quality + * + * Features per player (all derived from game_records): total_games — how active the player is win_rate — overall + * strength avg_move_count — game-length preference (tactical vs positional) games_as_white_ratio — colour bias + * + * Output: Parquet: player_id + cluster (0..k-1) + feature values CSV: per-cluster archetype averages (interpret what + * each tier means) + */ +object PlayerClusteringJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-clusters" + val k = if args.length > 1 then args(1).toInt else 4 + + val spark = SparkSession + .builder() + .appName("NowChess Player Clustering") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir, k) + spark.stop() + + def run( + spark: SparkSession, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + k: Int, + ): Unit = + val games = spark.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "game_records") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .option("fetchsize", "10000") + .load() + .select("white_id", "black_id", "result", "move_count") + .filter(F.col("result").isNotNull) + + val playerStats = buildPlayerStats(games) + .filter(F.col("total_games") >= 5) + + val featureCols = Array("total_games", "win_rate", "avg_move_count", "games_as_white_ratio") + + val assembler = new VectorAssembler() + .setInputCols(featureCols) + .setOutputCol("raw_features") + .setHandleInvalid("skip") + + val scaler = new StandardScaler() + .setInputCol("raw_features") + .setOutputCol("features") + .setWithStd(true) + .setWithMean(true) + + val kmeans = new KMeans() + .setK(k) + .setSeed(42L) + .setFeaturesCol("features") + .setPredictionCol("cluster") + + val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans)) + + val model = pipeline.fit(playerStats) + val predictions = model.transform(playerStats) + + val silhouette = new ClusteringEvaluator() + .setFeaturesCol("features") + .setPredictionCol("cluster") + .evaluate(predictions) + + println(s"[Clustering] k=$k silhouette=$silhouette") + + // Average feature values per cluster reveal what each tier represents. + // Example interpretation for k=4: + // Cluster 0: high total_games + high win_rate → experienced strong players + // Cluster 1: low total_games + low win_rate → beginners / casual + // Cluster 2: high total_games + mid win_rate → active intermediate + // Cluster 3: low total_games + high win_rate → strong but infrequent + val archetypes = predictions + .groupBy("cluster") + .agg( + F.count("*").as("player_count"), + F.round(F.avg("total_games"), 1).as("avg_total_games"), + F.round(F.avg("win_rate"), 3).as("avg_win_rate"), + F.round(F.avg("avg_move_count"), 1).as("avg_move_count"), + F.round(F.avg("games_as_white_ratio"), 3).as("avg_white_ratio"), + ) + .orderBy("cluster") + + archetypes.show(20, false) + + predictions + .select("player_id", "total_games", "win_rate", "avg_move_count", "cluster") + .write + .mode("overwrite") + .parquet(s"$outputDir/player_clusters") + + archetypes.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/cluster_archetypes") + + private def buildPlayerStats(games: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = + val asWhite = games.select( + F.col("white_id").as("player_id"), + F.col("result"), + F.col("move_count"), + F.lit(1).as("is_white"), + ) + val asBlack = games.select( + F.col("black_id").as("player_id"), + F.col("result"), + F.col("move_count"), + F.lit(0).as("is_white"), + ) + + val won = (F.col("is_white") === 1 && F.col("result") === "white") + .or(F.col("is_white") === 0 && F.col("result") === "black") + + asWhite + .union(asBlack) + .groupBy("player_id") + .agg( + F.count("*").as("total_games"), + F.round(F.sum(F.when(won, 1.0).otherwise(0.0)) / F.count("*"), 3).as("win_rate"), + F.round(F.avg(F.col("move_count")), 1).as("avg_move_count"), + F.round(F.avg(F.col("is_white").cast("double")), 3).as("games_as_white_ratio"), + ) diff --git a/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala new file mode 100644 index 0000000..f4b7155 --- /dev/null +++ b/modules/analytics/src/main/scala/de/nowchess/analytics/PlayerGraphJob.scala @@ -0,0 +1,151 @@ +package de.nowchess.analytics + +import org.apache.spark.graphx.Edge +import org.apache.spark.graphx.Graph +import org.apache.spark.graphx.VertexId +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions as F +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType + +/** Models the NowChess player network as a directed graph and runs GraphX analytics. + * + * Spark / GraphX concepts shown: + * - Building a Graph from RDDs derived from a JDBC DataFrame + * - PageRank — measures a player's "influence"; high score = many games against other high-ranked players (analogous + * to web link authority) + * - Connected Components — finds isolated player communities; players who have never played anyone from another + * component cannot be linked + * - Converting GraphX results back to DataFrames for SQL-style joins and output + * + * Graph model: Vertices: one per unique player (vertex ID = hashCode of player UUID string) Edges: one per completed + * game (white → black), attributed with result + * + * Note: hashCode gives a 32-bit → 64-bit vertex ID; collision probability is negligible for typical player counts. For + * millions of players, replace with MLlib StringIndexer to generate collision-free Long IDs. + */ +object PlayerGraphJob: + + def main(args: Array[String]): Unit = + val jdbcUrl = sys.env.getOrElse("NOWCHESS_JDBC_URL", "jdbc:postgresql://localhost:5432/nowchess") + val dbUser = sys.env.getOrElse("NOWCHESS_DB_USER", "nowchess") + val dbPass = sys.env.getOrElse("NOWCHESS_DB_PASS", "nowchess") + val outputDir = if args.length > 0 then args(0) else "/tmp/nowchess-player-graph" + + val spark = SparkSession + .builder() + .appName("NowChess Player Graph Analytics") + .getOrCreate() + + run(spark, jdbcUrl, dbUser, dbPass, outputDir) + spark.stop() + + def run( + spark: SparkSession, + jdbcUrl: String, + dbUser: String, + dbPass: String, + outputDir: String, + ): Unit = + val gamesRdd: RDD[Row] = spark.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "game_records") + .option("user", dbUser) + .option("password", dbPass) + .option("driver", "org.postgresql.Driver") + .option("fetchsize", "10000") + .load() + .select("white_id", "black_id", "result") + .filter(F.col("result").isNotNull) + .rdd + + val toVid: String => VertexId = s => s.hashCode.toLong + + // Each row contributes two vertex entries (white and black player). + val vertices: RDD[(VertexId, String)] = gamesRdd + .flatMap { row => + Seq( + (toVid(row.getString(0)), row.getString(0)), + (toVid(row.getString(1)), row.getString(1)), + ) + } + .distinct() + + // Directed edge white → black, labelled with the game result. + val edges: RDD[Edge[String]] = gamesRdd.map { row => + Edge(toVid(row.getString(0)), toVid(row.getString(1)), row.getString(2)) + } + + val graph: Graph[String, String] = Graph(vertices, edges) + + println(s"[Graph] vertices=${graph.numVertices} edges=${graph.numEdges}") + + // ── PageRank ──────────────────────────────────────────────────────────── + // Convergence tolerance 0.01 — lower = more iterations = more accurate. + // Returns Graph[Double, Double]; vertex attribute = PageRank score. + val pageRanks: RDD[(VertexId, Double)] = graph.pageRank(0.01).vertices + + // ── Connected Components ──────────────────────────────────────────────── + // Returns Graph[VertexId, ED]; vertex attribute = minimum vertex ID in + // the component (serves as a stable component label). + val components: RDD[(VertexId, VertexId)] = graph.connectedComponents().vertices + + // Convert each RDD result to a DataFrame so we can join with SQL semantics. + val vertexDf = rddToFrame(spark, vertices, "player_id", StringType) + val pageRankDf = rddToFrame(spark, pageRanks, "page_rank", DoubleType) + val componentDf = rddToFrame(spark, components, "component_id", LongType) + + val result = vertexDf + .join(pageRankDf, "vertex_id") + .join(componentDf, "vertex_id") + .drop("vertex_id") + .withColumn("page_rank", F.round(F.col("page_rank"), 4)) + .orderBy(F.desc("page_rank")) + + println("[Graph] Top 20 players by PageRank:") + result.show(20, false) + + result.write + .mode("overwrite") + .parquet(s"$outputDir/player_graph") + + // How many players belong to each connected component? + // A large dominant component + many singletons is the expected shape. + val componentSizes = result + .groupBy("component_id") + .agg(F.count("*").as("player_count")) + .orderBy(F.desc("player_count")) + + println("[Graph] Connected component sizes:") + componentSizes.show(10, false) + + componentSizes.write + .mode("overwrite") + .option("header", "true") + .csv(s"$outputDir/component_sizes") + + // Build a two-column DataFrame (vertex_id: Long, valueCol: valueType) from an RDD. + // Used to bridge GraphX RDD results into the DataFrame API without implicits. + private def rddToFrame[T]( + spark: SparkSession, + rdd: RDD[(VertexId, T)], + valueCol: String, + valueType: DataType, + ): org.apache.spark.sql.DataFrame = + val schema = StructType( + List( + StructField("vertex_id", LongType, nullable = false), + StructField(valueCol, valueType, nullable = false), + ), + ) + spark.createDataFrame( + rdd.map { case (vid, v) => Row.fromSeq(Seq[Any](vid, v)) }, + schema, + )