@@ -30,25 +30,34 @@ class ShardManager(
3030 ZStream .fromHub(eventsHub)
3131
3232 def register (pod : Pod ): UIO [Unit ] =
33- ZIO .logInfo(s " Registering $pod" ) *>
34- (stateRef
35- .updateAndGetZIO(state =>
36- ZIO
37- .succeed(OffsetDateTime .now())
38- .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
39- )
40- .flatMap(state => ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))) *>
41- persistPods.forkDaemon).unit
33+ for {
34+ _ <- ZIO .logInfo(s " Registering $pod" )
35+ state <- stateRef.updateAndGetZIO(state =>
36+ ZIO
37+ .succeed(OffsetDateTime .now())
38+ .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
39+ )
40+ _ <- eventsHub.publish(ShardingEvent .PodRegistered (pod.address))
41+ _ <- ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))
42+ _ <- persistPods.forkDaemon
43+ } yield ()
4244
4345 def notifyUnhealthyPod (podAddress : PodAddress ): UIO [Unit ] =
4446 ZIO
4547 .whenZIO(stateRef.get.map(_.pods.contains(podAddress))) {
46- ZIO .unlessZIO(healthApi.isAlive(podAddress))(
47- ZIO .logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
48- )
48+ eventsHub.publish(ShardingEvent .PodHealthChecked (podAddress)) *>
49+ ZIO .unlessZIO(healthApi.isAlive(podAddress))(
50+ ZIO .logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
51+ )
4952 }
5053 .unit
5154
55+ def checkAllPodsHealth : UIO [Unit ] =
56+ for {
57+ pods <- stateRef.get.map(_.pods.keySet)
58+ _ <- ZIO .foreachParDiscard(pods)(notifyUnhealthyPod).withParallelism(4 )
59+ } yield ()
60+
5261 def unregister (podAddress : PodAddress ): UIO [Unit ] =
5362 ZIO
5463 .whenZIO(stateRef.get.map(_.pods.contains(podAddress))) {
@@ -64,6 +73,7 @@ class ShardManager(
6473 )
6574 )
6675 }
76+ _ <- eventsHub.publish(ShardingEvent .PodUnregistered (podAddress))
6777 _ <- eventsHub
6878 .publish(ShardingEvent .ShardsUnassigned (podAddress, unassignments))
6979 .when(unassignments.nonEmpty)
@@ -242,6 +252,9 @@ object ShardManager {
242252 object ShardingEvent {
243253 case class ShardsAssigned (pod : PodAddress , shards : Set [ShardId ]) extends ShardingEvent
244254 case class ShardsUnassigned (pod : PodAddress , shards : Set [ShardId ]) extends ShardingEvent
255+ case class PodRegistered (pod : PodAddress ) extends ShardingEvent
256+ case class PodUnregistered (pod : PodAddress ) extends ShardingEvent
257+ case class PodHealthChecked (pod : PodAddress ) extends ShardingEvent
245258 }
246259
247260 def decideAssignmentsForUnassignedShards (
0 commit comments