Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.Delivery;
Expand Down Expand Up @@ -158,6 +159,7 @@
* @author Alexey Platonov
* @author Leonardo Ferreira
* @author Ngoc Nhan
* @author Jeongjun Min
*
* @since 1.0
*/
Expand Down Expand Up @@ -189,6 +191,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private final AtomicInteger activeTemplateCallbacks = new AtomicInteger();

private final ActiveObjectCounter<Object> pendingRepliesCounter = new ActiveObjectCounter<>();

private final ConcurrentMap<Channel, RabbitTemplate> publisherConfirmChannels = new ConcurrentHashMap<>();

private final Map<Object, PendingReply> replyHolder = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -2071,6 +2075,7 @@ private DirectReplyToMessageListenerContainer createReplyToContainer(ConnectionF
messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
}
saveAndSetProperties(message, pendingReply, messageTag);
this.pendingRepliesCounter.add(pendingReply);
this.replyHolder.put(messageTag, pendingReply);
if (noCorrelation) {
this.replyHolder.put(channel, pendingReply);
Expand Down Expand Up @@ -2150,11 +2155,19 @@ private void saveAndSetProperties(final Message message, final PendingReply pend

/**
* Subclasses can implement this to be notified that a reply has timed out.
* The default implementation also releases the counter for pending replies.
* Subclasses should call {@code super.replyTimedOut(correlationId)} if they
* override this method and wish to maintain this behavior.
* @param correlationId the correlationId
* @since 2.1.2
*/
protected void replyTimedOut(@Nullable String correlationId) {
// NOSONAR
if (correlationId != null) {
Object pending = this.replyHolder.get(correlationId);
if (pending != null) {
this.pendingRepliesCounter.release(pending);
}
}
}

/**
Expand Down Expand Up @@ -2666,6 +2679,11 @@ public String getUUID() {
return this.uuid;
}

@Override
public ActiveObjectCounter<Object> getPendingReplyCounter() {
return this.pendingRepliesCounter;
}

@Override
public void onMessage(Message message, @Nullable Channel channel) {
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -2696,6 +2714,7 @@ public void onMessage(Message message, @Nullable Channel channel) {
else {
restoreProperties(message, pendingReply);
pendingReply.reply(message);
this.pendingRepliesCounter.release(pendingReply);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
* @author Jeonggi Kim
* @author Java4ye
* @author Thomas Badie
* @author Jeongjun Min
*
* @since 1.0
*/
Expand Down Expand Up @@ -692,6 +693,10 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
Runnable awaitShutdown = () -> {
logger.info("Waiting for workers to finish.");
try {
if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) {
awaitPendingReplies(listenerContainerAware);
}

boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
Expand Down Expand Up @@ -738,6 +743,22 @@ private void runCallbackIfNotNull(@Nullable Runnable callback) {
}
}

private void awaitPendingReplies(ListenerContainerAware listener) throws InterruptedException {
ActiveObjectCounter<Object> replyCounter = listener.getPendingReplyCounter();

if (replyCounter != null && replyCounter.getCount() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Waiting for pending replies: " + replyCounter.getCount());
}
if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
if (logger.isWarnEnabled()) {
logger.warn("Shutdown timeout expired, but " + replyCounter.getCount()
+ " pending replies still remain.");
}
}
}
}

private boolean isActive(BlockingQueueConsumer consumer) {
boolean consumerActive;
this.consumersLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* interface can have configuration verified during initialization.
*
* @author Gary Russell
* @author Jeongjun Min
* @since 1.5
*
*/
Expand All @@ -39,4 +40,12 @@ public interface ListenerContainerAware {
@Nullable
Collection<String> expectedQueueNames();

/**
* Return a counter for pending replies, if any.
* @return the counter, or null.
* @since 4.0
*/
default @Nullable ActiveObjectCounter<Object> getPendingReplyCounter() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.DirectFieldAccessor;
Expand Down Expand Up @@ -102,6 +104,7 @@
* @author Yansong Ren
* @author Tim Bourquin
* @author Jeonggi Kim
* @author Jeongjun Min
*/
public class SimpleMessageListenerContainerTests {

Expand Down Expand Up @@ -716,6 +719,47 @@ void testWithConsumerStartWhenNotActive() {
assertThat(start.getCount()).isEqualTo(0L);
}

@Test
@SuppressWarnings("unchecked")
void testShutdownWithPendingReplies() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
given(connectionFactory.createConnection()).willReturn(connection);
given(connection.createChannel(false)).willReturn(channel);
given(channel.isOpen()).willReturn(true);

RabbitTemplate template = new RabbitTemplate(connectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("shutdown.test.queue");
container.setMessageListener(template);

template.setReplyAddress(container.getQueueNames()[0]);

long shutdownTimeout = 500L;
container.setShutdownTimeout(shutdownTimeout);

ActiveObjectCounter<Object> replyCounter = template.getPendingReplyCounter();
assertThat(replyCounter).isNotNull();

Object pending = new Object();
replyCounter.add(pending);
assertThat(replyCounter.getCount()).isEqualTo(1);

Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class));
new DirectFieldAccessor(container).setPropertyValue("logger", logger);

container.start();

container.stop();

await().atMost(Duration.ofMillis(500)).untilAsserted(() ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those max 30 seconds by default is fine with us in our tests.
It is really not going to be blocked that long.
Might really maximum those 500 millis.
However, if we have it as exact our given time, there is no guarantee that CPU resources will be available for us in time to satisfy this expectation.

verify(logger).warn("Shutdown timeout expired, but 1 pending replies still remain.")
);

replyCounter.release(pending);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this call makes any difference or sense for the test logic.
Therefore let's remove altogether to avoid extra reading noise!

}

private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
final boolean cancel, final CountDownLatch latch) {
return invocation -> {
Expand Down
7 changes: 6 additions & 1 deletion src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ There is no need to keep out-dated utilities and recommendation is to migrate to
The Jackson 2 has been deprecated for removal in whole Spring portfolio.
Respective new classes have been introduced to support Jackson 3.

See xref:amqp/message-converters.adoc[] for more information.
See xref:amqp/message-converters.adoc[] for more information.

[[x40-smlc-changes]]
=== MessageListenerContainer Changes

The `SimpleMessageListenerContainer` now awaits at most `shutdownTimeout` for pending replies from the provided `RabbitTemplate` listener on its shutdown.