Skip to content

Commit 7b20b4c

Browse files
authored
Merge pull request #276 from scalecube/develop
New release
2 parents 791bc0e + 4943db4 commit 7b20b4c

File tree

4 files changed

+93
-28
lines changed

4 files changed

+93
-28
lines changed

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private Mono<Cluster> doStart0() {
232232
return TransportImpl.bind(config.transportConfig())
233233
.flatMap(
234234
transport1 -> {
235-
localMember = createLocalMember(transport1.address().port());
235+
localMember = createLocalMember(transport1.address());
236236
transport = new SenderAwareTransport(transport1, localMember.address());
237237

238238
cidGenerator = new CorrelationIdGenerator(localMember.id());
@@ -350,18 +350,17 @@ private Flux<MembershipEvent> listenMembership() {
350350
* Creates and prepares local cluster member. An address of member that's being constructed may be
351351
* overriden from config variables.
352352
*
353-
* @param listenPort transport listen port
353+
* @param address transport address
354354
* @return local cluster member with cluster address and cluster member id
355355
*/
356-
private Member createLocalMember(int listenPort) {
357-
String localAddress = Address.getLocalIpAddress().getHostAddress();
358-
Integer port = Optional.ofNullable(config.memberPort()).orElse(listenPort);
356+
private Member createLocalMember(Address address) {
357+
int port = Optional.ofNullable(config.memberPort()).orElse(address.port());
359358

360359
// calculate local member cluster address
361360
Address memberAddress =
362361
Optional.ofNullable(config.memberHost())
363-
.map(memberHost -> Address.create(memberHost, port))
364-
.orElseGet(() -> Address.create(localAddress, listenPort));
362+
.map(host -> Address.create(host, port))
363+
.orElseGet(() -> Address.create(address.host(), port));
365364

366365
return new Member(Member.generateId(), config.memberAlias(), memberAddress);
367366
}
@@ -507,11 +506,11 @@ public boolean isShutdown() {
507506
private static class SenderAwareTransport implements Transport {
508507

509508
private final Transport transport;
510-
private final Address sender;
509+
private final Address address;
511510

512-
private SenderAwareTransport(Transport transport, Address sender) {
511+
private SenderAwareTransport(Transport transport, Address address) {
513512
this.transport = Objects.requireNonNull(transport);
514-
this.sender = Objects.requireNonNull(sender);
513+
this.address = Objects.requireNonNull(address);
515514
}
516515

517516
@Override
@@ -545,7 +544,7 @@ public Flux<Message> listen() {
545544
}
546545

547546
private Message enhanceWithSender(Message message) {
548-
return Message.with(message).sender(sender).build();
547+
return Message.with(message).sender(address).build();
549548
}
550549
}
551550
}

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.scalecube.cluster.transport.api.Message;
1818
import io.scalecube.cluster.transport.api.Transport;
1919
import io.scalecube.net.Address;
20+
import java.net.InetAddress;
2021
import java.nio.ByteBuffer;
2122
import java.time.Duration;
2223
import java.util.ArrayList;
@@ -166,15 +167,40 @@ public MembershipProtocolImpl(
166167
.subscribe(this::onMemberRemoved)));
167168
}
168169

169-
// Remove duplicates and local address
170+
// Remove duplicates and local address(es)
170171
private List<Address> cleanUpSeedMembers(Collection<Address> seedMembers) {
172+
InetAddress localIpAddress = Address.getLocalIpAddress();
173+
174+
String hostAddress = localIpAddress.getHostAddress();
175+
String hostName = localIpAddress.getHostName();
176+
177+
Address memberAddr = localMember.address();
178+
Address transportAddr = transport.address();
179+
Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port());
180+
Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port());
181+
Address memberAddByHostName = Address.create(hostName, memberAddr.port());
182+
Address transportAddrByHostName = Address.create(hostName, transportAddr.port());
183+
171184
return new LinkedHashSet<>(seedMembers)
172185
.stream()
173-
.filter(address -> !address.equals(localMember.address()))
174-
.filter(address -> !address.equals(transport.address()))
186+
.filter(addr -> checkAddressesNotEqual(addr, memberAddr))
187+
.filter(addr -> checkAddressesNotEqual(addr, transportAddr))
188+
.filter(addr -> checkAddressesNotEqual(addr, memberAddrByHostAddress))
189+
.filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostAddress))
190+
.filter(addr -> checkAddressesNotEqual(addr, memberAddByHostName))
191+
.filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostName))
175192
.collect(Collectors.toList());
176193
}
177194

