Skip to content

Commit a869e8a

Browse files
artembilangaryrussell
authored andcommitted
Revise retry for IntegrationReactiveUtils (#3543)
Related to: spring-cloud/stream-applications#156 The `IntegrationReactiveUtils` uses a general `Flux.retry()` operator to always retry for all the errors. On the other hand it has only a `.doOnError(MessagingException.class)` which leads swallowing all the other exceptions from logs and handling. * Replace with `retryWhen()` for the `MessagingException` predicate failing for all other exceptions. The end-user may add their own `retry` or error handling mechanism to the returned `Flux` from the `IntegrationReactiveUtils` **Cherry-pick to 5.4.x**
1 parent c392d38 commit a869e8a

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
import reactor.core.publisher.Mono;
3838
import reactor.core.publisher.Sinks;
3939
import reactor.core.scheduler.Schedulers;
40+
import reactor.util.retry.Retry;
4041

4142
/**
4243
* Utilities for adapting integration components to/from reactive types.
@@ -100,7 +101,7 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
100101
Mono.delay(ctx.getOrDefault(DELAY_WHEN_EMPTY_KEY,
101102
DEFAULT_DELAY_WHEN_EMPTY)))))
102103
.repeat()
103-
.retry();
104+
.retryWhen(Retry.indefinitely().filter(MessagingException.class::isInstance));
104105
}
105106

106107
/**
Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import org.junit.jupiter.api.Test;
2728

29+
import org.springframework.integration.core.MessageSource;
2830
import org.springframework.integration.util.IntegrationReactiveUtils;
31+
import org.springframework.messaging.MessagingException;
2932
import org.springframework.messaging.support.GenericMessage;
3033

3134
import reactor.core.Disposable;
@@ -41,7 +44,7 @@
4144
*
4245
* @since 5.1.9
4346
*/
44-
class MessageChannelReactiveUtilsTests {
47+
class IntegrationReactiveUtilsTests {
4548

4649
@Test
4750
void testBackpressureWithSubscribableChannel() {
@@ -116,4 +119,28 @@ void testPublisherPayloadWithNullChannel() throws InterruptedException {
116119
assertThat(publisherSubscribed.await(10, TimeUnit.SECONDS)).isTrue();
117120
}
118121

122+
123+
@Test
124+
void testRetryOnMessagingExceptionOnly() {
125+
AtomicInteger retryAttempts = new AtomicInteger(3);
126+
AtomicReference<Throwable> finalException = new AtomicReference<>();
127+
MessageSource<?> messageSource =
128+
() -> {
129+
if (retryAttempts.getAndDecrement() > 0) {
130+
throw new MessagingException("retryable MessagingException");
131+
}
132+
else {
133+
throw new RuntimeException("non-retryable RuntimeException");
134+
}
135+
};
136+
137+
StepVerifier.create(IntegrationReactiveUtils.messageSourceToFlux(messageSource).doOnError(finalException::set))
138+
.expectSubscription()
139+
.expectErrorMessage("non-retryable RuntimeException")
140+
.verify(Duration.ofSeconds(1));
141+
142+
assertThat(retryAttempts.get()).isEqualTo(-1);
143+
assertThat(finalException.get()).hasMessage("non-retryable RuntimeException");
144+
}
145+
119146
}

0 commit comments

Comments
 (0)