Skip to content

Commit 8b5769a

Browse files
committed
1 parent b04c7df commit 8b5769a

File tree

4 files changed

+79
-64
lines changed

4 files changed

+79
-64
lines changed

moss-core/src/main/java/de/codecentric/boot/admin/server/notify/RemindingNotifier.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.scheduler.Scheduler;
2929
import reactor.core.scheduler.Schedulers;
30+
import reactor.retry.Retry;
3031

3132
import java.time.Duration;
3233
import java.time.Instant;
@@ -73,14 +74,16 @@ public Mono<Void> doNotify(InstanceEvent event, Instance instance) {
7374
public void start() {
7475
Scheduler scheduler = Schedulers.newSingle("reminders");
7576
this.subscription = Flux.interval(this.checkReminderInverval, scheduler)
76-
.log(log.getName(), Level.FINEST)
77-
.doOnSubscribe(s -> log.debug("Started reminders"))
78-
.flatMap(i -> this.sendReminders())
79-
.onErrorContinue((ex, value) -> log.warn(
80-
"Unexpected error while sending reminders",
81-
ex
82-
)).doFinally(s -> scheduler.dispose())
83-
.subscribe();
77+
.log(log.getName(), Level.FINEST)
78+
.doOnSubscribe(s -> log.debug("Started reminders"))
79+
.flatMap(i -> this.sendReminders())
80+
.retryWhen(Retry.any()
81+
.retryMax(Long.MAX_VALUE)
82+
.doOnRetry(ctx -> log.warn("Unexpected error when sending reminders",
83+
ctx.exception()
84+
)))
85+
.doFinally(s -> scheduler.dispose())
86+
.subscribe();
8487
}
8588

8689
public void stop() {
@@ -94,16 +97,17 @@ protected Mono<Void> sendReminders() {
9497
Instant now = Instant.now();
9598

9699
return Flux.fromIterable(this.reminders.values())
97-
.filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
98-
.flatMap(reminder -> delegate.notify(reminder.getEvent())
99-
.doOnSuccess(signal -> reminder.setLastNotification(now)))
100-
.then();
100+
.filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
101+
.flatMap(reminder -> delegate.notify(reminder.getEvent())
102+
.doOnSuccess(signal -> reminder.setLastNotification(now)))
103+
.then();
101104
}
102105

103106
protected boolean shouldStartReminder(InstanceEvent event) {
104107
if (event instanceof InstanceStatusChangedEvent) {
105108
return Arrays.binarySearch(reminderStatuses,
106-
((InstanceStatusChangedEvent) event).getStatusInfo().getStatus()) >= 0;
109+
((InstanceStatusChangedEvent) event).getStatusInfo().getStatus()
110+
) >= 0;
107111
}
108112
return false;
109113
}
@@ -114,7 +118,8 @@ protected boolean shouldEndReminder(InstanceEvent event) {
114118
}
115119
if (event instanceof InstanceStatusChangedEvent) {
116120
return Arrays.binarySearch(reminderStatuses,
117-
((InstanceStatusChangedEvent) event).getStatusInfo().getStatus()) < 0;
121+
((InstanceStatusChangedEvent) event).getStatusInfo().getStatus()
122+
) < 0;
118123
}
119124
return false;
120125
}

moss-core/src/main/java/de/codecentric/boot/admin/server/services/AbstractEventHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2018 the original author or authors.
2+
* Copyright 2014-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
2020
import reactor.core.Disposable;
2121
import reactor.core.publisher.Flux;
22+
import reactor.retry.Retry;
2223

