Skip to content

Commit 2df9fde

Browse files
committed
Enhanced cluster.jmx-monitor: exposed few more properties; changed constructor for Member; removed memberId instead started to use memberAlias
1 parent 2e3d8f1 commit 2df9fde

File tree

12 files changed

+94
-83
lines changed

12 files changed

+94
-83
lines changed

cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public final class ClusterConfig implements Cloneable {
3535
private MetadataEncoder metadataEncoder = MetadataEncoder.INSTANCE;
3636
private MetadataDecoder metadataDecoder = MetadataDecoder.INSTANCE;
3737

38-
private String memberId;
38+
private String memberAlias;
3939
private String memberHost;
4040
private Integer memberPort;
4141

@@ -168,19 +168,19 @@ public ClusterConfig memberHost(String memberHost) {
168168
return c;
169169
}
170170

171-
public String memberId() {
172-
return memberId;
171+
public String memberAlias() {
172+
return memberAlias;
173173
}
174174

175175
/**
176-
* Sets a memberId.
176+
* Sets a memberAlias.
177177
*
178-
* @param memberId member id
178+
* @param memberAlias member alias
179179
* @return new {@code ClusterConfig} instance
180180
*/
181-
public ClusterConfig memberId(String memberId) {
181+
public ClusterConfig memberAlias(String memberAlias) {
182182
ClusterConfig c = clone();
183-
c.memberId = memberId;
183+
c.memberAlias = memberAlias;
184184
return c;
185185
}
186186

@@ -285,7 +285,7 @@ public String toString() {
285285
.add("metadataTimeout=" + metadataTimeout)
286286
.add("metadataEncoder=" + metadataEncoder)
287287
.add("metadataDecoder=" + metadataDecoder)
288-
.add("memberId='" + memberId + "'")
288+
.add("memberAlias='" + memberAlias + "'")
289289
.add("memberHost='" + memberHost + "'")
290290
.add("memberPort=" + memberPort)
291291
.add("transportConfig=" + transportConfig)

cluster-api/src/main/java/io/scalecube/cluster/Member.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,41 +11,38 @@
1111
public final class Member {
1212

1313
private String id;
14+
private String alias;
1415
private Address address;
1516

1617
/** Instantiates empty member for deserialization purpose. */
1718
Member() {}
1819

1920
/**
20-
* Create instance of cluster member by given address; member id will be generated by {@link
21-
* #generateId()}.
22-
*
23-
* @param address address on which given member listens for incoming messages
24-
*/
25-
public Member(Address address) {
26-
this(generateId(), address);
27-
}
28-
29-
/**
30-
* Create instance of cluster member with given parameters.
21+
* Constructor.
3122
*
3223
* @param id member id
33-
* @param address address on which given member listens for incoming messages
24+
* @param alias member alias (optional)
25+
* @param address member address
3426
*/
35-
public Member(String id, Address address) {
36-
this.id = Objects.requireNonNull(id);
37-
this.address = Objects.requireNonNull(address);
27+
public Member(String id, String alias, Address address) {
28+
this.id = Objects.requireNonNull(id, "member id");
29+
this.alias = alias; // optional
30+
this.address = Objects.requireNonNull(address, "member address");
3831
}
3932

4033
public String id() {
4134
return id;
4235
}
4336

37+
public String alias() {
38+
return alias;
39+
}
40+
4441
public Address address() {
4542
return address;
4643
}
4744

48-
private static String generateId() {
45+
public static String generateId() {
4946
return Long.toHexString(UUID.randomUUID().getMostSignificantBits() & Long.MAX_VALUE);
5047
}
5148

@@ -68,6 +65,10 @@ public int hashCode() {
6865

6966
@Override
7067
public String toString() {
71-
return id + ":" + address.port();
68+
if (alias == null) {
69+
return id + "@" + address;
70+
} else {
71+
return alias + "/" + id + "@" + address;
72+
}
7273
}
7374
}

cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ public abstract class AbstractMonitorMBean {
1212
* Registers monitor mbean.
1313
*
1414
* @param other monitor mbean instance
15-
* @throws Exception in case of error
1615
* @return object instance
16+
* @throws Exception in case of error
1717
*/
1818
public static ObjectInstance register(AbstractMonitorMBean other) throws Exception {
1919
Object bean = other.getBeanType().cast(other);

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,7 @@ private Member createLocalMember(int listenPort) {
340340
.map(memberHost -> Address.create(memberHost, port))
341341
.orElseGet(() -> Address.create(localAddress, listenPort));
342342

343-
if (config.memberId() != null) {
344-
return new Member(config.memberId(), memberAddress);
345-
} else {
346-
return new Member(memberAddress);
347-
}
343+
return new Member(Member.generateId(), config.memberAlias(), memberAddress);
348344
}
349345

350346
@Override
@@ -492,6 +488,10 @@ public interface MonitorMBean {
492488

493489
String getIdAsString();
494490

491+
Collection<String> getAlias();
492+
493+
String getAliasAsString();
494+
495495
Collection<String> getAddress();
496496

497497
String getAddressAsString();
@@ -511,33 +511,42 @@ private JmxMonitorMBean(ClusterImpl cluster) {
511511

512512
@Override
513513
public Collection<String> getId() {
514-
return Collections.singleton(cluster.member().id());
514+
return Collections.singleton(getIdAsString());
515515
}
516516

517517
@Override
518518
public String getIdAsString() {
519-
return getId().iterator().next();
519+
return cluster.member().id();
520+
}
521+
522+
@Override
523+
public Collection<String> getAlias() {
524+
return Collections.singleton(getAliasAsString());
525+
}
526+
527+
@Override
528+
public String getAliasAsString() {
529+
return cluster.member().alias();
520530
}
521531

522532
@Override
523533
public Collection<String> getAddress() {
524-
return Collections.singleton(String.valueOf(cluster.member().address()));
534+
return Collections.singleton(getAddressAsString());
525535
}
526536

527537
@Override
528538
public String getAddressAsString() {
529-
return getAddress().iterator().next();
539+
return String.valueOf(cluster.member().address());
530540
}
531541

532542
@Override
533543
public Collection<String> getMetadata() {
534-
return Collections.singletonList(
535-
String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null)));
544+
return Collections.singletonList(getMetadataAsString());
536545
}
537546

538547
@Override
539548
public String getMetadataAsString() {
540-
return getMetadata().iterator().next();
549+
return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null));
541550
}
542551

543552
@Override

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason
482482
() -> {
483483
Objects.requireNonNull(r1, "Membership record can't be null");
484484
// Get current record
485-
MembershipRecord r0 = membershipTable.get(r1.id());
485+
MembershipRecord r0 = membershipTable.get(r1.member().id());
486486

487487
// Check if new record r1 overrides existing membership record r0
488488
if (r1.equals(r0) || !r1.isOverrides(r0)) {
@@ -509,7 +509,7 @@ private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason
509509

510510
if (r1.isSuspect()) {
511511
// Update membership and schedule/cancel suspicion timeout task
512-
membershipTable.put(r1.id(), r1);
512+
membershipTable.put(r1.member().id(), r1);
513513
scheduleSuspicionTimeoutTask(r1);
514514
spreadMembershipGossipUnlessGossiped(r1, reason);
515515
}
@@ -530,7 +530,7 @@ private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason
530530
.doOnSuccess(
531531
metadata1 -> {
532532
// If metadata was received then member is Alive
533-
cancelSuspicionTimeoutTask(r1.id());
533+
cancelSuspicionTimeoutTask(r1.member().id());
534534
spreadMembershipGossipUnlessGossiped(r1, reason);
535535
// Update membership
536536
ByteBuffer metadata0 = metadataStore.updateMetadata(r1.member(), metadata1);
@@ -567,28 +567,32 @@ private Mono<Void> onSelfMemberDetected(
567567
});
568568
}
569569

570-
private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {
570+
private Mono<Void> onDeadMemberDetected(MembershipRecord r) {
571571
return Mono.fromRunnable(
572572
() -> {
573-
cancelSuspicionTimeoutTask(r1.id());
574-
if (!members.containsKey(r1.id())) {
573+
final Member member = r.member();
574+
575+
cancelSuspicionTimeoutTask(member.id());
576+
577+
if (!members.containsKey(member.id())) {
575578
return;
576579
}
580+
577581
// Update membership
578-
members.remove(r1.id());
579-
membershipTable.remove(r1.id());
582+
members.remove(member.id());
583+
membershipTable.remove(member.id());
580584
// removed
581-
ByteBuffer metadata0 = metadataStore.removeMetadata(r1.member());
582-
MembershipEvent event = MembershipEvent.createRemoved(r1.member(), metadata0);
585+
ByteBuffer metadata0 = metadataStore.removeMetadata(member);
586+
MembershipEvent event = MembershipEvent.createRemoved(member, metadata0);
583587
LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event);
584588
sink.next(event);
585589
});
586590
}
587591

588592
private void onAliveMemberDetected(
589-
MembershipRecord r1, ByteBuffer metadata0, ByteBuffer metadata1) {
593+
MembershipRecord r, ByteBuffer metadata0, ByteBuffer metadata1) {
590594

591-
final Member member = r1.member();
595+
final Member member = r.member();
592596

593597
boolean memberExists = members.containsKey(member.id());
594598

@@ -600,7 +604,7 @@ private void onAliveMemberDetected(
600604
}
601605

602606
members.put(member.id(), member);
603-
membershipTable.put(member.id(), r1);
607+
membershipTable.put(member.id(), r);
604608

605609
if (event != null) {
606610
LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event);
@@ -616,15 +620,15 @@ private void cancelSuspicionTimeoutTask(String memberId) {
616620
}
617621
}
618622

619-
private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
623+
private void scheduleSuspicionTimeoutTask(MembershipRecord r) {
620624
long suspicionTimeout =
621625
ClusterMath.suspicionTimeout(
622626
membershipConfig.suspicionMult(),
623627
membershipTable.size(),
624628
failureDetectorConfig.pingInterval());
625629

626630
suspicionTimeoutTasks.computeIfAbsent(
627-
record.id(),
631+
r.member().id(),
628632
id -> {
629633
LOGGER.debug(
630634
"Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", id, suspicionTimeout);
@@ -635,29 +639,28 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
635639

636640
private void onSuspicionTimeout(String memberId) {
637641
suspicionTimeoutTasks.remove(memberId);
638-
MembershipRecord record = membershipTable.get(memberId);
639-
if (record != null) {
640-
LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record);
641-
MembershipRecord deadRecord =
642-
new MembershipRecord(record.member(), DEAD, record.incarnation());
642+
MembershipRecord r = membershipTable.get(memberId);
643+
if (r != null) {
644+
LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", r);
645+
MembershipRecord deadRecord = new MembershipRecord(r.member(), DEAD, r.incarnation());
643646
updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT)
644647
.subscribe(null, this::onError);
645648
}
646649
}
647650

648651
private void spreadMembershipGossipUnlessGossiped(
649-
MembershipRecord r1, MembershipUpdateReason reason) {
652+
MembershipRecord r, MembershipUpdateReason reason) {
650653
// Spread gossip (unless already gossiped)
651654
if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP
652655
&& reason != MembershipUpdateReason.INITIAL_SYNC) {
653-
spreadMembershipGossip(r1).doOnError(this::onErrorIgnore).subscribe();
656+
spreadMembershipGossip(r).doOnError(this::onErrorIgnore).subscribe();
654657
}
655658
}
656659

657-
private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
660+
private Mono<Void> spreadMembershipGossip(MembershipRecord r) {
658661
return Mono.defer(
659662
() -> {
660-
Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();
663+
Message msg = Message.withData(r).qualifier(MEMBERSHIP_GOSSIP).build();
661664
LOGGER.debug("Spead membreship: {} with gossip", msg);
662665
return gossipProtocol
663666
.spread(msg)
@@ -792,7 +795,7 @@ public String getDeadMembersAsString() {
792795
private List<String> findRecordsByCondition(Predicate<MembershipRecord> condition) {
793796
return membershipProtocol.getMembershipRecords().stream()
794797
.filter(condition)
795-
.map(record -> new Member(record.id(), record.address()))
798+
.map(MembershipRecord::member)
796799
.map(Member::toString)
797800
.collect(Collectors.toList());
798801
}

cluster/src/main/java/io/scalecube/cluster/membership/MembershipRecord.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static io.scalecube.cluster.membership.MemberStatus.SUSPECT;
66

77
import io.scalecube.cluster.Member;
8-
import io.scalecube.net.Address;
98
import java.util.Objects;
109

1110
/** Cluster membership record which represents member, status, and incarnation. */
@@ -29,14 +28,6 @@ public Member member() {
2928
return member;
3029
}
3130

32-
public String id() {
33-
return member.id();
34-
}
35-
36-
public Address address() {
37-
return member.address();
38-
}
39-
4031
public MemberStatus status() {
4132
return status;
4233
}

cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,13 @@ private FailureDetectorImpl createFd(Transport transport, List<Address> members)
409409
private FailureDetectorImpl createFd(
410410
Transport transport, List<Address> addresses, FailureDetectorConfig config) {
411411

412-
Member localMember = new Member("member-" + transport.address().port(), transport.address());
412+
Member localMember =
413+
new Member("member-" + transport.address().port(), null, transport.address());
413414

414415
Flux<MembershipEvent> membershipFlux =
415416
Flux.fromIterable(addresses)
416417
.filter(address -> !transport.address().equals(address))
417-
.map(address -> new Member("member-" + address.port(), address))
418+
.map(address -> new Member("member-" + address.port(), null, address))
418419
.map(member -> MembershipEvent.createAdded(member, null));
419420

420421
CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id());

cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,13 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
255255
.gossipInterval(gossipInterval)
256256
.gossipRepeatMult(gossipRepeatMultiplier);
257257

258-
Member localMember = new Member("member-" + transport.address().port(), transport.address());
258+
Member localMember =
259+
new Member("member-" + transport.address().port(), null, transport.address());
259260

260261
Flux<MembershipEvent> membershipFlux =
261262
Flux.fromIterable(members)
262263
.filter(address -> !transport.address().equals(address))
263-
.map(address -> new Member("member-" + address.port(), address))
264+
.map(address -> new Member("member-" + address.port(), null, address))
264265
.map(member -> MembershipEvent.createAdded(member, null));
265266

266267
GossipProtocolImpl gossipProtocol =

0 commit comments

Comments
 (0)