Skip to content

Commit 4f466f6

Browse files
authored
perf changes (#552)
* reset to snapshots for next release * Switching FrameType flags back to bits (#548) * switching frame type flag back to bits so we don't have to traverse an array when looking up flags * jmh test * Added an EMPTY flag * changes from performance testing for Netty transport, and underlying RSocket core protocol * removing unused imports * switched to netty's IntObjectMap Signed-off-by: Robert Roeser <[email protected]>
1 parent d15bd91 commit 4f466f6

File tree

8 files changed

+383
-51
lines changed

8 files changed

+383
-51
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
# limitations under the License.
1313
#
1414

15-
version=0.11.13.BUILD-SNAPSHOT
15+
version=0.11.14.BUILD-SNAPSHOT

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,27 @@
1616

1717
package io.rsocket;
1818

19+
import io.netty.util.collection.IntObjectHashMap;
1920
import io.rsocket.exceptions.ConnectionErrorException;
2021
import io.rsocket.exceptions.Exceptions;
2122
import io.rsocket.framing.FrameType;
2223
import io.rsocket.internal.LimitableRequestPublisher;
2324
import io.rsocket.internal.UnboundedProcessor;
25+
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.Mono;
29+
import reactor.core.publisher.SignalType;
30+
import reactor.core.publisher.UnicastProcessor;
2431

2532
import java.nio.channels.ClosedChannelException;
2633
import java.time.Duration;
34+
import java.util.Collections;
35+
import java.util.Map;
2736
import java.util.concurrent.atomic.AtomicBoolean;
2837
import java.util.concurrent.atomic.AtomicReference;
2938
import java.util.function.Consumer;
3039
import java.util.function.Function;
31-
import org.jctools.maps.NonBlockingHashMapLong;
32-
import org.reactivestreams.Publisher;
33-
import org.reactivestreams.Subscriber;
34-
import reactor.core.publisher.*;
3540

3641
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
3742
class RSocketClient implements RSocket {
@@ -40,8 +45,8 @@ class RSocketClient implements RSocket {
4045
private final Function<Frame, ? extends Payload> frameDecoder;
4146
private final Consumer<Throwable> errorConsumer;
4247
private final StreamIdSupplier streamIdSupplier;
43-
private final NonBlockingHashMapLong<LimitableRequestPublisher> senders;
44-
private final NonBlockingHashMapLong<UnicastProcessor<Payload>> receivers;
48+
private final Map<Integer, LimitableRequestPublisher> senders;
49+
private final Map<Integer, UnicastProcessor<Payload>> receivers;
4550
private final UnboundedProcessor<Frame> sendProcessor;
4651
private KeepAliveHandler keepAliveHandler;
4752
private final Lifecycle lifecycle = new Lifecycle();
@@ -69,8 +74,8 @@ class RSocketClient implements RSocket {
6974
this.frameDecoder = frameDecoder;
7075
this.errorConsumer = errorConsumer;
7176
this.streamIdSupplier = streamIdSupplier;
72-
this.senders = new NonBlockingHashMapLong<>(256);
73-
this.receivers = new NonBlockingHashMapLong<>(256);
77+
this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
78+
this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>());
7479

7580
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
7681
this.sendProcessor = new UnboundedProcessor<>();

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,29 @@
1616

1717
package io.rsocket;
1818

19-
import static io.rsocket.Frame.Request.initialRequestN;
20-
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
21-
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
22-
19+
import io.netty.util.collection.IntObjectHashMap;
2320
import io.rsocket.exceptions.ApplicationErrorException;
2421
import io.rsocket.exceptions.ConnectionErrorException;
2522
import io.rsocket.framing.FrameType;
2623
import io.rsocket.internal.LimitableRequestPublisher;
2724
import io.rsocket.internal.UnboundedProcessor;
28-
import java.util.function.Consumer;
29-
import java.util.function.Function;
30-
import org.jctools.maps.NonBlockingHashMapLong;
3125
import org.reactivestreams.Publisher;
3226
import org.reactivestreams.Subscriber;
3327
import org.reactivestreams.Subscription;
3428
import reactor.core.Disposable;
35-
import reactor.core.publisher.*;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Mono;
31+
import reactor.core.publisher.SignalType;
32+
import reactor.core.publisher.UnicastProcessor;
33+
34+
import java.util.Collections;
35+
import java.util.Map;
36+
import java.util.function.Consumer;
37+
import java.util.function.Function;
38+
39+
import static io.rsocket.Frame.Request.initialRequestN;
40+
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
41+
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
3642

3743
/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
3844
class RSocketServer implements RSocket {
@@ -42,8 +48,8 @@ class RSocketServer implements RSocket {
4248
private final Function<Frame, ? extends Payload> frameDecoder;
4349
private final Consumer<Throwable> errorConsumer;
4450

45-
private final NonBlockingHashMapLong<Subscription> sendingSubscriptions;
46-
private final NonBlockingHashMapLong<UnicastProcessor<Payload>> channelProcessors;
51+
private final Map<Integer, Subscription> sendingSubscriptions;
52+
private final Map<Integer, UnicastProcessor<Payload>> channelProcessors;
4753

4854
private final UnboundedProcessor<Frame> sendProcessor;
4955
private KeepAliveHandler keepAliveHandler;
@@ -69,8 +75,8 @@ class RSocketServer implements RSocket {
6975
this.requestHandler = requestHandler;
7076
this.frameDecoder = frameDecoder;
7177
this.errorConsumer = errorConsumer;
72-
this.sendingSubscriptions = new NonBlockingHashMapLong<>();
73-
this.channelProcessors = new NonBlockingHashMapLong<>();
78+
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
79+
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());
7480

7581
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
7682
// connections

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
package io.rsocket.internal;
1818

1919
import io.netty.util.ReferenceCountUtil;
20-
import java.util.Objects;
21-
import java.util.Queue;
22-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2420
import org.reactivestreams.Subscriber;
2521
import org.reactivestreams.Subscription;
2622
import reactor.core.CoreSubscriber;
@@ -32,6 +28,11 @@
3228
import reactor.util.concurrent.Queues;
3329
import reactor.util.context.Context;
3430

31+
import java.util.Objects;
32+
import java.util.Queue;
33+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
35+
3536
/**
3637
* A Processor implementation that takes a custom queue and allows only a single subscriber.
3738
*

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import reactor.core.Disposable;
3434
import reactor.core.publisher.Flux;
3535
import reactor.core.publisher.Mono;
36+
import reactor.core.scheduler.Schedulers;
3637
import reactor.test.StepVerifier;
3738

3839
public interface TransportTest {
@@ -164,16 +165,23 @@ default void requestChannel3() {
164165
.expectComplete()
165166
.verify(getTimeout());
166167
}
167-
168+
168169
@DisplayName("makes 1 requestChannel request with 512 payloads")
169170
@Test
170171
default void requestChannel512() {
171172
Flux<Payload> payloads = Flux.range(0, 512).map(this::createTestPayload);
172-
173+
174+
Flux.range(0, 1024)
175+
.flatMap(v -> Mono.fromRunnable(()-> check(payloads)).subscribeOn(Schedulers.elastic()), 12)
176+
.blockLast();
177+
}
178+
179+
default void check(Flux<Payload> payloads) {
173180
getClient()
174181
.requestChannel(payloads)
175182
.as(StepVerifier::create)
176183
.expectNextCount(512)
184+
.as("expected 512 items")
177185
.expectComplete()
178186
.verify(getTimeout());
179187
}
@@ -233,7 +241,7 @@ default void requestStream10_000() {
233241
.expectComplete()
234242
.verify(getTimeout());
235243
}
236-
244+
237245
@DisplayName("makes 1 requestStream request and receives 5 responses")
238246
@Test
239247
default void requestStream5() {

0 commit comments

Comments
 (0)