Skip to content

Commit 8d9dc8e

Browse files
committed
Improve parallelism of message deserialization
1 parent 6842062 commit 8d9dc8e

File tree

1 file changed

+12
-15
lines changed

1 file changed

+12
-15
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -282,22 +282,19 @@ class Sharding private (
282282
_ <- ZStream
283283
.fromQueue(binaryQueue)
284284
.mapM { case (msg, p) =>
285-
serialization
286-
.decode[Req](msg.body)
287-
.map(req => Some((req, msg.entityId, p, msg.replyId)))
288-
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).as(None))
289-
}
290-
.collectSome
291-
.foreach { case (msg, entityId, p, replyId) =>
292-
Promise
293-
.make[Throwable, Option[Any]]
294-
.flatMap(p2 =>
295-
entityManager.send(entityId, msg, replyId, p2) *>
296-
p2.await
297-
.flatMap(ZIO.foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork)
298-
)
299-
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).unit)
285+
(for {
286+
req <- serialization.decode[Req](msg.body)
287+
p2 <- Promise.make[Throwable, Option[Any]]
288+
_ <- entityManager.send(msg.entityId, req, msg.replyId, p2)
289+
resOption <- p2.await
290+
res <- ZIO.foreach(resOption)(serialization.encode)
291+
_ <- p.succeed(res)
292+
} yield ())
293+
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash))
294+
.fork
295+
.unit
300296
}
297+
.runDrain
301298
.forkManaged
302299
} yield ()
303300
}

0 commit comments

Comments
 (0)