diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 301abbf728..e9d89b1947 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -84,6 +84,7 @@ import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.rabbit.support.ActiveObjectCounter; import org.springframework.amqp.rabbit.support.ConsumerCancelledException; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.Delivery; @@ -158,6 +159,7 @@ * @author Alexey Platonov * @author Leonardo Ferreira * @author Ngoc Nhan + * @author Jeongjun Min * * @since 1.0 */ @@ -189,6 +191,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count private final AtomicInteger activeTemplateCallbacks = new AtomicInteger(); + private final ActiveObjectCounter pendingRepliesCounter = new ActiveObjectCounter<>(); + private final ConcurrentMap publisherConfirmChannels = new ConcurrentHashMap<>(); private final Map replyHolder = new ConcurrentHashMap<>(); @@ -2071,6 +2075,7 @@ private DirectReplyToMessageListenerContainer createReplyToContainer(ConnectionF messageTag = String.valueOf(this.messageTagProvider.incrementAndGet()); } saveAndSetProperties(message, pendingReply, messageTag); + this.pendingRepliesCounter.add(pendingReply); this.replyHolder.put(messageTag, pendingReply); if (noCorrelation) { this.replyHolder.put(channel, pendingReply); @@ -2150,11 +2155,19 @@ private void saveAndSetProperties(final Message message, final PendingReply pend /** * Subclasses can implement this to be notified that a reply has timed out. + * The default implementation also releases the counter for pending replies. + * Subclasses should call {@code super.replyTimedOut(correlationId)} if they + * override this method and wish to maintain this behavior. * @param correlationId the correlationId * @since 2.1.2 */ protected void replyTimedOut(@Nullable String correlationId) { - // NOSONAR + if (correlationId != null) { + Object pending = this.replyHolder.get(correlationId); + if (pending != null) { + this.pendingRepliesCounter.release(pending); + } + } } /** @@ -2666,6 +2679,11 @@ public String getUUID() { return this.uuid; } + @Override + public ActiveObjectCounter getPendingReplyCounter() { + return this.pendingRepliesCounter; + } + @Override public void onMessage(Message message, @Nullable Channel channel) { if (logger.isTraceEnabled()) { @@ -2696,6 +2714,7 @@ public void onMessage(Message message, @Nullable Channel channel) { else { restoreProperties(message, pendingReply); pendingReply.reply(message); + this.pendingRepliesCounter.release(pendingReply); } } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 4849d9b279..66240d2366 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -87,6 +87,7 @@ * @author Jeonggi Kim * @author Java4ye * @author Thomas Badie + * @author Jeongjun Min * * @since 1.0 */ @@ -692,6 +693,10 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { Runnable awaitShutdown = () -> { logger.info("Waiting for workers to finish."); try { + if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) { + awaitPendingReplies(listenerContainerAware); + } + boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS); if (finished) { logger.info("Successfully waited for workers to finish."); @@ -738,6 +743,22 @@ private void runCallbackIfNotNull(@Nullable Runnable callback) { } } + private void awaitPendingReplies(ListenerContainerAware listener) throws InterruptedException { + ActiveObjectCounter replyCounter = listener.getPendingReplyCounter(); + + if (replyCounter != null && replyCounter.getCount() > 0) { + if (logger.isInfoEnabled()) { + logger.info("Waiting for pending replies: " + replyCounter.getCount()); + } + if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) { + if (logger.isWarnEnabled()) { + logger.warn("Shutdown timeout expired, but " + replyCounter.getCount() + + " pending replies still remain."); + } + } + } + } + private boolean isActive(BlockingQueueConsumer consumer) { boolean consumerActive; this.consumersLock.lock(); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java index 392a476189..57f94bbeea 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java @@ -25,6 +25,7 @@ * interface can have configuration verified during initialization. * * @author Gary Russell + * @author Jeongjun Min * @since 1.5 * */ @@ -39,4 +40,12 @@ public interface ListenerContainerAware { @Nullable Collection expectedQueueNames(); + /** + * Return a counter for pending replies, if any. + * @return the counter, or null. + * @since 4.0 + */ + default @Nullable ActiveObjectCounter getPendingReplyCounter() { + return null; + } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 80f8622cad..07630ccd5e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -62,7 +62,9 @@ import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; +import org.springframework.amqp.rabbit.support.ActiveObjectCounter; import org.springframework.amqp.utils.test.TestUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.DirectFieldAccessor; @@ -102,6 +104,7 @@ * @author Yansong Ren * @author Tim Bourquin * @author Jeonggi Kim + * @author Jeongjun Min */ public class SimpleMessageListenerContainerTests { @@ -716,6 +719,45 @@ void testWithConsumerStartWhenNotActive() { assertThat(start.getCount()).isEqualTo(0L); } + @Test + @SuppressWarnings("unchecked") + void testShutdownWithPendingReplies() { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + Connection connection = mock(Connection.class); + Channel channel = mock(Channel.class); + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createChannel(false)).willReturn(channel); + given(channel.isOpen()).willReturn(true); + + RabbitTemplate template = new RabbitTemplate(connectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueueNames("shutdown.test.queue"); + container.setMessageListener(template); + + template.setReplyAddress(container.getQueueNames()[0]); + + long shutdownTimeout = 500L; + container.setShutdownTimeout(shutdownTimeout); + + ActiveObjectCounter replyCounter = template.getPendingReplyCounter(); + assertThat(replyCounter).isNotNull(); + + Object pending = new Object(); + replyCounter.add(pending); + assertThat(replyCounter.getCount()).isEqualTo(1); + + Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class)); + new DirectFieldAccessor(container).setPropertyValue("logger", logger); + + container.start(); + + container.stop(); + + await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> + verify(logger).warn("Shutdown timeout expired, but 1 pending replies still remain.") + ); + } + private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, final boolean cancel, final CountDownLatch latch) { return invocation -> { diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 2d3e195783..69dbc72f70 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -36,4 +36,9 @@ There is no need to keep out-dated utilities and recommendation is to migrate to The Jackson 2 has been deprecated for removal in whole Spring portfolio. Respective new classes have been introduced to support Jackson 3. -See xref:amqp/message-converters.adoc[] for more information. \ No newline at end of file +See xref:amqp/message-converters.adoc[] for more information. + +[[x40-smlc-changes]] +=== MessageListenerContainer Changes + +The `SimpleMessageListenerContainer` now awaits at most `shutdownTimeout` for pending replies from the provided `RabbitTemplate` listener on its shutdown.