@@ -37,7 +37,8 @@ class Sharding private (
3737 shardManager.register(address)
3838
3939 val unregister : Task [Unit ] =
40- ZIO .logDebug(s " Stopping local entities " ) *>
40+ shardManager.getAssignments *> // ping the shard manager first to stop if it's not available
41+ ZIO .logDebug(s " Stopping local entities " ) *>
4142 isShuttingDownRef.set(true ) *>
4243 entityStates.get.flatMap(states => ZIO .foreachDiscard(states.values)(_.entityManager.terminateAllEntities)) *>
4344 ZIO .logDebug(s " Unregistering pod $address to Shard Manager " ) *>
@@ -108,23 +109,30 @@ class Sharding private (
108109 def getPods : UIO [Set [PodAddress ]] =
109110 shardAssignments.get.map(_.values.toSet)
110111
112+ private def updateAssignments (
113+ assignmentsOpt : Map [ShardId , Option [PodAddress ]],
114+ fromShardManager : Boolean
115+ ): UIO [Unit ] = {
116+ val assignments = assignmentsOpt.flatMap { case (k, v) => v.map(k -> _) }
117+ ZIO .logDebug(" Received new shard assignments" ) *>
118+ (if (fromShardManager) shardAssignments.update(map => if (map.isEmpty) assignments else map)
119+ else
120+ shardAssignments.update(map =>
121+ // we keep self assignments (we don't override them with the new assignments
122+ // because only the Shard Manager is able to change assignments of the current node, via assign/unassign
123+ assignments.filter { case (_, pod) => pod != address } ++
124+ map.filter { case (_, pod) => pod == address }
125+ ))
126+ }
127+
111128 private [shardcake] val refreshAssignments : ZIO [Scope , Nothing , Unit ] = {
112129 val assignmentStream =
113130 ZStream .fromZIO(
114131 shardManager.getAssignments.map(_ -> true ) // first, get the assignments from the shard manager directly
115132 ) ++
116133 storage.assignmentsStream.map(_ -> false ) // then, get assignments changes from Redis
117134 assignmentStream.mapZIO { case (assignmentsOpt, fromShardManager) =>
118- val assignments = assignmentsOpt.flatMap { case (k, v) => v.map(k -> _) }
119- ZIO .logDebug(" Received new shard assignments" ) *>
120- (if (fromShardManager) shardAssignments.update(map => if (map.isEmpty) assignments else map)
121- else
122- shardAssignments.update(map =>
123- // we keep self assignments (we don't override them with the new assignments
124- // because only the Shard Manager is able to change assignments of the current node, via assign/unassign
125- assignments.filter { case (_, pod) => pod != address } ++
126- map.filter { case (_, pod) => pod == address }
127- ))
135+ updateAssignments(assignmentsOpt, fromShardManager)
128136 }.runDrain
129137 }.retry(Schedule .fixed(config.refreshAssignmentsRetryInterval))
130138 .interruptible
@@ -230,7 +238,12 @@ class Sharding private (
230238 )
231239 .map(_ isEqual cdt)
232240 )
233- ZIO .whenZIO(notify)(shardManager.notifyUnhealthyPod(pod).forkDaemon)
241+ ZIO .whenZIO(notify)(
242+ (shardManager.notifyUnhealthyPod(pod) *>
243+ // just in case we missed the update from the pubsub, refresh assignments
244+ shardManager.getAssignments
245+ .flatMap(updateAssignments(_, fromShardManager = true ))).forkDaemon
246+ )
234247 }
235248 }
236249 .flatMap(ZIO .foreach(_)(serialization.decode[Res ]))
0 commit comments