File tree Expand file tree Collapse file tree 1 file changed +23
-0
lines changed Expand file tree Collapse file tree 1 file changed +23
-0
lines changed Original file line number Diff line number Diff line change @@ -255,6 +255,29 @@ class RSocketRequester extends RSocket {
255255 responder.metadataPush (metadataPushFrame.payload).then ((value) => {});
256256 }
257257 break ;
258+ case frame_types.REQUEST_STREAM :
259+ var requestStreamFrame = frame as RequestStreamFrame ;
260+ var requesterStreamId = header.streamId;
261+ if (responder != null && requestStreamFrame.payload != null ) {
262+ responder.requestStream (requestStreamFrame.payload).listen ((payload) {
263+ connection.write (FrameCodec .encodePayloadFrame (
264+ requesterStreamId, false , payload));
265+ //encodePayloadFrame(requesterStreamId, false, payload)
266+ }, onDone: () {
267+ connection.write (
268+ FrameCodec .encodePayloadFrame (requesterStreamId, true , null ));
269+ }, onError: (Object error) {
270+ if (error is RSocketException ) {
271+ var e = error;
272+ connection.write (FrameCodec .encodeErrorFrame (
273+ requesterStreamId, e.code, e.message));
274+ } else {
275+ connection.write (FrameCodec .encodeErrorFrame (requesterStreamId,
276+ RSocketErrorCode .APPLICATION_ERROR , error.toString ()));
277+ }
278+ });
279+ }
280+ break ;
258281 default :
259282 }
260283 }
You can’t perform that action at this time.
0 commit comments