From 368fe549aef16a8b8939d9994e1678ee7d8d5ef4 Mon Sep 17 00:00:00 2001 From: Jeongjun Min Date: Wed, 6 Aug 2025 23:22:19 +0900 Subject: [PATCH 1/6] GH-3031: Defer SMLC shutdown for pending replies This commit introduces a mechanism to delay the shutdown of a SimpleMessageListenerContainer if there are pending replies for request/reply operations. A new functional interface, `PendingReplyProvider`, is introduced and can be set on the container. `RabbitTemplate` now exposes a `getPendingReplyCount()` method to serve as this provider. When the provider is set, the container will wait up to the configured `shutdownTimeout` for the pending reply count to drop to zero before proceeding with the consumer cancellation. Signed-off-by: Jeongjun Min --- .../amqp/rabbit/core/RabbitTemplate.java | 11 ++++ .../SimpleMessageListenerContainer.java | 62 +++++++++++++++++++ .../listener/api/PendingReplyProvider.java | 19 ++++++ .../SimpleMessageListenerContainerTests.java | 38 ++++++++++++ 4 files changed, 130 insertions(+) create mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 301abbf728..c2ee95fdc8 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -960,6 +960,17 @@ public int getUnconfirmedCount() { .sum(); } + /** + * Return the number of pending replies in flight. + * Used to defer shutdown of a listener container if pending replies are present. + * @return the number of pending replies. + * @since 4.0 + * @see org.springframework.amqp.rabbit.listener.api.PendingReplyProvider + */ + public int getPendingReplyCount() { + return this.replyHolder.size(); + } + /** * When using receive methods with a non-zero timeout, a * {@link com.rabbitmq.client.Consumer} is created to receive the message. Use this diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 4849d9b279..2fc26ec9bb 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -56,6 +56,7 @@ import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory; import org.springframework.amqp.rabbit.connection.SimpleResourceHolder; import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener; +import org.springframework.amqp.rabbit.listener.api.PendingReplyProvider; import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException; import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException; import org.springframework.amqp.rabbit.support.ActiveObjectCounter; @@ -130,6 +131,10 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long batchReceiveTimeout; + private long pendingReplyCheckInterval = 200L; + + private @Nullable PendingReplyProvider pendingReplyProvider; + private @Nullable Set consumers; private @Nullable Integer declarationRetries; @@ -357,6 +362,28 @@ public void setBatchReceiveTimeout(long batchReceiveTimeout) { this.batchReceiveTimeout = batchReceiveTimeout; } + /** + * Set the interval for checking for pending replies during shutdown. + * Default is 200ms. + * @param pendingReplyCheckInterval the interval in milliseconds. + * @since 4.0 + */ + public void setPendingReplyCheckInterval(long pendingReplyCheckInterval) { + this.pendingReplyCheckInterval = pendingReplyCheckInterval; + } + + /** + * Set a provider for the number of pending replies. + * When set, the container will wait for pending replies during shutdown, + * up to the {@link #setShutdownTimeout(long) shutdownTimeout}. + * @param pendingReplyProvider the pending reply provider. + * @since 4.0 + * @see org.springframework.amqp.rabbit.core.RabbitTemplate#getPendingReplyCount() + */ + public void setPendingReplyProvider(PendingReplyProvider pendingReplyProvider) { + this.pendingReplyProvider = pendingReplyProvider; + } + /** * This property has several functions. *

@@ -652,6 +679,8 @@ private void waitForConsumersToStart(Set process @Override protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { + waitForPendingReplies(); + Thread thread = this.containerStoppingForAbort.get(); if (thread != null && !thread.equals(Thread.currentThread())) { logger.info("Shutdown ignored - container is stopping due to an aborted consumer"); @@ -732,6 +761,39 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { } } + /** + * Wait for pending replies if a pending reply provider is configured. + */ + private void waitForPendingReplies() { + PendingReplyProvider provider = this.pendingReplyProvider; + if (provider != null && getShutdownTimeout() > 0) { + long deadline = System.currentTimeMillis() + getShutdownTimeout(); + try { + while (isRunning() && System.currentTimeMillis() < deadline) { + int pendingCount = provider.getPendingReplyCount(); + if (pendingCount <= 0) { + if (logger.isInfoEnabled()) { + logger.info("No pending replies detected, proceeding with shutdown."); + } + return; + } + if (logger.isInfoEnabled()) { + logger.info("Waiting for " + pendingCount + " pending replies before final shutdown..."); + } + Thread.sleep(this.pendingReplyCheckInterval); + } + int remaining = provider.getPendingReplyCount(); + if (remaining > 0) { + logger.warn("Shutdown timeout expired, but " + remaining + " pending replies still remain."); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while waiting for pending replies."); + } + } + } + private void runCallbackIfNotNull(@Nullable Runnable callback) { if (callback != null) { callback.run(); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java new file mode 100644 index 0000000000..32960d3260 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java @@ -0,0 +1,19 @@ +package org.springframework.amqp.rabbit.listener.api; + +/** + * A functional interface to provide the number of pending replies, + * used to delay listener container shutdown. + * + * @author Jeongjun Min + * @since 4.0 + * @see org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPendingReplyProvider(PendingReplyProvider) + */ +@FunctionalInterface +public interface PendingReplyProvider { + + /** + * Return the number of pending replies. + * @return the number of pending replies. + */ + int getPendingReplyCount(); +} \ No newline at end of file diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 80f8622cad..c9da3f4d15 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -62,6 +63,7 @@ import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.utils.test.TestUtils; import org.springframework.aop.support.AopUtils; @@ -799,4 +801,40 @@ public void execute(Runnable task) { } + @Test + @SuppressWarnings("unchecked") + void testShutdownWithPendingReplies() { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + Connection connection = mock(Connection.class); + Channel channel = mock(Channel.class); + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createChannel(false)).willReturn(channel); + given(channel.isOpen()).willReturn(true); + + RabbitTemplate template = new RabbitTemplate(connectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueueNames("foo"); + container.setMessageListener(mock(MessageListener.class)); + + long shutdownTimeout = 2000L; + long checkInterval = 500L; + container.setShutdownTimeout(shutdownTimeout); + container.setPendingReplyCheckInterval(checkInterval); + container.setPendingReplyProvider(template::getPendingReplyCount); + + Map replyHolder = (Map) ReflectionTestUtils.getField(template, "replyHolder"); + assertThat(replyHolder).isNotNull(); + replyHolder.put("foo", new CompletableFuture()); + + assertThat(template.getPendingReplyCount()).isEqualTo(1); + + container.start(); + + long startTime = System.currentTimeMillis(); + container.stop(); + long stopDuration = System.currentTimeMillis() - startTime; + + assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500); + assertThat(template.getPendingReplyCount()).isEqualTo(1); + } } From 1a48e51a91faa8b18eca74ea8db88612b3e5d660 Mon Sep 17 00:00:00 2001 From: Jeongjun Min Date: Thu, 7 Aug 2025 00:23:39 +0900 Subject: [PATCH 2/6] Fix checkstyle violations Signed-off-by: Jeongjun Min --- .../listener/api/PendingReplyProvider.java | 18 ++++- .../SimpleMessageListenerContainerTests.java | 73 ++++++++++--------- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java index 32960d3260..d305bf499a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java @@ -1,3 +1,19 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.amqp.rabbit.listener.api; /** @@ -16,4 +32,4 @@ public interface PendingReplyProvider { * @return the number of pending replies. */ int getPendingReplyCount(); -} \ No newline at end of file +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index c9da3f4d15..881e89f7db 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -718,6 +718,43 @@ void testWithConsumerStartWhenNotActive() { assertThat(start.getCount()).isEqualTo(0L); } + @Test + @SuppressWarnings("unchecked") + void testShutdownWithPendingReplies() { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + Connection connection = mock(Connection.class); + Channel channel = mock(Channel.class); + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createChannel(false)).willReturn(channel); + given(channel.isOpen()).willReturn(true); + + RabbitTemplate template = new RabbitTemplate(connectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueueNames("foo"); + container.setMessageListener(mock(MessageListener.class)); + + long shutdownTimeout = 2000L; + long checkInterval = 500L; + container.setShutdownTimeout(shutdownTimeout); + container.setPendingReplyCheckInterval(checkInterval); + container.setPendingReplyProvider(template::getPendingReplyCount); + + Map replyHolder = (Map) ReflectionTestUtils.getField(template, "replyHolder"); + assertThat(replyHolder).isNotNull(); + replyHolder.put("foo", new CompletableFuture()); + + assertThat(template.getPendingReplyCount()).isEqualTo(1); + + container.start(); + + long startTime = System.currentTimeMillis(); + container.stop(); + long stopDuration = System.currentTimeMillis() - startTime; + + assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500); + assertThat(template.getPendingReplyCount()).isEqualTo(1); + } + private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, final boolean cancel, final CountDownLatch latch) { return invocation -> { @@ -801,40 +838,4 @@ public void execute(Runnable task) { } - @Test - @SuppressWarnings("unchecked") - void testShutdownWithPendingReplies() { - ConnectionFactory connectionFactory = mock(ConnectionFactory.class); - Connection connection = mock(Connection.class); - Channel channel = mock(Channel.class); - given(connectionFactory.createConnection()).willReturn(connection); - given(connection.createChannel(false)).willReturn(channel); - given(channel.isOpen()).willReturn(true); - - RabbitTemplate template = new RabbitTemplate(connectionFactory); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); - container.setQueueNames("foo"); - container.setMessageListener(mock(MessageListener.class)); - - long shutdownTimeout = 2000L; - long checkInterval = 500L; - container.setShutdownTimeout(shutdownTimeout); - container.setPendingReplyCheckInterval(checkInterval); - container.setPendingReplyProvider(template::getPendingReplyCount); - - Map replyHolder = (Map) ReflectionTestUtils.getField(template, "replyHolder"); - assertThat(replyHolder).isNotNull(); - replyHolder.put("foo", new CompletableFuture()); - - assertThat(template.getPendingReplyCount()).isEqualTo(1); - - container.start(); - - long startTime = System.currentTimeMillis(); - container.stop(); - long stopDuration = System.currentTimeMillis() - startTime; - - assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500); - assertThat(template.getPendingReplyCount()).isEqualTo(1); - } } From 7a732db6f550f1011de3e6462450379c3c54c615 Mon Sep 17 00:00:00 2001 From: Jeongjun Min Date: Thu, 7 Aug 2025 02:42:45 +0900 Subject: [PATCH 3/6] Apply review suggestions for shutdown deferral Refactor the shutdown deferral mechanism based on pull request feedback: - Replace the custom `PendingReplyProvider` with the existing `ListenerContainerAware` interface and `ActiveObjectCounter`. - Move the waiting logic into the `awaitShutdown` runnable for better consistency with the existing shutdown process. Signed-off-by: Jeongjun Min --- .../amqp/rabbit/core/RabbitTemplate.java | 31 ++++--- .../SimpleMessageListenerContainer.java | 80 +++++-------------- .../listener/api/PendingReplyProvider.java | 35 -------- .../support/ListenerContainerAware.java | 9 +++ .../SimpleMessageListenerContainerTests.java | 24 +++--- 5 files changed, 59 insertions(+), 120 deletions(-) delete mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index c2ee95fdc8..362fa50a88 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -84,6 +84,7 @@ import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.rabbit.support.ActiveObjectCounter; import org.springframework.amqp.rabbit.support.ConsumerCancelledException; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.Delivery; @@ -189,6 +190,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count private final AtomicInteger activeTemplateCallbacks = new AtomicInteger(); + private final ActiveObjectCounter pendingRepliesCounter = new ActiveObjectCounter<>(); + private final ConcurrentMap publisherConfirmChannels = new ConcurrentHashMap<>(); private final Map replyHolder = new ConcurrentHashMap<>(); @@ -960,17 +963,6 @@ public int getUnconfirmedCount() { .sum(); } - /** - * Return the number of pending replies in flight. - * Used to defer shutdown of a listener container if pending replies are present. - * @return the number of pending replies. - * @since 4.0 - * @see org.springframework.amqp.rabbit.listener.api.PendingReplyProvider - */ - public int getPendingReplyCount() { - return this.replyHolder.size(); - } - /** * When using receive methods with a non-zero timeout, a * {@link com.rabbitmq.client.Consumer} is created to receive the message. Use this @@ -2082,6 +2074,7 @@ private DirectReplyToMessageListenerContainer createReplyToContainer(ConnectionF messageTag = String.valueOf(this.messageTagProvider.incrementAndGet()); } saveAndSetProperties(message, pendingReply, messageTag); + this.pendingRepliesCounter.add(pendingReply); this.replyHolder.put(messageTag, pendingReply); if (noCorrelation) { this.replyHolder.put(channel, pendingReply); @@ -2161,11 +2154,19 @@ private void saveAndSetProperties(final Message message, final PendingReply pend /** * Subclasses can implement this to be notified that a reply has timed out. + * The default implementation also releases the counter for pending replies. + * Subclasses should call {@code super.replyTimedOut(correlationId)} if they + * override this method and wish to maintain this behavior. * @param correlationId the correlationId * @since 2.1.2 */ protected void replyTimedOut(@Nullable String correlationId) { - // NOSONAR + if (correlationId != null) { + Object pending = this.replyHolder.get(correlationId); + if (pending != null) { + this.pendingRepliesCounter.release(pending); + } + } } /** @@ -2677,6 +2678,11 @@ public String getUUID() { return this.uuid; } + @Override + public ActiveObjectCounter getPendingReplyCounter() { + return this.pendingRepliesCounter; + } + @Override public void onMessage(Message message, @Nullable Channel channel) { if (logger.isTraceEnabled()) { @@ -2707,6 +2713,7 @@ public void onMessage(Message message, @Nullable Channel channel) { else { restoreProperties(message, pendingReply); pendingReply.reply(message); + this.pendingRepliesCounter.release(pendingReply); } } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 2fc26ec9bb..63af38676c 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -56,7 +56,6 @@ import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory; import org.springframework.amqp.rabbit.connection.SimpleResourceHolder; import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener; -import org.springframework.amqp.rabbit.listener.api.PendingReplyProvider; import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException; import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException; import org.springframework.amqp.rabbit.support.ActiveObjectCounter; @@ -131,10 +130,6 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long batchReceiveTimeout; - private long pendingReplyCheckInterval = 200L; - - private @Nullable PendingReplyProvider pendingReplyProvider; - private @Nullable Set consumers; private @Nullable Integer declarationRetries; @@ -362,28 +357,6 @@ public void setBatchReceiveTimeout(long batchReceiveTimeout) { this.batchReceiveTimeout = batchReceiveTimeout; } - /** - * Set the interval for checking for pending replies during shutdown. - * Default is 200ms. - * @param pendingReplyCheckInterval the interval in milliseconds. - * @since 4.0 - */ - public void setPendingReplyCheckInterval(long pendingReplyCheckInterval) { - this.pendingReplyCheckInterval = pendingReplyCheckInterval; - } - - /** - * Set a provider for the number of pending replies. - * When set, the container will wait for pending replies during shutdown, - * up to the {@link #setShutdownTimeout(long) shutdownTimeout}. - * @param pendingReplyProvider the pending reply provider. - * @since 4.0 - * @see org.springframework.amqp.rabbit.core.RabbitTemplate#getPendingReplyCount() - */ - public void setPendingReplyProvider(PendingReplyProvider pendingReplyProvider) { - this.pendingReplyProvider = pendingReplyProvider; - } - /** * This property has several functions. *

@@ -679,8 +652,6 @@ private void waitForConsumersToStart(Set process @Override protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { - waitForPendingReplies(); - Thread thread = this.containerStoppingForAbort.get(); if (thread != null && !thread.equals(Thread.currentThread())) { logger.info("Shutdown ignored - container is stopping due to an aborted consumer"); @@ -721,6 +692,24 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { Runnable awaitShutdown = () -> { logger.info("Waiting for workers to finish."); try { + ActiveObjectCounter replyCounter = null; + Object listener = getMessageListener(); + if (listener instanceof ListenerContainerAware) { + replyCounter = ((ListenerContainerAware) listener).getPendingReplyCounter(); + } + + if (replyCounter != null) { + if (logger.isInfoEnabled()) { + logger.info("Waiting for pending replies: " + replyCounter.getCount()); + } + if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) { + if (logger.isWarnEnabled()) { + logger.warn("Shutdown timeout expired, but " + replyCounter.getCount() + + " pending replies still remain."); + } + } + } + boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS); if (finished) { logger.info("Successfully waited for workers to finish."); @@ -761,39 +750,6 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { } } - /** - * Wait for pending replies if a pending reply provider is configured. - */ - private void waitForPendingReplies() { - PendingReplyProvider provider = this.pendingReplyProvider; - if (provider != null && getShutdownTimeout() > 0) { - long deadline = System.currentTimeMillis() + getShutdownTimeout(); - try { - while (isRunning() && System.currentTimeMillis() < deadline) { - int pendingCount = provider.getPendingReplyCount(); - if (pendingCount <= 0) { - if (logger.isInfoEnabled()) { - logger.info("No pending replies detected, proceeding with shutdown."); - } - return; - } - if (logger.isInfoEnabled()) { - logger.info("Waiting for " + pendingCount + " pending replies before final shutdown..."); - } - Thread.sleep(this.pendingReplyCheckInterval); - } - int remaining = provider.getPendingReplyCount(); - if (remaining > 0) { - logger.warn("Shutdown timeout expired, but " + remaining + " pending replies still remain."); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Interrupted while waiting for pending replies."); - } - } - } - private void runCallbackIfNotNull(@Nullable Runnable callback) { if (callback != null) { callback.run(); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java deleted file mode 100644 index d305bf499a..0000000000 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/PendingReplyProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2025-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.amqp.rabbit.listener.api; - -/** - * A functional interface to provide the number of pending replies, - * used to delay listener container shutdown. - * - * @author Jeongjun Min - * @since 4.0 - * @see org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPendingReplyProvider(PendingReplyProvider) - */ -@FunctionalInterface -public interface PendingReplyProvider { - - /** - * Return the number of pending replies. - * @return the number of pending replies. - */ - int getPendingReplyCount(); -} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java index 392a476189..0ade526e70 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java @@ -39,4 +39,13 @@ public interface ListenerContainerAware { @Nullable Collection expectedQueueNames(); + /** + * Return a counter for pending replies, if any. + * @return the counter, or null. + * @since 4.0 + */ + @Nullable + default ActiveObjectCounter getPendingReplyCounter() { + return null; + } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 881e89f7db..89b7b2cdf7 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -65,6 +64,7 @@ import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; +import org.springframework.amqp.rabbit.support.ActiveObjectCounter; import org.springframework.amqp.utils.test.TestUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.DirectFieldAccessor; @@ -730,20 +730,21 @@ void testShutdownWithPendingReplies() { RabbitTemplate template = new RabbitTemplate(connectionFactory); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); - container.setQueueNames("foo"); - container.setMessageListener(mock(MessageListener.class)); + container.setQueueNames("shutdown.test.queue"); + container.setMessageListener(template); + + template.setReplyAddress(container.getQueueNames()[0]); long shutdownTimeout = 2000L; - long checkInterval = 500L; container.setShutdownTimeout(shutdownTimeout); - container.setPendingReplyCheckInterval(checkInterval); - container.setPendingReplyProvider(template::getPendingReplyCount); - Map replyHolder = (Map) ReflectionTestUtils.getField(template, "replyHolder"); - assertThat(replyHolder).isNotNull(); - replyHolder.put("foo", new CompletableFuture()); + ActiveObjectCounter replyCounter = + (ActiveObjectCounter) ReflectionTestUtils.getField(template, "pendingRepliesCounter"); + assertThat(replyCounter).isNotNull(); - assertThat(template.getPendingReplyCount()).isEqualTo(1); + Object pending = new Object(); + replyCounter.add(pending); + assertThat(replyCounter.getCount()).isEqualTo(1); container.start(); @@ -751,8 +752,9 @@ void testShutdownWithPendingReplies() { container.stop(); long stopDuration = System.currentTimeMillis() - startTime; + replyCounter.release(pending); + assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500); - assertThat(template.getPendingReplyCount()).isEqualTo(1); } private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, From cc2e48c254b73b000b9976c17e957dad7d764557 Mon Sep 17 00:00:00 2001 From: Jeongjun Min Date: Thu, 7 Aug 2025 19:33:49 +0900 Subject: [PATCH 4/6] Refactor shutdown deferral logic based on review feedback - Move @Nullable to type declaration - Use pattern matching for ListenerContainerAware - Skip waiting when replyCounter is null or count is zero - Extract shutdown wait logic into separate method - Add author tag and update whats-new.adoc Signed-off-by: Jeongjun Min --- .../amqp/rabbit/core/RabbitTemplate.java | 1 + .../SimpleMessageListenerContainer.java | 35 ++++++++++--------- .../support/ListenerContainerAware.java | 4 +-- .../SimpleMessageListenerContainerTests.java | 1 + .../antora/modules/ROOT/pages/whats-new.adoc | 7 +++- 5 files changed, 29 insertions(+), 19 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 362fa50a88..e9d89b1947 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -159,6 +159,7 @@ * @author Alexey Platonov * @author Leonardo Ferreira * @author Ngoc Nhan + * @author Jeongjun Min * * @since 1.0 */ diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 63af38676c..66240d2366 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -87,6 +87,7 @@ * @author Jeonggi Kim * @author Java4ye * @author Thomas Badie + * @author Jeongjun Min * * @since 1.0 */ @@ -692,22 +693,8 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { Runnable awaitShutdown = () -> { logger.info("Waiting for workers to finish."); try { - ActiveObjectCounter replyCounter = null; - Object listener = getMessageListener(); - if (listener instanceof ListenerContainerAware) { - replyCounter = ((ListenerContainerAware) listener).getPendingReplyCounter(); - } - - if (replyCounter != null) { - if (logger.isInfoEnabled()) { - logger.info("Waiting for pending replies: " + replyCounter.getCount()); - } - if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) { - if (logger.isWarnEnabled()) { - logger.warn("Shutdown timeout expired, but " + replyCounter.getCount() - + " pending replies still remain."); - } - } + if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) { + awaitPendingReplies(listenerContainerAware); } boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS); @@ -756,6 +743,22 @@ private void runCallbackIfNotNull(@Nullable Runnable callback) { } } + private void awaitPendingReplies(ListenerContainerAware listener) throws InterruptedException { + ActiveObjectCounter replyCounter = listener.getPendingReplyCounter(); + + if (replyCounter != null && replyCounter.getCount() > 0) { + if (logger.isInfoEnabled()) { + logger.info("Waiting for pending replies: " + replyCounter.getCount()); + } + if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) { + if (logger.isWarnEnabled()) { + logger.warn("Shutdown timeout expired, but " + replyCounter.getCount() + + " pending replies still remain."); + } + } + } + } + private boolean isActive(BlockingQueueConsumer consumer) { boolean consumerActive; this.consumersLock.lock(); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java index 0ade526e70..57f94bbeea 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java @@ -25,6 +25,7 @@ * interface can have configuration verified during initialization. * * @author Gary Russell + * @author Jeongjun Min * @since 1.5 * */ @@ -44,8 +45,7 @@ public interface ListenerContainerAware { * @return the counter, or null. * @since 4.0 */ - @Nullable - default ActiveObjectCounter getPendingReplyCounter() { + default @Nullable ActiveObjectCounter getPendingReplyCounter() { return null; } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 89b7b2cdf7..292b96ae29 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -104,6 +104,7 @@ * @author Yansong Ren * @author Tim Bourquin * @author Jeonggi Kim + * @author Jeongjun Min */ public class SimpleMessageListenerContainerTests { diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 2d3e195783..9105b77fa5 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -36,4 +36,9 @@ There is no need to keep out-dated utilities and recommendation is to migrate to The Jackson 2 has been deprecated for removal in whole Spring portfolio. Respective new classes have been introduced to support Jackson 3. -See xref:amqp/message-converters.adoc[] for more information. \ No newline at end of file +See xref:amqp/message-converters.adoc[] for more information. + +[[x40-enhancements]] +=== Enhancements + +Defer `SimpleMessageListenerContainer` shutdown for pending `RabbitTemplate` replies. From 1dd2d72a56054953c69f6a122f9927b2d45a355f Mon Sep 17 00:00:00 2001 From: Jeongjun Min Date: Fri, 8 Aug 2025 02:11:06 +0900 Subject: [PATCH 5/6] Refine test and update whats-new This commit applies the final polishing suggestions from the review. - The test case is refactored to be more robust and efficient. It now verifies the warning log message via a spy instead of relying on unstable timing assertions. The shutdown timeout is also reduced to speed up the build. - The `whats-new.adoc` document is updated with the title and description suggested in the review. Signed-off-by: Jeongjun Min --- .../SimpleMessageListenerContainerTests.java | 16 +++++++++------- .../antora/modules/ROOT/pages/whats-new.adoc | 6 +++--- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 292b96ae29..15eaa7ba90 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -736,26 +736,28 @@ void testShutdownWithPendingReplies() { template.setReplyAddress(container.getQueueNames()[0]); - long shutdownTimeout = 2000L; + long shutdownTimeout = 500L; container.setShutdownTimeout(shutdownTimeout); - ActiveObjectCounter replyCounter = - (ActiveObjectCounter) ReflectionTestUtils.getField(template, "pendingRepliesCounter"); + ActiveObjectCounter replyCounter = template.getPendingReplyCounter(); assertThat(replyCounter).isNotNull(); Object pending = new Object(); replyCounter.add(pending); assertThat(replyCounter.getCount()).isEqualTo(1); + Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class)); + new DirectFieldAccessor(container).setPropertyValue("logger", logger); + container.start(); - long startTime = System.currentTimeMillis(); container.stop(); - long stopDuration = System.currentTimeMillis() - startTime; - replyCounter.release(pending); + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> + verify(logger).warn("Shutdown timeout expired, but 1 pending replies still remain.") + ); - assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500); + replyCounter.release(pending); } private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 9105b77fa5..69dbc72f70 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -38,7 +38,7 @@ Respective new classes have been introduced to support Jackson 3. See xref:amqp/message-converters.adoc[] for more information. -[[x40-enhancements]] -=== Enhancements +[[x40-smlc-changes]] +=== MessageListenerContainer Changes -Defer `SimpleMessageListenerContainer` shutdown for pending `RabbitTemplate` replies. +The `SimpleMessageListenerContainer` now awaits at most `shutdownTimeout` for pending replies from the provided `RabbitTemplate` listener on its shutdown. From 00487ae87a32d6a70b49e52f2d946654ef4ceefd Mon Sep 17 00:00:00 2001 From: Jeongjun Min Date: Fri, 8 Aug 2025 02:34:01 +0900 Subject: [PATCH 6/6] Refine shutdown deferral test for robustness Improve test robustness by increasing the await timeout and remove unnecessary cleanup code per review feedback. Signed-off-by: Jeongjun Min --- .../rabbit/listener/SimpleMessageListenerContainerTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 15eaa7ba90..07630ccd5e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -753,11 +753,9 @@ void testShutdownWithPendingReplies() { container.stop(); - await().atMost(Duration.ofMillis(500)).untilAsserted(() -> + await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> verify(logger).warn("Shutdown timeout expired, but 1 pending replies still remain.") ); - - replyCounter.release(pending); } private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,