Skip to content

Commit 14fe215

Browse files
committed
GH-3039: Move Recovery in BlockingQueueConsumer into stop()
Fixes: #3039 Issue link: #3039 Currently, the `BlockingQueueConsumer` initiates a Basic Recovery command on the channel for transactional consumer immediately after Basic Cancel. However, it is possible still to try to handle in-flight messages during `shutdownTimeout` in the listener container * Leave only Basic Cancel command in the `BlockingQueueConsumer.basicCancel()` API * Revert `BlockingQueueConsumer.nextMessage(timeout)` method logic to normal loop until message pulled from the in-memory cache is `null` * Call `basicCancel(true)` from the `stop()` is not cancelled yet * Perform `channel.basicRecover()` for transactional channel in the `stop()`. This `stop()` is usually called from the listener container when in-flight messages have not been processed during `shutdownTimeout` **Auto-cherry-pick to `3.2.x` & `3.1.x`**
1 parent 1dd3f4d commit 14fe215

File tree

1 file changed

+26
-40
lines changed

1 file changed

+26
-40
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -464,18 +464,19 @@ int getQueueCount() {
464464
}
465465

466466
protected void basicCancel() {
467-
basicCancel(false);
467+
basicCancel(true);
468468
}
469469

470470
protected void basicCancel(boolean expected) {
471471
this.normalCancel = expected;
472+
getConsumerTags()
473+
.forEach(consumerTag -> {
474+
if (this.channel.isOpen()) {
475+
RabbitUtils.cancel(this.channel, consumerTag);
476+
}
477+
});
472478
this.cancelled.set(true);
473479
this.abortStarted = System.currentTimeMillis();
474-
475-
Collection<String> consumerTags = getConsumerTags();
476-
if (!CollectionUtils.isEmpty(consumerTags)) {
477-
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
478-
}
479480
}
480481

481482
protected boolean hasDelivery() {
@@ -559,35 +560,12 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
559560
if (!this.missingQueues.isEmpty()) {
560561
checkMissingQueues();
561562
}
562-
563-
if (this.transactional && cancelled()) {
564-
throw consumerCancelledException(null);
565-
}
566-
else {
567-
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
568-
if (cancelled() && (message == null || this.transactional)) {
569-
Long deliveryTagToNack = null;
570-
if (message != null) {
571-
deliveryTagToNack = message.getMessageProperties().getDeliveryTag();
572-
}
573-
throw consumerCancelledException(deliveryTagToNack);
574-
}
575-
else {
576-
return message;
577-
}
578-
}
579-
}
580-
581-
private ConsumerCancelledException consumerCancelledException(@Nullable Long deliveryTagToNack) {
582-
this.activeObjectCounter.release(this);
583-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
584-
if (deliveryTagToNack != null) {
585-
rollbackOnExceptionIfNecessary(consumerCancelledException, deliveryTagToNack);
586-
}
587-
else {
588-
this.deliveryTags.clear();
563+
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
564+
if (message == null && this.cancelled.get()) {
565+
this.activeObjectCounter.release(this);
566+
throw new ConsumerCancelledException();
589567
}
590-
return consumerCancelledException;
568+
return message;
591569
}
592570

593571
/*
@@ -815,13 +793,21 @@ public void stop() {
815793
this.abortStarted = System.currentTimeMillis();
816794
}
817795
if (!cancelled()) {
818-
try {
819-
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
796+
basicCancel(true);
797+
}
798+
try {
799+
if (this.transactional) {
800+
/*
801+
* Re-queue in-flight messages if any
802+
* (after the consumer is cancelled to prevent the broker from simply sending them back to us).
803+
* Does not require a tx.commit.
804+
*/
805+
this.channel.basicRecover(true);
820806
}
821-
catch (Exception e) {
822-
if (logger.isDebugEnabled()) {
823-
logger.debug("Error closing consumer " + this, e);
824-
}
807+
}
808+
catch (Exception e) {
809+
if (logger.isDebugEnabled()) {
810+
logger.debug("Error closing consumer " + this, e);
825811
}
826812
}
827813
if (logger.isDebugEnabled()) {

0 commit comments

Comments
 (0)