Skip to content

Commit d75777d

Browse files
artembilangaryrussell
authored andcommitted
AMQP-813: Requeue after retry exhausted option
JIRA: https://jira.spring.io/browse/AMQP-813 * Add `ImmediateRequeueAmqpException` to let `ContainerUtils.shouldRequeue()` to return `true` immediately and requeue the message in the container * Add `ImmediateRequeueMessageRecoverer` to throw a mentioned above `ImmediateRequeueAmqpException` * Refactor `RetryInterceptorBuilder` to avoid duplicated code * Mention changes in the Docs * Mention `ImmediateRequeueAmqpException` in Docs alongside with the `defaultRequeueRejected` Doc Polishing
1 parent 88f57fa commit d75777d

File tree

7 files changed

+182
-90
lines changed

7 files changed

+182
-90
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp;
18+
19+
/**
20+
* The special {@link AmqpException} to be thrown from the listener (e.g. retry recoverer callback)
21+
* to {@code requeue) failed message.
22+
*
23+
* @author Artem Bilan
24+
*
25+
* @since 2.1
26+
*/
27+
@SuppressWarnings("serial")
28+
public class ImmediateRequeueAmqpException extends AmqpException {
29+
30+
public ImmediateRequeueAmqpException(String message) {
31+
super(message);
32+
}
33+
34+
public ImmediateRequeueAmqpException(Throwable cause) {
35+
super(cause);
36+
}
37+
38+
public ImmediateRequeueAmqpException(String message, Throwable cause) {
39+
super(message, cause);
40+
}
41+
42+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RetryInterceptorBuilder.java

Lines changed: 24 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,10 +63,12 @@
6363
*
6464
* @author James Carr
6565
* @author Gary Russell
66+
* @author Artem Bilan
67+
*
6668
* @since 1.3
6769
*
6870
*/
69-
public abstract class RetryInterceptorBuilder<T extends MethodInterceptor> {
71+
public abstract class RetryInterceptorBuilder<B extends RetryInterceptorBuilder<B, T>, T extends MethodInterceptor> {
7072

7173
private RetryOperations retryOperations;
7274

@@ -100,16 +102,21 @@ public static StatelessRetryInterceptorBuilder stateless() {
100102
return new StatelessRetryInterceptorBuilder();
101103
}
102104

105+
@SuppressWarnings("unchecked")
106+
protected final B _this() {
107+
return (B) this;
108+
}
109+
103110
/**
104111
* Apply the retry operations - once this is set, other properties can no longer be set; can't
105112
* be set if other properties have been applied.
106113
* @param retryOperations The retry operations.
107114
* @return this.
108115
*/
109-
public RetryInterceptorBuilder<T> retryOperations(RetryOperations retryOperations) {
116+
public B retryOperations(RetryOperations retryOperations) {
110117
Assert.isTrue(!this.templateAltered, "Cannot set retryOperations when the default has been modified");
111118
this.retryOperations = retryOperations;
112-
return this;
119+
return _this();
113120
}
114121

115122
/**
@@ -118,13 +125,13 @@ public RetryInterceptorBuilder<T> retryOperations(RetryOperations retryOperation
118125
* @param maxAttempts the max attempts.
119126
* @return this.
120127
*/
121-
public RetryInterceptorBuilder<T> maxAttempts(int maxAttempts) {
128+
public B maxAttempts(int maxAttempts) {
122129
Assert.isNull(this.retryOperations, "cannot alter the retry policy when a custom retryOperations has been set");
123130
Assert.isTrue(!this.retryPolicySet, "cannot alter the retry policy when a custom retryPolicy has been set");
124131
this.simpleRetryPolicy.setMaxAttempts(maxAttempts);
125132
this.retryTemplate.setRetryPolicy(this.simpleRetryPolicy);
126133
this.templateAltered = true;
127-
return this;
134+
return _this();
128135
}
129136

130137
/**
@@ -134,7 +141,7 @@ public RetryInterceptorBuilder<T> maxAttempts(int maxAttempts) {
134141
* @param maxInterval The max interval.
135142
* @return this.
136143
*/
137-
public RetryInterceptorBuilder<T> backOffOptions(long initialInterval, double multiplier, long maxInterval) {
144+
public B backOffOptions(long initialInterval, double multiplier, long maxInterval) {
138145
Assert.isNull(this.retryOperations, "cannot set the back off policy when a custom retryOperations has been set");
139146
Assert.isTrue(!this.backOffPolicySet, "cannot set the back off options when a back off policy has been set");
140147
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
@@ -144,7 +151,7 @@ public RetryInterceptorBuilder<T> backOffOptions(long initialInterval, double mu
144151
this.retryTemplate.setBackOffPolicy(policy);
145152
this.backOffOptionsSet = true;
146153
this.templateAltered = true;
147-
return this;
154+
return _this();
148155
}
149156

150157
/**
@@ -153,37 +160,37 @@ public RetryInterceptorBuilder<T> backOffOptions(long initialInterval, double mu
153160
* @param policy The policy.
154161
* @return this.
155162
*/
156-
public RetryInterceptorBuilder<T> retryPolicy(RetryPolicy policy) {
163+
public B retryPolicy(RetryPolicy policy) {
157164
Assert.isNull(this.retryOperations, "cannot set the retry policy when a custom retryOperations has been set");
158165
Assert.isTrue(!this.templateAltered, "cannot set the retry policy if max attempts or back off policy or options changed");
159166
this.retryTemplate.setRetryPolicy(policy);
160167
this.retryPolicySet = true;
161168
this.templateAltered = true;
162-
return this;
169+
return _this();
163170
}
164171

165172
/**
166173
* Apply the back off policy. Cannot be used if a custom retry operations, or back off policy has been applied.
167174
* @param policy The policy.
168175
* @return this.
169176
*/
170-
public RetryInterceptorBuilder<T> backOffPolicy(BackOffPolicy policy) {
177+
public B backOffPolicy(BackOffPolicy policy) {
171178
Assert.isNull(this.retryOperations, "cannot set the back off policy when a custom retryOperations has been set");
172179
Assert.isTrue(!this.backOffOptionsSet, "cannot set the back off policy when the back off policy options have been set");
173180
this.retryTemplate.setBackOffPolicy(policy);
174181
this.templateAltered = true;
175182
this.backOffPolicySet = true;
176-
return this;
183+
return _this();
177184
}
178185

179186
/**
180187
* Apply a Message recoverer - default is to log and discard after retry is exhausted.
181188
* @param recoverer The recoverer.
182189
* @return this.
183190
*/
184-
public RetryInterceptorBuilder<T> recoverer(MessageRecoverer recoverer) {
191+
public B recoverer(MessageRecoverer recoverer) {
185192
this.messageRecoverer = recoverer;
186-
return this;
193+
return _this();
187194
}
188195

189196
protected void applyCommonSettings(AbstractRetryOperationsInterceptorFactoryBean factoryBean) {
@@ -204,7 +211,8 @@ protected void applyCommonSettings(AbstractRetryOperationsInterceptorFactoryBean
204211
/**
205212
* Builder for a stateful interceptor.
206213
*/
207-
public static final class StatefulRetryInterceptorBuilder extends RetryInterceptorBuilder<StatefulRetryOperationsInterceptor> {
214+
public static final class StatefulRetryInterceptorBuilder
215+
extends RetryInterceptorBuilder<StatefulRetryInterceptorBuilder, StatefulRetryOperationsInterceptor> {
208216

209217
private final StatefulRetryOperationsInterceptorFactoryBean factoryBean =
210218
new StatefulRetryOperationsInterceptorFactoryBean();
@@ -238,44 +246,6 @@ public StatefulRetryInterceptorBuilder newMessageIdentifier(NewMessageIdentifier
238246
return this;
239247
}
240248

241-
@Override
242-
public StatefulRetryInterceptorBuilder retryOperations(
243-
RetryOperations retryOperations) {
244-
super.retryOperations(retryOperations);
245-
return this;
246-
}
247-
248-
@Override
249-
public StatefulRetryInterceptorBuilder maxAttempts(int maxAttempts) {
250-
super.maxAttempts(maxAttempts);
251-
return this;
252-
}
253-
254-
@Override
255-
public StatefulRetryInterceptorBuilder backOffOptions(long initialInterval,
256-
double multiplier, long maxInterval) {
257-
super.backOffOptions(initialInterval, multiplier, maxInterval);
258-
return this;
259-
}
260-
261-
@Override
262-
public StatefulRetryInterceptorBuilder retryPolicy(RetryPolicy policy) {
263-
super.retryPolicy(policy);
264-
return this;
265-
}
266-
267-
@Override
268-
public StatefulRetryInterceptorBuilder backOffPolicy(BackOffPolicy policy) {
269-
super.backOffPolicy(policy);
270-
return this;
271-
}
272-
273-
@Override
274-
public StatefulRetryInterceptorBuilder recoverer(MessageRecoverer recoverer) {
275-
super.recoverer(recoverer);
276-
return this;
277-
}
278-
279249
@Override
280250
public StatefulRetryOperationsInterceptor build() {
281251
this.applyCommonSettings(this.factoryBean);
@@ -295,7 +265,7 @@ public StatefulRetryOperationsInterceptor build() {
295265
* Builder for a stateless interceptor.
296266
*/
297267
public static final class StatelessRetryInterceptorBuilder
298-
extends RetryInterceptorBuilder<RetryOperationsInterceptor> {
268+
extends RetryInterceptorBuilder<StatelessRetryInterceptorBuilder, RetryOperationsInterceptor> {
299269

300270
private final StatelessRetryOperationsInterceptorFactoryBean factoryBean =
301271
new StatelessRetryOperationsInterceptorFactoryBean();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ContainerUtils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.apache.commons.logging.Log;
2020

2121
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
22+
import org.springframework.amqp.ImmediateRequeueAmqpException;
2223

2324
/**
2425
* Utility methods for listener containers.
2526
*
2627
* @author Gary Russell
28+
* @author Artem Bilan
2729
*
2830
* @since 2.1
2931
*
@@ -45,7 +47,8 @@ private ContainerUtils() {
4547
*/
4648
public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
4749
boolean shouldRequeue = defaultRequeueRejected ||
48-
throwable instanceof MessageRejectedWhileStoppingException;
50+
throwable instanceof MessageRejectedWhileStoppingException ||
51+
throwable instanceof ImmediateRequeueAmqpException;
4952
Throwable t = throwable;
5053
while (shouldRequeue && t != null) {
5154
if (t instanceof AmqpRejectAndDontRequeueException) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.retry;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
22+
import org.springframework.amqp.ImmediateRequeueAmqpException;
23+
import org.springframework.amqp.core.Message;
24+
25+
/**
26+
* The {@link MessageRecoverer} implementation to throw an {@link ImmediateRequeueAmqpException}
27+
* for subsequent requeuing in the listener container.
28+
*
29+
* @author Artem Bilan
30+
*
31+
* @since 2.1
32+
*/
33+
public class ImmediateRequeueMessageRecoverer implements MessageRecoverer {
34+
35+
protected Log logger = LogFactory.getLog(ImmediateRequeueMessageRecoverer.class);
36+
37+
@Override
38+
public void recover(Message message, Throwable cause) {
39+
if (this.logger.isWarnEnabled()) {
40+
this.logger.warn("Retries exhausted for message " + message + "; requeuing...", cause);
41+
}
42+
throw new ImmediateRequeueAmqpException(cause);
43+
}
44+
45+
}

0 commit comments

Comments
 (0)