Skip to content

Commit a4f014d

Browse files
authored
GH-1409: Fix Nacks for Async Replies
Resolves #1409 Normally, when message has a fatal exception (such as message conversion) `basicNack` with `multiple` true is used, to nack any previously unacked messages (e.g. when using batch size to limit the ack traffic). Even when using manual acks, fatal exceptions are nacked by the container because the user does not have access to the message. However, when using async replies, this has the side effect of nacking unprocessed messages. Detect whether async replies are being used and only nack individual records that cause fatal exceptions. Also, coerce the `AcknowledgeMode` to `MANUAL` for such listners. Add a test for both containers; send a good message followed by a bad one without actually completing the reply future. After the exception occurs and the container is stopped, there should be one messag in the queue. * Remove warning, deprecation; add docs. * Docs. **Cherry-pick to `2.3.x` & `2.2.x`**
1 parent 687b515 commit a4f014d

File tree

13 files changed

+328
-32
lines changed

13 files changed

+328
-32
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,16 @@ default void containerAckMode(AcknowledgeMode mode) {
4343
// NOSONAR - empty
4444
}
4545

46+
/**
47+
* Return true if this listener is request/reply and the replies are
48+
* async.
49+
* @return true for async replies.
50+
* @since 2.2.21
51+
*/
52+
default boolean isAsyncReplies() {
53+
return false;
54+
}
55+
4656
/**
4757
* Delivers a batch of messages.
4858
* @param messages the messages.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
255255

256256
private volatile boolean lazyLoad;
257257

258+
private boolean asyncReplies;
259+
258260
@Override
259261
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
260262
this.applicationEventPublisher = applicationEventPublisher;
@@ -439,6 +441,7 @@ public void setMessageListener(MessageListener messageListener) {
439441
this.messageListener = messageListener;
440442
this.isBatchListener = messageListener instanceof BatchMessageListener
441443
|| messageListener instanceof ChannelAwareBatchMessageListener;
444+
this.asyncReplies = messageListener.isAsyncReplies();
442445
}
443446

444447
/**
@@ -1016,10 +1019,12 @@ public boolean isPossibleAuthenticationFailureFatal() {
10161019
return this.possibleAuthenticationFailureFatal;
10171020
}
10181021

1019-
10201022
protected boolean isPossibleAuthenticationFailureFatalSet() {
10211023
return this.possibleAuthenticationFailureFatalSet;
10221024
}
1025+
protected boolean isAsyncReplies() {
1026+
return this.asyncReplies;
1027+
}
10231028

10241029
/**
10251030
* Set to true to automatically declare elements (queues, exchanges, bindings)
@@ -1220,6 +1225,9 @@ public void afterPropertiesSet() {
12201225
catch (IllegalStateException e) {
12211226
this.logger.debug("Could not enable micrometer timers", e);
12221227
}
1228+
if (this.isAsyncReplies() && !AcknowledgeMode.MANUAL.equals(this.acknowledgeMode)) {
1229+
this.acknowledgeMode = AcknowledgeMode.MANUAL;
1230+
}
12231231
}
12241232

12251233
@Override

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,17 @@ public synchronized void stop() {
779779
* @param ex the thrown application exception or error
780780
*/
781781
public void rollbackOnExceptionIfNecessary(Throwable ex) {
782+
rollbackOnExceptionIfNecessary(ex, -1);
783+
}
784+
785+
/**
786+
* Perform a rollback, handling rollback exceptions properly.
787+
* @param ex the thrown application exception or error
788+
* @param tag delivery tag; when specified (greater than or equal to 0) only that
789+
* message is nacked.
790+
* @since 2.2.21.
791+
*/
792+
public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) {
782793

783794
boolean ackRequired = !this.acknowledgeMode.isAutoAck()
784795
&& (!this.acknowledgeMode.isManual() || ContainerUtils.isRejectManual(ex));
@@ -790,14 +801,20 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) {
790801
RabbitUtils.rollbackIfNecessary(this.channel);
791802
}
792803
if (ackRequired) {
793-
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
794-
if (deliveryTag.isPresent()) {
795-
this.channel.basicNack(deliveryTag.getAsLong(), true,
796-
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
804+
if (tag < 0) {
805+
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
806+
if (deliveryTag.isPresent()) {
807+
this.channel.basicNack(deliveryTag.getAsLong(), true,
808+
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
809+
}
810+
if (this.transactional) {
811+
// Need to commit the reject (=nack)
812+
RabbitUtils.commitIfNecessary(this.channel);
813+
}
797814
}
798-
if (this.transactional) {
799-
// Need to commit the reject (=nack)
800-
RabbitUtils.commitIfNecessary(this.channel);
815+
else {
816+
this.channel.basicNack(tag, false,
817+
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
801818
}
802819
}
803820
}
@@ -806,7 +823,12 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) {
806823
throw RabbitExceptionTranslator.convertRabbitAccessException(e); // NOSONAR stack trace loss
807824
}
808825
finally {
809-
this.deliveryTags.clear();
826+
if (tag < 0) {
827+
this.deliveryTags.clear();
828+
}
829+
else {
830+
this.deliveryTags.remove(tag);
831+
}
810832
}
811833
}
812834

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ private void rollback(long deliveryTag, Exception e) {
12141214
}
12151215
}
12161216
}
1217-
getChannel().basicNack(deliveryTag, true,
1217+
getChannel().basicNack(deliveryTag, !isAsyncReplies(),
12181218
ContainerUtils.shouldRequeue(isDefaultRequeueRejected(), e, this.logger));
12191219
}
12201220
catch (IOException e1) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,9 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
982982
}
983983
break;
984984
}
985+
long tagToRollback = isAsyncReplies()
986+
? message.getMessageProperties().getDeliveryTag()
987+
: -1;
985988
if (getTransactionManager() != null) {
986989
if (getTransactionAttribute().rollbackOn(ex)) {
987990
RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
@@ -994,7 +997,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
994997
* If we don't actually have a transaction, we have to roll back
995998
* manually. See prepareHolderForRollback().
996999
*/
997-
consumer.rollbackOnExceptionIfNecessary(ex);
1000+
consumer.rollbackOnExceptionIfNecessary(ex, tagToRollback);
9981001
}
9991002
throw ex; // encompassing transaction will handle the rollback.
10001003
}
@@ -1006,7 +1009,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
10061009
}
10071010
}
10081011
else {
1009-
consumer.rollbackOnExceptionIfNecessary(ex);
1012+
consumer.rollbackOnExceptionIfNecessary(ex, tagToRollback);
10101013
throw ex;
10111014
}
10121015
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.lang.reflect.Type;
2222
import java.lang.reflect.WildcardType;
2323
import java.util.Arrays;
24-
import java.util.function.Consumer;
2524

