Skip to content

Commit a800b31

Browse files
committed
GH-2941: Fix BlockingQueueConsumer race condition on cancel
Fixes: #2941 Issue link: #2941 Now `BlockingQueueConsumer.basicCancel()` performs `RabbitUtils.closeMessageConsumer()` to initiate `basicRecovery` on the transactional consumer to re-queue all the un-acked messages. However, there is a race condition when one in-flight message may still be delivered to the listener and then TX commit is initiated. There a `basicAck()` is initiated. However, such a tag might already be discarded because of the previous `basicRecovery`. Therefore, adjust `BlockingQueueConsumer.commitIfNecessary()` to skip `basicAck()` if locally transacted and already cancelled. Right, this may lead to the duplication delivery, but having abnormal shutdown situation we cannot guarantee that this message to commit has been processed properly. Also, adjust `BlockingQueueConsumer.nextMessage()` to rollback a message if consumer is cancelled instead of going through the loop via listener * Increase `replyTimeout` in the `EnableRabbitReturnTypesTests` for resource-sensitive builds
1 parent 88c655e commit a800b31

File tree

2 files changed

+41
-21
lines changed

2 files changed

+41
-21
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ public class BlockingQueueConsumer {
167167

168168
private long consumeDelay;
169169

170-
private java.util.function.Consumer<String> missingQueuePublisher = str -> { };
170+
private java.util.function.Consumer<String> missingQueuePublisher = str -> {
171+
};
171172

172173
private boolean globalQos;
173174

@@ -464,12 +465,13 @@ protected void basicCancel() {
464465

465466
protected void basicCancel(boolean expected) {
466467
this.normalCancel = expected;
468+
this.cancelled.set(true);
469+
this.abortStarted = System.currentTimeMillis();
470+
467471
Collection<String> consumerTags = getConsumerTags();
468472
if (!CollectionUtils.isEmpty(consumerTags)) {
469473
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
470474
}
471-
this.cancelled.set(true);
472-
this.abortStarted = System.currentTimeMillis();
473475
}
474476

475477
protected boolean hasDelivery() {
@@ -555,12 +557,26 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
555557
if (!this.missingQueues.isEmpty()) {
556558
checkMissingQueues();
557559
}
558-
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
559-
if (message == null && this.cancelled.get()) {
560+
if (!cancelled()) {
561+
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
562+
if (message != null && cancelled()) {
563+
this.activeObjectCounter.release(this);
564+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
565+
rollbackOnExceptionIfNecessary(consumerCancelledException,
566+
message.getMessageProperties().getDeliveryTag());
567+
throw consumerCancelledException;
568+
}
569+
else {
570+
return message;
571+
}
572+
}
573+
else {
574+
this.deliveryTags.clear();
560575
this.activeObjectCounter.release(this);
561-
throw new ConsumerCancelledException();
576+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
577+
rollbackOnExceptionIfNecessary(consumerCancelledException);
578+
throw consumerCancelledException;
562579
}
563-
return message;
564580
}
565581

566582
/*
@@ -785,7 +801,7 @@ public void stop() {
785801
if (this.abortStarted == 0) { // signal handle delivery to use offer
786802
this.abortStarted = System.currentTimeMillis();
787803
}
788-
if (!this.cancelled()) {
804+
if (!cancelled()) {
789805
try {
790806
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
791807
}
@@ -889,22 +905,25 @@ boolean commitIfNecessary(boolean localTx, boolean forceAck) {
889905
/*
890906
* If we have a TX Manager, but no TX, act like we are locally transacted.
891907
*/
892-
boolean isLocallyTransacted = localTx
893-
|| (this.transactional
894-
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
908+
boolean isLocallyTransacted =
909+
localTx ||
910+
(this.transactional &&
911+
TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
895912
try {
896913
boolean ackRequired = forceAck || (!this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual());
897914

898-
if (ackRequired && (!this.transactional || isLocallyTransacted)) {
899-
long deliveryTag = new ArrayList<>(this.deliveryTags).get(this.deliveryTags.size() - 1);
900-
try {
901-
this.channel.basicAck(deliveryTag, true);
902-
notifyMessageAckListener(true, deliveryTag, null);
903-
}
904-
catch (Exception e) {
905-
logger.error("Error acking.", e);
906-
notifyMessageAckListener(false, deliveryTag, e);
907-
}
915+
if (ackRequired && (!this.transactional || (isLocallyTransacted && !cancelled()))) {
916+
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
917+
deliveryTag.ifPresent((tag) -> {
918+
try {
919+
this.channel.basicAck(tag, true);
920+
notifyMessageAckListener(true, tag, null);
921+
}
922+
catch (Exception e) {
923+
logger.error("Error acking.", e);
924+
notifyMessageAckListener(false, tag, e);
925+
}
926+
});
908927
}
909928

910929
if (isLocallyTransacted) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitReturnTypesTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Cachi
113113
public RabbitTemplate template(CachingConnectionFactory cf, Jackson2JsonMessageConverter converter) {
114114
RabbitTemplate template = new RabbitTemplate(cf);
115115
template.setMessageConverter(converter);
116+
template.setReplyTimeout(30_000);
116117
return template;
117118
}
118119

0 commit comments

Comments
 (0)