Skip to content

Commit 1ce38e5

Browse files
t-beckmannjoshiste
authored andcommitted
Properly dispose schedulers on shutdown
With this commit all schedulers are disposed on shutdown so the jvm can exit gracefully.
1 parent bb5a9c4 commit 1ce38e5

File tree

5 files changed

+83
-8
lines changed

5 files changed

+83
-8
lines changed

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/notify/NotificationTrigger.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,39 @@
2020
import de.codecentric.boot.admin.server.services.AbstractEventHandler;
2121
import reactor.core.publisher.Flux;
2222
import reactor.core.publisher.Mono;
23+
import reactor.core.scheduler.Scheduler;
2324
import reactor.core.scheduler.Schedulers;
2425

2526
import org.reactivestreams.Publisher;
2627

2728
public class NotificationTrigger extends AbstractEventHandler<InstanceEvent> {
2829
private final Notifier notifier;
30+
private Scheduler scheduler;
2931

3032
public NotificationTrigger(Notifier notifier, Publisher<InstanceEvent> publisher) {
3133
super(publisher, InstanceEvent.class);
3234
this.notifier = notifier;
3335
}
3436

37+
@Override
38+
public void start() {
39+
assert scheduler == null;
40+
scheduler = Schedulers.newSingle("notifications");
41+
super.start();
42+
}
43+
44+
@Override
45+
public void stop() {
46+
super.stop();
47+
if (scheduler != null) {
48+
scheduler.dispose();
49+
scheduler = null;
50+
}
51+
}
52+
3553
@Override
3654
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
37-
return publisher.subscribeOn(Schedulers.newSingle("notifications")).flatMap(this::sendNotifications);
55+
return publisher.subscribeOn(scheduler).flatMap(this::sendNotifications);
3856
}
3957

4058
protected Mono<Void> sendNotifications(InstanceEvent event) {

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/notify/RemindingNotifier.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import reactor.core.Disposable;
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
28+
import reactor.core.scheduler.Scheduler;
2829
import reactor.core.scheduler.Schedulers;
2930

3031
import java.time.Duration;
@@ -51,6 +52,7 @@ public class RemindingNotifier extends AbstractEventNotifier {
5152
private String[] reminderStatuses = {"DOWN", "OFFLINE"};
5253
@Nullable
5354
private Disposable subscription;
55+
private Scheduler scheduler;
5456

5557
public RemindingNotifier(Notifier delegate, InstanceRepository repository) {
5658
super(repository);
@@ -69,9 +71,10 @@ public Mono<Void> doNotify(InstanceEvent event, Instance instance) {
6971
}));
7072
}
7173

72-
7374
public void start() {
74-
this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
75+
assert scheduler == null;
76+
scheduler = Schedulers.newSingle("reminders");
77+
this.subscription = Flux.interval(this.checkReminderInverval, scheduler)
7578
.log(log.getName(), Level.FINEST)
7679
.doOnSubscribe(s -> log.debug("Started reminders"))
7780
.flatMap(i -> this.sendReminders())
@@ -87,6 +90,10 @@ public void stop() {
8790
log.debug("stopped reminders");
8891
this.subscription.dispose();
8992
}
93+
if (scheduler != null) {
94+
scheduler.dispose();
95+
scheduler = null;
96+
}
9097
}
9198

