Skip to content

Commit ad9db19

Browse files
committed
WIP
1 parent 93cf26a commit ad9db19

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
import io.scalecube.cluster.transport.api.Transport;
55
import io.scalecube.net.Address;
66
import java.util.List;
7+
import java.util.Map;
8+
import java.util.concurrent.ConcurrentHashMap;
79
import java.util.concurrent.atomic.AtomicInteger;
810
import reactor.core.publisher.Mono;
911

1012
public class TransportWrapper {
1113

1214
private final Transport transport;
1315

16+
private final Map<Member, Integer> addressIndexByMember = new ConcurrentHashMap<>();
17+
1418
public TransportWrapper(Transport transport) {
1519
this.transport = transport;
1620
}
@@ -26,13 +30,22 @@ public Mono<Message> requestResponse(Member member, Message request) {
2630
return Mono.defer(
2731
() -> {
2832
final List<Address> addresses = member.addresses();
29-
final AtomicInteger currentIndex = new AtomicInteger();
33+
final int numRetries = addresses.size() - 1;
34+
final Integer index = addressIndexByMember.getOrDefault(member, 0);
35+
final AtomicInteger currentIndex = new AtomicInteger(index);
36+
3037
return Mono.defer(
3138
() -> {
32-
final Address address = addresses.get(currentIndex.getAndIncrement());
39+
int increment = currentIndex.getAndIncrement();
40+
41+
if (increment == addresses.size()) {
42+
currentIndex.set(increment = 0);
43+
}
44+
45+
final Address address = addresses.get(increment);
3346
return transport.requestResponse(address, request);
3447
})
35-
.retry(addresses.size() - 1);
48+
.retry(numRetries);
3649
});
3750
}
3851

cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@ public TransportWrapperTest() {
5050
@Nested
5151
class RequestResponseTests {
5252

53+
@Test
54+
void requestResponseShouldWorkIndex2() {
55+
final List<Address> addresses = Collections.singletonList(Address.from("test:0"));
56+
final Member member = new Member("test", null, addresses, "namespace");
57+
58+
when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response));
59+
60+
StepVerifier.create(transportWrapper.requestResponse(member, request))
61+
.assertNext(message -> Assertions.assertSame(response, message, "response"))
62+
.thenCancel()
63+
.verify();
64+
65+
}
66+
5367
@Test
5468
void requestResponseShouldWork() {
5569
final List<Address> addresses = Collections.singletonList(Address.from("test:0"));

0 commit comments

Comments
 (0)