Skip to content

Commit 6dc4457

Browse files
committed
GH-3166: SMLC: synchronize on processorThreadsToInterrupt
Fixes: #3166 The `processorThreadsToInterrupt` is iterated in the `shutdownAndWaitOrCallback()`, and apparently one of the processors has finished successfully removing itself from the `processorThreadsToInterrupt` list. This leads to the `ConcurrentModificationException` on the mentioned above iteration. * Fix `SimpleMessageListenerContainer` making the `processorThreadsToInterrupt` as a `Collections.synchronizedList()` * Wrap `processorThreadsToInterrupt` iteration in the `shutdownAndWaitOrCallback()` with a `synchronized (this.processorThreadsToInterrupt)` **Auto-cherry-pick to `3.2.x`**
1 parent e1081de commit 6dc4457

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.List;
@@ -115,7 +116,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
115116

116117
private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<>();
117118

118-
private final List<Thread> processorThreadsToInterrupt = new ArrayList<>();
119+
private final List<Thread> processorThreadsToInterrupt = Collections.synchronizedList(new ArrayList<>());
119120

120121
private long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
121122

@@ -706,7 +707,9 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
706707
else {
707708
logger.info("Workers not finished.");
708709
if (isForceCloseChannel() || this.stopNow.get()) {
709-
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
710+
synchronized (this.processorThreadsToInterrupt) {
711+
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
712+
}
710713
canceledConsumers.forEach(consumer -> {
711714
if (logger.isWarnEnabled()) {
712715
logger.warn("Closing channel for unresponsive consumer: " + consumer);

0 commit comments

Comments
 (0)