Skip to content

Commit 14dd8a5

Browse files
committed
Done with refactoring jmx; added more jmx attributes
1 parent bee9df9 commit 14dd8a5

File tree

10 files changed

+311
-178
lines changed

10 files changed

+311
-178
lines changed

cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public final class MembershipConfig implements Cloneable {
2929
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
3030
private int suspicionMult = DEFAULT_SUSPICION_MULT;
3131
private String syncGroup = "default";
32+
private int removedMembersHistorySize = 42;
3233

3334
public MembershipConfig() {}
3435

@@ -157,6 +158,22 @@ public MembershipConfig syncGroup(String syncGroup) {
157158
return m;
158159
}
159160

161+
public int removedMembersHistorySize() {
162+
return removedMembersHistorySize;
163+
}
164+
165+
/**
166+
* Sets a removedMembersHistorySize.
167+
*
168+
* @param removedMembersHistorySize history size for remove members
169+
* @return new {@code MembershipConfig} instance
170+
*/
171+
public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) {
172+
MembershipConfig m = clone();
173+
m.removedMembersHistorySize = removedMembersHistorySize;
174+
return m;
175+
}
176+
160177
@Override
161178
public MembershipConfig clone() {
162179
try {
@@ -174,6 +191,7 @@ public String toString() {
174191
.add("syncTimeout=" + syncTimeout)
175192
.add("suspicionMult=" + suspicionMult)
176193
.add("syncGroup='" + syncGroup + "'")
194+
.add("removedMembersHistorySize=" + removedMembersHistorySize)
177195
.toString();
178196
}
179197
}

cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99
import io.scalecube.cluster.membership.MembershipProtocolImpl;
1010
import io.scalecube.cluster.metadata.MetadataStore;
1111
import io.scalecube.cluster.metadata.MetadataStoreImpl;
12+
import io.scalecube.cluster.monitor.ClusterMonitorMBean;
13+
import io.scalecube.cluster.monitor.ClusterMonitorModel;
14+
import io.scalecube.cluster.monitor.JmxClusterMonitorMBean;
1215
import io.scalecube.cluster.transport.api.Message;
1316
import io.scalecube.cluster.transport.api.Transport;
1417
import io.scalecube.cluster.transport.api.TransportConfig;
1518
import io.scalecube.net.Address;
1619
import io.scalecube.transport.netty.TransportImpl;
20+
import java.lang.management.ManagementFactory;
1721
import java.util.Collection;
1822
import java.util.Collections;
1923
import java.util.Objects;
@@ -23,6 +27,10 @@
2327
import java.util.function.UnaryOperator;
2428
import java.util.stream.Collectors;
2529
import java.util.stream.Stream;
30+
import javax.management.MBeanServer;
31+
import javax.management.ObjectInstance;
32+
import javax.management.ObjectName;
33+
import javax.management.StandardMBean;
2634
import org.slf4j.Logger;
2735
import org.slf4j.LoggerFactory;
2836
import reactor.core.Disposable;
@@ -82,6 +90,7 @@ public final class ClusterImpl implements Cluster {
8290
private MetadataStore metadataStore;
8391
private Scheduler scheduler;
8492
private CorrelationIdGenerator cidGenerator;
93+
private ClusterMonitorModel.Builder monitorModelBuilder;
8594

8695
public ClusterImpl() {
8796
this(ClusterConfig.defaultConfig());
@@ -228,6 +237,7 @@ private Mono<Cluster> doStart0() {
228237

229238
cidGenerator = new CorrelationIdGenerator(localMember.id());
230239
scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
240+
monitorModelBuilder = new ClusterMonitorModel.Builder();
231241

232242
failureDetector =
233243
new FailureDetectorImpl(
@@ -259,14 +269,16 @@ private Mono<Cluster> doStart0() {
259269
metadataStore,
260270
config,
261271
scheduler,
262-
cidGenerator);
272+
cidGenerator,
273+
monitorModelBuilder);
263274

264275
actionsDisposables.add(
276+
// Retransmit inner membership events to public api layer
265277
membership
266278
.listen()
267279
/*.publishOn(scheduler)*/
268-
// dont uncomment, already beign executed inside sc-cluster thread
269-
.subscribe(membershipSink::next, this::onError));
280+
// Dont uncomment, already beign executed inside sc-cluster thread
281+
.subscribe(membershipSink::next, this::onError, membershipSink::complete));
270282

271283
return Mono.fromRunnable(() -> failureDetector.start())
272284
.then(Mono.fromRunnable(() -> gossip.start()))
@@ -301,7 +313,18 @@ private void startHandler() {
301313
}
302314

303315
private Mono<Void> startJmxMonitor() {
304-
return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then();
316+
return Mono.fromCallable(this::startJmxMonitor0).then();
317+
}
318+
319+
private ObjectInstance startJmxMonitor0() throws Exception {
320+
ClusterMonitorModel monitorModel = monitorModelBuilder.config(config).cluster(this).build();
321+
322+
JmxClusterMonitorMBean bean = new JmxClusterMonitorMBean(monitorModel);
323+
StandardMBean standardMBean = new StandardMBean(bean, ClusterMonitorMBean.class);
324+
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
325+
ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id());
326+
327+
return server.registerMBean(standardMBean, objectName);
305328
}
306329

307330
private void onError(Throwable th) {
@@ -481,50 +504,6 @@ public boolean isShutdown() {
481504
return onShutdown.isDisposed();
482505
}
483506

484-
@SuppressWarnings("unused")
485-
public interface MonitorMBean {
486-
487-
int getClusterSize();
488-
489-
String getMember();
490-
491-
String getMetadata();
492-
}
493-
494-
public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean {
495-
496-
private final ClusterImpl cluster;
497-
498-
private JmxMonitorMBean(ClusterImpl cluster) {
499-
this.cluster = cluster;
500-
}
501-
502-
@Override
503-
public int getClusterSize() {
504-
return cluster.otherMembers().size() + 1;
505-
}
506-
507-
@Override
508-
public String getMember() {
509-
return cluster.member().toString();
510-
}
511-
512-
@Override
513-
public String getMetadata() {
514-
return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null));
515-
}
516-
517-
@Override
518-
protected Class getBeanType() {
519-
return MonitorMBean.class;
520-
}
521-
522-
@Override
523-
protected String getObjectName() {
524-
return "io.scalecube.cluster:name=Cluster@" + cluster.member().id();
525-
}
526-
}
527-
528507
private static class SenderAwareTransport implements Transport {
529508

530509
private final Transport transport;

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ public FailureDetectorImpl(
8989
// Subscribe
9090
actionsDisposables.addAll(
9191
Arrays.asList(
92-
membershipProcessor //
92+
membershipProcessor // Listen membership events to update remoteMembers
9393
.publishOn(scheduler)
9494
.subscribe(this::onMemberEvent, this::onError),
9595
transport
96-
.listen() //
96+
.listen() // Listen failure detector requests
9797
.publishOn(scheduler)
9898
.subscribe(this::onMessage, this::onError)));
9999
}

cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ public GossipProtocolImpl(
9191
// Subscribe
9292
actionsDisposables.addAll(
9393
Arrays.asList(
94-
membershipProcessor //
94+
membershipProcessor // Listen membership events to update remoteMembers
9595
.publishOn(scheduler)
9696
.subscribe(this::onMemberEvent, this::onError),
9797
transport
98-
.listen()
98+
.listen() // Listen gossip requests
9999
.publishOn(scheduler)
100100
.filter(this::isGossipReq)
101101
.subscribe(this::onGossipReq, this::onError)));

0 commit comments

Comments
 (0)