Skip to content

Commit 452b618

Browse files
committed
Fix missed events in EventHandlers when flatMapping.
An error during event handling for a single event doesn't kill the sub- scription, but prevents the remaining events of the current "chunk" to be processed, so we need to ignore failures for single events. closes #1128
1 parent c0e47fa commit 452b618

File tree

7 files changed

+108
-22
lines changed

7 files changed

+108
-22
lines changed

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/EndpointDetectionTrigger.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import reactor.core.scheduler.Schedulers;
2626

2727
import org.reactivestreams.Publisher;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
public class EndpointDetectionTrigger extends AbstractEventHandler<InstanceEvent> {
32+
private static final Logger log = LoggerFactory.getLogger(EndpointDetectionTrigger.class);
3033
private final EndpointDetector endpointDetector;
3134

3235
public EndpointDetectionTrigger(EndpointDetector endpointDetector, Publisher<InstanceEvent> publisher) {
@@ -45,6 +48,9 @@ protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
4548
}
4649

4750
protected Mono<Void> detectEndpoints(InstanceEvent event) {
48-
return endpointDetector.detectEndpoints(event.getInstance());
51+
return this.endpointDetector.detectEndpoints(event.getInstance()).onErrorResume(e -> {
52+
log.warn("Unexpected error while detecting endpoints for {}", event.getInstance(), e);
53+
return Mono.empty();
54+
});
4955
}
5056
}

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/InfoUpdateTrigger.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828

2929
import java.time.Duration;
3030
import org.reactivestreams.Publisher;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
public class InfoUpdateTrigger extends AbstractEventHandler<InstanceEvent> {
35+
private static final Logger log = LoggerFactory.getLogger(InfoUpdateTrigger.class);
3336
private final InfoUpdater infoUpdater;
3437
private final IntervalCheck intervalCheck;
3538

@@ -51,7 +54,10 @@ protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
5154
}
5255

5356
protected Mono<Void> updateInfo(InstanceId instanceId) {
54-
return this.infoUpdater.updateInfo(instanceId).doFinally(s -> this.intervalCheck.markAsChecked(instanceId));
57+
return this.infoUpdater.updateInfo(instanceId).onErrorResume(e -> {
58+
log.warn("Unexpected error while updating info for {}", instanceId, e);
59+
return Mono.empty();
60+
}).doFinally(s -> this.intervalCheck.markAsChecked(instanceId));
5561
}
5662

5763
@Override

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727

2828
import java.time.Duration;
2929
import org.reactivestreams.Publisher;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
public class StatusUpdateTrigger extends AbstractEventHandler<InstanceEvent> {
34+
private static final Logger log = LoggerFactory.getLogger(StatusUpdateTrigger.class);
3235
private final StatusUpdater statusUpdater;
3336
private final IntervalCheck intervalCheck;
3437

@@ -49,7 +52,10 @@ protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
4952
}
5053

5154
protected Mono<Void> updateStatus(InstanceId instanceId) {
52-
return this.statusUpdater.updateStatus(instanceId).doFinally(s -> this.intervalCheck.markAsChecked(instanceId));
55+
return this.statusUpdater.updateStatus(instanceId).onErrorResume(e -> {
56+
log.warn("Unexpected error while updating status for {}", instanceId, e);
57+
return Mono.empty();
58+
}).doFinally(s -> this.intervalCheck.markAsChecked(instanceId));
5359
}
5460

5561
@Override

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/AbstractEventHandlerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,17 @@ private TestEventHandler(Publisher<InstanceEvent> publisher) {
102102
protected Publisher<Void> handle(Flux<InstanceRegisteredEvent> publisher) {
103103
return publisher.flatMap(event -> {
104104
if (event.equals(errorEvent)) {
105-
throw (new IllegalStateException("Error"));
105+
return Mono.error(new IllegalStateException("Error"));
106106
} else {
107107
log.info("Event {}", event);
108-
sink.next(event);
108+
this.sink.next(event);
109109
return Mono.empty();
110110
}
111-
});
111+
}).then();
112112
}
113113

