@@ -27,7 +27,19 @@ trait Messenger[-Msg] {
2727 * streaming responses. See `sendStreamAutoRestart` for an alternative that will automatically restart the stream
2828 * in case of rebalance.
2929 */
30- def sendStream [Res ](entityId : String )(msg : StreamReplier [Res ] => Msg ): Task [ZStream [Any , Throwable , Res ]]
30+ def sendAndReceiveStream [Res ](entityId : String )(msg : StreamReplier [Res ] => Msg ): Task [ZStream [Any , Throwable , Res ]]
31+
32+ /**
33+ * Send a stream of messages.
34+ */
35+ def sendStream (entityId : String )(messages : ZStream [Any , Throwable , Msg ]): Task [Unit ]
36+
37+ /**
38+ * Send a stream of messages and receive a stream of responses of type `Res`.
39+ */
40+ def sendStreamAndReceiveStream [Res ](entityId : String )(
41+ messages : StreamReplier [Res ] => ZStream [Any , Throwable , Msg ]
42+ ): Task [ZStream [Any , Throwable , Res ]]
3143
3244 /**
3345 * Send a message and receive a stream of responses of type `Res` while restarting the stream when the remote entity
@@ -38,11 +50,42 @@ trait Messenger[-Msg] {
3850 * cursor from the responses so that when the remote entity is rebalanced, a new message can be sent with the right
3951 * cursor according to what we've seen in the previous stream of responses.
4052 */
41- def sendStreamAutoRestart [Cursor , Res ](entityId : String , cursor : Cursor )(msg : (Cursor , StreamReplier [Res ]) => Msg )(
53+ def sendAndReceiveStreamAutoRestart [Cursor , Res ](entityId : String , cursor : Cursor )(
54+ msg : (Cursor , StreamReplier [Res ]) => Msg
55+ )(
56+ updateCursor : (Cursor , Res ) => Cursor
57+ ): ZStream [Any , Throwable , Res ] =
58+ ZStream
59+ .unwrap(sendAndReceiveStream[Res ](entityId)(msg(cursor, _)))
60+ .either
61+ .mapAccum(cursor) {
62+ case (c, Right (res)) => updateCursor(c, res) -> Right (res)
63+ case (c, Left (err)) => (c, Left (c -> err))
64+ }
65+ .flatMap {
66+ case Right (res) => ZStream .succeed(res)
67+ case Left ((lastSeenCursor, StreamCancelled )) =>
68+ ZStream .execute(ZIO .sleep(200 .millis)) ++
69+ sendAndReceiveStreamAutoRestart(entityId, lastSeenCursor)(msg)(updateCursor)
70+ case Left ((_, err)) => ZStream .fail(err)
71+ }
72+
73+ /**
74+ * Send a stream of messages and receive a stream of responses of type `Res` while restarting the stream when the
75+ * remote entity is rebalanced.
76+ *
77+ * To do so, we need a "cursor" so the stream of responses can be restarted where it ended before the rebalance. That
78+ * is, the first message sent to the remote entity contains the given initial cursor value and we extract an updated
79+ * cursor from the responses so that when the remote entity is rebalanced, a new message can be sent with the right
80+ * cursor according to what we've seen in the previous stream of responses.
81+ */
82+ def sendStreamAndReceiveStreamAutoRestart [Cursor , Res ](entityId : String , cursor : Cursor )(
83+ msg : (Cursor , StreamReplier [Res ]) => ZStream [Any , Throwable , Msg ]
84+ )(
4285 updateCursor : (Cursor , Res ) => Cursor
4386 ): ZStream [Any , Throwable , Res ] =
4487 ZStream
45- .unwrap(sendStream [Res ](entityId)(msg(cursor, _)))
88+ .unwrap(sendStreamAndReceiveStream [Res ](entityId)(msg(cursor, _)))
4689 .either
4790 .mapAccum(cursor) {
4891 case (c, Right (res)) => updateCursor(c, res) -> Right (res)
@@ -52,7 +95,7 @@ trait Messenger[-Msg] {
5295 case Right (res) => ZStream .succeed(res)
5396 case Left ((lastSeenCursor, StreamCancelled )) =>
5497 ZStream .execute(ZIO .sleep(200 .millis)) ++
55- sendStreamAutoRestart (entityId, lastSeenCursor)(msg)(updateCursor)
98+ sendStreamAndReceiveStreamAutoRestart (entityId, lastSeenCursor)(msg)(updateCursor)
5699 case Left ((_, err)) => ZStream .fail(err)
57100 }
58101}
0 commit comments