Skip to content

Commit e9425c5

Browse files
committed
wip
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent da29481 commit e9425c5

File tree

4 files changed

+24
-26
lines changed

4 files changed

+24
-26
lines changed

rsocket-core/src/main/java/io/rsocket/RSocket.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,9 @@ default double availability() {
8585
}
8686

8787
@Override
88-
default void dispose() {
88+
default void dispose() {}
8989

90-
}
91-
92-
default void disposeGracefully() {
93-
94-
}
90+
default void disposeGracefully() {}
9591

9692
@Override
9793
default boolean isDisposed() {

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.function.Supplier;
4848
import reactor.core.Disposable;
4949
import reactor.core.publisher.Mono;
50+
import reactor.core.publisher.Sinks;
5051
import reactor.util.annotation.Nullable;
5152
import reactor.util.function.Tuples;
5253
import reactor.util.retry.Retry;
@@ -655,6 +656,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
655656
requesterLeaseTracker = null;
656657
}
657658

659+
Sinks.Empty<Void> requesterSink = Sinks.empty();
660+
Sinks.Empty<Void> responderSink = Sinks.empty();
661+
658662
RSocket rSocketRequester =
659663
new RSocketRequester(
660664
multiplexer.asClientConnection(),
@@ -667,7 +671,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
667671
(int) keepAliveMaxLifeTime.toMillis(),
668672
keepAliveHandler,
669673
interceptors::initRequesterRequestInterceptor,
670-
requesterLeaseTracker);
674+
requesterLeaseTracker,
675+
requesterSink,
676+
requesterSink.asMono().and(responderSink.asMono()));
671677

672678
RSocket wrappedRSocketRequester =
673679
interceptors.initRequester(rSocketRequester);

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
6666
RSocketRequester.class, Throwable.class, "terminationError");
6767

6868
@Nullable private final RequesterLeaseTracker requesterLeaseTracker;
69+
private Mono<Void> onAllClose;
6970
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
70-
private final Sinks.Empty<Void> onClose;
7171

7272
RSocketRequester(
7373
DuplexConnection connection,
@@ -80,18 +80,21 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
8080
int keepAliveAckTimeout,
8181
@Nullable KeepAliveHandler keepAliveHandler,
8282
Function<RSocket, RequestInterceptor> requestInterceptorFunction,
83-
@Nullable RequesterLeaseTracker requesterLeaseTracker) {
83+
@Nullable RequesterLeaseTracker requesterLeaseTracker,
84+
Sinks.Empty<Void> onClose,
85+
Mono<Void> onAllClose) {
8486
super(
8587
mtu,
8688
maxFrameLength,
8789
maxInboundPayloadSize,
8890
payloadDecoder,
8991
connection,
9092
streamIdSupplier,
91-
requestInterceptorFunction);
93+
requestInterceptorFunction,
94+
onClose);
9295

9396
this.requesterLeaseTracker = requesterLeaseTracker;
94-
this.onClose = Sinks.empty();
97+
this.onAllClose = onAllClose;
9598

9699
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
97100
connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
@@ -188,13 +191,12 @@ public double availability() {
188191

189192
@Override
190193
public void dispose() {
191-
tryShutdown();
194+
getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
192195
}
193196

194197
@Override
195198
public void disposeGracefully() {
196199
super.terminate();
197-
super.onAllStreamsTerminated()
198200
}
199201

200202
@Override
@@ -204,7 +206,7 @@ public boolean isDisposed() {
204206

205207
@Override
206208
public Mono<Void> onClose() {
207-
return onClose.asMono();
209+
return onAllClose;
208210
}
209211

210212
private void handleIncomingFrames(ByteBuf frame) {
@@ -340,7 +342,6 @@ private void terminate(Throwable e) {
340342
if (keepAliveFramesAcceptor != null) {
341343
keepAliveFramesAcceptor.dispose();
342344
}
343-
getDuplexConnection().dispose();
344345
final RequestInterceptor requestInterceptor = getRequestInterceptor();
345346
if (requestInterceptor != null) {
346347
requestInterceptor.dispose();

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import io.rsocket.plugins.RequestInterceptor;
1111
import java.util.Objects;
1212
import java.util.function.Function;
13-
14-
import reactor.core.publisher.Mono;
1513
import reactor.core.publisher.Sinks;
1614
import reactor.util.annotation.Nullable;
1715

@@ -23,7 +21,7 @@ class RequesterResponderSupport {
2321
private final PayloadDecoder payloadDecoder;
2422
private final ByteBufAllocator allocator;
2523
private final DuplexConnection connection;
26-
private final Sinks.Empty<Void> onAllStreamsTerminatedSink;
24+
private final Sinks.Empty<Void> onClose;
2725
@Nullable private final RequestInterceptor requestInterceptor;
2826

2927
@Nullable final StreamIdSupplier streamIdSupplier;
@@ -39,7 +37,8 @@ public RequesterResponderSupport(
3937
PayloadDecoder payloadDecoder,
4038
DuplexConnection connection,
4139
@Nullable StreamIdSupplier streamIdSupplier,
42-
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
40+
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
41+
Sinks.Empty<Void> onClose) {
4342

4443
this.activeStreams = new IntObjectHashMap<>();
4544
this.mtu = mtu;
@@ -49,7 +48,7 @@ public RequesterResponderSupport(
4948
this.allocator = connection.alloc();
5049
this.streamIdSupplier = streamIdSupplier;
5150
this.connection = connection;
52-
this.onAllStreamsTerminatedSink = Sinks.empty();
51+
this.onClose = onClose;
5352
this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this);
5453
}
5554

@@ -187,7 +186,7 @@ public boolean remove(int streamId, FrameHandler frameHandler) {
187186
}
188187

189188
if (terminated) {
190-
onAllStreamsTerminatedSink.tryEmitEmpty();
189+
onClose.tryEmitEmpty();
191190
}
192191
return true;
193192
}
@@ -206,11 +205,7 @@ public void terminate() {
206205
}
207206

208207
if (terminated) {
209-
onAllStreamsTerminatedSink.tryEmitEmpty();
208+
onClose.tryEmitEmpty();
210209
}
211210
}
212-
213-
public Mono<Void> onAllStreamsTerminated() {
214-
return onAllStreamsTerminatedSink.asMono();
215-
}
216211
}

0 commit comments

Comments
 (0)