fix: remove corrupted instances immediately and evict dead instances
Problem: Dead instances pile up indefinitely. Failed metadata parsing leaves stale data in registry. No cleanup mechanism exists. Changes: 1. Remove instance from registry on parse failure (corrupted metadata = unrecoverable) 2. Evict instances with state="DEAD" on next health check (was only evicting by heartbeat age) This prevents: - Memory leak from accumulating dead/corrupted instances - Stale data persisting after parse failures - Dead instances blocking resources indefinitely Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
+13
-5
@@ -92,7 +92,9 @@ class InstanceRegistry:
|
|||||||
Uni.createFrom().item(())
|
Uni.createFrom().item(())
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
log.warnf(ex, "Failed to parse instance metadata for %s", instanceId)
|
log.warnf(ex, "Failed to parse instance metadata for %s — removing from registry", instanceId)
|
||||||
|
instances.remove(instanceId)
|
||||||
|
meterRegistry.counter("nowchess.coordinator.instances.removed").increment()
|
||||||
Uni.createFrom().item(())
|
Uni.createFrom().item(())
|
||||||
}
|
}
|
||||||
.onFailure()
|
.onFailure()
|
||||||
@@ -112,15 +114,21 @@ class InstanceRegistry:
|
|||||||
val stale = instances.asScala
|
val stale = instances.asScala
|
||||||
.collect { case (id, inst) =>
|
.collect { case (id, inst) =>
|
||||||
try
|
try
|
||||||
if Instant.parse(inst.lastHeartbeat).isBefore(cutoff) then Some(id)
|
val isHeartbeatStale = Instant.parse(inst.lastHeartbeat).isBefore(cutoff)
|
||||||
else None
|
val isDead = inst.state == "DEAD"
|
||||||
|
if isHeartbeatStale || isDead then Some(id) else None
|
||||||
catch case _: Exception => None
|
catch case _: Exception => None
|
||||||
}
|
}
|
||||||
.flatten
|
.flatten
|
||||||
.toList
|
.toList
|
||||||
stale.foreach { id =>
|
stale.foreach { id =>
|
||||||
instances.remove(id)
|
val inst = Option(instances.remove(id))
|
||||||
meterRegistry.counter("nowchess.coordinator.instances.evicted").increment()
|
meterRegistry.counter("nowchess.coordinator.instances.evicted").increment()
|
||||||
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
inst.foreach { i =>
|
||||||
|
if i.state == "DEAD" then
|
||||||
|
log.warnf("Evicted dead instance %s", id)
|
||||||
|
else
|
||||||
|
log.warnf("Evicted stale instance %s (heartbeat older than %s)", id, maxAge)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
stale
|
stale
|
||||||
|
|||||||
Reference in New Issue
Block a user