Skip to content

Commit 9850bd2

Browse files
committed
introduces disposable channels
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 870f25a commit 9850bd2

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed

rsocket-core/src/main/java/io/rsocket/core/RequestResponseRequesterMono.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import io.netty.util.IllegalReferenceCountException;
2828
import io.rsocket.DuplexConnection;
2929
import io.rsocket.Payload;
30+
import io.rsocket.exceptions.CanceledException;
3031
import io.rsocket.frame.CancelFrameCodec;
3132
import io.rsocket.frame.FrameType;
3233
import io.rsocket.frame.decoder.PayloadDecoder;
3334
import io.rsocket.plugins.RequestInterceptor;
3435
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3536
import org.reactivestreams.Subscription;
3637
import reactor.core.CoreSubscriber;
38+
import reactor.core.Disposable;
39+
import reactor.core.Disposables;
3740
import reactor.core.Exceptions;
3841
import reactor.core.Scannable;
3942
import reactor.core.publisher.Mono;
@@ -42,7 +45,7 @@
4245
import reactor.util.annotation.Nullable;
4346

4447
final class RequestResponseRequesterMono extends Mono<Payload>
45-
implements RequesterFrameHandler, LeasePermitHandler, Subscription, Scannable {
48+
implements RequesterFrameHandler, LeasePermitHandler, Subscription, Scannable, Disposable {
4649

4750
final ByteBufAllocator allocator;
4851
final Payload payload;
@@ -255,6 +258,32 @@ public final void cancel() {
255258
}
256259
}
257260

261+
@Override
262+
public void dispose() {
263+
long previousState = markTerminated(STATE, this);
264+
if (isTerminated(previousState)) {
265+
return;
266+
}
267+
268+
if (isFirstFrameSent(previousState)) {
269+
final int streamId = this.streamId;
270+
this.requesterResponderSupport.remove(streamId, this);
271+
272+
ReassemblyUtils.synchronizedRelease(this, previousState);
273+
274+
this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
275+
276+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
277+
if (requestInterceptor != null) {
278+
requestInterceptor.onCancel(streamId, FrameType.REQUEST_RESPONSE);
279+
}
280+
} else if (!isReadyToSendFirstFrame(previousState)) {
281+
this.payload.release();
282+
}
283+
284+
this.actual.onError(new CanceledException("Forcefully disposed"));
285+
}
286+
258287
@Override
259288
public final void handlePayload(Payload value) {
260289
if (this.done) {

rsocket-core/src/main/java/io/rsocket/core/RequestResponseResponderSubscriber.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040
import reactor.core.CoreSubscriber;
41+
import reactor.core.Disposable;
4142
import reactor.core.publisher.Mono;
4243
import reactor.core.publisher.Operators;
4344
import reactor.util.annotation.Nullable;
4445
import reactor.util.context.Context;
4546

4647
final class RequestResponseResponderSubscriber
47-
implements ResponderFrameHandler, CoreSubscriber<Payload> {
48+
implements ResponderFrameHandler, CoreSubscriber<Payload>, Disposable {
4849

4950
static final Logger logger = LoggerFactory.getLogger(RequestResponseResponderSubscriber.class);
5051

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import io.rsocket.plugins.RequestInterceptor;
1010
import java.util.Objects;
1111
import java.util.function.Function;
12+
13+
import reactor.core.publisher.Mono;
14+
import reactor.core.publisher.Sinks;
1215
import reactor.util.annotation.Nullable;
1316

1417
class RequesterResponderSupport {
@@ -19,12 +22,14 @@ class RequesterResponderSupport {
1922
private final PayloadDecoder payloadDecoder;
2023
private final ByteBufAllocator allocator;
2124
private final DuplexConnection connection;
25+
private final Sinks.Empty<Void> onAllStreamsTerminatedSink;
2226
@Nullable private final RequestInterceptor requestInterceptor;
2327

2428
@Nullable final StreamIdSupplier streamIdSupplier;
2529
final IntObjectMap<FrameHandler> activeStreams;
2630

2731
boolean terminating;
32+
boolean terminated;
2833

2934
public RequesterResponderSupport(
3035
int mtu,
@@ -43,6 +48,7 @@ public RequesterResponderSupport(
4348
this.allocator = connection.alloc();
4449
this.streamIdSupplier = streamIdSupplier;
4550
this.connection = connection;
51+
this.onAllStreamsTerminatedSink = Sinks.empty();
4652
this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this);
4753
}
4854

@@ -122,8 +128,8 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
122128
}
123129

124130
public synchronized boolean add(int streamId, FrameHandler frameHandler) {
125-
if (this.terminating) {
126-
throw
131+
if (this.terminated) {
132+
throwing
127133
}
128134
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
129135
// copy of Map.putIfAbsent(key, value) without `streamId` boxing
@@ -165,7 +171,27 @@ public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
165171
return true;
166172
}
167173

168-
public synchronized void terminate() {
169-
this.terminating = true;
174+
public void terminate() {
175+
final boolean terminated;
176+
synchronized (this) {
177+
this.terminating = true;
178+
179+
if (activeStreams.size() == 0) {
180+
terminated = true;
181+
this.terminated = true;
182+
} else {
183+
terminated = false;
184+
}
185+
}
186+
187+
if (terminated) {
188+
onAllStreamsTerminatedSink.tryEmitEmpty();
189+
} else {
190+
191+
}
192+
}
193+
194+
public Mono<Void> onAllStreamsTerminated() {
195+
return onAllStreamsTerminatedSink.asMono();
170196
}
171197
}

0 commit comments

Comments
 (0)