Skip to content

Commit 6087886

Browse files
garyrussellartembilan
authored andcommitted
GH-1058: Fix Reject/Requeue for Async Returns
Fixes #1058 The listener adapter did not honor the exception types used to complete the async result and always requeued. Use trusty on travis.
1 parent 47e5e17 commit 6087886

File tree

6 files changed

+131
-4
lines changed

6 files changed

+131
-4
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
dist: trusty
12
language: java
23
jdk: oraclejdk8
34
sudo: false

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
396396
.acceptIfNotNull(this.beforeSendReplyPostProcessors, messageListener::setBeforeSendReplyPostProcessors)
397397
.acceptIfNotNull(this.retryTemplate, messageListener::setRetryTemplate)
398398
.acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null, this.recoveryCallback,
399-
messageListener::setRecoveryCallback);
399+
messageListener::setRecoveryCallback)
400+
.acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected);
400401
}
401402
initializeContainer(instance, endpoint);
402403

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.amqp.core.Message;
3333
import org.springframework.amqp.core.MessagePostProcessor;
3434
import org.springframework.amqp.core.MessageProperties;
35+
import org.springframework.amqp.rabbit.listener.ContainerUtils;
3536
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
3637
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
3738
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
@@ -114,6 +115,7 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe
114115

115116
private boolean isManualAck;
116117

118+
private boolean defaultRequeueRejected = true;
117119

