Skip to content

Commit 3c24774

Browse files
committed
GH-3031: Defer SMLC shutdown for pending replies
This commit introduces a mechanism to delay the shutdown of a SimpleMessageListenerContainer if there are pending replies for request/reply operations. A new functional interface, `PendingReplyProvider`, is introduced and can be set on the container. `RabbitTemplate` now exposes a `getPendingReplyCount()` method to serve as this provider. When the provider is set, the container will wait up to the configured `shutdownTimeout` for the pending reply count to drop to zero before proceeding with the consumer cancellation. Signed-off-by: Jeongjun Min <[email protected]>
1 parent 5bcd01f commit 3c24774

File tree

4 files changed

+130
-0
lines changed

4 files changed

+130
-0
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,17 @@ public int getUnconfirmedCount() {
960960
.sum();
961961
}
962962

963+
/**
964+
* Return the number of pending replies in flight.
965+
* Used to defer shutdown of a listener container if pending replies are present.
966+
* @return the number of pending replies.
967+
* @since 4.0
968+
* @see org.springframework.amqp.rabbit.listener.api.PendingReplyProvider
969+
*/
970+
public int getPendingReplyCount() {
971+
return this.replyHolder.size();
972+
}
973+
963974
/**
964975
* When using receive methods with a non-zero timeout, a
965976
* {@link com.rabbitmq.client.Consumer} is created to receive the message. Use this

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
5757
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
5858
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
59+
import org.springframework.amqp.rabbit.listener.api.PendingReplyProvider;
5960
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
6061
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
6162
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
@@ -130,6 +131,10 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
130131

131132
private long batchReceiveTimeout;
132133

134+
private long pendingReplyCheckInterval = 200L;
135+
136+
private @Nullable PendingReplyProvider pendingReplyProvider;
137+
133138
private @Nullable Set<BlockingQueueConsumer> consumers;
134139

135140
private @Nullable Integer declarationRetries;
@@ -357,6 +362,28 @@ public void setBatchReceiveTimeout(long batchReceiveTimeout) {
357362
this.batchReceiveTimeout = batchReceiveTimeout;
358363
}
359364

365+
/**
366+
* Set the interval for checking for pending replies during shutdown.
367+
* Default is 200ms.
368+
* @param pendingReplyCheckInterval the interval in milliseconds.
369+
* @since 4.0
370+
*/
371+
public void setPendingReplyCheckInterval(long pendingReplyCheckInterval) {
372+
this.pendingReplyCheckInterval = pendingReplyCheckInterval;
373+
}
374+
375+
/**
376+
* Set a provider for the number of pending replies.
377+
* When set, the container will wait for pending replies during shutdown,
378+
* up to the {@link #setShutdownTimeout(long) shutdownTimeout}.
379+
* @param pendingReplyProvider the pending reply provider.
380+
* @since 4.0
381+
* @see org.springframework.amqp.rabbit.core.RabbitTemplate#getPendingReplyCount()
382+
*/
383+
public void setPendingReplyProvider(PendingReplyProvider pendingReplyProvider) {
384+
this.pendingReplyProvider = pendingReplyProvider;
385+
}
386+
360387
/**
361388
* This property has several functions.
362389
* <p>
@@ -652,6 +679,8 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process
652679

653680
@Override
654681
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
682+
waitForPendingReplies();
683+
655684
Thread thread = this.containerStoppingForAbort.get();
656685
if (thread != null && !thread.equals(Thread.currentThread())) {
657686
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
@@ -732,6 +761,39 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
732761
}
733762
}
734763

764+
/**
765+
* Wait for pending replies if a pending reply provider is configured.
766+
*/
767+
private void waitForPendingReplies() {
768+
PendingReplyProvider provider = this.pendingReplyProvider;
769+
if (provider != null && getShutdownTimeout() > 0) {
770+
long deadline = System.currentTimeMillis() + getShutdownTimeout();
771+
try {
772+
while (isRunning() && System.currentTimeMillis() < deadline) {
773+
int pendingCount = provider.getPendingReplyCount();
774+
if (pendingCount <= 0) {
775+
if (logger.isInfoEnabled()) {
776+
logger.info("No pending replies detected, proceeding with shutdown.");
777+
}
778+
return;
779+
}
780+
if (logger.isInfoEnabled()) {
781+
logger.info("Waiting for " + pendingCount + " pending replies before final shutdown...");
782+
}
783+
Thread.sleep(this.pendingReplyCheckInterval);
784+
}
785+
int remaining = provider.getPendingReplyCount();
786+
if (remaining > 0) {
787+
logger.warn("Shutdown timeout expired, but " + remaining + " pending replies still remain.");
788+
}
789+
}
790+
catch (InterruptedException e) {
791+
Thread.currentThread().interrupt();
792+
logger.warn("Interrupted while waiting for pending replies.");
793+
}
794+
}
795+
}
796+
735797
private void runCallbackIfNotNull(@Nullable Runnable callback) {
736798
if (callback != null) {
737799
callback.run();
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.springframework.amqp.rabbit.listener.api;
2+
3+
/**
4+
* A functional interface to provide the number of pending replies,
5+
* used to delay listener container shutdown.
6+
*
7+
* @author Jeongjun Min
8+
* @since 4.0
9+
* @see org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPendingReplyProvider(PendingReplyProvider)
10+
*/
11+
@FunctionalInterface
12+
public interface PendingReplyProvider {
13+
14+
/**
15+
* Return the number of pending replies.
16+
* @return the number of pending replies.
17+
*/
18+
int getPendingReplyCount();
19+
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Objects;
3131
import java.util.Set;
32+
import java.util.concurrent.CompletableFuture;
3233
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.Executor;
3435
import java.util.concurrent.ExecutorService;
@@ -62,6 +63,7 @@
6263
import org.springframework.amqp.rabbit.connection.Connection;
6364
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
6465
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
66+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6567
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
6668
import org.springframework.amqp.utils.test.TestUtils;
6769
import org.springframework.aop.support.AopUtils;
@@ -799,4 +801,40 @@ public void execute(Runnable task) {
799801

800802
}
801803

804+
@Test
805+
@SuppressWarnings("unchecked")
806+
void testShutdownWithPendingReplies() {
807+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
808+
Connection connection = mock(Connection.class);
809+
Channel channel = mock(Channel.class);
810+
given(connectionFactory.createConnection()).willReturn(connection);
811+
given(connection.createChannel(false)).willReturn(channel);
812+
given(channel.isOpen()).willReturn(true);
813+
814+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
815+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
816+
container.setQueueNames("foo");
817+
container.setMessageListener(mock(MessageListener.class));
818+
819+
long shutdownTimeout = 2000L;
820+
long checkInterval = 500L;
821+
container.setShutdownTimeout(shutdownTimeout);
822+
container.setPendingReplyCheckInterval(checkInterval);
823+
container.setPendingReplyProvider(template::getPendingReplyCount);
824+
825+
Map<String, Object> replyHolder = (Map<String, Object>) ReflectionTestUtils.getField(template, "replyHolder");
826+
assertThat(replyHolder).isNotNull();
827+
replyHolder.put("foo", new CompletableFuture<Message>());
828+
829+
assertThat(template.getPendingReplyCount()).isEqualTo(1);
830+
831+
container.start();
832+
833+
long startTime = System.currentTimeMillis();
834+
container.stop();
835+
long stopDuration = System.currentTimeMillis() - startTime;
836+
837+
assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500);
838+
assertThat(template.getPendingReplyCount()).isEqualTo(1);
839+
}
802840
}

0 commit comments

Comments
 (0)