Skip to content

Commit c540c88

Browse files
committed
Merge branch 'master' into enh_external_hosts
# Conflicts: # cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
2 parents 838b36f + 71fe298 commit c540c88

File tree

8 files changed

+85
-197
lines changed

8 files changed

+85
-197
lines changed

cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public final class MembershipConfig implements Cloneable {
2828
private int syncInterval = DEFAULT_SYNC_INTERVAL;
2929
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
3030
private int suspicionMult = DEFAULT_SUSPICION_MULT;
31-
private int removedMembersHistorySize = 42;
3231
private String namespace = "default";
3332

3433
public MembershipConfig() {}
@@ -158,22 +157,6 @@ public MembershipConfig namespace(String namespace) {
158157
return m;
159158
}
160159

161-
public int removedMembersHistorySize() {
162-
return removedMembersHistorySize;
163-
}
164-
165-
/**
166-
* Setter for {@code removedMembersHistorySize}.
167-
*
168-
* @param removedMembersHistorySize history size for remove members
169-
* @return new {@code MembershipConfig} instance
170-
*/
171-
public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) {
172-
MembershipConfig m = clone();
173-
m.removedMembersHistorySize = removedMembersHistorySize;
174-
return m;
175-
}
176-
177160
@Override
178161
public MembershipConfig clone() {
179162
try {
@@ -191,7 +174,6 @@ public String toString() {
191174
.add("syncTimeout=" + syncTimeout)
192175
.add("suspicionMult=" + suspicionMult)
193176
.add("namespace='" + namespace + "'")
194-
.add("removedMembersHistorySize=" + removedMembersHistorySize)
195177
.toString();
196178
}
197179
}

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -455,13 +455,4 @@ private boolean isTransitPingAck(Message message) {
455455
return PING_ACK.equals(message.qualifier())
456456
&& message.<PingData>data().getOriginalIssuer() != null;
457457
}
458-
459-
/**
460-
* <b>NOTE:</b> this method is for testing purpose only.
461-
*
462-
* @return transport
463-
*/
464-
Transport getTransport() {
465-
return transport;
466-
}
467458
}

cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -371,22 +371,4 @@ private Set<String> getGossipsThatMostLikelyDisseminated(long period) {
371371
.map(gossipState -> gossipState.gossip().gossipId())
372372
.collect(Collectors.toSet());
373373
}
374-
375-
/**
376-
* <b>NOTE:</b> this method is for testing purpose only.
377-
*
378-
* @return transport
379-
*/
380-
Transport getTransport() {
381-
return transport;
382-
}
383-
384-
/**
385-
* <b>NOTE:</b> this method is for testing purpose only.
386-
*
387-
* @return local member
388-
*/
389-
Member getMember() {
390-
return localMember;
391-
}
392374
}

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 3 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@
3535
import java.util.Optional;
3636
import java.util.Set;
3737
import java.util.UUID;
38-
import java.util.concurrent.CopyOnWriteArrayList;
3938
import java.util.concurrent.ThreadLocalRandom;
4039
import java.util.concurrent.TimeUnit;
41-
import java.util.function.Predicate;
4240
import java.util.stream.Collectors;
4341
import org.slf4j.Logger;
4442
import org.slf4j.LoggerFactory;
@@ -50,6 +48,7 @@
5048
import reactor.core.publisher.Sinks;
5149
import reactor.core.scheduler.Scheduler;
5250

51+
@SuppressWarnings({"FieldCanBeLocal", "unused"})
5352
public final class MembershipProtocolImpl implements MembershipProtocol {
5453

5554
private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class);
@@ -85,7 +84,6 @@ private enum MembershipUpdateReason {
8584

8685
private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
8786
private final Map<String, Member> members = new HashMap<>();
88-
private final List<MembershipEvent> removedMembersHistory = new CopyOnWriteArrayList<>();
8987
private final Set<String> aliveEmittedSet = new HashSet<>();
9088

9189
// Subject
@@ -159,10 +157,8 @@ public MembershipProtocolImpl(
159157
.publishOn(scheduler)
160158
.subscribe(
161159
this::onMembershipGossip,
162-
ex -> LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)),
163-
listen() // Listen removed members for monitoring
164-
.filter(MembershipEvent::isRemoved)
165-
.subscribe(this::onMemberRemoved)));
160+
ex ->
161+
LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex))));
166162
}
167163

