Skip to content

Commit ce0542e

Browse files
committed
Prevent binary queue processing stream from stopping in case of error
1 parent 1d062c9 commit ce0542e

File tree

4 files changed

+8
-8
lines changed

4 files changed

+8
-8
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,18 +264,18 @@ class Sharding private (
264264
serialization
265265
.decode[Req](msg.body)
266266
.map(req => Some((req, msg.entityId, p, msg.replyId)))
267-
.catchAll(p.fail(_).as(None))
267+
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).as(None))
268268
}
269269
.collectSome
270270
.foreach { case (msg, entityId, p, replyId) =>
271271
Promise
272272
.make[Throwable, Option[Any]]
273273
.flatMap(p2 =>
274-
entityManager.send(entityId, msg, replyId, p2).catchAll(p.fail) *>
275-
p2.await.flatMap(
276-
ZIO.foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork
277-
)
274+
entityManager.send(entityId, msg, replyId, p2) *>
275+
p2.await
276+
.flatMap(ZIO.foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork)
278277
)
278+
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).unit)
279279
}
280280
.forkManaged
281281
} yield ()

examples/src/main/scala/example/complex/GuildApp.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ object GuildApp extends zio.App {
1717

1818
val program =
1919
(Sharding.registerEntity(Guild, behavior) *> Sharding.registerManaged).use { _ =>
20-
Sharding.messenger(Guild).map { guild =>
20+
Sharding.messenger(Guild).flatMap { guild =>
2121
for {
2222
user1 <- random.nextUUID.map(_.toString)
2323
user2 <- random.nextUUID.map(_.toString)

examples/src/main/scala/example/simple/GuildApp.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import zio.random.Random
1111
object GuildApp extends zio.App {
1212
val program =
1313
(Sharding.registerEntity(Guild, behavior) *> Sharding.registerManaged).use { _ =>
14-
Sharding.messenger(Guild).map { guild =>
14+
Sharding.messenger(Guild).flatMap { guild =>
1515
for {
1616
_ <- guild.send("guild1")(Join("user1", _)).debug
1717
_ <- guild.send("guild1")(Join("user2", _)).debug

examples/src/main/scala/example/simple/ShardManagerApp.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ object ShardManagerApp extends zio.App {
1111
val logging = Logging.debug
1212
val pods = ZLayer.succeed(GrpcConfig.default) ++ logging >>> GrpcPods.live // use gRPC protocol
1313
val health = pods >>> PodsHealth.local // just ping a pod to see if it's alive
14-
val storage = Storage.memory // / store data in memory
14+
val storage = Storage.memory // store data in memory
1515
val layer =
1616
ZLayer.succeed(ManagerConfig.default) ++ Clock.live ++ Console.live ++ logging ++ pods ++ health ++ storage >+>
1717
ShardManager.live // Shard Manager logic

0 commit comments

Comments
 (0)