fix(middleware): update paths for bot generation and stockfish configuration
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
refactor(bru): standardize authentication settings across requests chore: add coordinator base URL to configuration files
This commit is contained in:
+11
-9
@@ -34,16 +34,18 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
||||
|
||||
override def onNext(frame: HeartbeatFrame): Unit =
|
||||
lastInstanceId = frame.getInstanceId
|
||||
try
|
||||
instanceRegistry.updateInstanceFromRedis(frame.getInstanceId)
|
||||
log.debugf(
|
||||
"Received heartbeat from %s with %d subscriptions",
|
||||
frame.getInstanceId,
|
||||
frame.getSubscriptionCount,
|
||||
instanceRegistry
|
||||
.updateInstanceFromRedis(frame.getInstanceId)
|
||||
.subscribe()
|
||||
.`with`(
|
||||
_ =>
|
||||
log.debugf(
|
||||
"Received heartbeat from %s with %d subscriptions",
|
||||
frame.getInstanceId,
|
||||
frame.getSubscriptionCount,
|
||||
),
|
||||
ex => log.warnf(ex, "Failed to process heartbeat from %s", frame.getInstanceId),
|
||||
)
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to process heartbeat from %s", frame.getInstanceId)
|
||||
|
||||
override def onError(t: Throwable): Unit =
|
||||
log.warnf(t, "Heartbeat stream error for instance %s", lastInstanceId)
|
||||
|
||||
+15
-10
@@ -2,19 +2,20 @@ package de.nowchess.coordinator.service
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped
|
||||
import jakarta.inject.Inject
|
||||
import io.quarkus.redis.datasource.RedisDataSource
|
||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.compiletime.uninitialized
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.nowchess.coordinator.dto.InstanceMetadata
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import io.smallrye.mutiny.Uni
|
||||
|
||||
@ApplicationScoped
|
||||
class InstanceRegistry:
|
||||
// scalafix:off DisableSyntax.var
|
||||
@Inject
|
||||
private var redis: RedisDataSource = uninitialized
|
||||
private var redisPrefix = "nowchess"
|
||||
private var redis: ReactiveRedisDataSource = uninitialized
|
||||
private var redisPrefix = "nowchess"
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
private val mapper = ObjectMapper()
|
||||
@@ -29,14 +30,18 @@ class InstanceRegistry:
|
||||
def getAllInstances: List[InstanceMetadata] =
|
||||
instances.values.asScala.toList
|
||||
|
||||
def updateInstanceFromRedis(instanceId: String): Unit =
|
||||
def updateInstanceFromRedis(instanceId: String): Uni[Unit] =
|
||||
val key = s"$redisPrefix:instances:$instanceId"
|
||||
Option(redis.value(classOf[String]).get(key)).foreach { value =>
|
||||
try
|
||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
||||
instances.put(instanceId, metadata)
|
||||
catch case _: Exception => ()
|
||||
}
|
||||
redis.value(classOf[String])
|
||||
.get(key)
|
||||
.onItem().transformToUni { value =>
|
||||
try
|
||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
||||
instances.put(instanceId, metadata)
|
||||
Uni.createFrom().item(())
|
||||
catch case _: Exception => Uni.createFrom().item(())
|
||||
}
|
||||
.onFailure().recoverWithItem(())
|
||||
|
||||
def markInstanceDead(instanceId: String): Unit =
|
||||
instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD"))
|
||||
|
||||
Reference in New Issue
Block a user