File tree Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Original file line number Diff line number Diff line change @@ -45,7 +45,10 @@ class CompleterSubscriber implements Subscriber {
4545}
4646
4747class StreamSubscriber implements Subscriber {
48- StreamController controller = StreamController ();
48+ final StreamController controller;
49+
50+ StreamSubscriber ({FutureOr <void > onCancel () = null })
51+ : controller = StreamController (onCancel: onCancel);
4952
5053 @override
5154 void onNext (Payload value) {
@@ -123,7 +126,10 @@ class RSocketRequester extends RSocket {
123126 var streamId = streamIdSupplier.nextStreamId (senders);
124127 connection.write (FrameCodec .encodeRequestStreamFrame (
125128 streamId, MAX_REQUEST_N_SIZE , payload));
126- var streamSubscriber = StreamSubscriber ();
129+ var streamSubscriber = StreamSubscriber (onCancel: () {
130+ connection.write (FrameCodec .encodeCancelFrame (streamId));
131+ senders.remove (streamId);
132+ });
127133 senders[streamId] = streamSubscriber;
128134 return streamSubscriber.payloadStream ();
129135 };
You can’t perform that action at this time.
0 commit comments