Skip to content

Commit 80db8b1

Browse files
committed
AMQP-785: SMLC Lifecycle fixes
JIRA: https://jira.spring.io/browse/AMQP-785 Fixes: #689 - Only stop the container on one thread - Ignore concurrent stops - Interrupt consumer threads that are attempting to declare queues - In `restart()` don't start a new consumer if the container is stopping - Defer publishing consumer failure events until container is stopped - Add a RecoveryListener if needed to ensure channels are never recovered - Fix event publishing for `Error` - it is fatal __backport to 1.7.x will require work__ Clear the declaring flag when exiting `start()` with exception. Release the `activeObjectCounter` when interrupted while declaring. Polishing stopped container lifecycle Since restarted consumer is not be aware about stopped container, it can restart properly when RabbitMQ comes back on-line independently of the container state * Add `active` flag to the `ActiveObjectCounter` and `deactivate()` and `isActive()` hooks * Use `ActiveObjectCounter.deactivate()` in the container shutdown * Use `ActiveObjectCounter.isActive()` in the `BlockingQueueConsumer.cancelled()` * Use `BlockingQueueConsumer.cancelled()` in its `start()` toi check container activity before performing network job * Check `isActive()` state in the `AbstractMessageListenerContainer.shutdown()` * Remove `SimpleMessageListenerContainer.containerStopping` in favor of `isActive()` hook Conflicts: build.gradle spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ActiveObjectCounter.java spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java Resolved.
1 parent a08c07d commit 80db8b1

File tree

5 files changed

+156
-23
lines changed

5 files changed

+156
-23
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ subprojects { subproject ->
103103
rabbitmqHttpClientVersion = '1.1.1.RELEASE'
104104
slf4jVersion = "1.7.25"
105105

106-
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.3.11.RELEASE'
106+
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.3.13.RELEASE'
107107

108-
springRetryVersion = '1.2.0.RELEASE'
108+
springRetryVersion = '1.2.1.RELEASE'
109109
}
110110

