@@ -32,7 +32,8 @@ class Sharding private (
3232 storage : Storage ,
3333 logger : Logging ,
3434 random : Random .Service ,
35- clock : Clock .Service
35+ clock : Clock .Service ,
36+ serialization : Serialization
3637) { self =>
3738 private [shardcake] def getShardId (entityId : String ): ShardId =
3839 math.abs(entityId.hashCode % config.numberOfShards) + 1
@@ -167,42 +168,12 @@ class Sharding private (
167168 ZIO .whenCase(promises.get(replier.id)) { case Some (p) => p.succeed(Some (reply)) }.as(promises - replier.id)
168169 )
169170
170- def registerEntity [R , Req : Tag ](
171- entityType : EntityType [Req ],
172- behavior : (String , Dequeue [Req ]) => RIO [R , Nothing ],
173- terminateMessage : Promise [Nothing , Unit ] => Option [Req ] = (_ : Promise [Nothing , Unit ]) => None
174- ): ZManaged [Has [Serialization ] with Clock with R , Nothing , Messenger [Req ]] =
175- for {
176- serialization <- ZIO .service[Serialization ].toManaged_
177- entityManager <- EntityManager .make(behavior, terminateMessage, self, config).toManaged_
178- binaryQueue <- Queue .unbounded[(BinaryMessage , Promise [Throwable , Option [Array [Byte ]]])].toManaged(_.shutdown)
179- _ <- entityStates.update(_.updated(entityType.name, EntityState (binaryQueue, entityManager))).toManaged_
180- _ <- ZStream
181- .fromQueue(binaryQueue)
182- .mapM { case (msg, p) =>
183- serialization
184- .decode[Req ](msg.body)
185- .map(req => Some ((req, msg.entityId, p, msg.replyId)))
186- .catchAll(p.fail(_).as(None ))
187- }
188- .collectSome
189- .foreach { case (msg, entityId, p, replyId) =>
190- Promise
191- .make[Throwable , Option [Any ]]
192- .flatMap(p2 =>
193- entityManager.send(entityId, msg, replyId, p2).catchAll(p.fail) *>
194- p2.await.flatMap(
195- ZIO .foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork
196- )
197- )
198- }
199- .forkManaged
200- } yield new Messenger [Req ] {
201-
202- def sendDiscard (entityId : String )(msg : Req ): UIO [Unit ] =
171+ def messenger [Msg ](entityType : EntityType [Msg ]): Messenger [Msg ] =
172+ new Messenger [Msg ] {
173+ def sendDiscard (entityId : String )(msg : Msg ): UIO [Unit ] =
203174 sendMessage(entityId, msg, None ).timeout(config.sendTimeout).provideLayer(Clock .live).forkDaemon.unit
204175
205- def send [Res ](entityId : String )(msg : Replier [Res ] => Req ): Task [Res ] =
176+ def send [Res ](entityId : String )(msg : Replier [Res ] => Msg ): Task [Res ] =
206177 random.nextUUID.flatMap { uuid =>
207178 val body = msg(Replier (uuid.toString))
208179 sendMessage[Res ](entityId, body, Some (uuid.toString)).flatMap {
@@ -214,8 +185,9 @@ class Sharding private (
214185 .interruptible
215186 }
216187
217- private def sendMessage [Res ](entityId : String , msg : Req , replyId : Option [String ]): Task [Option [Res ]] = {
218- val shardId = getShardId(entityId)
188+ private def sendMessage [Res ](entityId : String , msg : Msg , replyId : Option [String ]): Task [Option [Res ]] = {
189+ val shardId = getShardId(entityId)
190+
219191 def trySend : Task [Option [Res ]] =
220192 for {
221193 shards <- shardAssignments.get
@@ -228,8 +200,16 @@ class Sharding private (
228200 Promise
229201 .make[Throwable , Option [Any ]]
230202 .flatMap(p =>
231- entityManager.send(entityId, msg, replyId, p) *>
232- p.await.map(_.asInstanceOf [Option [Res ]])
203+ entityStates.get.flatMap(s =>
204+ ZIO
205+ .foreach(s.get(entityType.name))(
206+ _.entityManager
207+ .asInstanceOf [EntityManager [Msg ]]
208+ .send(entityId, msg, replyId, p) *>
209+ p.await.map(_.asInstanceOf [Option [Res ]])
210+ )
211+ .map(_.flatten)
212+ )
233213 )
234214 } else {
235215 serialization
@@ -244,7 +224,8 @@ class Sharding private (
244224 .updateAndGet(old =>
245225 if (
246226 old.plusNanos(config.unhealthyPodReportInterval.toNanos) isBefore cdt
247- ) cdt
227+ )
228+ cdt
248229 else old
249230 )
250231 .map(_ isEqual cdt)
@@ -267,6 +248,37 @@ class Sharding private (
267248 trySend
268249 }
269250 }
251+
252+ def registerEntity [R , Req : Tag ](
253+ entityType : EntityType [Req ],
254+ behavior : (String , Dequeue [Req ]) => RIO [R , Nothing ],
255+ terminateMessage : Promise [Nothing , Unit ] => Option [Req ] = (_ : Promise [Nothing , Unit ]) => None
256+ ): ZManaged [Clock with R , Nothing , Unit ] =
257+ for {
258+ entityManager <- EntityManager .make(behavior, terminateMessage, self, config).toManaged_
259+ binaryQueue <- Queue .unbounded[(BinaryMessage , Promise [Throwable , Option [Array [Byte ]]])].toManaged(_.shutdown)
260+ _ <- entityStates.update(_.updated(entityType.name, EntityState (binaryQueue, entityManager))).toManaged_
261+ _ <- ZStream
262+ .fromQueue(binaryQueue)
263+ .mapM { case (msg, p) =>
264+ serialization
265+ .decode[Req ](msg.body)
266+ .map(req => Some ((req, msg.entityId, p, msg.replyId)))
267+ .catchAll(p.fail(_).as(None ))
268+ }
269+ .collectSome
270+ .foreach { case (msg, entityId, p, replyId) =>
271+ Promise
272+ .make[Throwable , Option [Any ]]
273+ .flatMap(p2 =>
274+ entityManager.send(entityId, msg, replyId, p2).catchAll(p.fail) *>
275+ p2.await.flatMap(
276+ ZIO .foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork
277+ )
278+ )
279+ }
280+ .forkManaged
281+ } yield ()
270282}
271283
272284object Sharding {
@@ -280,7 +292,7 @@ object Sharding {
280292 */
281293 val live : ZLayer [Has [Pods ] with Has [ShardManagerClient ] with Has [Storage ] with Has [Config ] with Has [
282294 Logging
283- ] with Clock with Random , Throwable , Has [Sharding ]] = (
295+ ] with Has [ Serialization ] with Clock with Random , Throwable , Has [Sharding ]] = (
284296 for {
285297 config <- ZManaged .service[Config ]
286298 pods <- ZManaged .service[Pods ]
@@ -289,6 +301,7 @@ object Sharding {
289301 logger <- ZManaged .service[Logging ]
290302 clock <- ZManaged .service[Clock .Service ]
291303 random <- ZManaged .service[Random .Service ]
304+ serialization <- ZManaged .service[Serialization ]
292305 shardsCache <- Ref .make(Map .empty[ShardId , PodAddress ]).toManaged_
293306 entityStates <- Ref .make[Map [String , EntityState ]](Map ()).toManaged_
294307 singletons <- RefM
@@ -319,7 +332,8 @@ object Sharding {
319332 storage,
320333 logger,
321334 random,
322- clock
335+ clock,
336+ serialization
323337 )
324338 _ <- sharding.refreshAssignments
325339 } yield sharding
@@ -360,11 +374,17 @@ object Sharding {
360374 entityType : EntityType [Req ],
361375 behavior : (String , Dequeue [Req ]) => RIO [R , Nothing ],
362376 terminateMessage : Promise [Nothing , Unit ] => Option [Req ] = (_ : Promise [Nothing , Unit ]) => None
363- ): URManaged [Has [Sharding ] with Has [ Serialization ] with R with Clock , Messenger [ Req ] ] =
377+ ): ZManaged [Has [Sharding ] with R with Clock , Nothing , Unit ] =
364378 for {
365- sharding <- ZIO .service[Sharding ].toManaged_
366- messenger <- sharding.registerEntity[R , Req ](entityType, behavior, terminateMessage)
367- } yield messenger
379+ sharding <- ZIO .service[Sharding ].toManaged_
380+ _ <- sharding.registerEntity[R , Req ](entityType, behavior, terminateMessage)
381+ } yield ()
382+
383+ /**
384+ * Get an object that allows sending messages to a given entity type.
385+ */
386+ def messenger [Msg ](entityType : EntityType [Msg ]): URIO [Has [Sharding ], Messenger [Msg ]] =
387+ ZIO .service[Sharding ].map(_.messenger(entityType))
368388
369389 /**
370390 * Get the number of pods currently registered to the Shard Manager
0 commit comments