118120
/**
119121
* Set the routing key to use when sending response messages.
@@ -260,6 +262,16 @@ protected MessageConverter getMessageConverter() {
260262
return this.messageConverter;
261263
}
262264

265+
/**
266+
* Set to the value of this listener's container equivalent property. Used when
267+
* rejecting from an async listener.
268+
* @param defaultRequeueRejected false to not requeue.
269+
* @since 2.1.8
270+
*/
271+
public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
272+
this.defaultRequeueRejected = defaultRequeueRejected;
273+
}
274+
263275
@Override
264276
public void containerAckMode(AcknowledgeMode mode) {
265277
this.isManualAck = AcknowledgeMode.MANUAL.equals(mode);
@@ -391,7 +403,8 @@ private void basicAck(Message request, Channel channel) {
391403
private void asyncFailure(Message request, Channel channel, Throwable t) {
392404
this.logger.error("Future or Mono was completed with an exception for " + request, t);
393405
try {
394-
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
406+
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false,
407+
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, t, this.logger));
395408
}
396409
catch (IOException e) {
397410
this.logger.error("Failed to nack message", e);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/AsyncListenerTests.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Collections;
23+
import java.util.HashMap;
2324
import java.util.List;
25+
import java.util.Map;
2426
import java.util.concurrent.CountDownLatch;
2527
import java.util.concurrent.TimeUnit;
2628
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,6 +31,8 @@
2931
import org.junit.Test;
3032
import org.junit.runner.RunWith;
3133

34+
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
35+
import org.springframework.amqp.ImmediateRequeueAmqpException;
3236
import org.springframework.amqp.core.AcknowledgeMode;
3337
import org.springframework.amqp.core.AnonymousQueue;
3438
import org.springframework.amqp.core.Queue;
@@ -88,6 +92,15 @@ public class AsyncListenerTests {
8892
@Autowired
8993
private Queue queue4;
9094

95+
@Autowired
96+
private Queue queue5;
97+
98+
@Autowired
99+
private Queue queue6;
100+
101+
@Autowired
102+
private Queue queue7;
103+
91104
@Autowired
92105
private Listener listener;
93106

@@ -108,6 +121,19 @@ public void testAsyncListener() throws Exception {
108121
assertThat(listener.latch4.await(10, TimeUnit.SECONDS));
109122
}
110123

124+
@Test
125+
public void testRouteToDLQ() throws InterruptedException {
126+
this.rabbitTemplate.convertAndSend(this.queue5.getName(), "foo");
127+
assertThat(this.listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
128+
this.rabbitTemplate.convertAndSend(this.queue6.getName(), "foo");
129+
assertThat(this.listener.latch6.await(10, TimeUnit.SECONDS)).isTrue();
130+
}
131+
132+
@Test
133+
public void testOverrideDontRequeue() throws Exception {
134+
assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue7.getName(), "foo")).isEqualTo("listen7");
135+
}
136+
111137
@Configuration
112138
@EnableRabbit
113139
public static class EnableRabbitConfig {
@@ -131,6 +157,17 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
131157
return factory;
132158
}
133159

160+
@Bean
161+
public SimpleRabbitListenerContainerFactory dontRequeueFactory() {
162+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
163+
factory.setConnectionFactory(rabbitConnectionFactory());
164+
factory.setMismatchedQueuesFatal(true);
165+
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
166+
factory.setMessageConverter(converter());
167+
factory.setDefaultRequeueRejected(false);
168+
return factory;
169+
}
170+
134171
@Bean
135172
public ConnectionFactory rabbitConnectionFactory() {
136173
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
@@ -180,6 +217,37 @@ public Queue queue4() {
180217
return new AnonymousQueue();
181218
}
182219

220+
@Bean
221+
public Queue queue5() {
222+
Map<String, Object> args = new HashMap<>();
223+
args.put("x-dead-letter-exchange", "");
224+
args.put("x-dead-letter-routing-key", queue5DLQ().getName());
225+
return new AnonymousQueue(args);
226+
}
227+
228+
@Bean
229+
public Queue queue5DLQ() {
230+
return new AnonymousQueue();
231+
}
232+
233+
@Bean
234+
public Queue queue6() {
235+
Map<String, Object> args = new HashMap<>();
236+
args.put("x-dead-letter-exchange", "");
237+
args.put("x-dead-letter-routing-key", queue6DLQ().getName());
238+
return new AnonymousQueue(args);
239+
}
240+
241+
@Bean
242+
public Queue queue6DLQ() {
243+
return new AnonymousQueue();
244+
}
245+
246+
@Bean
247+
public Queue queue7() {
248+
return new AnonymousQueue();
249+
}
250+
183251
@Bean
184252
public Listener listener() {
185253
return new Listener();
@@ -196,6 +264,12 @@ public static class Listener {
196264

197265
private final CountDownLatch latch4 = new CountDownLatch(1);
198266

267+
private final CountDownLatch latch5 = new CountDownLatch(1);
268+
269+
private final CountDownLatch latch6 = new CountDownLatch(1);
270+
271+
private final AtomicBoolean first7 = new AtomicBoolean(true);
272+
199273
@RabbitListener(id = "foo", queues = "#{queue1.name}")
200274
public ListenableFuture<String> listen1(String foo) {
201275
SettableListenableFuture<String> future = new SettableListenableFuture<>();
@@ -231,6 +305,43 @@ public ListenableFuture<Void> listen4(@SuppressWarnings("unused") String foo) {
231305
return future;
232306
}
233307

308+
@RabbitListener(id = "fiz", queues = "#{queue5.name}")
309+
public ListenableFuture<Void> listen5(@SuppressWarnings("unused") String foo) {
310+
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
311+
future.setException(new AmqpRejectAndDontRequeueException("asyncToDLQ"));
312+
return future;
313+
}
314+
315+
@RabbitListener(id = "buz", queues = "#{queue5DLQ.name}")
316+
public void listen5DLQ(@SuppressWarnings("unused") String foo) {
317+
this.latch5.countDown();
318+
}
319+
320+
@RabbitListener(id = "fix", queues = "#{queue6.name}", containerFactory = "dontRequeueFactory")
321+
public ListenableFuture<Void> listen6(@SuppressWarnings("unused") String foo) {
322+
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
323+
future.setException(new IllegalStateException("asyncDefaultToDLQ"));
324+
return future;
325+
}
326+
327+
@RabbitListener(id = "fox", queues = "#{queue6DLQ.name}")
328+
public void listen6DLQ(@SuppressWarnings("unused") String foo) {
329+
this.latch6.countDown();
330+
}
331+
332+
@RabbitListener(id = "overrideFactoryRequeue", queues = "#{queue7.name}",
333+
containerFactory = "dontRequeueFactory")
334+
public ListenableFuture<String> listen7(@SuppressWarnings("unused") String foo) {
335+
SettableListenableFuture<String> future = new SettableListenableFuture<>();
336+
if (this.first7.compareAndSet(true, false)) {
337+
future.setException(new ImmediateRequeueAmqpException("asyncOverrideDefaultToDLQ"));
338+
}
339+
else {
340+
future.set("listen7");
341+
}
342+
return future;
343+
}
344+
234345
}
235346

236347
}

src/reference/asciidoc/amqp.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2834,8 +2834,9 @@ Starting with version 2.1, `@RabbitListener` (and `@RabbitHandler`) methods can
28342834

28352835
IMPORTANT: The listener container factory must be configured with `AcknowledgeMode.MANUAL` so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.
28362836
When the async result is completed with an error, whether the message is requeued or not depends on the exception type thrown, the container configuration, and the container error handler.
2837-
By default, the message will be requeued, unless the container's `defaultRequeueRejected` property is set to `false`.
2837+
By default, the message will be requeued, unless the container's `defaultRequeueRejected` property is set to `false` (it is `true` by default).
28382838
If the async result is completed with an `AmqpRejectAndDontRequeueException`, the message will not be requeued.
2839+
If the container's `defaultRequeueRejected` property is `false`, you can override that by setting the future's exception to a `ImmediateRequeueException` and the message will be requeued.
28392840
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be acknowledged or requeued.
28402841

28412842
[[threading]]

src/reference/asciidoc/whats-new.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ See <<async-annotation-driven-enable>> for more information.
8686
===== Async `@RabbitListener` Return
8787

8888
`@RabbitListener` methods can now return `ListenableFuture<?>` or `Mono<?>`.
89-
See <<async-return>> for more information.
89+
See <<async-returns>> for more information.
9090

9191
===== Connection Factory Bean Changes
9292

0 commit comments

Comments
 (0)