|
34 | 34 | import java.util.stream.Collectors; |
35 | 35 | import java.util.stream.Stream; |
36 | 36 | import javax.management.MBeanServer; |
37 | | -import javax.management.ObjectInstance; |
38 | 37 | import javax.management.ObjectName; |
39 | 38 | import javax.management.StandardMBean; |
40 | 39 | import org.slf4j.Logger; |
41 | 40 | import org.slf4j.LoggerFactory; |
42 | 41 | import reactor.core.Disposable; |
43 | 42 | import reactor.core.Disposables; |
| 43 | +import reactor.core.Exceptions; |
44 | 44 | import reactor.core.publisher.DirectProcessor; |
45 | 45 | import reactor.core.publisher.Flux; |
46 | 46 | import reactor.core.publisher.FluxSink; |
@@ -284,8 +284,9 @@ private Mono<Cluster> doStart0() { |
284 | 284 | .then(Mono.fromRunnable(() -> gossip.start())) |
285 | 285 | .then(Mono.fromRunnable(() -> metadataStore.start())) |
286 | 286 | .then(Mono.fromRunnable(this::startHandler)) |
287 | | - .then((membership.start())) |
288 | | - .then(startJmxMonitor()); |
| 287 | + .then(membership.start()) |
| 288 | + .then(Mono.fromRunnable(this::startJmxMonitor)) |
| 289 | + .then(); |
289 | 290 | }) |
290 | 291 | .thenReturn(this); |
291 | 292 | } |
@@ -334,19 +335,27 @@ private void startHandler() { |
334 | 335 | actionsDisposables.add(listenGossip().subscribe(handler::onGossip, this::onError)); |
335 | 336 | } |
336 | 337 |
|
337 | | - private Mono<Void> startJmxMonitor() { |
338 | | - return Mono.fromCallable(this::startJmxMonitor0).then(); |
339 | | - } |
340 | | - |
341 | | - private ObjectInstance startJmxMonitor0() throws Exception { |
| 338 | + private void startJmxMonitor() { |
342 | 339 | ClusterMonitorModel monitorModel = monitorModelBuilder.config(config).cluster(this).build(); |
| 340 | + JmxClusterMonitorMBean monitorMBean = new JmxClusterMonitorMBean(monitorModel); |
| 341 | + try { |
| 342 | + StandardMBean standardMBean = new StandardMBean(monitorMBean, ClusterMonitorMBean.class); |
| 343 | + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); |
| 344 | + ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id()); |
| 345 | + server.registerMBean(standardMBean, objectName); |
| 346 | + } catch (Exception ex) { |
| 347 | + throw Exceptions.propagate(ex); |
| 348 | + } |
| 349 | + } |
343 | 350 |
|
344 | | - JmxClusterMonitorMBean bean = new JmxClusterMonitorMBean(monitorModel); |
345 | | - StandardMBean standardMBean = new StandardMBean(bean, ClusterMonitorMBean.class); |
346 | | - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); |
347 | | - ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id()); |
348 | | - |
349 | | - return server.registerMBean(standardMBean, objectName); |
| 351 | + private void stopJmxMonitor() { |
| 352 | + try { |
| 353 | + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); |
| 354 | + ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id()); |
| 355 | + server.unregisterMBean(objectName); |
| 356 | + } catch (Exception ex) { |
| 357 | + throw Exceptions.propagate(ex); |
| 358 | + } |
350 | 359 | } |
351 | 360 |
|
352 | 361 | private void onError(Throwable th) { |
@@ -480,7 +489,11 @@ private Mono<Void> doShutdown() { |
480 | 489 | return Mono.defer( |
481 | 490 | () -> { |
482 | 491 | LOGGER.info("[{}] Cluster member is shutting down", localMember); |
483 | | - return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop()) |
| 492 | + return Flux.concatDelayError( |
| 493 | + leaveCluster(), |
| 494 | + dispose(), |
| 495 | + transport.stop(), |
| 496 | + Mono.fromRunnable(this::stopJmxMonitor)) |
484 | 497 | .then() |
485 | 498 | .doFinally(s -> scheduler.dispose()) |
486 | 499 | .doOnSuccess( |
|
0 commit comments