9299
protected Mono<Void> sendReminders() {

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/EndpointDetectionTrigger.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,39 @@
2121
import de.codecentric.boot.admin.server.domain.events.InstanceStatusChangedEvent;
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
24+
import reactor.core.scheduler.Scheduler;
2425
import reactor.core.scheduler.Schedulers;
2526

2627
import org.reactivestreams.Publisher;
2728

2829
public class EndpointDetectionTrigger extends AbstractEventHandler<InstanceEvent> {
2930
private final EndpointDetector endpointDetector;
31+
private Scheduler scheduler;
3032

3133
public EndpointDetectionTrigger(EndpointDetector endpointDetector, Publisher<InstanceEvent> publisher) {
3234
super(publisher, InstanceEvent.class);
3335
this.endpointDetector = endpointDetector;
3436
}
3537

38+
@Override
39+
public void start() {
40+
assert scheduler == null;
41+
scheduler = Schedulers.newSingle("endpoint-detector");
42+
super.start();
43+
}
44+
45+
@Override
46+
public void stop() {
47+
super.stop();
48+
if (scheduler != null) {
49+
scheduler.dispose();
50+
scheduler = null;
51+
}
52+
}
53+
3654
@Override
3755
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
38-
return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector"))
56+
return publisher.subscribeOn(scheduler)
3957
.filter(event -> event instanceof InstanceStatusChangedEvent ||
4058
event instanceof InstanceRegistrationUpdatedEvent)
4159
.flatMap(this::detectEndpoints);

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/InfoUpdateTrigger.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,39 @@
2222
import de.codecentric.boot.admin.server.domain.events.InstanceStatusChangedEvent;
2323
import reactor.core.publisher.Flux;
2424
import reactor.core.publisher.Mono;
25+
import reactor.core.scheduler.Scheduler;
2526
import reactor.core.scheduler.Schedulers;
2627

2728
import org.reactivestreams.Publisher;
2829

2930
public class InfoUpdateTrigger extends AbstractEventHandler<InstanceEvent> {
3031
private final InfoUpdater infoUpdater;
32+
private Scheduler scheduler;
3133

3234
public InfoUpdateTrigger(InfoUpdater infoUpdater, Publisher<InstanceEvent> publisher) {
3335
super(publisher, InstanceEvent.class);
3436
this.infoUpdater = infoUpdater;
3537
}
3638

39+
@Override
40+
public void start() {
41+
assert scheduler == null;
42+
scheduler = Schedulers.newSingle("info-updater");
43+
super.start();
44+
}
45+
46+
@Override
47+
public void stop() {
48+
super.stop();
49+
if (scheduler != null) {
50+
scheduler.dispose();
51+
scheduler = null;
52+
}
53+
}
54+
3755
@Override
3856
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
39-
return publisher.subscribeOn(Schedulers.newSingle("info-updater"))
57+
return publisher.subscribeOn(scheduler)
4058
.filter(event -> event instanceof InstanceEndpointsDetectedEvent ||
4159
event instanceof InstanceStatusChangedEvent ||
4260
event instanceof InstanceRegistrationUpdatedEvent)

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import reactor.core.Disposable;
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.Mono;
26+
import reactor.core.scheduler.Scheduler;
2627
import reactor.core.scheduler.Schedulers;
2728

2829
import java.time.Duration;
@@ -43,7 +44,8 @@ public class StatusUpdateTrigger extends AbstractEventHandler<InstanceEvent> {
4344
private Duration statusLifetime = Duration.ofSeconds(10);
4445
@Nullable
4546
private Disposable intervalSubscription;
46-
47+
private Scheduler statusMonitorScheduler;
48+
private Scheduler statusUpdaterScheduler;
4749

4850
public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> publisher) {
4951
super(publisher, InstanceEvent.class);
@@ -52,11 +54,15 @@ public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent>
5254

5355
@Override
5456
public void start() {
57+
assert statusMonitorScheduler == null;
58+
statusMonitorScheduler = Schedulers.newSingle("status-monitor");
59+
assert statusUpdaterScheduler == null;
60+
statusUpdaterScheduler = Schedulers.newSingle("status-updater");
5561
super.start();
5662
intervalSubscription = Flux.interval(updateInterval)
5763
.doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
5864
.log(log.getName(), Level.FINEST)
59-
.subscribeOn(Schedulers.newSingle("status-monitor"))
65+
.subscribeOn(statusMonitorScheduler)
6066
.concatMap(i -> this.updateStatusForAllInstances())
6167
.onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
6268
ex
@@ -66,7 +72,7 @@ public void start() {
6672

6773
@Override
6874
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
69-
return publisher.subscribeOn(Schedulers.newSingle("status-updater"))
75+
return publisher.subscribeOn(statusUpdaterScheduler)
7076
.filter(event -> event instanceof InstanceRegisteredEvent ||
7177
event instanceof InstanceRegistrationUpdatedEvent)
7278
.flatMap(event -> updateStatus(event.getInstance()));
@@ -78,6 +84,14 @@ public void stop() {
7884
if (intervalSubscription != null) {
7985
intervalSubscription.dispose();
8086
}
87+
if (statusUpdaterScheduler != null) {
88+
statusUpdaterScheduler.dispose();
89+
statusUpdaterScheduler = null;
90+
}
91+
if (statusMonitorScheduler != null) {
92+
statusMonitorScheduler.dispose();
93+
statusMonitorScheduler = null;
94+
}
8195
}
8296

8397
protected Mono<Void> updateStatusForAllInstances() {

0 commit comments

Comments
 (0)