Skip to content

Commit 275868e

Browse files
authored
GH-3031: Defer SMLC shutdown for pending replies
Fixes: #3031 This commit introduces a mechanism to delay the shutdown of a SimpleMessageListenerContainer if there are pending replies for request/reply operations. A new functional interface, `PendingReplyProvider`, is introduced and can be set on the container. `RabbitTemplate` now exposes a `getPendingReplyCount()` method to serve as this provider. When the provider is set, the container will wait up to the configured `shutdownTimeout` for the pending reply count to drop to zero before proceeding with the consumer cancellation. * Fix checkstyle violations Refactor the shutdown deferral mechanism based on pull request feedback: - Replace the custom `PendingReplyProvider` with the existing `ListenerContainerAware` interface and `ActiveObjectCounter`. - Move the waiting logic into the `awaitShutdown` runnable for better consistency with the existing shutdown process. * Refactor shutdown deferral logic based on review feedback - Move @nullable to type declaration - Use pattern matching for ListenerContainerAware - Skip waiting when replyCounter is null or count is zero - Extract shutdown wait logic into separate method - Add author tag and update whats-new.adoc * Refine test and update whats-new This commit applies the final polishing suggestions from the review. - The test case is refactored to be more robust and efficient. It now verifies the warning log message via a spy instead of relying on unstable timing assertions. The shutdown timeout is also reduced to speed up the build. - The `whats-new.adoc` document is updated with the title and description suggested in the review. * Refine shutdown deferral test for robustness Improve test robustness by increasing the await timeout and remove unnecessary cleanup code per review feedback. Signed-off-by: Jeongjun Min <[email protected]>
1 parent 5bcd01f commit 275868e

File tree

5 files changed

+98
-2
lines changed

5 files changed

+98
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
8585
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
8686
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
87+
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
8788
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
8889
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
8990
import org.springframework.amqp.rabbit.support.Delivery;
@@ -158,6 +159,7 @@
158159
* @author Alexey Platonov
159160
* @author Leonardo Ferreira
160161
* @author Ngoc Nhan
162+
* @author Jeongjun Min
161163
*
162164
* @since 1.0
163165
*/
@@ -189,6 +191,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
189191

190192
private final AtomicInteger activeTemplateCallbacks = new AtomicInteger();
191193

194+
private final ActiveObjectCounter<Object> pendingRepliesCounter = new ActiveObjectCounter<>();
195+
192196
private final ConcurrentMap<Channel, RabbitTemplate> publisherConfirmChannels = new ConcurrentHashMap<>();
193197

