Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,17 @@ public int getUnconfirmedCount() {
.sum();
}

/**
* Return the number of pending replies in flight.
* Used to defer shutdown of a listener container if pending replies are present.
* @return the number of pending replies.
* @since 4.0
* @see org.springframework.amqp.rabbit.listener.api.PendingReplyProvider
*/
public int getPendingReplyCount() {
return this.replyHolder.size();
}

/**
* When using receive methods with a non-zero timeout, a
* {@link com.rabbitmq.client.Consumer} is created to receive the message. Use this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.PendingReplyProvider;
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
Expand Down Expand Up @@ -130,6 +131,10 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta

private long batchReceiveTimeout;

private long pendingReplyCheckInterval = 200L;

private @Nullable PendingReplyProvider pendingReplyProvider;

private @Nullable Set<BlockingQueueConsumer> consumers;

private @Nullable Integer declarationRetries;
Expand Down Expand Up @@ -357,6 +362,28 @@ public void setBatchReceiveTimeout(long batchReceiveTimeout) {
this.batchReceiveTimeout = batchReceiveTimeout;
}

/**
* Set the interval for checking for pending replies during shutdown.
* Default is 200ms.
* @param pendingReplyCheckInterval the interval in milliseconds.
* @since 4.0
*/
public void setPendingReplyCheckInterval(long pendingReplyCheckInterval) {
this.pendingReplyCheckInterval = pendingReplyCheckInterval;
}

/**
* Set a provider for the number of pending replies.
* When set, the container will wait for pending replies during shutdown,
* up to the {@link #setShutdownTimeout(long) shutdownTimeout}.
* @param pendingReplyProvider the pending reply provider.
* @since 4.0
* @see org.springframework.amqp.rabbit.core.RabbitTemplate#getPendingReplyCount()
*/
public void setPendingReplyProvider(PendingReplyProvider pendingReplyProvider) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this extra abstraction.
We already have a ListenerContainerAware implemented on the RabbitTemplate.
So, that getPendingReplyCount() could be moved into that contract.

this.pendingReplyProvider = pendingReplyProvider;
}

/**
* This property has several functions.
* <p>
Expand Down Expand Up @@ -652,6 +679,8 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process

@Override
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
waitForPendingReplies();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a right place for such a logic.
I believe it has to be a part of that awaitShutdown in the end of this shutdownAndWaitOrCallback() method.


Thread thread = this.containerStoppingForAbort.get();
if (thread != null && !thread.equals(Thread.currentThread())) {
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
Expand Down Expand Up @@ -732,6 +761,39 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
}
}

/**
* Wait for pending replies if a pending reply provider is configured.
*/
private void waitForPendingReplies() {
PendingReplyProvider provider = this.pendingReplyProvider;
if (provider != null && getShutdownTimeout() > 0) {
long deadline = System.currentTimeMillis() + getShutdownTimeout();
try {
while (isRunning() && System.currentTimeMillis() < deadline) {
int pendingCount = provider.getPendingReplyCount();
if (pendingCount <= 0) {
if (logger.isInfoEnabled()) {
logger.info("No pending replies detected, proceeding with shutdown.");
}
return;
}
if (logger.isInfoEnabled()) {
logger.info("Waiting for " + pendingCount + " pending replies before final shutdown...");
}
Thread.sleep(this.pendingReplyCheckInterval);
}
int remaining = provider.getPendingReplyCount();
if (remaining > 0) {
logger.warn("Shutdown timeout expired, but " + remaining + " pending replies still remain.");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for pending replies.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need so much logic here.
Currently, we utilize an ActiveObjectCounter for the consumers to be cancelled in this listener container.
I believe we can use it in the RabbitTemplate as well for those pending replies to be fulfilled.
Then in that awaitShutdown hook we could take this ActiveObjectCounter from the ListenerContainerAware and wait on it as we do for consumers.

}
}
}

private void runCallbackIfNotNull(@Nullable Runnable callback) {
if (callback != null) {
callback.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2025-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.listener.api;

/**
* A functional interface to provide the number of pending replies,
* used to delay listener container shutdown.
*
* @author Jeongjun Min
* @since 4.0
* @see org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPendingReplyProvider(PendingReplyProvider)
*/
@FunctionalInterface
public interface PendingReplyProvider {

/**
* Return the number of pending replies.
* @return the number of pending replies.
*/
int getPendingReplyCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.aop.support.AopUtils;
Expand Down Expand Up @@ -716,6 +718,43 @@ void testWithConsumerStartWhenNotActive() {
assertThat(start.getCount()).isEqualTo(0L);
}

@Test
@SuppressWarnings("unchecked")
void testShutdownWithPendingReplies() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
given(connectionFactory.createConnection()).willReturn(connection);
given(connection.createChannel(false)).willReturn(channel);
given(channel.isOpen()).willReturn(true);

RabbitTemplate template = new RabbitTemplate(connectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No foo/bar language, please, in the project.

container.setMessageListener(mock(MessageListener.class));

long shutdownTimeout = 2000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to wait for "never reply" so long.
We deliberately aware that reply never comes back to us in this testing scenario.
So, what is the point to block this test for those 2 seconds?
Now imaging that every single 3000 tests in the project is going to block for similar time.
That would be a disaster for development feedback loop.

I think we should make like 500 millis at most.

long checkInterval = 500L;
container.setShutdownTimeout(shutdownTimeout);
container.setPendingReplyCheckInterval(checkInterval);
container.setPendingReplyProvider(template::getPendingReplyCount);

Map<String, Object> replyHolder = (Map<String, Object>) ReflectionTestUtils.getField(template, "replyHolder");
assertThat(replyHolder).isNotNull();
replyHolder.put("foo", new CompletableFuture<Message>());

assertThat(template.getPendingReplyCount()).isEqualTo(1);

container.start();

long startTime = System.currentTimeMillis();
container.stop();
long stopDuration = System.currentTimeMillis() - startTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timing is always problematic for testing.

Let's see if we can spy on the logger of the SimpleMessageListenerContainer and await().untilAsserted() for the new warn log message you have introduced in this change!


assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500);
assertThat(template.getPendingReplyCount()).isEqualTo(1);
}

private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
final boolean cancel, final CountDownLatch latch) {
return invocation -> {
Expand Down