@@ -43,7 +43,8 @@ class Sharding private (
4343 shardManager.register(address)
4444
4545 val unregister : Task [Unit ] =
46- logger.logDebug(s " Stopping local entities " ) *>
46+ shardManager.getAssignments *> // ping the shard manager first to stop if it's not available
47+ logger.logDebug(s " Stopping local entities " ) *>
4748 isShuttingDownRef.set(true ) *>
4849 entityStates.get.flatMap(states => ZIO .foreach_(states.values)(_.entityManager.terminateAllEntities)) *>
4950 logger.logDebug(s " Unregistering pod $address to Shard Manager " ) *>
@@ -111,23 +112,30 @@ class Sharding private (
111112 def getPods : UIO [Set [PodAddress ]] =
112113 shardAssignments.get.map(_.values.toSet)
113114
115+ private def updateAssignments (
116+ assignmentsOpt : Map [ShardId , Option [PodAddress ]],
117+ fromShardManager : Boolean
118+ ): UIO [Unit ] = {
119+ val assignments = assignmentsOpt.flatMap { case (k, v) => v.map(k -> _) }
120+ logger.logDebug(" Received new shard assignments" ) *>
121+ (if (fromShardManager) shardAssignments.update(map => if (map.isEmpty) assignments else map)
122+ else
123+ shardAssignments.update(map =>
124+ // we keep self assignments (we don't override them with the new assignments
125+ // because only the Shard Manager is able to change assignments of the current node, via assign/unassign
126+ assignments.filter { case (_, pod) => pod != address } ++
127+ map.filter { case (_, pod) => pod == address }
128+ ))
129+ }
130+
114131 private [shardcake] val refreshAssignments : ZManaged [Clock , Nothing , Unit ] = {
115132 val assignmentStream =
116133 ZStream .fromEffect(
117134 shardManager.getAssignments.map(_ -> true ) // first, get the assignments from the shard manager directly
118135 ) ++
119136 storage.assignmentsStream.map(_ -> false ) // then, get assignments changes from Redis
120137 assignmentStream.mapM { case (assignmentsOpt, fromShardManager) =>
121- val assignments = assignmentsOpt.flatMap { case (k, v) => v.map(k -> _) }
122- logger.logDebug(" Received new shard assignments" ) *>
123- (if (fromShardManager) shardAssignments.update(map => if (map.isEmpty) assignments else map)
124- else
125- shardAssignments.update(map =>
126- // we keep self assignments (we don't override them with the new assignments
127- // because only the Shard Manager is able to change assignments of the current node, via assign/unassign
128- assignments.filter { case (_, pod) => pod != address } ++
129- map.filter { case (_, pod) => pod == address }
130- ))
138+ updateAssignments(assignmentsOpt, fromShardManager)
131139 }.runDrain
132140 }.retry(Schedule .fixed(config.refreshAssignmentsRetryInterval))
133141 .interruptible
@@ -237,7 +245,12 @@ class Sharding private (
237245 )
238246 .map(_ isEqual cdt)
239247 )
240- ZIO .whenM(notify)(shardManager.notifyUnhealthyPod(pod).forkDaemon)
248+ ZIO .whenM(notify)(
249+ (shardManager.notifyUnhealthyPod(pod) *>
250+ // just in case we missed the update from the pubsub, refresh assignments
251+ shardManager.getAssignments
252+ .flatMap(updateAssignments(_, fromShardManager = true ))).forkDaemon
253+ )
241254 }
242255 }
243256 .flatMap(ZIO .foreach(_)(serialization.decode[Res ]))
0 commit comments