168164
// Remove duplicates and local address(es)
@@ -893,102 +889,4 @@ private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
893889
.then();
894890
});
895891
}
896-
897-
/**
898-
* <b>NOTE:</b> this method is for testing purpose only.
899-
*
900-
* @return failure detector
901-
*/
902-
FailureDetector getFailureDetector() {
903-
return failureDetector;
904-
}
905-
906-
/**
907-
* <b>NOTE:</b> this method is for testing purpose only.
908-
*
909-
* @return gossip
910-
*/
911-
GossipProtocol getGossipProtocol() {
912-
return gossipProtocol;
913-
}
914-
915-
/**
916-
* <b>NOTE:</b> this method is for testing purpose only.
917-
*
918-
* @return transport
919-
*/
920-
Transport getTransport() {
921-
return transport;
922-
}
923-
924-
/**
925-
* <b>NOTE:</b> this method is for testing purpose only.
926-
*
927-
* @return metadataStore
928-
*/
929-
MetadataStore getMetadataStore() {
930-
return metadataStore;
931-
}
932-
933-
/**
934-
* <b>NOTE:</b> this method is for testing purpose only.
935-
*
936-
* @return transport
937-
*/
938-
List<MembershipRecord> getMembershipRecords() {
939-
return Collections.unmodifiableList(new ArrayList<>(membershipTable.values()));
940-
}
941-
942-
// ===============================================================
943-
// ============== Helper Methods for Monitoring ==================
944-
// ===============================================================
945-
946-
private int getIncarnation() {
947-
return membershipTable.get(localMember.id()).incarnation();
948-
}
949-
950-
private List<Member> getAliveMembers() {
951-
return findRecordsByCondition(MembershipRecord::isAlive);
952-
}
953-
954-
private List<Member> getSuspectedMembers() {
955-
return findRecordsByCondition(MembershipRecord::isSuspect);
956-
}
957-
958-
private List<Member> getRemovedMembers() {
959-
return removedMembersHistory.stream().map(MembershipEvent::member).collect(Collectors.toList());
960-
}
961-
962-
private List<Member> findRecordsByCondition(Predicate<MembershipRecord> condition) {
963-
return getMembershipRecords().stream()
964-
.filter(condition)
965-
.map(MembershipRecord::member)
966-
.collect(Collectors.toList());
967-
}
968-
969-
private void onMemberRemoved(MembershipEvent event) {
970-
int s = membershipConfig.removedMembersHistorySize();
971-
if (s <= 0) {
972-
return;
973-
}
974-
removedMembersHistory.add(event);
975-
if (removedMembersHistory.size() > s) {
976-
removedMembersHistory.remove(0);
977-
}
978-
}
979-
980-
public static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
981-
return send(transport, addresses, 0, request);
982-
}
983-
984-
private static Mono<Void> send(
985-
Transport transport, List<Address> addresses, int currentIndex, Message request) {
986-
if (currentIndex >= addresses.size()) {
987-
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
988-
}
989-
990-
return transport
991-
.send(addresses.get(currentIndex), request)
992-
.onErrorResume(th -> send(transport, addresses, currentIndex + 1, request));
993-
}
994892
}

cluster/src/test/java/io/scalecube/cluster/BaseTest.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.scalecube.cluster.transport.api.TransportConfig;
77
import io.scalecube.cluster.utils.NetworkEmulatorTransport;
88
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
9+
import java.lang.reflect.Field;
910
import java.time.Duration;
1011
import java.util.concurrent.TimeUnit;
1112
import org.junit.jupiter.api.AfterEach;
@@ -30,32 +31,43 @@ public final void baseTearDown(TestInfo testInfo) {
3031
LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****");
3132
}
3233

