Skip to content

Commit 05f2320

Browse files
garyrussellartembilan
authored andcommitted
GH-1414: ConcurrentMLC Fix ConcurrentModification
Resolves #1414 - synchronize all access to `this.containers` - don't return an indirect reference to the field in `getContainers()` **cherry-pick to all supported (1.3.x will require a back port)** (cherry picked from commit 8090a09)
1 parent f07925e commit 05f2320

File tree

2 files changed

+34
-22
lines changed

2 files changed

+34
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
7676

7777
private final ContainerProperties containerProperties;
7878

79-
private final Object lifecycleMonitor = new Object();
79+
protected final Object lifecycleMonitor = new Object(); // NOSONAR
8080

8181
private String beanName;
8282

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,38 +93,46 @@ public void setConcurrency(int concurrency) {
9393
* this container.
9494
*/
9595
public List<KafkaMessageListenerContainer<K, V>> getContainers() {
96-
return Collections.unmodifiableList(this.containers);
96+
synchronized (this.lifecycleMonitor) {
97+
return Collections.unmodifiableList(new ArrayList<>(this.containers));
98+
}
9799
}
98100

99101
@Override
100102
public Collection<TopicPartition> getAssignedPartitions() {
101-
return this.containers.stream()
102-
.map(KafkaMessageListenerContainer::getAssignedPartitions)
103-
.filter(Objects::nonNull)
104-
.flatMap(Collection::stream)
105-
.collect(Collectors.toList());
103+
synchronized (this.lifecycleMonitor) {
104+
return this.containers.stream()
105+
.map(KafkaMessageListenerContainer::getAssignedPartitions)
106+
.filter(Objects::nonNull)
107+
.flatMap(Collection::stream)
108+
.collect(Collectors.toList());
109+
}
106110
}
107111

108112
@Override
109113
public boolean isContainerPaused() {
110-
boolean paused = isPaused();
111-
if (paused) {
112-
for (AbstractMessageListenerContainer<K, V> container : this.containers) {
113-
if (!container.isContainerPaused()) {
114-
return false;
114+
synchronized (this.lifecycleMonitor) {
115+
boolean paused = isPaused();
116+
if (paused) {
117+
for (AbstractMessageListenerContainer<K, V> container : this.containers) {
118+
if (!container.isContainerPaused()) {
119+
return false;
120+
}
115121
}
116122
}
123+
return paused;
117124
}
118-
return paused;
119125
}
120126

121127
@Override
122128
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
123-
Map<String, Map<MetricName, ? extends Metric>> metrics = new HashMap<>();
124-
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
125-
metrics.putAll(container.metrics());
129+
synchronized (this.lifecycleMonitor) {
130+
Map<String, Map<MetricName, ? extends Metric>> metrics = new HashMap<>();
131+
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
132+
metrics.putAll(container.metrics());
133+
}
134+
return Collections.unmodifiableMap(metrics);
126135
}
127-
return Collections.unmodifiableMap(metrics);
128136
}
129137

130138
/*
@@ -231,14 +239,18 @@ protected void doStop(final Runnable callback) {
231239

232240
@Override
233241
public void pause() {
234-
super.pause();
235-
this.containers.forEach(AbstractMessageListenerContainer::pause);
242+
synchronized (this.lifecycleMonitor) {
243+
super.pause();
244+
this.containers.forEach(AbstractMessageListenerContainer::pause);
245+
}
236246
}
237247

238248
@Override
239249
public void resume() {
240-
super.resume();
241-
this.containers.forEach(AbstractMessageListenerContainer::resume);
250+
synchronized (this.lifecycleMonitor) {
251+
super.resume();
252+
this.containers.forEach(AbstractMessageListenerContainer::resume);
253+
}
242254
}
243255

244256
@Override

0 commit comments

Comments
 (0)