2625
import org.apache.commons.logging.Log;
2726
import org.apache.commons.logging.LogFactory;
@@ -56,7 +55,6 @@
5655
import org.springframework.util.concurrent.ListenableFuture;
5756

5857
import com.rabbitmq.client.Channel;
59-
import reactor.core.publisher.Mono;
6058

6159
/**
6260
* An abstract {@link org.springframework.amqp.core.MessageListener} adapter providing the
@@ -81,7 +79,7 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe
8179

8280
private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
8381

84-
private static final boolean monoPresent = // NOSONAR - lower case
82+
static final boolean monoPresent = // NOSONAR - lower case, protected
8583
ClassUtils.isPresent("reactor.core.publisher.Mono", ChannelAwareMessageListener.class.getClassLoader());
8684

8785
/**
@@ -695,19 +693,4 @@ public Object getResult() {
695693

696694
}
697695

698-
private static class MonoHandler { // NOSONAR - pointless to name it ..Utils|Helper
699-
700-
static boolean isMono(Object result) {
701-
return result instanceof Mono;
702-
}
703-
704-
@SuppressWarnings("unchecked")
705-
static void subscribe(Object returnValue, Consumer<? super Object> success,
706-
Consumer<? super Throwable> failure, Runnable completeConsumer) {
707-
708-
((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);
709-
}
710-
711-
}
712-
713696
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.lang.reflect.Method;
2121
import java.util.ArrayList;
2222
import java.util.Arrays;
23+
import java.util.Iterator;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.concurrent.ConcurrentHashMap;
@@ -44,6 +45,7 @@
4445
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
4546
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
4647
import org.springframework.util.Assert;
48+
import org.springframework.util.concurrent.ListenableFuture;
4749
import org.springframework.validation.Validator;
4850

4951

@@ -84,6 +86,8 @@ public class DelegatingInvocableHandler {
8486

8587
private final PayloadValidator validator;
8688

89+
private final boolean asyncReplies;
90+
8791
/**
8892
* Construct an instance with the supplied handlers for the bean.
8993
* @param handlers the handlers.
@@ -132,9 +136,19 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
132136
this.resolver = beanExpressionResolver;
133137
this.beanExpressionContext = beanExpressionContext;
134138
this.validator = validator == null ? null : new PayloadValidator(validator);
139+
boolean asyncReplies;
140+
asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler);
141+
Iterator<InvocableHandlerMethod> iterator = handlers.iterator();
142+
while (iterator.hasNext()) {
143+
asyncReplies |= isAsyncReply(iterator.next());
144+
}
145+
this.asyncReplies = asyncReplies;
135146
}
136147

137-
148+
private boolean isAsyncReply(InvocableHandlerMethod method) {
149+
return (AbstractAdaptableMessageListener.monoPresent && MonoHandler.isMono(method.getMethod().getReturnType()))
150+
|| ListenableFuture.class.isAssignableFrom(method.getMethod().getReturnType());
151+
}
138152

139153
/**
140154
* @return the bean
@@ -143,6 +157,15 @@ public Object getBean() {
143157
return this.bean;
144158
}
145159

160+
/**
161+
* Return true if any handler method has an async reply type.
162+
* @return the asyncReply.
163+
* @since 2.2.21
164+
*/
165+
public boolean isAsyncReplies() {
166+
return this.asyncReplies;
167+
}
168+
146169
/**
147170
* Invoke the method with the given message.
148171
* @param message the message.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.lang.Nullable;
2323
import org.springframework.messaging.Message;
2424
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
25+
import org.springframework.util.concurrent.ListenableFuture;
2526

2627
/**
2728
* A wrapper for either an {@link InvocableHandlerMethod} or
@@ -38,13 +39,18 @@ public class HandlerAdapter {
3839

3940
private final DelegatingInvocableHandler delegatingHandler;
4041

42+
private final boolean asyncReplies;
43+
4144
/**
4245
* Construct an instance with the provided method.
4346
* @param invokerHandlerMethod the method.
4447
*/
4548
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4649
this.invokerHandlerMethod = invokerHandlerMethod;
4750
this.delegatingHandler = null;
51+
this.asyncReplies = (AbstractAdaptableMessageListener.monoPresent
52+
&& MonoHandler.isMono(invokerHandlerMethod.getMethod().getReturnType()))
53+
|| ListenableFuture.class.isAssignableFrom(invokerHandlerMethod.getMethod().getReturnType());
4854
}
4955

