Skip to content

Commit badcb8a

Browse files
committed
GH-1414: ConcurrentMLC Fix ConcurrentModification
Resolves #1414 - synchronize all access to `this.containers` - don't return an indirect reference to the field in `getContainers()` **cherry-pick to all supported (1.3.x will require a back port)**
1 parent 7f1268d commit badcb8a

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public enum AckMode {
105105

106106
private final ContainerProperties containerProperties;
107107

108-
private final Object lifecycleMonitor = new Object();
108+
protected final Object lifecycleMonitor = new Object(); // NOSONAR
109109

110110
private String beanName;
111111

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -92,16 +92,20 @@ public void setConcurrency(int concurrency) {
9292
* this container.
9393
*/
9494
public List<KafkaMessageListenerContainer<K, V>> getContainers() {
95-
return Collections.unmodifiableList(this.containers);
95+
synchronized (this.lifecycleMonitor) {
96+
return Collections.unmodifiableList(new ArrayList<>(this.containers));
97+
}
9698
}
9799

98100
@Override
99101
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
100-
Map<String, Map<MetricName, ? extends Metric>> metrics = new HashMap<>();
101-
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
102-
metrics.putAll(container.metrics());
102+
synchronized (this.lifecycleMonitor) {
103+
Map<String, Map<MetricName, ? extends Metric>> metrics = new HashMap<>();
104+
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
105+
metrics.putAll(container.metrics());
106+
}
107+
return Collections.unmodifiableMap(metrics);
103108
}
104-
return Collections.unmodifiableMap(metrics);
105109
}
106110

107111
/*

0 commit comments

Comments
 (0)