Skip to content

Commit d415359

Browse files
committed
Improve parallelism of message deserialization
1 parent 65597c9 commit d415359

File tree

1 file changed

+12
-15
lines changed

1 file changed

+12
-15
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,19 @@ class Sharding private (
275275
_ <- ZStream
276276
.fromQueue(binaryQueue)
277277
.mapZIO { case (msg, p) =>
278-
serialization
279-
.decode[Req](msg.body)
280-
.map(req => Some((req, msg.entityId, p, msg.replyId)))
281-
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).as(None))
282-
}
283-
.collectSome
284-
.runForeach { case (msg, entityId, p, replyId) =>
285-
Promise
286-
.make[Throwable, Option[Any]]
287-
.flatMap(p2 =>
288-
entityManager.send(entityId, msg, replyId, p2) *>
289-
p2.await
290-
.flatMap(ZIO.foreach(_)(serialization.encode).flatMap(p.succeed(_)).catchAll(p.fail(_)).fork)
291-
)
292-
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash).unit)
278+
(for {
279+
req <- serialization.decode[Req](msg.body)
280+
p2 <- Promise.make[Throwable, Option[Any]]
281+
_ <- entityManager.send(msg.entityId, req, msg.replyId, p2)
282+
resOption <- p2.await
283+
res <- ZIO.foreach(resOption)(serialization.encode)
284+
_ <- p.succeed(res)
285+
} yield ())
286+
.catchAllCause((cause: Cause[Throwable]) => p.fail(cause.squash))
287+
.fork
288+
.unit
293289
}
290+
.runDrain
294291
.forkScoped
295292
} yield ()
296293
}

0 commit comments

Comments
 (0)