Skip to content

Commit e1081de

Browse files
committed
GH-2920: SMLC: Interrupt ML on shutdown
Fixes: #2920 When the `SimpleMessageListenerContainer.shutdownTimeout` period is over, there is no reason to keep any related threads active, sometimes blocking the whole application stop. * Introduce a `List SimpleMessageListenerContainer.processorThreadsToInterrupt` to keep track of the scheduled `AsyncMessageProcessingConsumer`. * Interrupt those threads when `this.cancellationLock.await()` is not successful in the `SimpleMessageListenerContainer.shutdownAndWaitOrCallback()` **Auto-cherry-pick to `3.2.x`**
1 parent 8baaa76 commit e1081de

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
115115

116116
private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<>();
117117

118+
private final List<Thread> processorThreadsToInterrupt = new ArrayList<>();
119+
118120
private long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
119121

120122
private long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL;
@@ -704,6 +706,7 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
704706
else {
705707
logger.info("Workers not finished.");
706708
if (isForceCloseChannel() || this.stopNow.get()) {
709+
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
707710
canceledConsumers.forEach(consumer -> {
708711
if (logger.isWarnEnabled()) {
709712
logger.warn("Closing channel for unresponsive consumer: " + consumer);
@@ -1349,6 +1352,7 @@ public void run() { // NOSONAR - line count
13491352

13501353
try {
13511354
initialize();
1355+
SimpleMessageListenerContainer.this.processorThreadsToInterrupt.add(Thread.currentThread());
13521356
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
13531357
mainLoop();
13541358
}
@@ -1433,6 +1437,7 @@ public void run() { // NOSONAR - line count
14331437
}
14341438
}
14351439
finally {
1440+
SimpleMessageListenerContainer.this.processorThreadsToInterrupt.remove(Thread.currentThread());
14361441
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
14371442
if (getTransactionManager() != null) {
14381443
ConsumerChannelRegistry.unRegisterConsumerChannel();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.rabbitmq.client.AMQP.BasicProperties;
4141
import com.rabbitmq.client.Channel;
4242
import com.rabbitmq.client.Consumer;
43+
import com.rabbitmq.client.DefaultConsumer;
4344
import com.rabbitmq.client.Envelope;
4445
import com.rabbitmq.client.PossibleAuthenticationFailureException;
4546
import org.aopalliance.intercept.MethodInterceptor;
@@ -217,7 +218,7 @@ public void testTxSizeAcks() throws Exception {
217218
container.setMessageListener(messages::add);
218219
container.start();
219220
BasicProperties props = new BasicProperties();
220-
byte[] payload = "baz" .getBytes();
221+
byte[] payload = "baz".getBytes();
221222
Envelope envelope = new Envelope(1L, false, "foo", "bar");
222223
consumer.get().handleDelivery("1", envelope, props, payload);
223224
envelope = new Envelope(2L, false, "foo", "bar");
@@ -272,7 +273,7 @@ public void testTxSizeAcksWIthShortSet() throws Exception {
272273
container.afterPropertiesSet();
273274
container.start();
274275
BasicProperties props = new BasicProperties();
275-
byte[] payload = "baz" .getBytes();
276+
byte[] payload = "baz".getBytes();
276277
Envelope envelope = new Envelope(1L, false, "foo", "bar");
277278
consumer.get().handleDelivery(consumerTag, envelope, props, payload);
278279
envelope = new Envelope(2L, false, "foo", "bar");
@@ -756,6 +757,50 @@ void testShutdownWithPendingReplies() {
756757
assertThat(replyCounter.getCount()).isEqualTo(1);
757758
}
758759

760+
@Test
761+
@SuppressWarnings("unchecked")
762+
void listenerIsInterruptedOnUnsuccessfulShutdown() throws Exception {
763+
ConnectionFactory connectionFactory = mock();
764+
Connection connection = mock();
765+
Channel channel = mock();
766+
given(connectionFactory.createConnection()).willReturn(connection);
767+
given(connection.createChannel(false)).willReturn(channel);
768+
willAnswer(invocation -> "1")
769+
.given(channel)
770+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
771+
any(Consumer.class));
772+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
773+
container.setQueueNames("some_queue");
774+
775+
CountDownLatch handlingLatch = new CountDownLatch(1);
776+
CountDownLatch interruptedLatch = new CountDownLatch(1);
777+
container.setMessageListener(message -> {
778+
handlingLatch.countDown();
779+
try {
780+
Thread.sleep(2000L);
781+
}
782+
catch (InterruptedException e) {
783+
interruptedLatch.countDown();
784+
}
785+
});
786+
container.setShutdownTimeout(200L);
787+
container.start();
788+
789+
Set<BlockingQueueConsumer> consumers = TestUtils.getPropertyValue(container, "consumers", Set.class);
790+
BlockingQueueConsumer blockingQueueConsumer = consumers.iterator().next();
791+
792+
Map<String, DefaultConsumer> internalConsumers =
793+
TestUtils.getPropertyValue(blockingQueueConsumer, "consumers", Map.class);
794+
internalConsumers.values().iterator().next()
795+
.handleDelivery("1", new Envelope(1, false, "", ""), new BasicProperties(), new byte[] {1});
796+
797+
assertThat(handlingLatch.await(10, TimeUnit.SECONDS)).isTrue();
798+
799+
container.stop();
800+
801+
assertThat(interruptedLatch.await(10, TimeUnit.SECONDS)).isTrue();
802+
}
803+
759804
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
760805
final boolean cancel, final CountDownLatch latch) {
761806
return invocation -> {

0 commit comments

Comments
 (0)