Skip to content

Commit cd94127

Browse files
committed
Some clean up for Retry functionality
* Add `whats-new.adoc` section about Core Retry migration * Remove `SendRetryContextAccessor` as out-of-use API * Remove `MessageRecoveryCallback` in favor of `MessageRecoverer` * Improve `ReplyFailureException` to carry a `replyTo` property * Use `ReplyFailureException` together with a `replyTo` in the `AbstractAdaptableMessageListener` when retry is exhausted * Improve Javadoc of the `MessageKeyGenerator` to justify a null from the `getKey()` contract * Fix typos in the Javadocs of the affected classes
1 parent 346d3ae commit cd94127

File tree

23 files changed

+261
-376
lines changed

23 files changed

+261
-376
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/SendRetryContextAccessor.java

Lines changed: 0 additions & 69 deletions
This file was deleted.

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamRetryOperationsInterceptorFactoryBean.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@
3030
* Convenient factory bean for creating a stateless retry interceptor for use in a
3131
* {@link StreamListenerContainer} when consuming native stream messages, giving you a
3232
* large amount of control over the behavior of a container when a listener fails. To
33-
* control the number of retry attempt or the backoff in between attempts, supply a
33+
* control the number of retry attempts or the backoff in between attempts, supply a
3434
* customized {@link RetryOperations}. Stateless retry is appropriate if your listener can
3535
* be called repeatedly between failures with no side effects. The semantics of stateless
3636
* retry mean that a listener exception is not propagated to the container until the retry
3737
* attempts are exhausted. When the retry attempts are exhausted it can be processed using
3838
* a {@link StreamMessageRecoverer} if one is provided.
3939
*
4040
* @author Gary Russell
41+
* @author Stephane Nicoll
4142
*/
4243
public class StreamRetryOperationsInterceptorFactoryBean extends StatelessRetryOperationsInterceptorFactoryBean {
4344

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
/**
7575
* @author Gary Russell
7676
* @author Artem Bilan
77+
* @author Stephane Nicoll
78+
*
7779
* @since 2.4
7880
*
7981
*/
@@ -147,6 +149,7 @@ private Map<String, Object> queueInfo(String queueName) throws URISyntaxExceptio
147149
.accept(MediaType.APPLICATION_JSON)
148150
.retrieve()
149151
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
152+
150153
})
151154
.block(Duration.ofSeconds(10));
152155
}
@@ -233,20 +236,13 @@ public void start() {
233236
}
234237

