@@ -33,24 +33,33 @@ class ShardManager(
3333 ZStream .fromHub(eventsHub)
3434
3535 def register (pod : Pod ): UIO [Unit ] =
36- logger.logInfo(s " Registering $pod" ) *>
37- (stateRef
38- .updateAndGet(state =>
39- ZIO
40- .succeed(OffsetDateTime .now())
41- .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
42- )
43- .flatMap(state => ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))) *>
44- persistPods.forkDaemon).unit
36+ for {
37+ _ <- logger.logInfo(s " Registering $pod" )
38+ state <- stateRef.updateAndGet(state =>
39+ ZIO
40+ .succeed(OffsetDateTime .now())
41+ .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
42+ )
43+ _ <- eventsHub.publish(ShardingEvent .PodRegistered (pod.address))
44+ _ <- ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))
45+ _ <- persistPods.forkDaemon
46+ } yield ()
4547
4648 def notifyUnhealthyPod (podAddress : PodAddress ): UIO [Unit ] =
4749 ZIO
4850 .whenM(stateRef.get.map(_.pods.contains(podAddress))) {
49- ZIO .unlessM(healthApi.isAlive(podAddress))(
50- logger.logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
51- )
51+ eventsHub.publish(ShardingEvent .PodHealthChecked (podAddress)) *>
52+ ZIO .unlessM(healthApi.isAlive(podAddress))(
53+ logger.logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
54+ )
5255 }
5356
57+ def checkAllPodsHealth : UIO [Unit ] =
58+ for {
59+ pods <- stateRef.get.map(_.pods.keySet)
60+ _ <- ZIO .foreachParN_(4 )(pods)(notifyUnhealthyPod)
61+ } yield ()
62+
5463 def unregister (podAddress : PodAddress ): UIO [Unit ] =
5564 ZIO
5665 .whenM(stateRef.get.map(_.pods.contains(podAddress))) {
@@ -68,6 +77,7 @@ class ShardManager(
6877 )
6978 )
7079 }
80+ _ <- eventsHub.publish(ShardingEvent .PodUnregistered (podAddress))
7181 _ <- eventsHub
7282 .publish(ShardingEvent .ShardsUnassigned (podAddress, unassignments))
7383 .when(unassignments.nonEmpty)
@@ -259,6 +269,9 @@ object ShardManager {
259269 object ShardingEvent {
260270 case class ShardsAssigned (pod : PodAddress , shards : Set [ShardId ]) extends ShardingEvent
261271 case class ShardsUnassigned (pod : PodAddress , shards : Set [ShardId ]) extends ShardingEvent
272+ case class PodRegistered (pod : PodAddress ) extends ShardingEvent
273+ case class PodUnregistered (pod : PodAddress ) extends ShardingEvent
274+ case class PodHealthChecked (pod : PodAddress ) extends ShardingEvent
262275 }
263276
264277 def decideAssignmentsForUnassignedShards (
0 commit comments