2324
import java.util.logging.Level;
2425
import javax.annotation.Nullable;
@@ -40,12 +41,15 @@ protected AbstractEventHandler(Publisher<InstanceEvent> publisher, Class<T> even
4041

4142
public void start() {
4243
subscription = Flux.from(publisher)
43-
.log(log.getName(), Level.FINEST)
44-
.doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
45-
.ofType(eventType)
46-
.cast(eventType).transform(this::handle)
47-
.onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
48-
.subscribe();
44+
.log(log.getName(), Level.FINEST)
45+
.doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
46+
.ofType(eventType)
47+
.cast(eventType)
48+
.transform(this::handle)
49+
.retryWhen(Retry.any()
50+
.retryMax(Long.MAX_VALUE)
51+
.doOnRetry(ctx -> log.warn("Unexpected error", ctx.exception())))
52+
.subscribe();
4953
}
5054

5155
protected abstract Publisher<Void> handle(Flux<T> publisher);

moss-core/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2018 the original author or authors.
2+
* Copyright 2014-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import reactor.core.publisher.Mono;
2626
import reactor.core.scheduler.Scheduler;
2727
import reactor.core.scheduler.Schedulers;
28+
import reactor.retry.Retry;
2829

2930
import java.time.Duration;
3031
import java.time.Instant;
@@ -45,7 +46,6 @@ public class StatusUpdateTrigger extends AbstractEventHandler<InstanceEvent> {
4546
@Nullable
4647
private Disposable intervalSubscription;
4748

48-
4949
public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> publisher) {
5050
super(publisher, InstanceEvent.class);
5151
this.statusUpdater = statusUpdater;
@@ -56,23 +56,27 @@ public void start() {
5656
super.start();
5757
Scheduler scheduler = Schedulers.newSingle("status-monitor");
5858
intervalSubscription = Flux.interval(updateInterval)
59-
.doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
60-
.log(log.getName(), Level.FINEST).subscribeOn(scheduler)
61-
.concatMap(i -> this.updateStatusForAllInstances())
62-
.onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
63-
ex
64-
)).doFinally(s -> scheduler.dispose())
65-
.subscribe();
59+
.doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
60+
.log(log.getName(), Level.FINEST)
61+
.subscribeOn(scheduler)
62+
.concatMap(i -> this.updateStatusForAllInstances())
63+
.retryWhen(Retry.any()
64+
.retryMax(Long.MAX_VALUE)
65+
.doOnRetry(ctx -> log.warn("Unexpected error when updating statuses",
66+
ctx.exception()
67+
)))
68+
.doFinally(s -> scheduler.dispose())
69+
.subscribe();
6670
}
6771

6872
@Override
6973
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
7074
Scheduler scheduler = Schedulers.newSingle("status-updater");
7175
return publisher.subscribeOn(scheduler)
72-
.filter(event -> event instanceof InstanceRegisteredEvent ||
73-
event instanceof InstanceRegistrationUpdatedEvent)
74-
.flatMap(event -> updateStatus(event.getInstance()))
75-
.doFinally(s -> scheduler.dispose());
76+
.filter(event -> event instanceof InstanceRegisteredEvent ||
77+
event instanceof InstanceRegistrationUpdatedEvent)
78+
.flatMap(event -> updateStatus(event.getInstance()))
79+
.doFinally(s -> scheduler.dispose());
7680
}
7781

7882
@Override
@@ -87,10 +91,10 @@ protected Mono<Void> updateStatusForAllInstances() {
8791
log.debug("Updating status for all instances");
8892
Instant expiryInstant = Instant.now().minus(statusLifetime);
8993
return Flux.fromIterable(lastQueried.entrySet())
90-
.filter(e -> e.getValue().isBefore(expiryInstant))
91-
.map(Map.Entry::getKey)
92-
.flatMap(this::updateStatus)
93-
.then();
94+
.filter(e -> e.getValue().isBefore(expiryInstant))
95+
.map(Map.Entry::getKey)
96+
.flatMap(this::updateStatus)
97+
.then();
9498
}
9599

