Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.Delivery;
Expand Down Expand Up @@ -189,6 +190,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private final AtomicInteger activeTemplateCallbacks = new AtomicInteger();

private final ActiveObjectCounter<Object> pendingRepliesCounter = new ActiveObjectCounter<>();

private final ConcurrentMap<Channel, RabbitTemplate> publisherConfirmChannels = new ConcurrentHashMap<>();

private final Map<Object, PendingReply> replyHolder = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -2071,6 +2074,7 @@ private DirectReplyToMessageListenerContainer createReplyToContainer(ConnectionF
messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
}
saveAndSetProperties(message, pendingReply, messageTag);
this.pendingRepliesCounter.add(pendingReply);
this.replyHolder.put(messageTag, pendingReply);
if (noCorrelation) {
this.replyHolder.put(channel, pendingReply);
Expand Down Expand Up @@ -2150,11 +2154,19 @@ private void saveAndSetProperties(final Message message, final PendingReply pend

/**
* Subclasses can implement this to be notified that a reply has timed out.
* The default implementation also releases the counter for pending replies.
* Subclasses should call {@code super.replyTimedOut(correlationId)} if they
* override this method and wish to maintain this behavior.
* @param correlationId the correlationId
* @since 2.1.2
*/
protected void replyTimedOut(@Nullable String correlationId) {
// NOSONAR
if (correlationId != null) {
Object pending = this.replyHolder.get(correlationId);
if (pending != null) {
this.pendingRepliesCounter.release(pending);
}
}
}

/**
Expand Down Expand Up @@ -2666,6 +2678,11 @@ public String getUUID() {
return this.uuid;
}

@Override
public ActiveObjectCounter<Object> getPendingReplyCounter() {
return this.pendingRepliesCounter;
}

@Override
public void onMessage(Message message, @Nullable Channel channel) {
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -2696,6 +2713,7 @@ public void onMessage(Message message, @Nullable Channel channel) {
else {
restoreProperties(message, pendingReply);
pendingReply.reply(message);
this.pendingRepliesCounter.release(pendingReply);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,24 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
Runnable awaitShutdown = () -> {
logger.info("Waiting for workers to finish.");
try {
ActiveObjectCounter<Object> replyCounter = null;
Object listener = getMessageListener();
if (listener instanceof ListenerContainerAware) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just do this if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) {
Then use that pattern matching variable in the block.

replyCounter = ((ListenerContainerAware) listener).getPendingReplyCounter();
}

if (replyCounter != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be inside of the previous if.
Plus I believe this still has to be conditional on the replyCounter.getCount() > 0.
No reason to log anything and wait if there is really nothing to wait for.

And we probably can just extract this into a separate method to this code easier to read.

if (logger.isInfoEnabled()) {
logger.info("Waiting for pending replies: " + replyCounter.getCount());
}
if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
if (logger.isWarnEnabled()) {
logger.warn("Shutdown timeout expired, but " + replyCounter.getCount()
+ " pending replies still remain.");
}
}
}

boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ public interface ListenerContainerAware {
@Nullable
Collection<String> expectedQueueNames();

/**
* Return a counter for pending replies, if any.
* @return the counter, or null.
* @since 4.0
*/
@Nullable
default ActiveObjectCounter<Object> getPendingReplyCounter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This annotation has to be next to the type, not method itself.
Therefore this style is preferred:

default @Nullable ActiveObjectCounter<Object>

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.DirectFieldAccessor;
Expand Down Expand Up @@ -716,6 +718,45 @@ void testWithConsumerStartWhenNotActive() {
assertThat(start.getCount()).isEqualTo(0L);
}

@Test
@SuppressWarnings("unchecked")
void testShutdownWithPendingReplies() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
given(connectionFactory.createConnection()).willReturn(connection);
given(connection.createChannel(false)).willReturn(channel);
given(channel.isOpen()).willReturn(true);

RabbitTemplate template = new RabbitTemplate(connectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("shutdown.test.queue");
container.setMessageListener(template);

template.setReplyAddress(container.getQueueNames()[0]);

long shutdownTimeout = 2000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to wait for "never reply" so long.
We deliberately aware that reply never comes back to us in this testing scenario.
So, what is the point to block this test for those 2 seconds?
Now imaging that every single 3000 tests in the project is going to block for similar time.
That would be a disaster for development feedback loop.

I think we should make like 500 millis at most.

container.setShutdownTimeout(shutdownTimeout);

ActiveObjectCounter<Object> replyCounter =
(ActiveObjectCounter<Object>) ReflectionTestUtils.getField(template, "pendingRepliesCounter");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to use a reflection here since RabbitTemplate provides for us already getPendingReplyCounter().

assertThat(replyCounter).isNotNull();

Object pending = new Object();
replyCounter.add(pending);
assertThat(replyCounter.getCount()).isEqualTo(1);

container.start();

long startTime = System.currentTimeMillis();
container.stop();
long stopDuration = System.currentTimeMillis() - startTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timing is always problematic for testing.

Let's see if we can spy on the logger of the SimpleMessageListenerContainer and await().untilAsserted() for the new warn log message you have introduced in this change!


replyCounter.release(pending);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this call makes any difference or sense for the test logic.
Therefore let's remove altogether to avoid extra reading noise!


assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500);
}

private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
final boolean cancel, final CountDownLatch latch) {
return invocation -> {
Expand Down