Skip to content

Commit 203827a

Browse files
committed
GH-3005: Fix SimpleMLC.killOrRestart for closed AC
Fixes: #3005 Issue link: #3005 The `SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.killOrRestart()` is also called during application context shutdown. At this moment we cannot emit events into an application context. Otherwise, it fails with: ``` Exception in thread "rabbitListenerExecutor1" org.springframework.beans.factory.BeanCreationNotAllowedException: Error creating bean with name 'refreshEventListener': Singleton bean creation not allowed while singletons of this factory are in destruction (Do not request a bean from a BeanFactory in a destroy method implementation!) ``` * Introduce `ObservableListenerContainer.isApplicationContextClosed()` and call it as additional condition in the `SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.killOrRestart()` before trying to emit `AsyncConsumerStoppedEvent` # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ObservableListenerContainer.java
1 parent bba3d41 commit 203827a

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ObservableListenerContainer.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,19 +24,24 @@
2424
import org.springframework.beans.factory.DisposableBean;
2525
import org.springframework.context.ApplicationContext;
2626
import org.springframework.context.ApplicationContextAware;
27+
import org.springframework.context.ApplicationListener;
28+
import org.springframework.context.ConfigurableApplicationContext;
29+
import org.springframework.context.event.ContextClosedEvent;
2730
import org.springframework.lang.Nullable;
2831
import org.springframework.util.ClassUtils;
2932

3033
/**
3134
* @author Gary Russell
35+
* @author Artem Bilan
36+
*
3237
* @since 3.0.5
3338
*
3439
*/
3540
public abstract class ObservableListenerContainer extends RabbitAccessor
3641
implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean {
3742

3843
private static final boolean MICROMETER_PRESENT = ClassUtils.isPresent(
39-
"io.micrometer.core.instrument.MeterRegistry", AbstractMessageListenerContainer.class.getClassLoader());
44+
"io.micrometer.core.instrument.MeterRegistry", AbstractMessageListenerContainer.class.getClassLoader());
4045

4146
private ApplicationContext applicationContext;
4247

@@ -52,6 +57,8 @@ public abstract class ObservableListenerContainer extends RabbitAccessor
5257

5358
private String listenerId;
5459

60+
private volatile boolean contextClosed;
61+
5562
@Nullable
5663
protected final ApplicationContext getApplicationContext() {
5764
return this.applicationContext;
@@ -60,6 +67,9 @@ protected final ApplicationContext getApplicationContext() {
6067
@Override
6168
public final void setApplicationContext(ApplicationContext applicationContext) {
6269
this.applicationContext = applicationContext;
70+
if (applicationContext instanceof ConfigurableApplicationContext configurableApplicationContext) {
71+
configurableApplicationContext.addApplicationListener(new ApplicationClosedListener());
72+
}
6373
}
6474

6575
protected MicrometerHolder getMicrometerHolder() {
@@ -119,6 +129,10 @@ protected void checkObservation() {
119129
}
120130
}
121131

132+
protected boolean isApplicationContextClosed() {
133+
return this.contextClosed;
134+
}
135+
122136
@Override
123137
public void setBeanName(String beanName) {
124138
this.beanName = beanName;
@@ -152,4 +166,15 @@ public void destroy() {
152166
}
153167
}
154168

169+
private final class ApplicationClosedListener implements ApplicationListener<ContextClosedEvent> {
170+
171+
@Override
172+
public void onApplicationEvent(ContextClosedEvent event) {
173+
if (event.getApplicationContext().equals(ObservableListenerContainer.this.applicationContext)) {
174+
ObservableListenerContainer.this.contextClosed = true;
175+
}
176+
}
177+
178+
}
179+
155180
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
5858
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
5959
import org.springframework.amqp.support.ConsumerTagStrategy;
60+
import org.springframework.context.ApplicationEventPublisher;
6061
import org.springframework.core.log.LogMessage;
6162
import org.springframework.jmx.export.annotation.ManagedMetric;
6263
import org.springframework.jmx.support.MetricType;
@@ -797,7 +798,6 @@ protected void adjustConsumers(int deltaArg) {
797798
}
798799
}
799800

800-
801801
/**
802802
* Start up to delta consumers, limited by {@link #setMaxConcurrentConsumers(int)}.
803803
* @param delta the consumers to add.
@@ -862,7 +862,6 @@ private void considerAddingAConsumer() {
862862
}
863863
}
864864

865-
866865
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
867866
this.consumersLock.lock();
868867
try {
@@ -1262,7 +1261,6 @@ private final class AsyncMessageProcessingConsumer implements Runnable {
12621261

12631262
private boolean failedExclusive;
12641263

1265-
12661264
AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
12671265
this.consumer = consumer;
12681266
this.start = new CountDownLatch(1);
@@ -1527,8 +1525,9 @@ private void killOrRestart(boolean aborted) {
15271525
try {
15281526
this.consumer.stop();
15291527
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
1530-
if (getApplicationEventPublisher() != null) {
1531-
getApplicationEventPublisher().publishEvent(
1528+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
1529+
if (applicationEventPublisher != null && !isApplicationContextClosed()) {
1530+
applicationEventPublisher.publishEvent(
15321531
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
15331532
}
15341533
}

0 commit comments

Comments
 (0)