Skip to content

Commit da29481

Browse files
committed
wip
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 9850bd2 commit da29481

File tree

7 files changed

+43
-69
lines changed

7 files changed

+43
-69
lines changed

rsocket-core/src/main/java/io/rsocket/RSocket.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,12 @@ default double availability() {
8686

8787
@Override
8888
default void dispose() {
89-
dispose(true);
89+
9090
}
9191

92-
default void dispose(boolean force) {}
92+
default void disposeGracefully() {
93+
94+
}
9395

9496
@Override
9597
default boolean isDisposed() {

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import java.util.function.BiConsumer;
4646
import java.util.function.Consumer;
4747
import java.util.function.Supplier;
48-
49-
import org.reactivestreams.Publisher;
5048
import reactor.core.Disposable;
5149
import reactor.core.publisher.Mono;
5250
import reactor.util.annotation.Nullable;
@@ -179,11 +177,6 @@ public RSocketConnector dataMimeType(String dataMimeType) {
179177
return this;
180178
}
181179

182-
183-
public RSocketConnector gracefullShutdownTimeout(Supplier<? extends Publisher<?>> decider) {
184-
decider.get().subscribe(() -> );
185-
}
186-
187180
/**
188181
* Set the MIME type to use for formatting payload metadata on the established connection. This is
189182
* set in the initial {@code SETUP} frame sent to the server.

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ public void dispose() {
191191
tryShutdown();
192192
}
193193

194+
@Override
195+
public void disposeGracefully() {
196+
super.terminate();
197+
super.onAllStreamsTerminated()
198+
}
199+
194200
@Override
195201
public boolean isDisposed() {
196202
return terminationError != null;

rsocket-core/src/main/java/io/rsocket/core/RequestResponseRequesterMono.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,13 @@
2727
import io.netty.util.IllegalReferenceCountException;
2828
import io.rsocket.DuplexConnection;
2929
import io.rsocket.Payload;
30-
import io.rsocket.exceptions.CanceledException;
3130
import io.rsocket.frame.CancelFrameCodec;
3231
import io.rsocket.frame.FrameType;
3332
import io.rsocket.frame.decoder.PayloadDecoder;
3433
import io.rsocket.plugins.RequestInterceptor;
3534
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3635
import org.reactivestreams.Subscription;
3736
import reactor.core.CoreSubscriber;
38-
import reactor.core.Disposable;
39-
import reactor.core.Disposables;
4037
import reactor.core.Exceptions;
4138
import reactor.core.Scannable;
4239
import reactor.core.publisher.Mono;
@@ -45,7 +42,7 @@
4542
import reactor.util.annotation.Nullable;
4643

4744
final class RequestResponseRequesterMono extends Mono<Payload>
48-
implements RequesterFrameHandler, LeasePermitHandler, Subscription, Scannable, Disposable {
45+
implements RequesterFrameHandler, LeasePermitHandler, Subscription, Scannable {
4946

5047
final ByteBufAllocator allocator;
5148
final Payload payload;
@@ -258,32 +255,6 @@ public final void cancel() {
258255
}
259256
}
260257

261-
@Override
262-
public void dispose() {
263-
long previousState = markTerminated(STATE, this);
264-
if (isTerminated(previousState)) {
265-
return;
266-
}
267-
268-
if (isFirstFrameSent(previousState)) {
269-
final int streamId = this.streamId;
270-
this.requesterResponderSupport.remove(streamId, this);
271-
272-
ReassemblyUtils.synchronizedRelease(this, previousState);
273-
274-
this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
275-
276-
final RequestInterceptor requestInterceptor = this.requestInterceptor;
277-
if (requestInterceptor != null) {
278-
requestInterceptor.onCancel(streamId, FrameType.REQUEST_RESPONSE);
279-
}
280-
} else if (!isReadyToSendFirstFrame(previousState)) {
281-
this.payload.release();
282-
}
283-
284-
this.actual.onError(new CanceledException("Forcefully disposed"));
285-
}
286-
287258
@Override
288259
public final void handlePayload(Payload value) {
289260
if (this.done) {

rsocket-core/src/main/java/io/rsocket/core/RequestResponseResponderSubscriber.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,13 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040
import reactor.core.CoreSubscriber;
41-
import reactor.core.Disposable;
4241
import reactor.core.publisher.Mono;
4342
import reactor.core.publisher.Operators;
4443
import reactor.util.annotation.Nullable;
4544
import reactor.util.context.Context;
4645

4746
final class RequestResponseResponderSubscriber
48-
implements ResponderFrameHandler, CoreSubscriber<Payload>, Disposable {
47+
implements ResponderFrameHandler, CoreSubscriber<Payload> {
4948

5049
static final Logger logger = LoggerFactory.getLogger(RequestResponseResponderSubscriber.class);
5150

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.netty.util.collection.IntObjectMap;
66
import io.rsocket.DuplexConnection;
77
import io.rsocket.RSocket;
8+
import io.rsocket.exceptions.CanceledException;
89
import io.rsocket.frame.decoder.PayloadDecoder;
910
import io.rsocket.plugins.RequestInterceptor;
1011
import java.util.Objects;
@@ -96,7 +97,9 @@ public int getNextStreamId() {
9697
final StreamIdSupplier streamIdSupplier = this.streamIdSupplier;
9798
if (streamIdSupplier != null) {
9899
synchronized (this) {
99-
if
100+
if (this.terminating) {
101+
throw new CanceledException("Disposed");
102+
}
100103
return streamIdSupplier.nextStreamId(this.activeStreams);
101104
}
102105
} else {
@@ -116,6 +119,10 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
116119
if (streamIdSupplier != null) {
117120
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
118121
synchronized (this) {
122+
if (this.terminating) {
123+
throw new CanceledException("Disposed");
124+
}
125+
119126
final int streamId = streamIdSupplier.nextStreamId(activeStreams);
120127

121128
activeStreams.put(streamId, frameHandler);
@@ -128,9 +135,10 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
128135
}
129136

130137
public synchronized boolean add(int streamId, FrameHandler frameHandler) {
131-
if (this.terminated) {
132-
throwing
138+
if (this.terminating) {
139+
throw new CanceledException("Disposed");
133140
}
141+
134142
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
135143
// copy of Map.putIfAbsent(key, value) without `streamId` boxing
136144
final FrameHandler previousHandler = activeStreams.get(streamId);
@@ -160,14 +168,27 @@ public synchronized FrameHandler get(int streamId) {
160168
* @return {@code true} if there is {@link FrameHandler} for the given {@code streamId} and the
161169
* instance equals to the passed one
162170
*/
163-
public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
164-
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
165-
// copy of Map.remove(key, value) without `streamId` boxing
166-
final FrameHandler curValue = activeStreams.get(streamId);
167-
if (!Objects.equals(curValue, frameHandler)) {
168-
return false;
171+
public boolean remove(int streamId, FrameHandler frameHandler) {
172+
final boolean terminated;
173+
synchronized (this) {
174+
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
175+
// copy of Map.remove(key, value) without `streamId` boxing
176+
final FrameHandler curValue = activeStreams.get(streamId);
177+
if (!Objects.equals(curValue, frameHandler)) {
178+
return false;
179+
}
180+
activeStreams.remove(streamId);
181+
if (activeStreams.size() == 0) {
182+
terminated = true;
183+
this.terminated = true;
184+
} else {
185+
terminated = false;
186+
}
187+
}
188+
189+
if (terminated) {
190+
onAllStreamsTerminatedSink.tryEmitEmpty();
169191
}
170-
activeStreams.remove(streamId);
171192
return true;
172193
}
173194

@@ -186,8 +207,6 @@ public void terminate() {
186207

187208
if (terminated) {
188209
onAllStreamsTerminatedSink.tryEmitEmpty();
189-
} else {
190-
191210
}
192211
}
193212

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,12 @@
1616

1717
package io.rsocket.transport.local;
1818

19-
import io.netty.buffer.ByteBufUtil;
20-
import io.netty.buffer.Unpooled;
21-
import io.rsocket.frame.FrameHeaderCodec;
22-
import io.rsocket.frame.FrameType;
2319
import io.rsocket.test.TransportTest;
2420
import java.time.Duration;
2521
import java.util.UUID;
2622

2723
final class LocalResumableTransportTest implements TransportTest {
2824

29-
public static void main(String[] args) {
30-
31-
System.out.println(
32-
FrameHeaderCodec.frameType(
33-
Unpooled.copiedBuffer(
34-
ByteBufUtil.decodeHexDump("000000003800000000000007ef4a"))));
35-
System.out.println(
36-
FrameHeaderCodec.frameType(
37-
Unpooled.copiedBuffer(
38-
ByteBufUtil.decodeHexDump("000000002c0000000004"))));
39-
}
40-
4125
private final TransportPair transportPair =
4226
new TransportPair<>(
4327
() -> "test-" + UUID.randomUUID(),

0 commit comments

Comments
 (0)