Skip to content

Commit d2c8d13

Browse files
authored
make sure frames are emitted in the correct order (#427)
1 parent 2456e49 commit d2c8d13

File tree

2 files changed

+19
-15
lines changed

2 files changed

+19
-15
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@
1616

1717
package io.rsocket;
1818

19+
import static io.rsocket.util.ExceptionUtil.noStacktrace;
20+
1921
import io.netty.buffer.Unpooled;
2022
import io.netty.util.collection.IntObjectHashMap;
2123
import io.rsocket.exceptions.ConnectionException;
2224
import io.rsocket.exceptions.Exceptions;
2325
import io.rsocket.internal.LimitableRequestPublisher;
2426
import io.rsocket.util.PayloadImpl;
25-
import org.reactivestreams.Publisher;
26-
import org.reactivestreams.Subscriber;
27-
import reactor.core.Disposable;
28-
import reactor.core.publisher.*;
29-
30-
import javax.annotation.Nullable;
3127
import java.nio.channels.ClosedChannelException;
3228
import java.time.Duration;
3329
import java.util.Collection;
@@ -36,8 +32,11 @@
3632
import java.util.function.Consumer;
3733
import java.util.function.Function;
3834
import java.util.function.Supplier;
39-
40-
import static io.rsocket.util.ExceptionUtil.noStacktrace;
35+
import javax.annotation.Nullable;
36+
import org.reactivestreams.Publisher;
37+
import org.reactivestreams.Subscriber;
38+
import reactor.core.Disposable;
39+
import reactor.core.publisher.*;
4140

4241
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
4342
class RSocketClient implements RSocket {
@@ -53,7 +52,7 @@ class RSocketClient implements RSocket {
5352
private final IntObjectHashMap<Subscriber<Payload>> receivers;
5453
private final AtomicInteger missedAckCounter;
5554

56-
private final EmitterProcessor<Frame> sendProcessor;
55+
private final FluxProcessor<Frame, Frame> sendProcessor;
5756

5857
private @Nullable Disposable keepAliveSendSub;
5958
private volatile long timeLastTickSentMs;
@@ -82,7 +81,7 @@ class RSocketClient implements RSocket {
8281

8382
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
8483
// connections
85-
this.sendProcessor = EmitterProcessor.create();
84+
this.sendProcessor = EmitterProcessor.<Frame>create().serialize();
8685

8786
if (!Duration.ZERO.equals(tickPeriod)) {
8887
long ackTimeoutMs = ackTimeout.toMillis();
@@ -91,7 +90,7 @@ class RSocketClient implements RSocket {
9190
started
9291
.thenMany(Flux.interval(tickPeriod))
9392
.doOnSubscribe(s -> timeLastTickSentMs = System.currentTimeMillis())
94-
.flatMap(i -> sendKeepAlive(ackTimeoutMs, missedAcks))
93+
.concatMap(i -> sendKeepAlive(ackTimeoutMs, missedAcks))
9594
.doOnError(
9695
t -> {
9796
errorConsumer.accept(t);
@@ -425,7 +424,7 @@ protected void cleanup() {
425424
synchronized (RSocketClient.this) {
426425
subscribers = receivers.values();
427426
publishers = senders.values();
428-
427+
429428
senders.clear();
430429
receivers.clear();
431430
}

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class RSocketServer implements RSocket {
4545
private final IntObjectHashMap<Subscription> sendingSubscriptions;
4646
private final IntObjectHashMap<UnicastProcessor<Payload>> channelProcessors;
4747

48-
private final EmitterProcessor<Frame> sendProcessor;
48+
private final FluxProcessor<Frame, Frame> sendProcessor;
4949
private Disposable receiveDisposable;
5050

5151
RSocketServer(
@@ -58,7 +58,7 @@ class RSocketServer implements RSocket {
5858

5959
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
6060
// connections
61-
this.sendProcessor = EmitterProcessor.create();
61+
this.sendProcessor = EmitterProcessor.<Frame>create().serialize();
6262

6363
connection
6464
.send(sendProcessor)
@@ -67,7 +67,12 @@ class RSocketServer implements RSocket {
6767
.subscribe();
6868

6969
this.receiveDisposable =
70-
connection.receive().flatMap(this::handleFrame).doOnError(errorConsumer).then().subscribe();
70+
connection
71+
.receive()
72+
.flatMapSequential(this::handleFrame)
73+
.doOnError(errorConsumer)
74+
.then()
75+
.subscribe();
7176

7277
this.connection
7378
.onClose()

0 commit comments

Comments
 (0)