Skip to content

Commit 78a747a

Browse files
improves DuplexConnection api and reworks Resumability (#923)
Co-authored-by: Rossen Stoyanchev <[email protected]>
1 parent 4606f25 commit 78a747a

File tree

66 files changed

+2765
-2730
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2765
-2730
lines changed

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,28 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import java.net.SocketAddress;
2222
import java.nio.channels.ClosedChannelException;
23-
import org.reactivestreams.Publisher;
2423
import org.reactivestreams.Subscriber;
2524
import reactor.core.publisher.Flux;
26-
import reactor.core.publisher.Mono;
2725

2826
/** Represents a connection with input/output that the protocol uses. */
2927
public interface DuplexConnection extends Availability, Closeable {
3028

3129
/**
32-
* Sends the source of Frames on this connection and returns the {@code Publisher} representing
33-
* the result of this send.
30+
* Delivers the given frame to the underlying transport connection. This method is non-blocking
31+
* and can be safely executed from multiple threads. This method does not provide any flow-control
32+
* mechanism.
3433
*
35-
* <p><strong>Flow control</strong>
36-
*
37-
* <p>The passed {@code Publisher} must
38-
*
39-
* @param frames Stream of {@code Frame}s to send on the connection.
40-
* @return {@code Publisher} that completes when all the frames are written on the connection
41-
* successfully and errors when it fails.
42-
* @throws NullPointerException if {@code frames} is {@code null}
34+
* @param streamId to which the given frame relates
35+
* @param frame with the encoded content
4336
*/
44-
Mono<Void> send(Publisher<ByteBuf> frames);
37+
void sendFrame(int streamId, ByteBuf frame);
4538

4639
/**
47-
* Sends a single {@code Frame} on this connection and returns the {@code Publisher} representing
48-
* the result of this send.
40+
* Send an error frame and after it is successfully sent, close the connection.
4941
*
50-
* @param frame {@code Frame} to send.
51-
* @return {@code Publisher} that completes when the frame is written on the connection
52-
* successfully and errors when it fails.
42+
* @param errorException to encode in the error frame
5343
*/
54-
default Mono<Void> sendOne(ByteBuf frame) {
55-
return send(Mono.just(frame));
56-
}
44+
void sendErrorAndClose(RSocketErrorException errorException);
5745

5846
/**
5947
* Returns a stream of all {@code Frame}s received on this connection.

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

Lines changed: 15 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.Closeable;
2222
import io.rsocket.DuplexConnection;
23+
import io.rsocket.RSocketErrorException;
2324
import io.rsocket.frame.FrameHeaderCodec;
24-
import io.rsocket.frame.FrameUtil;
2525
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
2626
import io.rsocket.plugins.InitializingInterceptorRegistry;
2727
import java.net.SocketAddress;
2828
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
29-
import org.reactivestreams.Publisher;
3029
import org.reactivestreams.Subscription;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
3330
import reactor.core.CoreSubscriber;
3431
import reactor.core.publisher.Flux;
3532
import reactor.core.publisher.Mono;
@@ -50,67 +47,40 @@
5047
*/
5148
class ClientServerInputMultiplexer implements CoreSubscriber<ByteBuf>, Closeable {
5249

53-
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
54-
private static final InitializingInterceptorRegistry emptyInterceptorRegistry =
55-
new InitializingInterceptorRegistry();
56-
57-
private final InternalDuplexConnection setupReceiver;
5850
private final InternalDuplexConnection serverReceiver;
5951
private final InternalDuplexConnection clientReceiver;
60-
private final DuplexConnection setupConnection;
6152
private final DuplexConnection serverConnection;
6253
private final DuplexConnection clientConnection;
6354
private final DuplexConnection source;
6455
private final boolean isClient;
6556

6657
private Subscription s;
67-
private boolean setupReceived;
6858

6959
private Throwable t;
7060

7161
private volatile int state;
7262
private static final AtomicIntegerFieldUpdater<ClientServerInputMultiplexer> STATE =
7363
AtomicIntegerFieldUpdater.newUpdater(ClientServerInputMultiplexer.class, "state");
7464

75-
public ClientServerInputMultiplexer(DuplexConnection source) {
76-
this(source, emptyInterceptorRegistry, false);
77-
}
78-
7965
public ClientServerInputMultiplexer(
8066
DuplexConnection source, InitializingInterceptorRegistry registry, boolean isClient) {
8167
this.source = source;
8268
this.isClient = isClient;
83-
source = registry.initConnection(Type.SOURCE, source);
8469

85-
if (!isClient) {
86-
setupReceiver = new InternalDuplexConnection(this, source);
87-
setupConnection = registry.initConnection(Type.SETUP, setupReceiver);
88-
} else {
89-
setupReceiver = null;
90-
setupConnection = null;
91-
}
92-
serverReceiver = new InternalDuplexConnection(this, source);
93-
clientReceiver = new InternalDuplexConnection(this, source);
94-
serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
95-
clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
70+
this.serverReceiver = new InternalDuplexConnection(this, source);
71+
this.clientReceiver = new InternalDuplexConnection(this, source);
72+
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
73+
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
9674
}
9775

98-
public DuplexConnection asClientServerConnection() {
99-
return source;
100-
}
101-
102-
public DuplexConnection asServerConnection() {
76+
DuplexConnection asServerConnection() {
10377
return serverConnection;
10478
}
10579

106-
public DuplexConnection asClientConnection() {
80+
DuplexConnection asClientConnection() {
10781
return clientConnection;
10882
}
10983

110-
public DuplexConnection asSetupConnection() {
111-
return setupConnection;
112-
}
113-
11484
@Override
11585
public void dispose() {
11686
source.dispose();
@@ -130,12 +100,7 @@ public Mono<Void> onClose() {
130100
public void onSubscribe(Subscription s) {
131101
if (Operators.validate(this.s, s)) {
132102
this.s = s;
133-
if (isClient) {
134-
s.request(Long.MAX_VALUE);
135-
} else {
136-
// request first SetupFrame
137-
s.request(1);
138-
}
103+
s.request(Long.MAX_VALUE);
139104
}
140105
}
141106

@@ -145,12 +110,6 @@ public void onNext(ByteBuf frame) {
145110
final Type type;
146111
if (streamId == 0) {
147112
switch (FrameHeaderCodec.frameType(frame)) {
148-
case SETUP:
149-
case RESUME:
150-
case RESUME_OK:
151-
type = Type.SETUP;
152-
setupReceived = true;
153-
break;
154113
case LEASE:
155114
case KEEPALIVE:
156115
case ERROR:
@@ -164,19 +123,8 @@ public void onNext(ByteBuf frame) {
164123
} else {
165124
type = Type.CLIENT;
166125
}
167-
if (!isClient && type != Type.SETUP && !setupReceived) {
168-
final IllegalStateException error =
169-
new IllegalStateException("SETUP or LEASE frame must be received before any others.");
170-
this.s.cancel();
171-
onError(error);
172-
}
173126

174127
switch (type) {
175-
case SETUP:
176-
final InternalDuplexConnection setupReceiver = this.setupReceiver;
177-
setupReceiver.onNext(frame);
178-
setupReceiver.onComplete();
179-
break;
180128
case CLIENT:
181129
clientReceiver.onNext(frame);
182130
break;
@@ -193,16 +141,6 @@ public void onComplete() {
193141
return;
194142
}
195143

196-
if (!isClient) {
197-
if (!setupReceived) {
198-
setupReceiver.onComplete();
199-
}
200-
201-
if (previousState == 1) {
202-
return;
203-
}
204-
}
205-
206144
if (clientReceiver.isSubscribed()) {
207145
clientReceiver.onComplete();
208146
}
@@ -220,16 +158,6 @@ public void onError(Throwable t) {
220158
return;
221159
}
222160

223-
if (!isClient) {
224-
if (!setupReceived) {
225-
setupReceiver.onError(t);
226-
}
227-
228-
if (previousState == 1) {
229-
return;
230-
}
231-
}
232-
233161
if (clientReceiver.isSubscribed()) {
234162
clientReceiver.onError(t);
235163
}
@@ -244,17 +172,8 @@ boolean notifyRequested() {
244172
return false;
245173
}
246174

247-
if (isClient) {
248-
if (currentState == 2) {
249-
source.receive().subscribe(this);
250-
}
251-
} else {
252-
if (currentState == 1) {
253-
source.receive().subscribe(this);
254-
} else if (currentState == 3) {
255-
// means setup was consumed and we got request from client and server multiplexers
256-
s.request(Long.MAX_VALUE);
257-
}
175+
if (currentState == 2) {
176+
source.receive().subscribe(this);
258177
}
259178

260179
return true;
@@ -280,7 +199,6 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
280199
implements Subscription, DuplexConnection {
281200
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
282201
private final DuplexConnection source;
283-
private final boolean debugEnabled;
284202

285203
private volatile int state;
286204
static final AtomicIntegerFieldUpdater<InternalDuplexConnection> STATE =
@@ -292,7 +210,6 @@ public InternalDuplexConnection(
292210
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
293211
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
294212
this.source = source;
295-
this.debugEnabled = LOGGER.isDebugEnabled();
296213
}
297214

298215
@Override
@@ -340,32 +257,18 @@ void onError(Throwable t) {
340257
}
341258

342259
@Override
343-
public Mono<Void> send(Publisher<ByteBuf> frame) {
344-
if (debugEnabled) {
345-
return Flux.from(frame)
346-
.doOnNext(f -> LOGGER.debug("sending -> " + FrameUtil.toString(f)))
347-
.as(source::send);
348-
}
349-
350-
return source.send(frame);
260+
public void sendFrame(int streamId, ByteBuf frame) {
261+
source.sendFrame(streamId, frame);
351262
}
352263

353264
@Override
354-
public Mono<Void> sendOne(ByteBuf frame) {
355-
if (debugEnabled) {
356-
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
357-
}
358-
359-
return source.sendOne(frame);
265+
public void sendErrorAndClose(RSocketErrorException e) {
266+
source.sendErrorAndClose(e);
360267
}
361268

362269
@Override
363270
public Flux<ByteBuf> receive() {
364-
if (debugEnabled) {
365-
return this.doOnNext(frame -> LOGGER.debug("receiving -> " + FrameUtil.toString(frame)));
366-
} else {
367-
return this;
368-
}
271+
return this;
369272
}
370273

371274
@Override
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.rsocket.core;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.rsocket.DuplexConnection;
6+
import java.nio.channels.ClosedChannelException;
7+
import reactor.core.publisher.Mono;
8+
import reactor.util.function.Tuple2;
9+
import reactor.util.function.Tuples;
10+
11+
abstract class ClientSetup {
12+
abstract Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection);
13+
}
14+
15+
class DefaultClientSetup extends ClientSetup {
16+
17+
@Override
18+
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
19+
return Mono.create(
20+
sink -> sink.onRequest(__ -> sink.success(Tuples.of(Unpooled.EMPTY_BUFFER, connection))));
21+
}
22+
}
23+
24+
class ResumableClientSetup extends ClientSetup {
25+
26+
@Override
27+
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
28+
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
29+
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
30+
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
31+
}
32+
}

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020
import static io.rsocket.core.SendUtils.sendReleasingPayload;
2121
import static io.rsocket.core.StateUtils.*;
2222

23-
import io.netty.buffer.ByteBuf;
2423
import io.netty.buffer.ByteBufAllocator;
2524
import io.netty.util.IllegalReferenceCountException;
25+
import io.rsocket.DuplexConnection;
2626
import io.rsocket.Payload;
2727
import io.rsocket.frame.FrameType;
28-
import io.rsocket.internal.UnboundedProcessor;
2928
import java.time.Duration;
3029
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3130
import org.reactivestreams.Subscription;
@@ -50,15 +49,15 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
5049
final int mtu;
5150
final int maxFrameLength;
5251
final RequesterResponderSupport requesterResponderSupport;
53-
final UnboundedProcessor<ByteBuf> sendProcessor;
52+
final DuplexConnection connection;
5453

5554
FireAndForgetRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
5655
this.allocator = requesterResponderSupport.getAllocator();
5756
this.payload = payload;
5857
this.mtu = requesterResponderSupport.getMtu();
5958
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
6059
this.requesterResponderSupport = requesterResponderSupport;
61-
this.sendProcessor = requesterResponderSupport.getSendProcessor();
60+
this.connection = requesterResponderSupport.getDuplexConnection();
6261
}
6362

6463
@Override
@@ -106,7 +105,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
106105
}
107106

108107
sendReleasingPayload(
109-
streamId, FrameType.REQUEST_FNF, mtu, p, this.sendProcessor, this.allocator, true);
108+
streamId, FrameType.REQUEST_FNF, mtu, p, this.connection, this.allocator, true);
110109
} catch (Throwable e) {
111110
lazyTerminate(STATE, this);
112111
actual.onError(e);
@@ -169,7 +168,7 @@ public Void block() {
169168
FrameType.REQUEST_FNF,
170169
this.mtu,
171170
this.payload,
172-
this.sendProcessor,
171+
this.connection,
173172
this.allocator,
174173
true);
175174
} catch (Throwable e) {

0 commit comments

Comments
 (0)