Skip to content

Commit bb08ce2

Browse files
timbqartembilan
authored andcommitted
GH-1561: Alwways run stop callback
Fixes #1563 * GH-1561 run callback when container is stopping for abort too * GH-1561 add author information **Cherry-pick to `2.4.x`**
1 parent 632a959 commit bb08ce2

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -79,6 +79,7 @@
7979
* @author Alex Panchenko
8080
* @author Mat Jaggard
8181
* @author Yansong Ren
82+
* @author Tim Bourquin
8283
*
8384
* @since 1.0
8485
*/
@@ -622,6 +623,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
622623
Thread thread = this.containerStoppingForAbort.get();
623624
if (thread != null && !thread.equals(Thread.currentThread())) {
624625
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
626+
runCallbackIfNotNull(callback);
625627
return;
626628
}
627629

@@ -641,6 +643,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
641643
}
642644
else {
643645
logger.info("Shutdown ignored - container is already stopped");
646+
runCallbackIfNotNull(callback);
644647
return;
645648
}
646649
}
@@ -674,9 +677,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
674677
this.cancellationLock.deactivate();
675678
}
676679

677-
if (callback != null) {
678-
callback.run();
679-
}
680+
runCallbackIfNotNull(callback);
680681
};
681682
if (callback == null) {
682683
awaitShutdown.run();
@@ -686,6 +687,12 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
686687
}
687688
}
688689

690+
private void runCallbackIfNotNull(@Nullable Runnable callback) {
691+
if (callback != null) {
692+
callback.run();
693+
}
694+
}
695+
689696
private boolean isActive(BlockingQueueConsumer consumer) {
690697
boolean consumerActive;
691698
synchronized (this.consumersMonitor) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -108,6 +108,7 @@
108108
* @author Artem Bilan
109109
* @author Mohammad Hewedy
110110
* @author Yansong Ren
111+
* @author Tim Bourquin
111112
*/
112113
public class SimpleMessageListenerContainerTests {
113114

@@ -431,6 +432,29 @@ protected void setUpMockCancel(Channel channel, final List<Consumer> consumers)
431432
}).given(channel).basicCancel(anyString());
432433
}
433434

435+
@Test
436+
public void testCallbackIsRunOnStopAlsoWhenNoConsumerIsActive() throws InterruptedException {
437+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
438+
439+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
440+
441+
final CountDownLatch countDownLatch = new CountDownLatch(1);
442+
container.stop(countDownLatch::countDown);
443+
assertThat(countDownLatch.await(100, TimeUnit.MILLISECONDS)).isTrue();
444+
}
445+
446+
@Test
447+
public void testCallbackIsRunOnStopAlsoWhenContainerIsStoppingForAbort() throws InterruptedException {
448+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
449+
450+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
451+
ReflectionTestUtils.setField(container, "containerStoppingForAbort", new AtomicReference<>(new Thread()));
452+
453+
final CountDownLatch countDownLatch = new CountDownLatch(1);
454+
container.stop(countDownLatch::countDown);
455+
assertThat(countDownLatch.await(100, TimeUnit.MILLISECONDS)).isTrue();
456+
}
457+
434458
@Test
435459
public void testWithConnectionPerListenerThread() throws Exception {
436460
com.rabbitmq.client.ConnectionFactory mockConnectionFactory =

0 commit comments

Comments
 (0)