This commit is contained in:
+15
-2
@@ -22,7 +22,7 @@ import org.jboss.logging.Logger
|
||||
import scala.compiletime.uninitialized
|
||||
import scala.jdk.CollectionConverters.*
|
||||
import scala.util.Try
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors}
|
||||
import java.util.function.Consumer
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -46,6 +46,10 @@ class GameRedisSubscriberManager:
|
||||
|
||||
private val c2sListeners = new ConcurrentHashMap[String, ReactivePubSubCommands.ReactiveRedisSubscriber]()
|
||||
private val s2cObservers = new ConcurrentHashMap[String, Observer]()
|
||||
// Per-game single-thread executor so c2s messages are handled off the Vert.x
|
||||
// event loop (handleConnected/handleMove make blocking gRPC + Redis calls) while
|
||||
// staying ordered per game.
|
||||
private val c2sExecutors = new ConcurrentHashMap[String, ExecutorService]()
|
||||
|
||||
// scalafix:off DisableSyntax.var
|
||||
private var clockExpireSubscriber: Option[ReactivePubSubCommands.ReactiveRedisSubscriber] = None
|
||||
@@ -95,7 +99,14 @@ class GameRedisSubscriberManager:
|
||||
obs.emitInitialWriteback()
|
||||
heartbeatServiceOpt.foreach(_.addGameSubscription(gameId))
|
||||
|
||||
val handler: Consumer[String] = msg => handleC2sMessage(gameId, msg)
|
||||
val executor = c2sExecutors.computeIfAbsent(gameId, _ => Executors.newSingleThreadExecutor())
|
||||
val handler: Consumer[String] = msg =>
|
||||
val task = new Runnable:
|
||||
def run(): Unit =
|
||||
try handleC2sMessage(gameId, msg)
|
||||
catch case ex: Exception => log.warnf(ex, "Error handling c2s message for game %s", gameId)
|
||||
Try(executor.execute(task))
|
||||
()
|
||||
try
|
||||
val subscriber = reactiveRedis
|
||||
.pubsub(classOf[String])
|
||||
@@ -113,6 +124,7 @@ class GameRedisSubscriberManager:
|
||||
Option(s2cObservers.remove(gameId)).foreach { obs =>
|
||||
registry.get(gameId).foreach(_.engine.unsubscribe(obs))
|
||||
}
|
||||
Option(c2sExecutors.remove(gameId)).foreach(_.shutdownNow())
|
||||
|
||||
heartbeatServiceOpt.foreach(_.removeGameSubscription(gameId))
|
||||
log.debugf("Unsubscribed from game %s", gameId)
|
||||
@@ -187,3 +199,4 @@ class GameRedisSubscriberManager:
|
||||
clockExpireSubscriber.foreach(_.unsubscribe(clockExpireChannel).await().indefinitely())
|
||||
c2sListeners.forEach((gameId, subscriber) => subscriber.unsubscribe(c2sTopic(gameId)).await().indefinitely())
|
||||
s2cObservers.forEach((gameId, obs) => registry.get(gameId).foreach(_.engine.unsubscribe(obs)))
|
||||
c2sExecutors.forEach((_, executor) => executor.shutdownNow())
|
||||
|
||||
Reference in New Issue
Block a user