5056
/**
@@ -54,6 +60,7 @@ public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
5460
public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) {
5561
this.invokerHandlerMethod = null;
5662
this.delegatingHandler = delegatingHandler;
63+
this.asyncReplies = delegatingHandler.isAsyncReplies();
5764
}
5865

5966
/**
@@ -139,6 +146,15 @@ public Object getBean() {
139146
}
140147
}
141148

149+
/**
150+
* Return true if any handler method has an async reply type.
151+
* @return the asyncReply.
152+
* @since 2.2.21
153+
*/
154+
public boolean isAsyncReplies() {
155+
return this.asyncReplies;
156+
}
157+
142158
/**
143159
* Build an {@link InvocationResult} for the result and inbound payload.
144160
* @param result the result.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ protected HandlerAdapter getHandlerAdapter() {
107107
return this.handlerAdapter;
108108
}
109109

110+
@Override
111+
public boolean isAsyncReplies() {
112+
return this.handlerAdapter.isAsyncReplies();
113+
}
114+
110115
/**
111116
* Set the {@link AmqpHeaderMapper} implementation to use to map the standard
112117
* AMQP headers. By default, a {@link org.springframework.amqp.support.SimpleAmqpHeaderMapper
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener.adapter;
18+
19+
import java.util.function.Consumer;
20+
21+
import reactor.core.publisher.Mono;
22+
23+
/**
24+
* Class to prevent direct links to {@link Mono}.
25+
* @author Gary Russell
26+
* @since 2.2.21
27+
*/
28+
final class MonoHandler { // NOSONAR - pointless to name it ..Utils|Helper
29+
30+
private MonoHandler() {
31+
}
32+
33+
static boolean isMono(Object result) {
34+
return result instanceof Mono;
35+
}
36+
37+
@SuppressWarnings("unchecked")
38+
static void subscribe(Object returnValue, Consumer<? super Object> success,
39+
Consumer<? super Throwable> failure, Runnable completeConsumer) {
40+
41+
((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);
42+
}
43+
44+
}

0 commit comments

Comments
 (0)