feat(logging): add DEBUG/INFO/WARN logging across services (NCS-72) (#41)
Build & Test (NowChessSystems) TeamCity build finished

Reviewed-on: #41
Co-authored-by: Janis <janis.e.20@gmx.de>
Co-committed-by: Janis <janis.e.20@gmx.de>
This commit was merged in pull request #41.
This commit is contained in:
2026-05-02 17:33:27 +02:00
committed by Janis
parent 3c47d2b8c9
commit 804a4bf179
17 changed files with 321 additions and 107 deletions
@@ -9,7 +9,6 @@ quarkus:
server:
port: 9086
rest-client:
connection-timeout: 5000
read-timeout: 10000
smallrye-openapi:
info-title: NowChess Coordinator Service
@@ -27,13 +27,24 @@ class CoordinatorGrpcServer extends CoordinatorServiceGrpc.CoordinatorServiceImp
override def heartbeatStream(
responseObserver: StreamObserver[CoordinatorCommand],
): StreamObserver[HeartbeatFrame] =
log.info("New heartbeat stream connection established")
new StreamObserver[HeartbeatFrame]:
// scalafix:off DisableSyntax.var
private var lastInstanceId = ""
private var firstFrameSeen = false
// scalafix:on DisableSyntax.var
override def onNext(frame: HeartbeatFrame): Unit =
lastInstanceId = frame.getInstanceId
if !firstFrameSeen then
firstFrameSeen = true
log.infof(
"First heartbeat from instance %s (host=%s http=%d grpc=%d)",
frame.getInstanceId,
frame.getHostname,
frame.getHttpPort,
frame.getGrpcPort,
)
instanceRegistry
.updateInstanceFromRedis(frame.getInstanceId)
.subscribe()
@@ -16,10 +16,18 @@ class CoreGrpcClient:
private val channels = ConcurrentHashMap[String, ManagedChannel]()
private def getChannel(host: String, port: Int): ManagedChannel =
channels.computeIfAbsent(s"$host:$port", _ => ManagedChannelBuilder.forAddress(host, port).usePlaintext().build())
channels.computeIfAbsent(
s"$host:$port",
_ =>
log.infof("Opening gRPC channel to %s:%d", host, port)
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(),
)
private def evictStaleChannel(host: String, port: Int): Unit =
Option(channels.remove(s"$host:$port")).foreach(_.shutdownNow())
Option(channels.remove(s"$host:$port")).foreach { ch =>
log.infof("Evicting stale gRPC channel to %s:%d", host, port)
ch.shutdownNow()
}
@PreDestroy
def shutdown(): Unit =
@@ -33,7 +41,9 @@ class CoreGrpcClient:
try
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
val request = BatchResubscribeRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
stub.batchResubscribeGames(request).getSubscribedCount
val count = stub.batchResubscribeGames(request).getSubscribedCount
log.debugf("batchResubscribeGames %s:%d — subscribed %d games", host, port, count)
count
catch
case ex: Exception =>
log.warnf(ex, "batchResubscribeGames RPC failed for %s:%d", host, port)
@@ -44,7 +54,9 @@ class CoreGrpcClient:
try
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
val request = UnsubscribeGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
stub.unsubscribeGames(request).getUnsubscribedCount
val count = stub.unsubscribeGames(request).getUnsubscribedCount
log.debugf("unsubscribeGames %s:%d — unsubscribed %d games", host, port, count)
count
catch
case ex: Exception =>
log.warnf(ex, "unsubscribeGames RPC failed for %s:%d", host, port)
@@ -55,7 +67,9 @@ class CoreGrpcClient:
try
val stub = CoordinatorServiceGrpc.newBlockingStub(getChannel(host, port))
val request = EvictGamesRequest.newBuilder().addAllGameIds(gameIds.asJava).build()
stub.evictGames(request).getEvictedCount
val count = stub.evictGames(request).getEvictedCount
log.debugf("evictGames %s:%d — evicted %d games", host, port, count)
count
catch
case ex: Exception =>
log.warnf(ex, "evictGames RPC failed for %s:%d", host, port)
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import de.nowchess.coordinator.dto.InstanceMetadata
import java.util.concurrent.ConcurrentHashMap
import io.smallrye.mutiny.Uni
import org.jboss.logging.Logger
@ApplicationScoped
class InstanceRegistry:
@@ -18,6 +19,7 @@ class InstanceRegistry:
private var redisPrefix = "nowchess"
// scalafix:on DisableSyntax.var
private val log = Logger.getLogger(classOf[InstanceRegistry])
private val mapper = ObjectMapper()
private val instances = ConcurrentHashMap[String, InstanceMetadata]()
@@ -39,17 +41,30 @@ class InstanceRegistry:
.transformToUni { value =>
try
val metadata = mapper.readValue(value, classOf[InstanceMetadata])
val isNew = !instances.containsKey(instanceId)
instances.put(instanceId, metadata)
if isNew then
log.infof("Instance %s joined registry (subscriptions=%d)", instanceId, metadata.subscriptionCount)
else
log.debugf(
"Instance %s updated (subscriptions=%d state=%s)",
instanceId,
metadata.subscriptionCount,
metadata.state,
)
Uni.createFrom().item(())
catch case _: Exception => Uni.createFrom().item(())
catch
case ex: Exception =>
log.warnf(ex, "Failed to parse instance metadata for %s", instanceId)
Uni.createFrom().item(())
}
.onFailure()
.recoverWithItem(())
def markInstanceDead(instanceId: String): Unit =
instances.computeIfPresent(instanceId, (_, inst) => inst.copy(state = "DEAD"))
()
log.infof("Instance %s marked dead", instanceId)
def removeInstance(instanceId: String): Unit =
instances.remove(instanceId)
()
log.infof("Instance %s removed from registry", instanceId)