Skip to content

Commit 30efbf7

Browse files
committed
GH-3005: Fix SimpleMLC.killOrRestart for closed AC
Fixes: #3005 Issue link: #3005 The `SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.killOrRestart()` is also called during application context shutdown. At this moment we cannot emit events into an application context. Otherwise, it fails with: ``` Exception in thread "rabbitListenerExecutor1" org.springframework.beans.factory.BeanCreationNotAllowedException: Error creating bean with name 'refreshEventListener': Singleton bean creation not allowed while singletons of this factory are in destruction (Do not request a bean from a BeanFactory in a destroy method implementation!) ``` * Introduce `ObservableListenerContainer.isApplicationContextClosed()` and call it as additional condition in the `SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.killOrRestart()` before trying to emit `AsyncConsumerStoppedEvent` **Auto-cherry-pick to `3.2.x`** The fix for `3.1.x` requires a slightly different approach via `ContextClosedEvent`
1 parent b55010f commit 30efbf7

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ObservableListenerContainer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.beans.factory.DisposableBean;
2727
import org.springframework.context.ApplicationContext;
2828
import org.springframework.context.ApplicationContextAware;
29+
import org.springframework.context.ConfigurableApplicationContext;
2930
import org.springframework.util.ClassUtils;
3031

3132
/**
@@ -121,6 +122,12 @@ protected void checkObservation() {
121122
}
122123
}
123124

125+
126+
protected boolean isApplicationContextClosed() {
127+
return this.applicationContext instanceof ConfigurableApplicationContext configurableCtx
128+
&& configurableCtx.isClosed();
129+
}
130+
124131
@Override
125132
public void setBeanName(String beanName) {
126133
this.beanName = beanName;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
6565
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
6666
import org.springframework.amqp.support.ConsumerTagStrategy;
67+
import org.springframework.context.ApplicationEventPublisher;
6768
import org.springframework.core.log.LogMessage;
6869
import org.springframework.jmx.export.annotation.ManagedMetric;
6970
import org.springframework.jmx.support.MetricType;
@@ -807,7 +808,6 @@ protected void adjustConsumers(int deltaArg) {
807808
}
808809
}
809810

810-
811811
/**
812812
* Start up to delta consumers, limited by {@link #setMaxConcurrentConsumers(int)}.
813813
* @param delta the consumers to add.
@@ -875,7 +875,6 @@ private void considerAddingAConsumer() {
875875
}
876876
}
877877

878-
879878
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
880879
this.consumersLock.lock();
881880
try {
@@ -1272,7 +1271,6 @@ private final class AsyncMessageProcessingConsumer implements Runnable {
12721271

12731272
private boolean failedExclusive;
12741273

1275-
12761274
AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
12771275
this.consumer = consumer;
12781276
this.start = new CountDownLatch(1);
@@ -1556,8 +1554,9 @@ private void killOrRestart(boolean aborted) {
15561554
try {
15571555
this.consumer.stop();
15581556
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
1559-
if (getApplicationEventPublisher() != null) {
1560-
getApplicationEventPublisher().publishEvent(
1557+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
1558+
if (applicationEventPublisher != null && !isApplicationContextClosed()) {
1559+
applicationEventPublisher.publishEvent(
15611560
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
15621561
}
15631562
}

0 commit comments

Comments
 (0)