@@ -37,6 +37,7 @@ class ShardManager(
3737 .succeed(OffsetDateTime .now())
3838 .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
3939 )
40+ _ <- ManagerMetrics .pods.increment
4041 _ <- eventsHub.publish(ShardingEvent .PodRegistered (pod.address))
4142 _ <- ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))
4243 _ <- persistPods.forkDaemon
@@ -45,7 +46,8 @@ class ShardManager(
4546 def notifyUnhealthyPod (podAddress : PodAddress ): UIO [Unit ] =
4647 ZIO
4748 .whenZIO(stateRef.get.map(_.pods.contains(podAddress))) {
48- eventsHub.publish(ShardingEvent .PodHealthChecked (podAddress)) *>
49+ ManagerMetrics .podHealthChecked.tagged(" pod_address" , podAddress.toString).increment *>
50+ eventsHub.publish(ShardingEvent .PodHealthChecked (podAddress)) *>
4951 ZIO .unlessZIO(healthApi.isAlive(podAddress))(
5052 ZIO .logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
5153 )
@@ -73,6 +75,7 @@ class ShardManager(
7375 )
7476 )
7577 }
78+ _ <- ManagerMetrics .pods.decrement
7679 _ <- eventsHub.publish(ShardingEvent .PodUnregistered (podAddress))
7780 _ <- eventsHub
7881 .publish(ShardingEvent .ShardsUnassigned (podAddress, unassignments))
@@ -92,7 +95,8 @@ class ShardManager(
9295 decideAssignmentsForUnassignedShards(state)
9396 else decideAssignmentsForUnbalancedShards(state, config.rebalanceRate)
9497 areChanges = assignments.nonEmpty || unassignments.nonEmpty
95- _ <- ZIO .logDebug(s " Rebalancing (rebalanceImmediately= $rebalanceImmediately) " ).when(areChanges)
98+ _ <- (ZIO .logDebug(s " Rebalancing (rebalanceImmediately= $rebalanceImmediately) " ) *>
99+ ManagerMetrics .rebalances.increment).when(areChanges)
96100 // ping pods first to make sure they are ready and remove those who aren't
97101 failedPingedPods <- ZIO
98102 .foreachPar(assignments.keySet ++ unassignments.keySet)(pod =>
@@ -114,9 +118,11 @@ class ShardManager(
114118 (podApi.unassignShards(pod, shards) *> updateShardsState(shards, None )).foldZIO(
115119 _ => ZIO .succeed((Set (pod), shards)),
116120 _ =>
117- eventsHub
118- .publish(ShardingEvent .ShardsUnassigned (pod, shards))
119- .as((Set .empty, Set .empty))
121+ ManagerMetrics .assignedShards.tagged(" pod_address" , pod.toString).decrementBy(shards.size) *>
122+ ManagerMetrics .unassignedShards.incrementBy(shards.size) *>
123+ eventsHub
124+ .publish(ShardingEvent .ShardsUnassigned (pod, shards))
125+ .as((Set .empty, Set .empty))
120126 )
121127 }
122128 .map(_.unzip)
@@ -131,7 +137,12 @@ class ShardManager(
131137 .foreachPar(filteredAssignments.toList) { case (pod, shards) =>
132138 (podApi.assignShards(pod, shards) *> updateShardsState(shards, Some (pod))).foldZIO(
133139 _ => ZIO .succeed(Set (pod)),
134- _ => eventsHub.publish(ShardingEvent .ShardsAssigned (pod, shards)).as(Set .empty)
140+ _ =>
141+ ManagerMetrics .assignedShards
142+ .tagged(" pod_address" , pod.toString)
143+ .incrementBy(shards.size) *>
144+ ManagerMetrics .unassignedShards.decrementBy(shards.size) *>
145+ eventsHub.publish(ShardingEvent .ShardsAssigned (pod, shards)).as(Set .empty)
135146 )
136147 }
137148 .map(_.flatten.toSet)
@@ -201,6 +212,15 @@ object ShardManager {
201212 filteredPods.map { case (k, v) => k -> PodWithMetadata (v, cdt) },
202213 (1 to config.numberOfShards).map(_ -> None ).toMap ++ filteredAssignments
203214 )
215+ _ <- ManagerMetrics .pods.incrementBy(initialState.pods.size)
216+ _ <- ZIO .foreachDiscard(initialState.shards) { case (_, podAddressOpt) =>
217+ podAddressOpt match {
218+ case Some (podAddress) =>
219+ ManagerMetrics .assignedShards.tagged(" pod_address" , podAddress.toString).increment
220+ case None =>
221+ ManagerMetrics .unassignedShards.increment
222+ }
223+ }
204224 state <- Ref .Synchronized .make(initialState)
205225 rebalanceSemaphore <- Semaphore .make(1 )
206226 eventsHub <- Hub .unbounded[ShardingEvent ]
0 commit comments