235238
private void clean(Environment env) {
236-
try {
237-
env.deleteStream("test.stream.queue1");
238-
}
239-
catch (Exception e) {
240-
}
241-
try {
242-
env.deleteStream("test.stream.queue2");
243-
}
244-
catch (Exception e) {
245-
}
246-
try {
247-
env.deleteStream("stream.created.over.amqp");
248-
}
249-
catch (Exception e) {
239+
String[] streamsToDelete = {"test.stream.queue1", "test.stream.queue2", "stream.created.over.amqp"};
240+
for (String streamToDelete : streamsToDelete) {
241+
try {
242+
env.deleteStream(streamToDelete);
243+
}
244+
catch (Exception e) {
245+
}
250246
}
251247
}
252248

@@ -274,11 +270,10 @@ RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerF
274270
RabbitListenerContainerFactory<StreamListenerContainer> observableFactory(Environment env) {
275271
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
276272
factory.setObservationEnabled(true);
277-
factory.setConsumerCustomizer((id, builder) -> {
278-
builder.name(id)
279-
.offset(OffsetSpecification.first())
280-
.manualTrackingStrategy();
281-
});
273+
factory.setConsumerCustomizer((id, builder) ->
274+
builder.name(id)
275+
.offset(OffsetSpecification.first())
276+
.manualTrackingStrategy());
282277
return factory;
283278
}
284279

@@ -328,11 +323,10 @@ RabbitListenerContainerFactory<StreamListenerContainer> nativeObsFactory(Environ
328323
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
329324
factory.setNativeListener(true);
330325
factory.setObservationEnabled(true);
331-
factory.setConsumerCustomizer((id, builder) -> {
332-
builder.name(id)
333-
.offset(OffsetSpecification.first())
334-
.manualTrackingStrategy();
335-
});
326+
factory.setConsumerCustomizer((id, builder) ->
327+
builder.name(id)
328+
.offset(OffsetSpecification.first())
329+
.manualTrackingStrategy());
336330
return factory;
337331
}
338332

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRetryOperationsInterceptorFactoryBean.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* Convenient base class for interceptor factories.
2828
*
2929
* @author Dave Syer
30+
* @author Stephane Nicoll
3031
*
3132
*/
3233
public abstract class AbstractRetryOperationsInterceptorFactoryBean implements FactoryBean<Advice> {
@@ -51,8 +52,4 @@ public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
5152
return this.messageRecoverer;
5253
}
5354

54-
public boolean isSingleton() {
55-
return true;
56-
}
57-
5855
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
3030
import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener;
3131
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
32-
import org.springframework.amqp.rabbit.retry.MessageRecoveryCallback;
32+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
3333
import org.springframework.amqp.utils.JavaUtils;
3434
import org.springframework.beans.BeansException;
3535
import org.springframework.context.ApplicationContext;
@@ -45,6 +45,7 @@
4545
* @author Gary Russell
4646
* @author Ngoc Nhan
4747
* @author Artem Bilan
48+
* @author Stephane Nicoll
4849
*
4950
* @since 2.4
5051
*
@@ -58,7 +59,7 @@ public abstract class BaseRabbitListenerContainerFactory<C extends MessageListen
5859

5960
private @Nullable RetryTemplate retryTemplate;
6061

61-
private @Nullable MessageRecoveryCallback recoveryCallback;
62+
private @Nullable MessageRecoverer recoveryCallback;
6263

6364
private Advice @Nullable [] adviceChain;
6465

@@ -93,9 +94,9 @@ public void setDefaultRequeueRejected(Boolean requeueRejected) {
9394
}
9495

9596
/**
96-
* Set post processors that will be applied before sending replies; added to each
97+
* Set post-processors that will be applied before sending replies; added to each
9798
* message listener adapter.
98-
* @param postProcessors the post processors.
99+
* @param postProcessors the post-processors.
99100
* @see AbstractAdaptableMessageListener#setBeforeSendReplyPostProcessors(MessagePostProcessor...)
100101
*/
101102
public void setBeforeSendReplyPostProcessors(MessagePostProcessor... postProcessors) {
@@ -108,30 +109,30 @@ public void setBeforeSendReplyPostProcessors(MessagePostProcessor... postProcess
108109
* Set a {@link RetryTemplate} to use when sending replies; added to each message
109110
* listener adapter.
110111
* @param retryTemplate the template.
111-
* @see #setReplyRecoveryCallback(MessageRecoveryCallback)
112+
* @see #setReplyRecoveryCallback(MessageRecoverer)
112113
* @see AbstractAdaptableMessageListener#setRetryTemplate(RetryTemplate)
113114
*/
114115
public void setRetryTemplate(RetryTemplate retryTemplate) {
115116
this.retryTemplate = retryTemplate;
116117
}
117118

118119
/**
119-
* Set a {@link MessageRecoveryCallback} to invoke when retries are exhausted. Added to
120+
* Set a {@link MessageRecoverer} to invoke when retries are exhausted. Added to
120121
* each message listener adapter. Only used if a {@link #setRetryTemplate(RetryTemplate)
121122
* retryTemplate} is provided.
122123
* @param recoveryCallback the recovery callback.
123124
* @see #setRetryTemplate(RetryTemplate)
124-
* @see AbstractAdaptableMessageListener#setRecoveryCallback(MessageRecoveryCallback)
125+
* @see AbstractAdaptableMessageListener#setRecoveryCallback(MessageRecoverer)
125126
*/
126-
public void setReplyRecoveryCallback(MessageRecoveryCallback recoveryCallback) {
127+
public void setReplyRecoveryCallback(MessageRecoverer recoveryCallback) {
127128
this.recoveryCallback = recoveryCallback;
128129
}
129130

130131
/**
131-
* Set a function to provide a reply post processor; it will be used if there is no
132+
* Set a function to provide a reply post-processor; it will be used if there is no
132133
* replyPostProcessor on the rabbit listener annotation. The input parameter is the
133134
* listener id.
134-
* @param replyPostProcessorProvider the post processor.
135+
* @param replyPostProcessorProvider the post-processor.
135136
* @since 3.0
136137
*/
137138
public void setReplyPostProcessorProvider(

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RetryInterceptorBuilder.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
* @author James Carr
6363
* @author Gary Russell
6464
* @author Artem Bilan
65+
* @author Stephane Nicoll
6566
*
6667
* @since 1.3
6768
*
@@ -100,13 +101,14 @@ protected final B _this() { // NOSONAR starts with _
100101
}
101102

102103
/**
103-
* Apply the retry policy - cannot be used if a custom retry template has been provided, or if retry
104+
* Apply the retry policy - cannot be used if a custom retry template has been provided or if the retry
104105
* policy has been customized already.
105106
* @param policy The policy.
106107
* @return this.
107108
*/
108109
public B retryPolicy(RetryPolicy policy) {
109-
Assert.isTrue(!this.templateAltered, "cannot set the retry policy if max attempts or back off policy or options changed");
110+
Assert.isTrue(!this.templateAltered,
111+
"cannot set the retry policy if max attempts or back off policy or options changed");
110112
this.retryPolicy = policy;
111113
this.retryPolicySet = true;
112114
this.templateAltered = true;
@@ -135,7 +137,7 @@ public B maxAttempts(int maxAttempts) {
135137
}
136138

137139
/**
138-
* Apply the backoff options. Cannot be used if a custom retry operations, or back off policy has been set.
140+
* Apply the backoff options. Cannot be used if a custom retry operations or back off policy has been set.
139141
* @param initialInterval The initial interval.
140142
* @param multiplier The multiplier.
141143
* @param maxInterval The max interval.
@@ -185,8 +187,8 @@ public static final class StatefulRetryInterceptorBuilder
185187
}
186188

187189
/**
188-
* Stateful retry requires messages to be identifiable. Default is to use the message id header; use a custom
189-
* implementation if the message id is not present or not reliable.
190+
* Stateful retry requires messages to be identifiable. The default is to use the message id header;
191+
* use a custom implementation if the message id is not present or not reliable.
190192
* @param messageKeyGenerator The key generator.
191193
* @return this.
192194
*/
@@ -196,7 +198,7 @@ public StatefulRetryInterceptorBuilder messageKeyGenerator(MessageKeyGenerator m
196198
}
197199

198200
/**
199-
* Apply a custom new message identifier. Default is to use the redelivered header.
201+
* Apply a custom new message identifier. The default is to use the redelivered header.
200202
* @param newMessageIdentifier The new message identifier.
201203
* @return this.
202204
*/

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
*
4343
* @author Juergen Hoeller
4444
* @author Stephane Nicoll
45+
*
46+
* @since 4.0
4547
*/
4648
public final class StatefulRetryOperationsInterceptor implements MethodInterceptor {
4749

@@ -60,6 +62,7 @@ public final class StatefulRetryOperationsInterceptor implements MethodIntercept
6062
StatefulRetryOperationsInterceptor(MessageKeyGenerator messageKeyGenerator,
6163
NewMessageIdentifier newMessageIdentifier, @Nullable RetryPolicy retryPolicy,
6264
@Nullable MessageRecoverer messageRecoverer) {
65+
6366
this.messageKeyGenerator = messageKeyGenerator;
6467
this.newMessageIdentifier = newMessageIdentifier;
6568
this.retryPolicy = (retryPolicy != null) ? retryPolicy : RetryPolicy.builder().build();
@@ -71,7 +74,7 @@ public final class StatefulRetryOperationsInterceptor implements MethodIntercept
7174
@Nullable Object[] args = invocation.getArguments();
7275
Message message = argToMessage(args);
7376
Object key = this.messageKeyGenerator.getKey(message);
74-
if (key == null) { // state cannot be managed without a key
77+
if (key == null) { // Means no retry for this message anymore.
7578
return invocation.proceed();
7679
}
7780
RetryState retryState = this.cache.get(key);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,19 @@
2626
/**
2727
* Convenient factory bean for creating a stateful retry interceptor for use in a message listener container, giving you
2828
* a large amount of control over the behaviour of a container when a listener fails. To control the number of retry
29-
* attempt or the backoff in between attempts, supply a customized {@link RetryPolicy}. Stateful retry is appropriate
30-
* if your listener is using a transactional resource that needs to be rollback on an exception (e.g. a stateful
29+
* attempts or the backoff in between attempts, supply a customized {@link RetryPolicy}. Stateful retry is appropriate
30+
* if your listener is using a transactional resource that needs to be rolled back on an exception (e.g. a stateful
3131
* connection to a back end server). JPA is the canonical example. The semantics of stateful retry mean that a listener
32-
* exception is propagated to the container, so that it can force a rollback. When the message is redelivered it has to
32+
* exception is propagated to the container so that it can force a rollback. When the message is redelivered it has to
3333
* be recognised (hence the {@link MessageKeyGenerator} strategy), and when the retry attempts are exhausted it will be
34-
* processed using a {@link MessageRecoverer} if one is provided, in a new transaction. If a recoverer is not provided
34+
* processed using a {@link MessageRecoverer} if one is provided, in a new transaction. If a recoverer is not provided,
3535
* the message will be logged and dropped.
3636
*
3737
* @author Dave Syer
3838
* @author Gary Russell
3939
* @author Ngoc Nhan
4040
* @author Artem Bilan
41+
* @author Stephane Nicoll
4142
*
4243
* @see RetryPolicy#shouldRetry(Throwable)
4344
*/

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
*
3434
* @author Juergen Hoeller
3535
* @author Stephane Nicoll
36+
*
37+
* @since 4.0
3638
*/
3739
public final class StatelessRetryOperationsInterceptor implements MethodInterceptor {
3840

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* Convenient factory bean for creating a stateless retry interceptor for use in a message listener container, giving
3434
* you a large amount of control over the behaviour of a container when a listener fails. To control the number of retry
35-
* attempt or the backoff in between attempts, supply a customized {@link RetryPolicy}. Stateless retry is appropriate
35+
* attempts or the backoff in between attempts, supply a customized {@link RetryPolicy}. Stateless retry is appropriate
3636
* if your listener can be called repeatedly between failures with no side effects. The semantics of stateless retry
3737
* mean that a listener exception is not propagated to the container until the retry attempts are exhausted. When the
3838
* retry attempts are exhausted it can be processed using a {@link MessageRecoverer} if one is provided, in the same
@@ -41,6 +41,7 @@
4141
*
4242
* @author Dave Syer
4343
* @author Gary Russell
44+
* @author Stephane Nicoll
4445
*
4546
* @see RetryOperations#execute(Retryable)
4647
*/

0 commit comments

Comments
 (0)