@@ -57,7 +57,7 @@ class Sharding private (
5757 shardManager.unregister(address).catchAllCause(ZIO .logErrorCause(" Error during unregister" , _))
5858 )
5959
60- private def isSingletonNode : UIO [Boolean ] =
60+ val isSingletonNode : UIO [Boolean ] =
6161 // Start singletons on the pod hosting shard 1.
6262 shardAssignments.get.map(_.get(1 ).contains(address))
6363
@@ -127,7 +127,15 @@ class Sharding private (
127127 pod = shards.get(shardId)
128128 } yield pod.contains(address)
129129
130- def getPods : UIO [Set [PodAddress ]] =
130+ val getAssignments : UIO [Map [ShardId , PodAddress ]] =
131+ shardAssignments.get
132+
133+ val thisPodAssignments : UIO [Chunk [ShardId ]] =
134+ getAssignments.map(a =>
135+ Chunk .fromIterable(a.view.collect { case (shardId, addr) if addr == this .address => shardId })
136+ )
137+
138+ val getPods : UIO [Set [PodAddress ]] =
131139 shardAssignments.get.map(_.values.toSet)
132140
133141 private def updateAssignments (
@@ -622,6 +630,12 @@ object Sharding {
622630 def registerScoped : RIO [Sharding with Scope , Unit ] =
623631 Sharding .register.withFinalizer(_ => Sharding .unregister)
624632
633+ /**
634+ * Returns true if current node contains the singletons
635+ */
636+ def isSingletonNode : RIO [Sharding , Boolean ] =
637+ ZIO .serviceWithZIO[Sharding ](_.isSingletonNode)
638+
625639 /**
626640 * Start a computation that is guaranteed to run only on a single pod.
627641 * Each pod should call `registerSingleton` but only a single pod will actually run it at any given time.
@@ -676,6 +690,20 @@ object Sharding {
676690 ): URIO [Sharding , Broadcaster [Msg ]] =
677691 ZIO .serviceWith[Sharding ](_.broadcaster(topicType, sendTimeout))
678692
693+ /**
694+ * Get the list of shards and the pod that holds them.
695+ *
696+ * Note: ShardId may not show up if the shard is not assigned to any pod.
697+ */
698+ def getAssignments : RIO [Sharding , Map [ShardId , PodAddress ]] =
699+ ZIO .serviceWithZIO[Sharding ](_.getAssignments)
700+
701+ /**
702+ * Get the list of shards currently assigned to the current pod
703+ */
704+ def thisPodAssignments : RIO [Sharding , Chunk [ShardId ]] =
705+ ZIO .serviceWithZIO[Sharding ](_.thisPodAssignments)
706+
679707 /**
680708 * Get the list of pods currently registered to the Shard Manager
681709 */
0 commit comments