194198
private final Map<Object, PendingReply> replyHolder = new ConcurrentHashMap<>();
@@ -2071,6 +2075,7 @@ private DirectReplyToMessageListenerContainer createReplyToContainer(ConnectionF
20712075
messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
20722076
}
20732077
saveAndSetProperties(message, pendingReply, messageTag);
2078+
this.pendingRepliesCounter.add(pendingReply);
20742079
this.replyHolder.put(messageTag, pendingReply);
20752080
if (noCorrelation) {
20762081
this.replyHolder.put(channel, pendingReply);
@@ -2150,11 +2155,19 @@ private void saveAndSetProperties(final Message message, final PendingReply pend
21502155

21512156
/**
21522157
* Subclasses can implement this to be notified that a reply has timed out.
2158+
* The default implementation also releases the counter for pending replies.
2159+
* Subclasses should call {@code super.replyTimedOut(correlationId)} if they
2160+
* override this method and wish to maintain this behavior.
21532161
* @param correlationId the correlationId
21542162
* @since 2.1.2
21552163
*/
21562164
protected void replyTimedOut(@Nullable String correlationId) {
2157-
// NOSONAR
2165+
if (correlationId != null) {
2166+
Object pending = this.replyHolder.get(correlationId);
2167+
if (pending != null) {
2168+
this.pendingRepliesCounter.release(pending);
2169+
}
2170+
}
21582171
}
21592172

21602173
/**
@@ -2666,6 +2679,11 @@ public String getUUID() {
26662679
return this.uuid;
26672680
}
26682681

2682+
@Override
2683+
public ActiveObjectCounter<Object> getPendingReplyCounter() {
2684+
return this.pendingRepliesCounter;
2685+
}
2686+
26692687
@Override
26702688
public void onMessage(Message message, @Nullable Channel channel) {
26712689
if (logger.isTraceEnabled()) {
@@ -2696,6 +2714,7 @@ public void onMessage(Message message, @Nullable Channel channel) {
26962714
else {
26972715
restoreProperties(message, pendingReply);
26982716
pendingReply.reply(message);
2717+
this.pendingRepliesCounter.release(pendingReply);
26992718
}
27002719
}
27012720

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
* @author Jeonggi Kim
8888
* @author Java4ye
8989
* @author Thomas Badie
90+
* @author Jeongjun Min
9091
*
9192
* @since 1.0
9293
*/
@@ -692,6 +693,10 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
692693
Runnable awaitShutdown = () -> {
693694
logger.info("Waiting for workers to finish.");
694695
try {
696+
if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) {
697+
awaitPendingReplies(listenerContainerAware);
698+
}
699+
695700
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
696701
if (finished) {
697702
logger.info("Successfully waited for workers to finish.");
@@ -738,6 +743,22 @@ private void runCallbackIfNotNull(@Nullable Runnable callback) {
738743
}
739744
}
740745

746+
private void awaitPendingReplies(ListenerContainerAware listener) throws InterruptedException {
747+
ActiveObjectCounter<Object> replyCounter = listener.getPendingReplyCounter();
748+
749+
if (replyCounter != null && replyCounter.getCount() > 0) {
750+
if (logger.isInfoEnabled()) {
751+
logger.info("Waiting for pending replies: " + replyCounter.getCount());
752+
}
753+
if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
754+
if (logger.isWarnEnabled()) {
755+
logger.warn("Shutdown timeout expired, but " + replyCounter.getCount()
756+
+ " pending replies still remain.");
757+
}
758+
}
759+
}
760+
}
761+
741762
private boolean isActive(BlockingQueueConsumer consumer) {
742763
boolean consumerActive;
743764
this.consumersLock.lock();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* interface can have configuration verified during initialization.
2626
*
2727
* @author Gary Russell
28+
* @author Jeongjun Min
2829
* @since 1.5
2930
*
3031
*/
@@ -39,4 +40,12 @@ public interface ListenerContainerAware {
3940
@Nullable
4041
Collection<String> expectedQueueNames();
4142

43+
/**
44+
* Return a counter for pending replies, if any.
45+
* @return the counter, or null.
46+
* @since 4.0
47+
*/
48+
default @Nullable ActiveObjectCounter<Object> getPendingReplyCounter() {
49+
return null;
50+
}
4251
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@
6262
import org.springframework.amqp.rabbit.connection.Connection;
6363
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
6464
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
65+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6566
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
67+
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
6668
import org.springframework.amqp.utils.test.TestUtils;
6769
import org.springframework.aop.support.AopUtils;
6870
import org.springframework.beans.DirectFieldAccessor;
@@ -102,6 +104,7 @@
102104
* @author Yansong Ren
103105
* @author Tim Bourquin
104106
* @author Jeonggi Kim
107+
* @author Jeongjun Min
105108
*/
106109
public class SimpleMessageListenerContainerTests {
107110

@@ -716,6 +719,45 @@ void testWithConsumerStartWhenNotActive() {
716719
assertThat(start.getCount()).isEqualTo(0L);
717720
}
718721

722+
@Test
723+
@SuppressWarnings("unchecked")
724+
void testShutdownWithPendingReplies() {
725+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
726+
Connection connection = mock(Connection.class);
727+
Channel channel = mock(Channel.class);
728+
given(connectionFactory.createConnection()).willReturn(connection);
729+
given(connection.createChannel(false)).willReturn(channel);
730+
given(channel.isOpen()).willReturn(true);
731+
732+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
733+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
734+
container.setQueueNames("shutdown.test.queue");
735+
container.setMessageListener(template);
736+
737+
template.setReplyAddress(container.getQueueNames()[0]);
738+
739+
long shutdownTimeout = 500L;
740+
container.setShutdownTimeout(shutdownTimeout);
741+
742+
ActiveObjectCounter<Object> replyCounter = template.getPendingReplyCounter();
743+
assertThat(replyCounter).isNotNull();
744+
745+
Object pending = new Object();
746+
replyCounter.add(pending);
747+
assertThat(replyCounter.getCount()).isEqualTo(1);
748+
749+
Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class));
750+
new DirectFieldAccessor(container).setPropertyValue("logger", logger);
751+
752+
container.start();
753+
754+
container.stop();
755+
756+
await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
757+
verify(logger).warn("Shutdown timeout expired, but 1 pending replies still remain.")
758+
);
759+
}
760+
719761
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
720762
final boolean cancel, final CountDownLatch latch) {
721763
return invocation -> {

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,9 @@ There is no need to keep out-dated utilities and recommendation is to migrate to
3636
The Jackson 2 has been deprecated for removal in whole Spring portfolio.
3737
Respective new classes have been introduced to support Jackson 3.
3838

39-
See xref:amqp/message-converters.adoc[] for more information.
39+
See xref:amqp/message-converters.adoc[] for more information.
40+
41+
[[x40-smlc-changes]]
42+
=== MessageListenerContainer Changes
43+
44+
The `SimpleMessageListenerContainer` now awaits at most `shutdownTimeout` for pending replies from the provided `RabbitTemplate` listener on its shutdown.

0 commit comments

Comments
 (0)