File tree Expand file tree Collapse file tree 3 files changed +19
-3
lines changed Expand file tree Collapse file tree 3 files changed +19
-3
lines changed Original file line number Diff line number Diff line change @@ -45,7 +45,10 @@ class CompleterSubscriber implements Subscriber {
4545}
4646
4747class StreamSubscriber implements Subscriber {
48- StreamController controller = StreamController ();
48+ final StreamController controller;
49+
50+ StreamSubscriber ({FutureOr <void > onCancel () = null })
51+ : controller = StreamController (onCancel: onCancel);
4952
5053 @override
5154 void onNext (Payload value) {
@@ -123,7 +126,10 @@ class RSocketRequester extends RSocket {
123126 var streamId = streamIdSupplier.nextStreamId (senders);
124127 connection.write (FrameCodec .encodeRequestStreamFrame (
125128 streamId, MAX_REQUEST_N_SIZE , payload));
126- var streamSubscriber = StreamSubscriber ();
129+ var streamSubscriber = StreamSubscriber (onCancel: () {
130+ connection.write (FrameCodec .encodeCancelFrame (streamId));
131+ senders.remove (streamId);
132+ });
127133 senders[streamId] = streamSubscriber;
128134 return streamSubscriber.payloadStream ();
129135 };
Original file line number Diff line number Diff line change @@ -472,6 +472,16 @@ class FrameCodec {
472472 refillFrameLength (frameBuffer);
473473 return frameBuffer.toUint8Array ();
474474 }
475+
476+ static Uint8List encodeCancelFrame (int streamId) {
477+ var frameBuffer = RSocketByteBuffer ();
478+ frameBuffer.writeI24 (0 ); // frame length
479+ frameBuffer.writeI32 (streamId); //stream id
480+ frameBuffer.writeI8 (frame_types.CANCEL << 2 );
481+ frameBuffer.writeI8 (0 );
482+ refillFrameLength (frameBuffer);
483+ return frameBuffer.toUint8Array ();
484+ }
475485}
476486
477487Payload decodePayload (
Original file line number Diff line number Diff line change @@ -7,6 +7,6 @@ issue_tracker: https://github.com/rsocket/rsocket-dart/issues
77environment :
88 sdk : ' >=2.7.0 <3.0.0'
99dependencies :
10- rxdart : ^0.27.0
10+ rxdart : ^0.27.1
1111dev_dependencies :
1212 test : ^1.6.0
You can’t perform that action at this time.
0 commit comments