Skip to content

Commit 8ae24c6

Browse files
authored
loadbalancer reconnects when downstream connection goes away (#547)
* updates to netty's duplex connection * loadbalancer reconnects when downstream connection goes away * fixed loadbalancer tests
1 parent 4f466f6 commit 8ae24c6

File tree

10 files changed

+640
-562
lines changed

10 files changed

+640
-562
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ atlassian-ide-plugin.xml
6565

6666
# NetBeans specific files/directories
6767
.nbattrs
68-
/bin
68+
**/bin/*
6969

7070
#.gitignore in subdirectory
7171
.gitignore

rsocket-core/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ dependencies {
2727
api 'io.netty:netty-buffer'
2828
api 'io.projectreactor:reactor-core'
2929

30-
implementation 'org.jctools:jctools-core'
3130
implementation 'org.slf4j:slf4j-api'
3231

3332
compileOnly 'com.google.code.findbugs:jsr305'

rsocket-core/src/main/java/io/rsocket/StreamIdSupplier.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,31 @@
1616

1717
package io.rsocket;
1818

19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
1921
final class StreamIdSupplier {
2022

21-
private int streamId;
23+
private static final AtomicIntegerFieldUpdater<StreamIdSupplier> STREAM_ID =
24+
AtomicIntegerFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
25+
private volatile int streamId;
2226

2327
private StreamIdSupplier(int streamId) {
2428
this.streamId = streamId;
2529
}
2630

27-
synchronized int nextStreamId() {
28-
streamId += 2;
29-
return streamId;
30-
}
31-
32-
synchronized boolean isBeforeOrCurrent(int streamId) {
33-
return this.streamId >= streamId && streamId > 0;
34-
}
35-
3631
static StreamIdSupplier clientSupplier() {
3732
return new StreamIdSupplier(-1);
3833
}
3934

4035
static StreamIdSupplier serverSupplier() {
4136
return new StreamIdSupplier(0);
4237
}
38+
39+
int nextStreamId() {
40+
return STREAM_ID.addAndGet(this, 2);
41+
}
42+
43+
boolean isBeforeOrCurrent(int streamId) {
44+
return this.streamId >= streamId && streamId > 0;
45+
}
4346
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,24 @@
1616

1717
package io.rsocket.fragmentation;
1818

19-
import static io.rsocket.fragmentation.FrameReassembler.createFrameReassembler;
20-
import static io.rsocket.util.AbstractionLeakingFrameUtils.toAbstractionLeakingFrame;
21-
2219
import io.netty.buffer.ByteBufAllocator;
2320
import io.netty.buffer.PooledByteBufAllocator;
21+
import io.netty.util.collection.IntObjectHashMap;
22+
import io.netty.util.collection.LongObjectHashMap;
2423
import io.rsocket.DuplexConnection;
2524
import io.rsocket.Frame;
2625
import io.rsocket.util.AbstractionLeakingFrameUtils;
2726
import io.rsocket.util.NumberUtils;
28-
import java.util.Objects;
29-
import org.jctools.maps.NonBlockingHashMapLong;
3027
import org.reactivestreams.Publisher;
3128
import reactor.core.publisher.Flux;
3229
import reactor.core.publisher.Mono;
3330

31+
import java.util.Collection;
32+
import java.util.Objects;
33+
34+
import static io.rsocket.fragmentation.FrameReassembler.createFrameReassembler;
35+
import static io.rsocket.util.AbstractionLeakingFrameUtils.toAbstractionLeakingFrame;
36+
3437
/**
3538
* A {@link DuplexConnection} implementation that fragments and reassembles {@link Frame}s.
3639
*
@@ -46,8 +49,7 @@ public final class FragmentationDuplexConnection implements DuplexConnection {
4649

4750
private final FrameFragmenter frameFragmenter;
4851

49-
private final NonBlockingHashMapLong<FrameReassembler> frameReassemblers =
50-
new NonBlockingHashMapLong<>();
52+
private final IntObjectHashMap<FrameReassembler> frameReassemblers = new IntObjectHashMap<>();
5153

5254
/**
5355
* Creates a new instance.
@@ -85,7 +87,16 @@ public FragmentationDuplexConnection(
8587

8688
delegate
8789
.onClose()
88-
.doFinally(signalType -> frameReassemblers.values().forEach(FrameReassembler::dispose))
90+
.doFinally(
91+
signalType -> {
92+
Collection<FrameReassembler> values;
93+
synchronized (this) {
94+
values = frameReassemblers.values();
95+
}
96+
for (FrameReassembler reassembler : values) {
97+
reassembler.dispose();
98+
}
99+
})
89100
.subscribe();
90101
}
91102

@@ -134,9 +145,13 @@ private Flux<Frame> toFragmentedFrames(int streamId, io.rsocket.framing.Frame fr
134145
}
135146

136147
private Mono<Frame> toReassembledFrames(int streamId, io.rsocket.framing.Frame fragment) {
137-
FrameReassembler frameReassembler =
138-
frameReassemblers.computeIfAbsent(
139-
(long) streamId, i -> createFrameReassembler(byteBufAllocator));
148+
FrameReassembler frameReassembler;
149+
150+
synchronized (this) {
151+
frameReassembler =
152+
frameReassemblers.computeIfAbsent(
153+
streamId, i -> createFrameReassembler(byteBufAllocator));
154+
}
140155

141156
return Mono.justOrEmpty(frameReassembler.reassemble(fragment))
142157
.map(frame -> toAbstractionLeakingFrame(byteBufAllocator, streamId, frame));

0 commit comments

Comments
 (0)