Skip to content

Commit a8c471c

Browse files
committed
Fix ReactiveStreamsConsumer for error handling
To support `onErrorContinue()` logic for the plain `Subscriber` we need to wrap its `onNext()` into a `try..catch` and respective `errorHandler` in the `ReactiveStreamsConsumer`
1 parent 6591ce9 commit a8c471c

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -166,21 +166,25 @@ protected void doStart() {
166166
this.subscription =
167167
Flux.from(this.publisher)
168168
.flatMap(this.reactiveMessageHandler::handleMessage)
169-
.onErrorContinue(this::onError)
169+
.onErrorContinue((ex, data) -> this.errorHandler.handleError(ex))
170170
.subscribe();
171171
}
172172
else if (this.subscriber != null) {
173-
Flux.from(this.publisher)
174-
.doOnSubscribe((subs) -> this.subscription = subs::cancel)
175-
.onErrorContinue(this::onError)
176-
.subscribe(this.subscriber);
173+
this.subscription =
174+
Flux.from(this.publisher)
175+
.doOnComplete(this.subscriber::onComplete)
176+
.doOnSubscribe(this.subscriber::onSubscribe)
177+
.subscribe((data) -> {
178+
try {
179+
this.subscriber.onNext(data);
180+
}
181+
catch (Exception ex) {
182+
this.errorHandler.handleError(ex);
183+
}
184+
});
177185
}
178186
}
179187

180-
private void onError(Throwable ex, Object data) {
181-
this.errorHandler.handleError(ex);
182-
}
183-
184188
@Override
185189
protected void doStop() {
186190
if (this.subscription != null) {

0 commit comments

Comments
 (0)