33-
protected void awaitSeconds(long seconds) {
34+
public static <T> T getField(Object obj, String fieldName) {
35+
try {
36+
final Field field = obj.getClass().getDeclaredField(fieldName);
37+
field.setAccessible(true);
38+
//noinspection unchecked
39+
return (T) field.get(obj);
40+
} catch (Exception ex) {
41+
throw new RuntimeException(ex);
42+
}
43+
}
44+
45+
public static void awaitSeconds(long seconds) {
3446
try {
3547
TimeUnit.SECONDS.sleep(seconds);
3648
} catch (InterruptedException e) {
3749
throw Exceptions.propagate(e);
3850
}
3951
}
4052

41-
protected void awaitSuspicion(int clusterSize) {
53+
public static void awaitSuspicion(int clusterSize) {
4254
int defaultSuspicionMult = MembershipConfig.DEFAULT_SUSPICION_MULT;
4355
int pingInterval = MembershipProtocolTest.PING_INTERVAL;
4456
long suspicionTimeoutSec =
4557
ClusterMath.suspicionTimeout(defaultSuspicionMult, clusterSize, pingInterval) / 1000;
4658
awaitSeconds(suspicionTimeoutSec + 2);
4759
}
4860

49-
protected NetworkEmulatorTransport createTransport() {
61+
public static NetworkEmulatorTransport createTransport() {
5062
return createTransport(TransportConfig.defaultConfig());
5163
}
5264

53-
protected NetworkEmulatorTransport createTransport(TransportConfig transportConfig) {
65+
public static NetworkEmulatorTransport createTransport(TransportConfig transportConfig) {
5466
return new NetworkEmulatorTransport(
5567
Transport.bindAwait(transportConfig.transportFactory(new TcpTransportFactory())));
5668
}
5769

58-
protected void destroyTransport(Transport transport) {
70+
public static void destroyTransport(Transport transport) {
5971
if (transport == null || transport.isStopped()) {
6072
return;
6173
}

cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -421,22 +421,22 @@ private FailureDetectorImpl createFd(
421421
return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler);
422422
}
423423

424-
private void start(List<FailureDetectorImpl> fdetectors) {
424+
private static void start(List<FailureDetectorImpl> fdetectors) {
425425
for (FailureDetectorImpl fd : fdetectors) {
426426
fd.start();
427427
}
428428
}
429429

430-
private void stop(List<FailureDetectorImpl> fdetectors) {
430+
private static void stop(List<FailureDetectorImpl> fdetectors) {
431431
for (FailureDetectorImpl fd : fdetectors) {
432432
fd.stop();
433433
}
434434
for (FailureDetectorImpl fd : fdetectors) {
435-
destroyTransport(fd.getTransport());
435+
destroyTransport(BaseTest.getField(fd, "transport"));
436436
}
437437
}
438438

439-
private void assertStatus(
439+
private static void assertStatus(
440440
Address address,
441441
MemberStatus status,
442442
Collection<FailureDetectorEvent> events,
@@ -462,10 +462,11 @@ private void assertStatus(
462462
}
463463
}
464464

465-
private Future<List<FailureDetectorEvent>> listenNextEventFor(
465+
private static Future<List<FailureDetectorEvent>> listenNextEventFor(
466466
FailureDetectorImpl fd, List<Address> addresses) {
467+
final Transport transport = BaseTest.getField(fd, "transport");
467468
addresses = new ArrayList<>(addresses);
468-
addresses.remove(fd.getTransport().address()); // exclude self
469+
addresses.remove(transport.address()); // exclude self
469470
if (addresses.isEmpty()) {
470471
throw new IllegalArgumentException();
471472
}
@@ -482,15 +483,16 @@ private Future<List<FailureDetectorEvent>> listenNextEventFor(
482483
return allOf(resultFuture);
483484
}
484485

485-
private Collection<FailureDetectorEvent> awaitEvents(Future<List<FailureDetectorEvent>> events) {
486+
private static Collection<FailureDetectorEvent> awaitEvents(
487+
Future<List<FailureDetectorEvent>> events) {
486488
try {
487489
return events.get(10, TimeUnit.SECONDS);
488490
} catch (Exception e) {
489491
throw new RuntimeException(e);
490492
}
491493
}
492494

493-
private <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
495+
private static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
494496
CompletableFuture<Void> allFuturesResult =
495497
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
496498
return allFuturesResult.thenApply(

cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,20 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
125125
final CountDownLatch latch = new CountDownLatch(membersNum - 1);
126126
final Map<Member, Member> receivers = new ConcurrentHashMap<>();
127127
final AtomicBoolean doubleDelivery = new AtomicBoolean(false);
128-
for (final GossipProtocolImpl protocol : gossipProtocols) {
129-
protocol
128+
for (final GossipProtocolImpl gossipProtocol : gossipProtocols) {
129+
gossipProtocol
130130
.listen()
131131
.subscribe(
132132
gossip -> {
133+
final Member localMember = BaseTest.getField(gossipProtocol, "localMember");
134+
final Transport transport = BaseTest.getField(gossipProtocol, "transport");
135+
133136
if (gossipData.equals(gossip.data())) {
134-
boolean firstTimeAdded =
135-
receivers.put(protocol.getMember(), protocol.getMember()) == null;
137+
boolean firstTimeAdded = receivers.put(localMember, localMember) == null;
136138
if (firstTimeAdded) {
137139
latch.countDown();
138140
} else {
139-
LOGGER.error(
140-
"Delivered gossip twice to: {}", protocol.getTransport().address());
141+
LOGGER.error("Delivered gossip twice to: {}", transport.address());
141142
doubleDelivery.set(true);
142143
}
143144
}
@@ -214,7 +215,7 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
214215
private LongSummaryStatistics computeMessageSentStats(List<GossipProtocolImpl> gossipProtocols) {
215216
List<Long> messageSentPerNode = new ArrayList<>(gossipProtocols.size());
216217
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
217-
NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport();
218+
final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport");
218219
messageSentPerNode.add(transport.networkEmulator().totalMessageSentCount());
219220
}
220221
return messageSentPerNode.stream().mapToLong(v -> v).summaryStatistics();
@@ -223,7 +224,7 @@ private LongSummaryStatistics computeMessageSentStats(List<GossipProtocolImpl> g
223224
private LongSummaryStatistics computeMessageLostStats(List<GossipProtocolImpl> gossipProtocols) {
224225
List<Long> messageLostPerNode = new ArrayList<>(gossipProtocols.size());
225226
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
226-
NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport();
227+
final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport");
227228
messageLostPerNode.add(transport.networkEmulator().totalOutboundMessageLostCount());
228229
}
229230
return messageLostPerNode.stream().mapToLong(v -> v).summaryStatistics();
@@ -274,7 +275,7 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
274275
return gossipProtocol;
275276
}
276277

277-
private void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
278+
private static void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
278279
// Stop all gossip protocols
279280
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
280281
gossipProtocol.stop();
@@ -283,7 +284,7 @@ private void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
283284
// Stop all transports
284285
List<Mono<Void>> futures = new ArrayList<>();
285286
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
286-
futures.add(gossipProtocol.getTransport().stop());
287+
futures.add(BaseTest.<Transport>getField(gossipProtocol, "transport").stop());
287288
}
288289

289290
try {

0 commit comments

Comments
 (0)