114114
public Flux<InstanceEvent> getFlux() {
115-
return flux;
115+
return this.flux;
116116
}
117117
}
118118

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/EndpointDetectionTriggerTest.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2018 the original author or authors.
2+
* Copyright 2014-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,44 +47,74 @@ public class EndpointDetectionTriggerTest {
4747

4848
@Before
4949
public void setUp() throws Exception {
50-
when(detector.detectEndpoints(any(InstanceId.class))).thenReturn(Mono.empty());
51-
trigger = new EndpointDetectionTrigger(detector, events.flux());
52-
trigger.start();
50+
when(this.detector.detectEndpoints(any(InstanceId.class))).thenReturn(Mono.empty());
51+
this.trigger = new EndpointDetectionTrigger(this.detector, this.events.flux());
52+
this.trigger.start();
5353
Thread.sleep(50L); //wait for subscription
5454
}
5555

5656
@Test
5757
public void should_detect_on_status_changed() {
5858
//when status-change event is emitted
59-
events.next(new InstanceStatusChangedEvent(instance.getId(), instance.getVersion(), StatusInfo.ofDown()));
59+
this.events.next(new InstanceStatusChangedEvent(this.instance.getId(),
60+
this.instance.getVersion(),
61+
StatusInfo.ofDown()
62+
));
6063
//then should update
61-
verify(detector, times(1)).detectEndpoints(instance.getId());
64+
verify(this.detector, times(1)).detectEndpoints(this.instance.getId());
6265
}
6366

6467
@Test
6568
public void should_detect_on_registration_updated() {
6669
//when status-change event is emitted
67-
events.next(
68-
new InstanceRegistrationUpdatedEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
70+
this.events.next(new InstanceRegistrationUpdatedEvent(this.instance.getId(),
71+
this.instance.getVersion(),
72+
this.instance.getRegistration()
73+
));
6974
//then should update
70-
verify(detector, times(1)).detectEndpoints(instance.getId());
75+
verify(this.detector, times(1)).detectEndpoints(this.instance.getId());
7176
}
7277

7378
@Test
7479
public void should_not_detect_on_non_relevant_event() {
7580
//when some non-status-change event is emitted
76-
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
81+
this.events.next(new InstanceRegisteredEvent(this.instance.getId(),
82+
this.instance.getVersion(),
83+
this.instance.getRegistration()
84+
));
7785
//then should not update
78-
verify(detector, never()).detectEndpoints(instance.getId());
86+
verify(this.detector, never()).detectEndpoints(this.instance.getId());
7987
}
8088

8189
@Test
8290
public void should_not_detect_on_trigger_stopped() {
8391
//when registered event is emitted but the trigger has been stopped
84-
trigger.stop();
85-
clearInvocations(detector);
86-
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
92+
this.trigger.stop();
93+
clearInvocations(this.detector);
94+
this.events.next(new InstanceRegisteredEvent(this.instance.getId(),
95+
this.instance.getVersion(),
96+
this.instance.getRegistration()
97+
));
8798
//then should not update
88-
verify(detector, never()).detectEndpoints(instance.getId());
99+
verify(this.detector, never()).detectEndpoints(this.instance.getId());
100+
}
101+
102+
@Test
103+
public void should_continue_detection_after_error() throws InterruptedException {
104+
//when status-change event is emitted and an error is emitted
105+
when(this.detector.detectEndpoints(any())).thenReturn(Mono.error(IllegalStateException::new))
106+
.thenReturn(Mono.empty());
107+
108+
this.events.next(new InstanceStatusChangedEvent(this.instance.getId(),
109+
this.instance.getVersion(),
110+
StatusInfo.ofDown()
111+
));
112+
this.events.next(new InstanceStatusChangedEvent(this.instance.getId(),
113+
this.instance.getVersion(),
114+
StatusInfo.ofUp()
115+
));
116+
117+
//then should update
118+
verify(this.detector, times(2)).detectEndpoints(this.instance.getId());
89119
}
90120
}

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/InfoUpdateTriggerTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,23 @@ public void should_not_update_on_non_relevant_event() {
149149
//then should not update
150150
verify(this.updater, never()).updateInfo(this.instance.getId());
151151
}
152+
153+
@Test
154+
public void should_continue_update_after_error() throws InterruptedException {
155+
//when status-change event is emitted and an error is emitted
156+
when(this.updater.updateInfo(any())).thenReturn(Mono.error(IllegalStateException::new))
157+
.thenReturn(Mono.empty());
158+
159+
this.events.next(new InstanceStatusChangedEvent(this.instance.getId(),
160+
this.instance.getVersion(),
161+
StatusInfo.ofDown()
162+
));
163+
this.events.next(new InstanceStatusChangedEvent(this.instance.getId(),
164+
this.instance.getVersion(),
165+
StatusInfo.ofUp()
166+
));
167+
168+
//then should update
169+
verify(this.updater, times(2)).updateInfo(this.instance.getId());
170+
}
152171
}

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/StatusUpdateTriggerTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,23 @@ public void should_not_update_on_non_relevant_event() {
132132
//then should not update
133133
verify(this.updater, never()).updateStatus(this.instance.getId());
134134
}
135+
136+
@Test
137+
public void should_continue_update_after_error() throws InterruptedException {
138+
//when status-change event is emitted and an error is emitted
139+
when(this.updater.updateStatus(any())).thenReturn(Mono.error(IllegalStateException::new))
140+
.thenReturn(Mono.empty());
141+
142+
this.events.next(new InstanceRegistrationUpdatedEvent(this.instance.getId(),
143+
this.instance.getVersion(),
144+
this.instance.getRegistration()
145+
));
146+
this.events.next(new InstanceRegistrationUpdatedEvent(this.instance.getId(),
147+
this.instance.getVersion(),
148+
this.instance.getRegistration()
149+
));
150+
151+
//then should update
152+
verify(this.updater, times(2)).updateStatus(this.instance.getId());
153+
}
135154
}

0 commit comments

Comments
 (0)