111111
eclipse {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,12 +547,17 @@ public void initialize() {
547547
* Stop the shared Connection, call {@link #doShutdown()}, and close this container.
548548
*/
549549
public void shutdown() {
550-
logger.debug("Shutting down Rabbit listener container");
551550
synchronized (this.lifecycleMonitor) {
551+
if (!isActive()) {
552+
logger.info("Shutdown ignored - container is not active already");
553+
return;
554+
}
552555
this.active = false;
553556
this.lifecycleMonitor.notifyAll();
554557
}
555558

559+
logger.debug("Shutting down Rabbit listener container");
560+
556561
// Shut down the invokers.
557562
try {
558563
doShutdown();
@@ -602,6 +607,9 @@ public final boolean isActive() {
602607
*/
603608
@Override
604609
public void start() {
610+
if (isRunning()) {
611+
return;
612+
}
605613
if (!this.initialized) {
606614
synchronized (this.lifecycleMonitor) {
607615
if (!this.initialized) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ActiveObjectCounter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,19 @@
2424
import java.util.concurrent.TimeUnit;
2525

2626
/**
27+
* A mechanism to keep track of active objects.
28+
* @param <T> the object type.
29+
*
2730
* @author Dave Syer
31+
* @author Artem Bilan
2832
*
2933
*/
3034
public class ActiveObjectCounter<T> {
3135

3236
private final ConcurrentMap<T, CountDownLatch> locks = new ConcurrentHashMap<T, CountDownLatch>();
3337

38+
private volatile boolean active = true;
39+
3440
public void add(T object) {
3541
CountDownLatch lock = new CountDownLatch(1);
3642
this.locks.putIfAbsent(object, lock);
@@ -71,6 +77,15 @@ public int getCount() {
7177

7278
public void reset() {
7379
this.locks.clear();
80+
this.active = true;
81+
}
82+
83+
public void deactivate() {
84+
this.active = false;
85+
}
86+
87+
public boolean isActive() {
88+
return this.active;
7489
}
7590

7691
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@
6464
import com.rabbitmq.client.Channel;
6565
import com.rabbitmq.client.DefaultConsumer;
6666
import com.rabbitmq.client.Envelope;
67+
import com.rabbitmq.client.Recoverable;
68+
import com.rabbitmq.client.RecoveryListener;
6769
import com.rabbitmq.client.ShutdownSignalException;
70+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
6871
import com.rabbitmq.utility.Utility;
6972

7073
/**
@@ -79,7 +82,7 @@
7982
* @author Alex Panchenko
8083
* @author Johno Crawford
8184
*/
82-
public class BlockingQueueConsumer {
85+
public class BlockingQueueConsumer implements RecoveryListener {
8386

8487
private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
8588

@@ -150,6 +153,10 @@ public class BlockingQueueConsumer {
150153

151154
private volatile boolean normalCancel;
152155

156+
volatile Thread thread;
157+
158+
volatile boolean declaring;
159+
153160
/**
154161
* Create a consumer. The consumer must not attempt to use
155162
* the connection factory or communicate with the broker
@@ -424,7 +431,8 @@ protected boolean hasDelivery() {
424431

425432
protected boolean cancelled() {
426433
return this.cancelled.get() || (this.abortStarted > 0 &&
427-
this.abortStarted + this.shutdownTimeout > System.currentTimeMillis());
434+
this.abortStarted + this.shutdownTimeout > System.currentTimeMillis())
435+
|| !this.activeObjectCounter.isActive();
428436
}
429437

430438
/**
@@ -559,10 +567,14 @@ public void start() throws AmqpException {
559567
if (logger.isDebugEnabled()) {
560568
logger.debug("Starting consumer " + this);
561569
}
570+
571+
this.thread = Thread.currentThread();
572+
562573
try {
563574
this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
564575
this.transactional);
565576
this.channel = this.resourceHolder.getChannel();
577+
addRecoveryListener();
566578
}
567579
catch (AmqpAuthenticationException e) {
568580
throw new FatalListenerStartupException("Authentication failure", e);
@@ -573,7 +585,11 @@ public void start() throws AmqpException {
573585

574586
// mirrored queue might be being moved
575587
int passiveDeclareRetries = this.declarationRetries;
588+
this.declaring = true;
576589
do {
590+
if (cancelled()) {
591+
break;
592+
}
577593
try {
578594
attemptPassiveDeclarations();
579595
if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
@@ -589,7 +605,10 @@ public void start() throws AmqpException {
589605
Thread.sleep(this.failedDeclarationRetryInterval);
590606
}
591607
catch (InterruptedException e1) {
608+
this.declaring = false;
592609
Thread.currentThread().interrupt();
610+
this.activeObjectCounter.release(this);
611+
throw RabbitExceptionTranslator.convertRabbitAccessException(e1);
593612
}
594613
}
595614
}
@@ -602,15 +621,17 @@ else if (e.getFailedQueues().size() < this.queues.length) {
602621
this.lastRetryDeclaration = System.currentTimeMillis();
603622
}
604623
else {
624+
this.declaring = false;
605625
this.activeObjectCounter.release(this);
606626
throw new QueuesNotAvailableException("Cannot prepare queue for listener. "
607627
+ "Either the queue doesn't exist or the broker will not allow us to use it.", e);
608628
}
609629
}
610630
}
611-
while (passiveDeclareRetries-- > 0);
631+
while (passiveDeclareRetries-- > 0 && !cancelled());
632+
this.declaring = false;
612633

613-
if (!this.acknowledgeMode.isAutoAck()) {
634+
if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
614635
// Set basicQos before calling basicConsume (otherwise if we are not acking the broker
615636
// will send blocks of 100 messages)
616637
try {
@@ -624,9 +645,11 @@ else if (e.getFailedQueues().size() < this.queues.length) {
624645

625646

626647
try {
627-
for (String queueName : this.queues) {
628-
if (!this.missingQueues.contains(queueName)) {
629-
consumeFromQueue(queueName);
648+
if (!cancelled()) {
649+
for (String queueName : this.queues) {
650+
if (!this.missingQueues.contains(queueName)) {
651+
consumeFromQueue(queueName);
652+
}
630653
}
631654
}
632655
}
@@ -635,6 +658,19 @@ else if (e.getFailedQueues().size() < this.queues.length) {
635658
}
636659
}
637660

661+
/**
662+
* Add a listener if necessary so we can immediately close an autorecovered
663+
* channel if necessary since the async consumer will no longer exist.
664+
*/
665+
private void addRecoveryListener() {
666+
if (this.channel instanceof ChannelProxy) {
667+
if (((ChannelProxy) this.channel).getTargetChannel() instanceof AutorecoveringChannel) {
668+
((AutorecoveringChannel) ((ChannelProxy) this.channel).getTargetChannel())
669+
.addRecoveryListener(this);
670+
}
671+
}
672+
}
673+
638674
private void consumeFromQueue(String queue) throws IOException {
639675
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
640676
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal, this.exclusive,
@@ -810,6 +846,28 @@ public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
810846

811847
}
812848

849+
@Override
850+
public void handleRecovery(Recoverable recoverable) {
851+
// should never get here
852+
handleRecoveryStarted(recoverable);
853+
}
854+
855+
@Override
856+
public void handleRecoveryStarted(Recoverable recoverable) {
857+
if (logger.isDebugEnabled()) {
858+
logger.debug("Closing an autorecovered channel: " + recoverable);
859+
}
860+
try {
861+
((Channel) recoverable).close();
862+
}
863+
catch (IOException e) {
864+
logger.debug("Error closing an autorecovered channel");
865+
}
866+
catch (TimeoutException e) {
867+
logger.debug("Error closing an autorecovered channel");
868+
}
869+
}
870+
813871
@Override
814872
public String toString() {
815873
return "Consumer@" + ObjectUtils.getIdentityHexString(this) + ": "

0 commit comments

Comments
 (0)