96100
protected Mono<Void> updateStatus(InstanceId instanceId) {

moss-core/src/test/java/de/codecentric/boot/admin/server/services/AbstractEventHandlerTest.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2018 the original author or authors.
2+
* Copyright 2014-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import de.codecentric.boot.admin.server.domain.values.Registration;
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.FluxSink;
26+
import reactor.core.publisher.Mono;
2627
import reactor.core.publisher.UnicastProcessor;
2728
import reactor.test.StepVerifier;
2829
import reactor.test.publisher.TestPublisher;
@@ -37,15 +38,15 @@ public class AbstractEventHandlerTest {
3738
private static final Logger log = LoggerFactory.getLogger(AbstractEventHandlerTest.class);
3839
private static final Registration registration = Registration.create("foo", "http://health").build();
3940
private static final InstanceRegisteredEvent event = new InstanceRegisteredEvent(InstanceId.of("id"),
40-
0L,
41-
registration
41+
0L,
42+
registration
4243
);
4344
private static final InstanceRegisteredEvent errorEvent = new InstanceRegisteredEvent(InstanceId.of("err"),
44-
0L,
45-
registration
45+
0L,
46+
registration
4647
);
4748
private static final InstanceDeregisteredEvent ignoredEvent = new InstanceDeregisteredEvent(InstanceId.of("id"),
48-
1L
49+
1L
4950
);
5051

5152
@Test
@@ -56,15 +57,15 @@ public void should_resubscribe_after_error() {
5657
eventHandler.start();
5758

5859
StepVerifier.create(eventHandler.getFlux())
59-
.expectSubscription()
60-
.then(() -> testPublisher.next(event))
61-
.expectNext(event)
62-
.then(() -> testPublisher.next(errorEvent))
63-
.expectNoEvent(Duration.ofMillis(100L))
64-
.then(() -> testPublisher.next(event))
65-
.expectNext(event)
66-
.thenCancel()
67-
.verify(Duration.ofSeconds(5));
60+
.expectSubscription()
61+
.then(() -> testPublisher.next(event))
62+
.expectNext(event)
63+
.then(() -> testPublisher.next(errorEvent))
64+
.expectNoEvent(Duration.ofMillis(100L))
65+
.then(() -> testPublisher.next(event))
66+
.expectNext(event)
67+
.thenCancel()
68+
.verify(Duration.ofSeconds(5));
6869

6970
}
7071

@@ -76,21 +77,21 @@ public void should_filter() {
7677
eventHandler.start();
7778

7879
StepVerifier.create(eventHandler.getFlux())
79-
.expectSubscription()
80-
.then(() -> testPublisher.next(event))
81-
.expectNext(event)
82-
.then(() -> testPublisher.next(ignoredEvent))
83-
.expectNoEvent(Duration.ofMillis(100L))
84-
.thenCancel()
85-
.verify(Duration.ofSeconds(5));
80+
.expectSubscription()
81+
.then(() -> testPublisher.next(event))
82+
.expectNext(event)
83+
.then(() -> testPublisher.next(ignoredEvent))
84+
.expectNoEvent(Duration.ofMillis(100L))
85+
.thenCancel()
86+
.verify(Duration.ofSeconds(5));
8687
}
8788

8889

8990
private static class TestEventHandler extends AbstractEventHandler<InstanceRegisteredEvent> {
9091
private final FluxSink<InstanceEvent> sink;
9192
private final Flux<InstanceEvent> flux;
9293

93-
protected TestEventHandler(Publisher<InstanceEvent> publisher) {
94+
private TestEventHandler(Publisher<InstanceEvent> publisher) {
9495
super(publisher, InstanceRegisteredEvent.class);
9596
UnicastProcessor<InstanceEvent> processor = UnicastProcessor.create();
9697
this.sink = processor.sink();
@@ -99,14 +100,15 @@ protected TestEventHandler(Publisher<InstanceEvent> publisher) {
99100

100101
@Override
101102
protected Publisher<Void> handle(Flux<InstanceRegisteredEvent> publisher) {
102-
return publisher.doOnNext(event -> {
103+
return publisher.flatMap(event -> {
103104
if (event.equals(errorEvent)) {
104-
throw new IllegalStateException("Error");
105+
throw (new IllegalStateException("Error"));
105106
} else {
106107
log.info("Event {}", event);
107108
sink.next(event);
109+
return Mono.empty();
108110
}
109-
}).then();
111+
});
110112
}
111113

112114
public Flux<InstanceEvent> getFlux() {

0 commit comments

Comments
 (0)