feat(logging): deepen coordinator and core logging (NCS-72)
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
Coordinator: InstanceRegistry logs instance join/update/dead/remove; CoordinatorGrpcServer logs new stream + first heartbeat per instance; CoreGrpcClient logs channel open/evict and RPC success counts. Core: GameResource replaces println with logger; RedisGameRegistry logs store/Redis-load/DB-load and surfaces silent failures; CoordinatorServiceHandler logs inbound gRPC commands; IoGrpcClientWrapper and RuleSetGrpcAdapter wrap gRPC calls with WARN on failure. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -9,7 +9,6 @@ quarkus:
|
||||
server:
|
||||
port: 9086
|
||||
rest-client:
|
||||
connection-timeout: 5000
|
||||
read-timeout: 10000
|
||||
smallrye-openapi:
|
||||
info-title: NowChess Coordinator Service
|
||||
|
||||
+11
@@ -27,13 +27,24 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
|
||||
override def heartbeatStream(
|
||||
responseObserver: StreamObserver[CoordinatorCommand],
|
||||
): StreamObserver[HeartbeatFrame] =
|
||||
log.info("New heartbeat stream connection established")
|
||||
new StreamObserver[HeartbeatFrame]:
|
||||
// scalafix:off DisableSyntax.var
|
||||
private var lastInstanceId = ""
|
||||
private var firstFrameSeen = false
|
||||
// scalafix:on DisableSyntax.var
|
||||
|
||||
override def onNext(frame: HeartbeatFrame): Unit =
|
||||
lastInstanceId = frame.getInstanceId
|
||||
if !firstFrameSeen then
|
||||
firstFrameSeen = true
|
||||
log.infof(
|
||||
"First heartbeat from instance %s (host=%s http=%d grpc=%d)",
|
||||
frame.getInstanceId,
|
||||
frame.getHostname,
|
||||
frame.getHttpPort,
|
||||
frame.getGrpcPort,
|
||||
)
|
||||
instanceRegistry
|
||||
.updateInstanceFromRedis(frame.getInstanceId)
|
||||
.subscribe()
|
||||
|
||||
+19
-5
@@ -16,10 +16,18 @@ class CoreGrpcClient:
|
||||
private val channels = ConcurrentHashMap[String, ManagedChannel]()
|
||||
|
||||
private def getChannel(host: String, port: Int): ManagedChannel =
|
||||
channels.computeIfAbsent(s"$host:$port", _ => ManagedChannelBuilder.forAddress(host, port).usePlaintext().build())
|
||||
channels.computeIfAbsent(
|
||||
s"$host:$port",
|
||||
_ =>
|
||||
log.infof("Opening gRPC channel to %s:%d", host, port)
|
||||
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(),
|
||||
)
|
||||
|
||||
private def evictStaleChannel(host: String, port: Int): Unit =
|
||||
Option(channels.remove(s"$host:$port")).foreach(_.shutdownNow())
|
||||
Option(channels.remove(s"$host:$port")).foreach { ch =>
|
||||
log.infof("Evicting stale gRPC channel to %s:%d", host, port)
|
||||
ch.shutdownNow()
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
def shutdown(): Unit =
|
||||
@@ -33,7 +41,9 @@ class CoreGrpcClient:
|
||||
try
|
||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
||||
val request = BatchResubscribeRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||
stub.batchResubscribeGames(request).getSubscribedCount
|
||||
val count = stub.batchResubscribeGames(request).getSubscribedCount
|
||||
log.debugf("batchResubscribeGames %s:%d — subscribed %d games", host, port, count)
|
||||
count
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "batchResubscribeGames RPC failed for %s:%d", host, port)
|
||||
@@ -44,7 +54,9 @@ class CoreGrpcClient:
|
||||
try
|
||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
||||
val request = UnsubscribeGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||
stub.unsubscribeGames(request).getUnsubscribedCount
|
||||
val count = stub.unsubscribeGames(request).getUnsubscribedCount
|
||||
log.debugf("unsubscribeGames %s:%d — unsubscribed %d games", host, port, count)
|
||||
count
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "unsubscribeGames RPC failed for %s:%d", host, port)
|
||||
@@ -55,7 +67,9 @@ class CoreGrpcClient:
|
||||
try
|
||||
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
|
||||
val request = EvictGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
|
||||
stub.evictGames(request).getEvictedCount
|
||||
val count = stub.evictGames(request).getEvictedCount
|
||||
log.debugf("evictGames %s:%d — evicted %d games", host, port, count)
|
||||
count
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "evictGames RPC failed for %s:%d", host, port)
|
||||
|
||||
+16
-5
@@ -41,19 +41,30 @@ class InstanceRegistry:
|
||||
.transformToUni { value =>
|
||||
try
|
||||
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
|
||||
val isNew = !instances.containsKey(instanceId)
|
||||
instances.put(instanceId, metadata)
|
||||
if isNew then
|
||||
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
|
||||
else
|
||||
log.debugf(
|
||||
"Instance %s updated (subscriptions=%d state=%s)",
|
||||
instanceId,
|
||||
metadata.subscriptionCount,
|
||||
metadata.state,
|
||||
)
|
||||
Uni.createFrom().item(())
|
||||
catch case ex: Exception =>
|
||||
log.warnf(ex, "Failed to parse instance metadata for %s", instanceId)
|
||||
Uni.createFrom().item(())
|
||||
catch
|
||||
case ex: Exception =>
|
||||
log.warnf(ex, "Failed to parse instance metadata for %s", instanceId)
|
||||
Uni.createFrom().item(())
|
||||
}
|
||||
.onFailure()
|
||||
.recoverWithItem(())
|
||||
|
||||
def markInstanceDead(instanceId: String): Unit =
|
||||
instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD"))
|
||||
()
|
||||
log.infof("Instance %s marked dead", instanceId)
|
||||
|
||||
def removeInstance(instanceId: String): Unit =
|
||||
instances.remove(instanceId)
|
||||
()
|
||||
log.infof("Instance %s removed from registry", instanceId)
|
||||
|
||||
Reference in New Issue
Block a user