fix(redis): prevent concurrent Redis heartbeat refreshes using AtomicBoolean
Build & Test (NowChessSystems) TeamCity build failed
Build & Test (NowChessSystems) TeamCity build failed
This commit is contained in:
+8
-2
@@ -11,6 +11,7 @@ import io.quarkus.redis.datasource.RedisDataSource
|
|||||||
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
import io.quarkus.redis.datasource.ReactiveRedisDataSource
|
||||||
import scala.compiletime.uninitialized
|
import scala.compiletime.uninitialized
|
||||||
import java.util.concurrent.{Executors, TimeUnit}
|
import java.util.concurrent.{Executors, TimeUnit}
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import org.jboss.logging.Logger
|
import org.jboss.logging.Logger
|
||||||
@@ -55,6 +56,7 @@ class InstanceHeartbeatService:
|
|||||||
private var serviceActive = false
|
private var serviceActive = false
|
||||||
private var shuttingDown = false
|
private var shuttingDown = false
|
||||||
// scalafix:on DisableSyntax.var
|
// scalafix:on DisableSyntax.var
|
||||||
|
private val redisHeartbeatPending = new AtomicBoolean(false)
|
||||||
|
|
||||||
def onStart(@Observes event: StartupEvent): Unit =
|
def onStart(@Observes event: StartupEvent): Unit =
|
||||||
if coordinatorEnabled then
|
if coordinatorEnabled then
|
||||||
@@ -171,6 +173,7 @@ class InstanceHeartbeatService:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private def refreshRedisHeartbeat(): Unit =
|
private def refreshRedisHeartbeat(): Unit =
|
||||||
|
if !redisHeartbeatPending.compareAndSet(false, true) then return
|
||||||
try
|
try
|
||||||
val key = s"$redisPrefix:instances:$instanceId"
|
val key = s"$redisPrefix:instances:$instanceId"
|
||||||
|
|
||||||
@@ -191,11 +194,14 @@ class InstanceHeartbeatService:
|
|||||||
.setex(key, 5L, json)
|
.setex(key, 5L, json)
|
||||||
.subscribe()
|
.subscribe()
|
||||||
.`with`(
|
.`with`(
|
||||||
_ => (),
|
_ => redisHeartbeatPending.set(false),
|
||||||
(ex: Throwable) => log.warnf(ex, "Failed to refresh Redis heartbeat"),
|
(ex: Throwable) =>
|
||||||
|
redisHeartbeatPending.set(false)
|
||||||
|
log.warnf(ex, "Failed to refresh Redis heartbeat"),
|
||||||
)
|
)
|
||||||
catch
|
catch
|
||||||
case ex: Exception =>
|
case ex: Exception =>
|
||||||
|
redisHeartbeatPending.set(false)
|
||||||
log.warnf(ex, "Failed to serialize Redis heartbeat metadata")
|
log.warnf(ex, "Failed to serialize Redis heartbeat metadata")
|
||||||
|
|
||||||
private def getHostname: String =
|
private def getHostname: String =
|
||||||
|
|||||||
Reference in New Issue
Block a user