195+
private boolean checkAddressesNotEqual(Address address0, Address address1) {
196+
if (!address0.equals(address1)) {
197+
return true;
198+
} else {
199+
LOGGER.warn("Filtering out seed address: {}", address0);
200+
return false;
201+
}
202+
}
203+
178204
@Override
179205
public Flux<MembershipEvent> listen() {
180206
return subject.onBackpressureBuffer();

cluster/src/test/java/io/scalecube/cluster/ClusterTest.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.scalecube.cluster.membership.MembershipEvent.Type;
99
import io.scalecube.cluster.metadata.SimpleMapMetadataCodec;
1010
import io.scalecube.net.Address;
11+
import java.net.InetAddress;
1112
import java.time.Duration;
1213
import java.util.ArrayList;
1314
import java.util.Collection;
@@ -29,6 +30,7 @@
2930
public class ClusterTest extends BaseTest {
3031

3132
public static final Duration TIMEOUT = Duration.ofSeconds(30);
33+
public static final int CONNECT_TIMEOUT = 3000;
3234

3335
@Test
3436
public void testMembersAccessFromScheduler() {
@@ -50,29 +52,53 @@ public void testMembersAccessFromScheduler() {
5052
}
5153

5254
@Test
53-
public void testJoinLocalhostIgnored() {
54-
Address[] addresses = {Address.from("localhost:4801"), Address.from("127.0.0.1:4801")};
55+
public void testJoinLocalhostIgnored() throws InterruptedException {
56+
InetAddress localIpAddress = Address.getLocalIpAddress();
57+
Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 4801);
58+
Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 4801);
59+
Address[] addresses = {
60+
Address.from("localhost:4801"),
61+
Address.from("127.0.0.1:4801"),
62+
Address.from("127.0.1.1:4801"),
63+
localAddressByHostname,
64+
localAddressByIp
65+
};
5566

5667
// Start seed node
5768
Cluster seedNode =
5869
new ClusterImpl()
59-
.transport(opts -> opts.port(4801).connectTimeout(500))
70+
.transport(opts -> opts.port(4801).connectTimeout(CONNECT_TIMEOUT))
6071
.membership(opts -> opts.seedMembers(addresses))
6172
.startAwait();
6273

74+
Thread.sleep(CONNECT_TIMEOUT);
75+
6376
Collection<Member> otherMembers = seedNode.otherMembers();
6477
assertEquals(0, otherMembers.size(), "otherMembers: " + otherMembers);
6578
}
6679

6780
@Test
68-
public void testJoinLocalhostIgnoredWithOverride() {
81+
public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException {
82+
InetAddress localIpAddress = Address.getLocalIpAddress();
83+
Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 7878);
84+
Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 7878);
85+
Address[] addresses = {
86+
Address.from("localhost:7878"),
87+
Address.from("127.0.0.1:7878"),
88+
Address.from("127.0.1.1:7878"),
89+
localAddressByHostname,
90+
localAddressByIp
91+
};
92+
6993
// Start seed node
7094
Cluster seedNode =
7195
new ClusterImpl(new ClusterConfig().memberHost("localhost").memberPort(7878))
72-
.transport(opts -> opts.port(7878).connectTimeout(500))
73-
.membership(opts -> opts.seedMembers(Address.from("localhost:7878")))
96+
.transport(opts -> opts.port(7878).connectTimeout(CONNECT_TIMEOUT))
97+
.membership(opts -> opts.seedMembers(addresses))
7498
.startAwait();
7599

100+
Thread.sleep(CONNECT_TIMEOUT);
101+
76102
Collection<Member> otherMembers = seedNode.otherMembers();
77103
assertEquals(0, otherMembers.size(), "otherMembers: " + otherMembers);
78104
}

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.scalecube.cluster.transport.api.Transport;
1717
import io.scalecube.cluster.transport.api.TransportConfig;
1818
import io.scalecube.net.Address;
19+
import java.net.InetAddress;
1920
import java.util.Map;
2021
import java.util.Objects;
2122
import java.util.concurrent.ConcurrentHashMap;
@@ -98,7 +99,7 @@ public TransportImpl(TransportConfig config) {
9899
*/
99100
private TransportImpl(DisposableServer server, TransportImpl other) {
100101
this.server = server;
101-
this.address = Address.create(server.address().getHostString(), server.address().getPort());
102+
this.address = prepareAddress(server);
102103
this.config = other.config;
103104
this.loopResources = other.loopResources;
104105
this.messagesSubject = other.messagesSubject;
@@ -116,6 +117,16 @@ private TransportImpl(DisposableServer server, TransportImpl other) {
116117
.subscribe(null, ex -> LOGGER.warn("Exception occurred on transport stop: " + ex));
117118
}
118119

120+
private static Address prepareAddress(DisposableServer server) {
121+
InetAddress address = server.address().getAddress();
122+
int port = server.address().getPort();
123+
if (address.isAnyLocalAddress()) {
124+
return Address.create(Address.getLocalIpAddress().getHostAddress(), port);
125+
} else {
126+
return Address.create(address.getHostAddress(), port);
127+
}
128+
}
129+
119130
/**
120131
* Init transport with the default configuration synchronously. Starts to accept connections on
121132
* local address.
@@ -171,15 +182,22 @@ public Mono<Transport> bind0() {
171182
.handle(this::onMessage)
172183
.bind()
173184
.doOnSuccess(
174-
server ->
175-
LOGGER.debug("Bound cluster transport on {}:{}", server.host(), server.port()))
185+
server -> {
186+
InetAddress address = server.address().getAddress();
187+
if (address.isAnyLocalAddress()) {
188+
LOGGER.debug("Bound cluster transport on *:{}", server.port());
189+
} else {
190+
LOGGER.debug(
191+
"Bound cluster transport on {}:{}", address.getHostAddress(), server.port());
192+
}
193+
})
176194
.doOnError(
177195
ex ->
178196
LOGGER.error(
179197
"Failed to bind cluster transport on port={}, cause: {}",
180198
config.port(),
181199
ex.toString()))
182-
.map(this::onBind);
200+
.map(server -> new TransportImpl(server, this));
183201
}
184202

185203
@Override
@@ -270,10 +288,6 @@ private Message toMessage(ByteBuf byteBuf) {
270288
}
271289
}
272290

273-
private TransportImpl onBind(DisposableServer server) {
274-
return new TransportImpl(server, this);
275-
}
276-
277291
private Mono<? extends Void> send0(Connection conn, Message message) {
278292
// do send
279293
return conn.outbound()

0 commit comments

Comments
 (0)