Skip to content

Commit 95677cc

Browse files
committed
Prevent binary queue processing stream from stopping in case of error
1 parent 0dd18cf commit 95677cc

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,18 @@ class Sharding private (
257257
serialization
258258
.decode[Req](msg.body)
259259
.map(req => Some((req, msg.entityId, p, msg.replyId)))
260-
.catchAll(p.fail(_).as(None))
260+
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).as(None))
261261
}
262262
.collectSome
263263
.runForeach { case (msg, entityId, p, replyId) =>
264264
Promise
265265
.make[Throwable, Option[Any]]
266266
.flatMap(p2 =>
267-
entityManager.send(entityId, msg, replyId, p2).catchAll(p.fail) *>
268-
p2.await.flatMap(
269-
ZIO.foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork
270-
)
267+
entityManager.send(entityId, msg, replyId, p2) *>
268+
p2.await
269+
.flatMap(ZIO.foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork)
271270
)
271+
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).unit)
272272
}
273273
.forkScoped
274274
} yield ()

0 commit comments

Comments
 (0)