Skip to content

Commit cb545bc

Browse files
authored
Merge pull request #261 from scalecube/refactoring/remove-jmx-method-duplicates
Refactor Jmx support, add attributes, shorten implementation
2 parents 0e21295 + 14dd8a5 commit cb545bc

File tree

10 files changed

+311
-232
lines changed

10 files changed

+311
-232
lines changed

cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public final class MembershipConfig implements Cloneable {
2929
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
3030
private int suspicionMult = DEFAULT_SUSPICION_MULT;
3131
private String syncGroup = "default";
32+
private int removedMembersHistorySize = 42;
3233

3334
public MembershipConfig() {}
3435

@@ -157,6 +158,22 @@ public MembershipConfig syncGroup(String syncGroup) {
157158
return m;
158159
}
159160

161+
public int removedMembersHistorySize() {
162+
return removedMembersHistorySize;
163+
}
164+
165+
/**
166+
* Sets a removedMembersHistorySize.
167+
*
168+
* @param removedMembersHistorySize history size for remove members
169+
* @return new {@code MembershipConfig} instance
170+
*/
171+
public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) {
172+
MembershipConfig m = clone();
173+
m.removedMembersHistorySize = removedMembersHistorySize;
174+
return m;
175+
}
176+
160177
@Override
161178
public MembershipConfig clone() {
162179
try {
@@ -174,6 +191,7 @@ public String toString() {
174191
.add("syncTimeout=" + syncTimeout)
175192
.add("suspicionMult=" + suspicionMult)
176193
.add("syncGroup='" + syncGroup + "'")
194+
.add("removedMembersHistorySize=" + removedMembersHistorySize)
177195
.toString();
178196
}
179197
}

cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 27 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99
import io.scalecube.cluster.membership.MembershipProtocolImpl;
1010
import io.scalecube.cluster.metadata.MetadataStore;
1111
import io.scalecube.cluster.metadata.MetadataStoreImpl;
12+
import io.scalecube.cluster.monitor.ClusterMonitorMBean;
13+
import io.scalecube.cluster.monitor.ClusterMonitorModel;
14+
import io.scalecube.cluster.monitor.JmxClusterMonitorMBean;
1215
import io.scalecube.cluster.transport.api.Message;
1316
import io.scalecube.cluster.transport.api.Transport;
1417
import io.scalecube.cluster.transport.api.TransportConfig;
1518
import io.scalecube.net.Address;
1619
import io.scalecube.transport.netty.TransportImpl;
20+
import java.lang.management.ManagementFactory;
1721
import java.util.Collection;
1822
import java.util.Collections;
1923
import java.util.Objects;
@@ -23,6 +27,10 @@
2327
import java.util.function.UnaryOperator;
2428
import java.util.stream.Collectors;
2529
import java.util.stream.Stream;
30+
import javax.management.MBeanServer;
31+
import javax.management.ObjectInstance;
32+
import javax.management.ObjectName;
33+
import javax.management.StandardMBean;
2634
import org.slf4j.Logger;
2735
import org.slf4j.LoggerFactory;
2836
import reactor.core.Disposable;
@@ -82,6 +90,7 @@ public final class ClusterImpl implements Cluster {
8290
private MetadataStore metadataStore;
8391
private Scheduler scheduler;
8492
private CorrelationIdGenerator cidGenerator;
93+
private ClusterMonitorModel.Builder monitorModelBuilder;
8594

8695
public ClusterImpl() {
8796
this(ClusterConfig.defaultConfig());
@@ -228,6 +237,7 @@ private Mono<Cluster> doStart0() {
228237

229238
cidGenerator = new CorrelationIdGenerator(localMember.id());
230239
scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
240+
monitorModelBuilder = new ClusterMonitorModel.Builder();
231241

232242
failureDetector =
233243
new FailureDetectorImpl(
@@ -259,14 +269,16 @@ private Mono<Cluster> doStart0() {
259269
metadataStore,
260270
config,
261271
scheduler,
262-
cidGenerator);
272+
cidGenerator,
273+
monitorModelBuilder);
263274

264275
actionsDisposables.add(
276+
// Retransmit inner membership events to public api layer
265277
membership
266278
.listen()
267279
/*.publishOn(scheduler)*/
268-
// dont uncomment, already beign executed inside sc-cluster thread
269-
.subscribe(membershipSink::next, this::onError));
280+
// Dont uncomment, already beign executed inside sc-cluster thread
281+
.subscribe(membershipSink::next, this::onError, membershipSink::complete));
270282

271283
return Mono.fromRunnable(() -> failureDetector.start())
272284
.then(Mono.fromRunnable(() -> gossip.start()))
@@ -301,7 +313,18 @@ private void startHandler() {
301313
}
302314

303315
private Mono<Void> startJmxMonitor() {
304-
return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then();
316+
return Mono.fromCallable(this::startJmxMonitor0).then();
317+
}
318+
319+
private ObjectInstance startJmxMonitor0() throws Exception {
320+
ClusterMonitorModel monitorModel = monitorModelBuilder.config(config).cluster(this).build();
321+
322+
JmxClusterMonitorMBean bean = new JmxClusterMonitorMBean(monitorModel);
323+
StandardMBean standardMBean = new StandardMBean(bean, ClusterMonitorMBean.class);
324+
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
325+
ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id());
326+
327+
return server.registerMBean(standardMBean, objectName);
305328
}
306329

307330
private void onError(Throwable th) {
@@ -481,85 +504,6 @@ public boolean isShutdown() {
481504
return onShutdown.isDisposed();
482505
}
483506

484-
@SuppressWarnings("unused")
485-
public interface MonitorMBean {
486-
487-
Collection<String> getId();
488-
489-
String getIdAsString();
490-
491-
Collection<String> getAlias();
492-
493-
String getAliasAsString();
494-
495-
Collection<String> getAddress();
496-
497-
String getAddressAsString();
498-
499-
Collection<String> getMetadata();
500-
501-
String getMetadataAsString();
502-
}
503-
504-
public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean {
505-
506-
private final ClusterImpl cluster;
507-
508-
private JmxMonitorMBean(ClusterImpl cluster) {
509-
this.cluster = cluster;
510-
}
511-
512-
@Override
513-
public Collection<String> getId() {
514-
return Collections.singleton(getIdAsString());
515-
}
516-
517-
@Override
518-
public String getIdAsString() {
519-
return cluster.member().id();
520-
}
521-
522-
@Override
523-
public Collection<String> getAlias() {
524-
return Collections.singleton(getAliasAsString());
525-
}
526-
527-
@Override
528-
public String getAliasAsString() {
529-
return cluster.member().alias();
530-
}
531-
532-
@Override
533-
public Collection<String> getAddress() {
534-
return Collections.singleton(getAddressAsString());
535-
}
536-
537-
@Override
538-
public String getAddressAsString() {
539-
return String.valueOf(cluster.member().address());
540-
}
541-
542-
@Override
543-
public Collection<String> getMetadata() {
544-
return Collections.singletonList(getMetadataAsString());
545-
}
546-
547-
@Override
548-
public String getMetadataAsString() {
549-
return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null));
550-
}
551-
552-
@Override
553-
protected Class getBeanType() {
554-
return MonitorMBean.class;
555-
}
556-
557-
@Override
558-
protected String getObjectName() {
559-
return "io.scalecube.cluster:name=Cluster@" + cluster.member().id();
560-
}
561-
}
562-
563507
private static class SenderAwareTransport implements Transport {
564508

565509
private final Transport transport;

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ public FailureDetectorImpl(
8989
// Subscribe
9090
actionsDisposables.addAll(
9191
Arrays.asList(
92-
membershipProcessor //
92+
membershipProcessor // Listen membership events to update remoteMembers
9393
.publishOn(scheduler)
9494
.subscribe(this::onMemberEvent, this::onError),
9595
transport
96-
.listen() //
96+
.listen() // Listen failure detector requests
9797
.publishOn(scheduler)
9898
.subscribe(this::onMessage, this::onError)));
9999
}

cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ public GossipProtocolImpl(
9191
// Subscribe
9292
actionsDisposables.addAll(
9393
Arrays.asList(
94-
membershipProcessor //
94+
membershipProcessor // Listen membership events to update remoteMembers
9595
.publishOn(scheduler)
9696
.subscribe(this::onMemberEvent, this::onError),
9797
transport
98-
.listen()
98+
.listen() // Listen gossip requests
9999
.publishOn(scheduler)
100100
.filter(this::isGossipReq)
101101
.subscribe(this::onGossipReq, this::onError)));

0 commit comments

Comments
 (0)