Skip to content

Commit 9d22b41

Browse files
committed
Added timestamp to the MembershipEvent
1 parent 121b62f commit 9d22b41

File tree

5 files changed

+32
-12
lines changed

5 files changed

+32
-12
lines changed

cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipEvent.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.scalecube.cluster.Member;
44
import java.nio.ByteBuffer;
5+
import java.time.Instant;
56
import java.util.Objects;
67
import java.util.StringJoiner;
78

@@ -21,37 +22,41 @@ public enum Type {
2122
private final Member member;
2223
private final ByteBuffer oldMetadata;
2324
private final ByteBuffer newMetadata;
25+
private final long timestamp;
2426

2527
private MembershipEvent(
26-
Type type, Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata) {
28+
Type type, Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata, long timestamp) {
2729
this.type = type;
2830
this.member = member;
2931
this.oldMetadata = oldMetadata;
3032
this.newMetadata = newMetadata;
33+
this.timestamp = timestamp;
3134
}
3235

3336
/**
3437
* Creates REMOVED membership event with cluster member and its metadata (optional).
3538
*
3639
* @param member cluster member; not null
3740
* @param metadata member metadata; optional
41+
* @param timestamp event timestamp
3842
* @return membership event
3943
*/
40-
public static MembershipEvent createRemoved(Member member, ByteBuffer metadata) {
44+
public static MembershipEvent createRemoved(Member member, ByteBuffer metadata, long timestamp) {
4145
Objects.requireNonNull(member, "member must be not null");
42-
return new MembershipEvent(Type.REMOVED, member, metadata, null);
46+
return new MembershipEvent(Type.REMOVED, member, metadata, null, timestamp);
4347
}
4448

4549
/**
4650
* Creates ADDED membership event with cluster member and its metadata.
4751
*
4852
* @param member cluster memeber; not null
4953
* @param metadata member metadata; not null
54+
* @param timestamp event timestamp
5055
* @return membership event
5156
*/
52-
public static MembershipEvent createAdded(Member member, ByteBuffer metadata) {
57+
public static MembershipEvent createAdded(Member member, ByteBuffer metadata, long timestamp) {
5358
Objects.requireNonNull(member, "member must be not null");
54-
return new MembershipEvent(Type.ADDED, member, null, metadata);
59+
return new MembershipEvent(Type.ADDED, member, null, metadata, timestamp);
5560
}
5661

5762
/**
@@ -60,12 +65,13 @@ public static MembershipEvent createAdded(Member member, ByteBuffer metadata) {
6065
* @param member cluster member; not null
6166
* @param oldMetadata previous metadata; not null
6267
* @param newMetadata new metadata; not null
68+
* @param timestamp event timestamp
6369
* @return membership event
6470
*/
6571
public static MembershipEvent createUpdated(
66-
Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata) {
72+
Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata, long timestamp) {
6773
Objects.requireNonNull(member, "member must be not null");
68-
return new MembershipEvent(Type.UPDATED, member, oldMetadata, newMetadata);
74+
return new MembershipEvent(Type.UPDATED, member, oldMetadata, newMetadata, timestamp);
6975
}
7076

7177
public Type type() {
@@ -96,16 +102,25 @@ public ByteBuffer newMetadata() {
96102
return newMetadata;
97103
}
98104

105+
public long timestamp() {
106+
return timestamp;
107+
}
108+
99109
@Override
100110
public String toString() {
101111
return new StringJoiner(", ", MembershipEvent.class.getSimpleName() + "[", "]")
102112
.add("type=" + type)
103113
.add("member=" + member)
104114
.add("oldMetadata=" + metadataAsString(oldMetadata))
105115
.add("newMetadata=" + metadataAsString(newMetadata))
116+
.add("timestamp=" + timestampAsString(timestamp))
106117
.toString();
107118
}
108119

120+
private String timestampAsString(long timestamp) {
121+
return Instant.ofEpochMilli(timestamp).toString();
122+
}
123+
109124
private String metadataAsString(ByteBuffer metadata) {
110125
if (metadata == null) {
111126
return null;

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,8 @@ private Mono<Void> onDeadMemberDetected(MembershipRecord r) {
583583
membershipTable.remove(member.id());
584584
// removed
585585
ByteBuffer metadata0 = metadataStore.removeMetadata(member);
586-
MembershipEvent event = MembershipEvent.createRemoved(member, metadata0);
586+
MembershipEvent event =
587+
MembershipEvent.createRemoved(member, metadata0, System.currentTimeMillis());
587588
LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event);
588589
sink.next(event);
589590
});
@@ -597,10 +598,11 @@ private void onAliveMemberDetected(
597598
boolean memberExists = members.containsKey(member.id());
598599

599600
MembershipEvent event = null;
601+
long timestamp = System.currentTimeMillis();
600602
if (!memberExists) {
601-
event = MembershipEvent.createAdded(member, metadata1);
603+
event = MembershipEvent.createAdded(member, metadata1, timestamp);
602604
} else if (!metadata1.equals(metadata0)) {
603-
event = MembershipEvent.createUpdated(member, metadata0, metadata1);
605+
event = MembershipEvent.createUpdated(member, metadata0, metadata1, timestamp);
604606
}
605607

606608
members.put(member.id(), member);

cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ private FailureDetectorImpl createFd(
416416
Flux.fromIterable(addresses)
417417
.filter(address -> !transport.address().equals(address))
418418
.map(address -> new Member("member-" + address.port(), null, address))
419-
.map(member -> MembershipEvent.createAdded(member, null));
419+
.map(member -> MembershipEvent.createAdded(member, null, 0));
420420

421421
CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id());
422422

cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
262262
Flux.fromIterable(members)
263263
.filter(address -> !transport.address().equals(address))
264264
.map(address -> new Member("member-" + address.port(), null, address))
265-
.map(member -> MembershipEvent.createAdded(member, null));
265+
.map(member -> MembershipEvent.createAdded(member, null, 0));
266266

267267
GossipProtocolImpl gossipProtocol =
268268
new GossipProtocolImpl(localMember, transport, membershipFlux, gossipConfig, scheduler);

examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public static void main(String[] args) throws Exception {
2626
// Alice init cluster
2727
Cluster alice =
2828
new ClusterImpl()
29+
.config(opts -> opts.memberAlias("Alice"))
2930
.config(opts -> opts.metadata(Collections.singletonMap("name", "Alice")))
3031
.handler(
3132
cluster -> {
@@ -42,6 +43,7 @@ public void onMembershipEvent(MembershipEvent event) {
4243
// Bob join cluster
4344
Cluster bob =
4445
new ClusterImpl()
46+
.config(opts -> opts.memberAlias("Bob"))
4547
.config(opts -> opts.metadata(Collections.singletonMap("name", "Bob")))
4648
.membership(opts -> opts.seedMembers(alice.address()))
4749
.handler(
@@ -59,6 +61,7 @@ public void onMembershipEvent(MembershipEvent event) {
5961
// Carol join cluster
6062
Cluster carol =
6163
new ClusterImpl()
64+
.config(opts -> opts.memberAlias("Carol"))
6265
.config(opts -> opts.metadata(Collections.singletonMap("name", "Carol")))
6366
.membership(opts -> opts.seedMembers(alice.address(), bob.address()))
6467
.handler(

0 commit comments

Comments
 (0)