feat(tournament): federate tournaments across clusters with DB replication
Build & Test (NowChessSystems) TeamCity build failed

- Replicate newly created tournaments to all registered remote servers,
  persisting them with originServerUrl so the remote can proxy mutations back
- Route all mutation endpoints (join/start/terminate/withdraw) through
  originServerUrl when set, instead of trying local state first
- Fix tournament event stream to proxy remote tournaments (was 404 before)
- Official bot now routes all calls through TOURNAMENT_SERVICE_URL (local
  tournament service) instead of calling remote cluster directly
- Bot parks on local account service + all registered remote servers on startup
- Add TOURNAMENT_SELF_URL env var so each cluster knows its own public URL

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Janis Eccarius
2026-06-21 21:23:49 +02:00
parent 97015cb95e
commit 5b000a6e5f
11 changed files with 289 additions and 93 deletions
@@ -26,6 +26,8 @@ nowchess:
prefix: nowchess
internal:
secret: 123abc
tournament:
service-url: http://localhost:8086
"%deployed":
quarkus:
@@ -49,3 +51,5 @@ nowchess:
prefix: ${REDIS_PREFIX:nowchess}
internal:
secret: ${INTERNAL_SECRET}
tournament:
service-url: ${TOURNAMENT_SERVICE_URL:http://localhost:8086}
@@ -25,15 +25,13 @@ class TournamentJoinResource:
@POST
@Path("/join-tournament")
def joinTournament(req: JoinTournamentRequest): Response =
val serverUrl = req.serverUrl.filter(_.nonEmpty).getOrElse(player.defaultServerUrl)
val difficulty = if req.difficulty.nonEmpty then req.difficulty else "medium"
log.infof(
"Official bot join requested — tournament=%s difficulty=%s server=%s",
"Official bot join requested — tournament=%s difficulty=%s",
req.tournamentId,
difficulty,
serverUrl,
)
player.joinTournament(req.tournamentId, req.botToken, difficulty, serverUrl) match
player.joinTournament(req.tournamentId, req.botToken, difficulty) match
case Right(botId) =>
val resp = JoinTournamentResponse(botId, difficulty, "joining")
Response.ok(resp).build()
@@ -20,7 +20,7 @@ object TournamentBotConfig:
tournamentId <- env.get("TOURNAMENT_ID").filter(_.nonEmpty)
token <- env.get("TOURNAMENT_BOT_TOKEN").filter(_.nonEmpty)
botId <- jwtSubject(token)
serverUrl = env.getOrElse("TOURNAMENT_SERVER_URL", "http://141.37.123.132:8086")
serverUrl = env.getOrElse("TOURNAMENT_SERVICE_URL", "http://localhost:8086")
difficulty = env.getOrElse("TOURNAMENT_BOT_DIFFICULTY", "medium")
yield TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty)
@@ -38,8 +38,8 @@ class TournamentBotGamePlayer:
@volatile private var running = true
// scalafix:on DisableSyntax.var
val defaultServerUrl: String =
System.getenv().asScala.getOrElse("TOURNAMENT_SERVER_URL", "http://141.37.123.132:8086")
val tournamentServiceUrl: String =
System.getenv().asScala.getOrElse("TOURNAMENT_SERVICE_URL", "http://localhost:8086")
@PostConstruct
def initialize(): Unit =
@@ -52,31 +52,41 @@ class TournamentBotGamePlayer:
startAsync(cfg)
private def parkOnStartup(): Unit =
park(defaultServerUrl, "expert") match
case Some(id) => log.infof("Parked expert bot on %s as id %s", defaultServerUrl, id)
case None => log.warnf("Failed to park expert bot on %s", defaultServerUrl)
private def park(serverUrl: String, difficulty: String): Option[String] =
System.getenv().asScala.get("TOURNAMENT_BOT_TOKEN").filter(_.nonEmpty).flatMap { token =>
Try {
val body = s"""{"name":"${botName(difficulty)}"}"""
val response = client
.target(serverUrl)
.path("api")
.path("bots")
.request(MediaType.APPLICATION_JSON)
.header("Authorization", s"Bearer $token")
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
if response.getStatus == 201 || response.getStatus == 200 then
val id = objectMapper.readTree(response.readEntity(classOf[String])).path("id").asText()
response.close()
Option(id).filter(_.nonEmpty)
else {
log.warnf("Parking bot %s returned status %d", botName(difficulty), response.getStatus); response.close();
None
val token = System.getenv().asScala.get("TOURNAMENT_BOT_TOKEN").filter(_.nonEmpty)
token match
case None => log.warn("TOURNAMENT_BOT_TOKEN not set — skipping park")
case Some(tok) =>
val localAccountUrl = System.getenv().asScala.getOrElse("ACCOUNT_SERVICE_URL", "http://localhost:8083")
BotController.listBots.foreach(diff => parkOn(localAccountUrl, diff, tok))
fetchRemoteServers().foreach { serverUrl =>
BotController.listBots.foreach(diff => parkOn(serverUrl, diff, tok))
}
}.getOrElse(None)
}
private def fetchRemoteServers(): List[String] =
Try {
val response = client.target(tournamentServiceUrl)
.path("api").path("tournament").path("servers")
.request(MediaType.APPLICATION_JSON).get()
if response.getStatus == 200 then
val node = objectMapper.readTree(response.readEntity(classOf[String]))
response.close()
node.path("servers").elements().asScala.toList.map(_.path("url").asText()).filter(_.nonEmpty)
else { response.close(); Nil }
}.getOrElse(Nil)
private def parkOn(serverUrl: String, difficulty: String, token: String): Unit =
Try {
val body = s"""{"name":"${botName(difficulty)}"}"""
val response = client.target(serverUrl).path("api").path("bots")
.request(MediaType.APPLICATION_JSON)
.header("Authorization", s"Bearer $token")
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
if response.getStatus == 201 || response.getStatus == 200 then
val id = objectMapper.readTree(response.readEntity(classOf[String])).path("id").asText()
log.infof("Parked bot %s on %s as id %s", botName(difficulty), serverUrl, id)
else log.warnf("Park %s on %s returned status %d", botName(difficulty), serverUrl, response.getStatus)
response.close()
}.failed.foreach(ex => log.warnf(ex, "Failed to park %s on %s", botName(difficulty), serverUrl))
private def botName(difficulty: String): String = s"NowChess ${difficulty.capitalize}"
@@ -84,7 +94,6 @@ class TournamentBotGamePlayer:
tournamentId: String,
botToken: Option[String],
difficulty: String,
serverUrl: String,
): Either[String, String] =
val resolvedToken = botToken.filter(_.nonEmpty)
.orElse(System.getenv().asScala.get("TOURNAMENT_BOT_TOKEN").filter(_.nonEmpty))
@@ -94,7 +103,7 @@ class TournamentBotGamePlayer:
TournamentBotConfig.jwtSubject(token) match
case None => Left("Invalid bot token — could not extract subject")
case Some(botId) =>
val cfg = TournamentBotConfig(serverUrl, tournamentId, token, botId, difficulty)
val cfg = TournamentBotConfig(tournamentServiceUrl, tournamentId, token, botId, difficulty)
if join(cfg) then
startAsync(cfg)
Right(botId)
@@ -27,6 +27,8 @@ nowchess:
prefix: ${REDIS_PREFIX:nowchess}
internal:
secret: ${INTERNAL_SECRET:123abc}
tournament:
self-url: ""
mp:
jwt:
@@ -46,6 +48,9 @@ mp:
hibernate-orm:
schema-management:
strategy: update
nowchess:
tournament:
self-url: ${TOURNAMENT_SELF_URL:}
"%test":
quarkus:
@@ -26,6 +26,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
classOf[RoundPairingsDto],
classOf[ErrorDto],
classOf[OkDto],
classOf[ReplicateTournamentRequest],
classOf[CorePlayerInfo],
classOf[CoreTimeControl],
classOf[CoreCreateGameRequest],
@@ -30,4 +30,7 @@ class Tournament:
var startsAt: Instant = uninitialized
var winnerId: String = uninitialized
var winnerName: String = uninitialized
@Column(nullable = true)
var originServerUrl: String = null
// scalafix:on
@@ -1,5 +1,7 @@
package de.nowchess.tournament.dto
import java.time.Instant
case class BotRef(id: String, name: String)
case class Clock(limit: Int, increment: Int)
@@ -72,3 +74,15 @@ case class RoundPairingsDto(round: Int, pairings: List[PairingDto])
case class ErrorDto(error: String)
case class OkDto(ok: Boolean = true)
case class ReplicateTournamentRequest(
id: String,
fullName: String,
nbRounds: Int,
clockLimit: Int,
clockIncrement: Int,
rated: Boolean,
createdBy: String,
startsAt: Instant,
status: String,
)
@@ -9,12 +9,12 @@ import de.nowchess.tournament.service.{
TournamentService,
TournamentStreamManager,
}
import io.smallrye.mutiny.Multi
import jakarta.annotation.security.{PermitAll, RolesAllowed}
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.{Context, HttpHeaders, MediaType, Response, StreamingOutput}
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.eclipse.microprofile.jwt.JsonWebToken
import org.jboss.logging.Logger
import scala.compiletime.uninitialized
@@ -36,6 +36,9 @@ class TournamentResource:
@Inject var externalClient: ExternalTournamentClient = uninitialized
@Inject var objectMapper: ObjectMapper = uninitialized
@Context var headers: HttpHeaders = uninitialized
@ConfigProperty(name = "nowchess.tournament.self-url", defaultValue = "")
var selfUrl: String = uninitialized
// scalafix:on
@GET
@@ -85,6 +88,11 @@ class TournamentResource:
val userId = Option(jwt.getSubject).getOrElse("")
val form = CreateTournamentForm(name, nbRounds, clockLimit, clockIncrement, rated)
val t = tournamentService.create(userId, form)
if selfUrl.nonEmpty then
registry.serverUrls().foreach { remoteUrl =>
if !externalClient.replicateTournament(remoteUrl, toReplicateRequest(t), selfUrl) then
log.warnf("Failed to replicate tournament %s to %s", t.id, remoteUrl)
}
Response.status(Response.Status.CREATED).entity(tournamentService.toDto(t)).build()
@GET
@@ -100,33 +108,59 @@ class TournamentResource:
.flatMap(url => externalClient.fetch(url, id).map(node => Response.ok(node).build()))
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
@POST
@Path("/replicate")
@PermitAll
def replicate(req: ReplicateTournamentRequest): Response =
val originUrl = Option(headers.getHeaderString("X-Origin-Url")).getOrElse("")
if originUrl.isEmpty then
Response.status(Response.Status.BAD_REQUEST).entity(ErrorDto("Missing X-Origin-Url header")).build()
else
tournamentService.get(req.id) match
case Some(_) => Response.status(Response.Status.CONFLICT).entity(ErrorDto("Tournament already exists")).build()
case None =>
tournamentService.replicate(req, originUrl)
Response.status(Response.Status.CREATED).build()
@DELETE
@Path("/{id}")
@RolesAllowed(Array("**"))
def terminate(@PathParam("id") id: String): Response =
val userId = Option(jwt.getSubject).getOrElse("")
tournamentService.terminate(id, userId) match
case Right(_) => Response.noContent().build()
case Left(error) => errorResponse(error)
tournamentService.get(id).flatMap(t => Option(t.originServerUrl)) match
case Some(originUrl) =>
val auth = Option(headers.getHeaderString("Authorization"))
val (status, body) = externalClient.proxyPost(originUrl, s"api/tournament/$id", auth)
Response.status(status).entity(body).build()
case None =>
tournamentService.terminate(id, userId) match
case Right(_) => Response.noContent().build()
case Left(error) => errorResponse(error)
@POST
@Path("/{id}/start")
@RolesAllowed(Array("**"))
def start(@PathParam("id") id: String): Response =
val userId = Option(jwt.getSubject).getOrElse("")
tournamentService.start(id, userId) match
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/start", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
tournamentService.get(id).flatMap(t => Option(t.originServerUrl)) match
case Some(originUrl) =>
val auth = Option(headers.getHeaderString("Authorization"))
val (status, body) = externalClient.proxyPost(originUrl, s"api/tournament/$id/start", auth)
Response.status(status).entity(body).build()
case None =>
tournamentService.start(id, userId) match
case Right(t) => Response.ok(tournamentService.toDto(t)).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/start", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@POST
@Path("/{id}/join")
@@ -136,21 +170,27 @@ class TournamentResource:
if tokenType != "bot" then
Response.status(Response.Status.FORBIDDEN).entity(ErrorDto("Only bots can join tournaments")).build()
else
val botId = Option(jwt.getSubject).getOrElse("")
val botName = Option(jwt.getClaim[AnyRef]("name")).map(_.toString).getOrElse(botId)
tournamentService.join(id, botId, botName) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/join", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
tournamentService.get(id).flatMap(t => Option(t.originServerUrl)) match
case Some(originUrl) =>
val auth = Option(headers.getHeaderString("Authorization"))
val (status, body) = externalClient.proxyPost(originUrl, s"api/tournament/$id/join", auth)
Response.status(status).entity(body).build()
case None =>
val botId = Option(jwt.getSubject).getOrElse("")
val botName = Option(jwt.getClaim[AnyRef]("name")).map(_.toString).getOrElse(botId)
tournamentService.join(id, botId, botName) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/join", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@POST
@Path("/{id}/withdraw")
@@ -160,20 +200,26 @@ class TournamentResource:
if tokenType != "bot" then
Response.status(Response.Status.FORBIDDEN).entity(ErrorDto("Only bots can withdraw")).build()
else
val botId = Option(jwt.getSubject).getOrElse("")
tournamentService.withdraw(id, botId) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/withdraw", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
tournamentService.get(id).flatMap(t => Option(t.originServerUrl)) match
case Some(originUrl) =>
val auth = Option(headers.getHeaderString("Authorization"))
val (status, body) = externalClient.proxyPost(originUrl, s"api/tournament/$id/withdraw", auth)
Response.status(status).entity(body).build()
case None =>
val botId = Option(jwt.getSubject).getOrElse("")
tournamentService.withdraw(id, botId) match
case Right(_) => Response.ok(OkDto()).build()
case Left(error) =>
error match
case TournamentError.NotFound(_) =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.map { url =>
val (status, body) = externalClient.proxyPost(url, s"api/tournament/$id/withdraw", auth)
Response.status(status).entity(body).build()
}
.getOrElse(errorResponse(error))
case _ => errorResponse(error)
@GET
@Path("/{id}/results")
@@ -236,15 +282,81 @@ class TournamentResource:
@Path("/{id}/stream")
@RolesAllowed(Array("**"))
@Produces(Array("application/x-ndjson"))
def stream(@PathParam("id") id: String): Multi[String] =
def stream(@PathParam("id") id: String): Response =
tournamentService.get(id) match
case None => Multi.createFrom().failure(new NotFoundException(s"Tournament $id not found"))
case Some(t) if Option(t.originServerUrl).isDefined =>
val auth = Option(headers.getHeaderString("Authorization"))
externalClient.proxyGetStream(t.originServerUrl, s"api/tournament/$id/stream", auth)
.map { inputStream =>
Response
.ok(new StreamingOutput {
def write(output: java.io.OutputStream): Unit =
val buf = new Array[Byte](4096)
// scalafix:off DisableSyntax.var
var n = inputStream.read(buf)
while n >= 0 do
output.write(buf, 0, n)
output.flush()
n = inputStream.read(buf)
// scalafix:on
})
.`type`("application/x-ndjson")
.build()
}
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id stream unavailable")).build())
case Some(_) =>
val botId = Option(jwt.getSubject).getOrElse("")
Multi.createFrom().emitter[String] { emitter =>
streamManager.register(id, botId, emitter)
emitter.onTermination(() => streamManager.unregister(id, botId, emitter))
val queue = new java.util.concurrent.LinkedBlockingQueue[Option[String]]()
val emitter = new io.smallrye.mutiny.subscription.MultiEmitter[String] {
def emit(item: String): io.smallrye.mutiny.subscription.MultiEmitter[String] =
queue.put(Some(item)); this
def fail(failure: Throwable): Unit = queue.put(None)
def complete(): Unit = queue.put(None)
def requested(): Long = Long.MaxValue
def isCancelled: Boolean = false
def onTermination(
onTermination: java.lang.Runnable,
): io.smallrye.mutiny.subscription.MultiEmitter[String] = this
}
streamManager.register(id, botId, emitter)
Response
.ok(new StreamingOutput {
def write(output: java.io.OutputStream): Unit =
try
// scalafix:off DisableSyntax.var
var cont = true
while cont do
queue.take() match
case None => cont = false
case Some(line) =>
output.write((line + "\n").getBytes("UTF-8"))
output.flush()
// scalafix:on
finally streamManager.unregister(id, botId, emitter)
})
.`type`("application/x-ndjson")
.build()
case None =>
val auth = Option(headers.getHeaderString("Authorization"))
resolveServer(id)
.flatMap(url => externalClient.proxyGetStream(url, s"api/tournament/$id/stream", auth))
.map { inputStream =>
Response
.ok(new StreamingOutput {
def write(output: java.io.OutputStream): Unit =
val buf = new Array[Byte](4096)
// scalafix:off DisableSyntax.var
var n = inputStream.read(buf)
while n >= 0 do
output.write(buf, 0, n)
output.flush()
n = inputStream.read(buf)
// scalafix:on
})
.`type`("application/x-ndjson")
.build()
}
.getOrElse(Response.status(Response.Status.NOT_FOUND).entity(ErrorDto(s"Tournament $id not found")).build())
@GET
@Path("/{id}/game/{gameId}")
@@ -297,15 +409,31 @@ class TournamentResource:
.getOrElse(Response.status(Response.Status.NOT_FOUND).build())
private def resolveServer(tournamentId: String): Option[String] =
registry.findServerUrl(tournamentId).orElse {
registry
.serverUrls()
.find(url => externalClient.fetch(url, tournamentId).isDefined)
.map { url =>
registry.bindTournament(tournamentId, url)
url
}
}
tournamentService.get(tournamentId)
.flatMap(t => Option(t.originServerUrl))
.orElse(registry.findServerUrl(tournamentId))
.orElse {
registry
.serverUrls()
.find(url => externalClient.fetch(url, tournamentId).isDefined)
.map { url =>
registry.bindTournament(tournamentId, url)
url
}
}
private def toReplicateRequest(t: de.nowchess.tournament.domain.Tournament): ReplicateTournamentRequest =
ReplicateTournamentRequest(
id = t.id,
fullName = t.fullName,
nbRounds = t.nbRounds,
clockLimit = t.clockLimit,
clockIncrement = t.clockIncrement,
rated = t.rated,
createdBy = t.createdBy,
startsAt = Option(t.startsAt).getOrElse(java.time.Instant.now()),
status = t.status,
)
private def errorResponse(error: TournamentError): Response =
val status = error match
@@ -1,6 +1,7 @@
package de.nowchess.tournament.service
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import de.nowchess.tournament.dto.ReplicateTournamentRequest
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.ws.rs.client.{Client, ClientBuilder, Entity}
@@ -66,6 +67,21 @@ class ExternalTournamentClient:
client.close()
}.getOrElse((502, """{"error":"External server unreachable"}"""))
def replicateTournament(serverUrl: String, req: ReplicateTournamentRequest, selfUrl: String): Boolean =
Try {
val client = buildClient()
val body = objectMapper.writeValueAsString(req)
val response = client
.target(s"$serverUrl/api/tournament/replicate")
.request(MediaType.APPLICATION_JSON)
.header("X-Origin-Url", selfUrl)
.post(Entity.entity(body, MediaType.APPLICATION_JSON))
try response.getStatus / 100 == 2
finally
response.close()
client.close()
}.getOrElse(false)
def proxyGetStream(serverUrl: String, path: String, authHeader: Option[String]): Option[java.io.InputStream] =
Try {
val client = buildClient()
@@ -9,6 +9,7 @@ import de.nowchess.tournament.dto.{
Clock,
CreateTournamentForm,
PairingDto,
ReplicateTournamentRequest,
ResultDto,
Standing,
TournamentDto,
@@ -61,6 +62,23 @@ class TournamentService:
tournamentRepository.persist(t)
t
@Transactional
def replicate(req: ReplicateTournamentRequest, originServerUrl: String): Tournament =
val t = new Tournament()
t.id = req.id
t.fullName = req.fullName
t.nbRounds = req.nbRounds
t.clockLimit = req.clockLimit
t.clockIncrement = req.clockIncrement
t.rated = req.rated
t.status = req.status
t.currentRound = 0
t.createdBy = req.createdBy
t.startsAt = req.startsAt
t.originServerUrl = originServerUrl
tournamentRepository.persist(t)
t
def get(id: String): Option[Tournament] =
tournamentRepository.findOptById(id)