Skip to content

Commit 1bc4ae4

Browse files
committed
Revert "GH-1436: Async Container Stop"
This reverts commit 5686416.
1 parent 5686416 commit 1bc4ae4

File tree

1 file changed

+39
-61
lines changed

1 file changed

+39
-61
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 39 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,7 +77,6 @@
7777
* @author Gary Russell
7878
* @author Artem Bilan
7979
* @author Alex Panchenko
80-
* @author Mat Jaggard
8180
*
8281
* @since 1.0
8382
*/
@@ -606,80 +605,59 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process
606605

607606
@Override
608607
protected void doShutdown() {
609-
shutdownAndWaitOrCallback(null);
610-
}
611-
612-
@Override
613-
public void stop(Runnable callback) {
614-
shutdownAndWaitOrCallback(callback);
615-
}
616-
617-
private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
618608
Thread thread = this.containerStoppingForAbort.get();
619609
if (thread != null && !thread.equals(Thread.currentThread())) {
620610
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
621611
return;
622612
}
623613

624-
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
625-
synchronized (this.consumersMonitor) {
626-
if (this.consumers != null) {
627-
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
628-
while (consumerIterator.hasNext()) {
629-
BlockingQueueConsumer consumer = consumerIterator.next();
630-
consumer.basicCancel(true);
631-
canceledConsumers.add(consumer);
632-
consumerIterator.remove();
633-
if (consumer.declaring) {
634-
consumer.thread.interrupt();
614+
try {
615+
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
616+
synchronized (this.consumersMonitor) {
617+
if (this.consumers != null) {
618+
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
619+
while (consumerIterator.hasNext()) {
620+
BlockingQueueConsumer consumer = consumerIterator.next();
621+
consumer.basicCancel(true);
622+
canceledConsumers.add(consumer);
623+
consumerIterator.remove();
624+
if (consumer.declaring) {
625+
consumer.thread.interrupt();
626+
}
635627
}
636628
}
637-
}
638-
else {
639-
logger.info("Shutdown ignored - container is already stopped");
640-
return;
641-
}
642-
}
643-
644-
Runnable awaitShutdown = () -> {
645-
logger.info("Waiting for workers to finish.");
646-
try {
647-
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
648-
if (finished) {
649-
logger.info("Successfully waited for workers to finish.");
650-
}
651629
else {
652-
logger.info("Workers not finished.");
653-
if (isForceCloseChannel()) {
654-
canceledConsumers.forEach(consumer -> {
655-
if (logger.isWarnEnabled()) {
656-
logger.warn("Closing channel for unresponsive consumer: " + consumer);
657-
}
658-
consumer.stop();
659-
});
660-
}
630+
logger.info("Shutdown ignored - container is already stopped");
631+
return;
661632
}
662633
}
663-
catch (InterruptedException e) {
664-
Thread.currentThread().interrupt();
665-
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
666-
}
667-
668-
synchronized (this.consumersMonitor) {
669-
this.consumers = null;
670-
this.cancellationLock.deactivate();
634+
logger.info("Waiting for workers to finish.");
635+
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
636+
if (finished) {
637+
logger.info("Successfully waited for workers to finish.");
671638
}
672-
673-
if (callback != null) {
674-
callback.run();
639+
else {
640+
logger.info("Workers not finished.");
641+
if (isForceCloseChannel()) {
642+
canceledConsumers.forEach(consumer -> {
643+
if (logger.isWarnEnabled()) {
644+
logger.warn("Closing channel for unresponsive consumer: " + consumer);
645+
}
646+
consumer.stop();
647+
});
648+
}
675649
}
676-
};
677-
if (callback == null) {
678-
awaitShutdown.run();
679650
}
680-
else {
681-
getTaskExecutor().execute(awaitShutdown);
651+
catch (InterruptedException e) {
652+
Thread.currentThread().interrupt();
653+
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
654+
}
655+
656+
synchronized (this.consumersMonitor) {
657+
this.consumers = null;
658+
this.cancellationLock.deactivate();
682659
}
660+
683661
}
684662

685663
private boolean isActive(BlockingQueueConsumer consumer) {

0 commit comments

Comments
 (0)