Skip to content

Commit e210c97

Browse files
committed
GH-3055: Fix DirectMLContainer for taskScheduler.shutdown()
Fixes: #3055 Issue link: #3057 The internal `DirectMessageListenerContainer.taskScheduler` is not destroyed when application context is closed. The current `doStop()` implementation is only called from `Lifecycle.stop()`. However, the application context calls the `SmartLifecycle.stop(Runnable)`. That one, in turn, in the `AbstractMessageListenerContainer` calls a `shutdown()`, which does not clean up `taskScheduler` in the `DirectMessageListenerContainer` implementation. In fact, this `shutdown()` is called from other volatile places in the `DirectMessageListenerContainer`, where assumption is that `taskScheduler` is active. Therefore, we cannot move `doStop()` extension into the `actualShutDown()` implementation. * Extract `cleanUpTaskScheduler()` method in the `DirectMessageListenerContainer` and call it from existing `doStop()`, from overridden `stop(Runnable)` and `destroy()` **Cherry-pick to `3.2.x` & `3.1.x`**
1 parent f8a3744 commit e210c97

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,14 +430,24 @@ protected void doStart() {
430430
}
431431

432432
@Override
433-
protected void doStop() {
434-
super.doStop();
433+
public void stop(Runnable callback) {
434+
super.stop(callback);
435+
cleanUpTaskScheduler();
436+
}
437+
438+
private void cleanUpTaskScheduler() {
435439
if (!this.taskSchedulerSet && this.taskScheduler != null) {
436440
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
437441
this.taskScheduler = null;
438442
}
439443
}
440444

445+
@Override
446+
protected void doStop() {
447+
super.doStop();
448+
cleanUpTaskScheduler();
449+
}
450+
441451
protected void actualStart() {
442452
this.aborted = false;
443453
this.hasStopped = false;
@@ -983,6 +993,12 @@ protected void consumerRemoved(SimpleConsumer consumer) {
983993
// default empty
984994
}
985995

996+
@Override
997+
public void destroy() {
998+
super.destroy();
999+
cleanUpTaskScheduler();
1000+
}
1001+
9861002
/**
9871003
* The consumer object.
9881004
*/

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Map;
2020
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.ScheduledExecutorService;
2122
import java.util.concurrent.TimeUnit;
2223

2324
import com.rabbitmq.client.AMQP.BasicProperties;
@@ -139,4 +140,26 @@ private void consumersCorrectlyCancelledOnShutdown(AbstractMessageListenerContai
139140
}
140141
}
141142

143+
@Test
144+
void directMessageListenerContainerShutdownsItsSchedulerOnStopWithCallback() {
145+
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
146+
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
147+
container.setConnectionFactory(cf);
148+
container.setQueueNames("test.shutdown");
149+
container.setMessageListener(m -> {
150+
});
151+
152+
container.start();
153+
154+
ScheduledExecutorService scheduledExecutorService =
155+
TestUtils.getPropertyValue(container, "taskScheduler.scheduledExecutor", ScheduledExecutorService.class);
156+
157+
container.stop(() -> {
158+
});
159+
160+
cf.destroy();
161+
162+
assertThat(scheduledExecutorService.isShutdown()).isTrue();
163+
}
164+
142165
}

0 commit comments

Comments
 (0)