Skip to content

Commit d8c7ae7

Browse files
authored
Revise retry for IntegrationReactiveUtils (#3543)
Related to: spring-cloud/stream-applications#156 The `IntegrationReactiveUtils` uses a general `Flux.retry()` operator to always retry for all the errors. On the other hand it has only a `.doOnError(MessagingException.class)` which leads swallowing all the other exceptions from logs and handling. * Replace with `retryWhen()` for the `MessagingException` predicate failing for all other exceptions. The end-user may add their own `retry` or error handling mechanism to the returned `Flux` from the `IntegrationReactiveUtils` **Cherry-pick to 5.4.x**
1 parent aa32270 commit d8c7ae7

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
import reactor.core.publisher.Mono;
3838
import reactor.core.publisher.Sinks;
3939
import reactor.core.scheduler.Schedulers;
40+
import reactor.util.retry.Retry;
4041

4142
/**
4243
* Utilities for adapting integration components to/from reactive types.
@@ -100,7 +101,7 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
100101
Mono.delay(ctx.getOrDefault(DELAY_WHEN_EMPTY_KEY,
101102
DEFAULT_DELAY_WHEN_EMPTY)))))
102103
.repeat()
103-
.retry();
104+
.retryWhen(Retry.indefinitely().filter(MessagingException.class::isInstance));
104105
}
105106

106107
/**
Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import org.junit.jupiter.api.AfterAll;
2728
import org.junit.jupiter.api.Test;
2829

30+
import org.springframework.integration.core.MessageSource;
2931
import org.springframework.integration.util.IntegrationReactiveUtils;
32+
import org.springframework.messaging.MessagingException;
3033
import org.springframework.messaging.support.GenericMessage;
3134

3235
import reactor.core.Disposable;
@@ -43,7 +46,7 @@
4346
*
4447
* @since 5.1.9
4548
*/
46-
class MessageChannelReactiveUtilsTests {
49+
class IntegrationReactiveUtilsTests {
4750

4851
private static final Scheduler SCHEDULER = Schedulers.boundedElastic();
4952

@@ -125,4 +128,28 @@ void testPublisherPayloadWithNullChannel() throws InterruptedException {
125128
assertThat(publisherSubscribed.await(10, TimeUnit.SECONDS)).isTrue();
126129
}
127130

131+
132+
@Test
133+
void testRetryOnMessagingExceptionOnly() {
134+
AtomicInteger retryAttempts = new AtomicInteger(3);
135+
AtomicReference<Throwable> finalException = new AtomicReference<>();
136+
MessageSource<?> messageSource =
137+
() -> {
138+
if (retryAttempts.getAndDecrement() > 0) {
139+
throw new MessagingException("retryable MessagingException");
140+
}
141+
else {
142+
throw new RuntimeException("non-retryable RuntimeException");
143+
}
144+
};
145+
146+
StepVerifier.create(IntegrationReactiveUtils.messageSourceToFlux(messageSource).doOnError(finalException::set))
147+
.expectSubscription()
148+
.expectErrorMessage("non-retryable RuntimeException")
149+
.verify(Duration.ofSeconds(1));
150+
151+
assertThat(retryAttempts.get()).isEqualTo(-1);
152+
assertThat(finalException.get()).hasMessage("non-retryable RuntimeException");
153+
}
154+
128155
}

0 commit comments

Comments
 (0)