Skip to content

Commit 9ab03a2

Browse files
vikinghawkacogoluegnes
authored andcommitted
Fix for ConcurrentModificationException
(cherry picked from commit 925fa70)
1 parent eb59fba commit 9ab03a2

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.*;
1919
import com.rabbitmq.client.impl.AMQCommand;
2020
import com.rabbitmq.client.impl.recovery.Utils.IoTimeoutExceptionRunnable;
21+
import com.rabbitmq.utility.Utility;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

@@ -39,11 +40,11 @@ public class AutorecoveringChannel implements RecoverableChannel {
3940

4041
private volatile RecoveryAwareChannelN delegate;
4142
private volatile AutorecoveringConnection connection;
42-
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
43-
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<RecoveryListener>();
44-
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
45-
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
46-
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());
43+
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<>();
44+
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<>();
45+
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<>();
46+
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<>();
47+
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<>());
4748
private int prefetchCountConsumer;
4849
private int prefetchCountGlobal;
4950
private boolean usesPublisherConfirms;
@@ -100,7 +101,7 @@ private void executeAndClean(IoTimeoutExceptionRunnable callback) throws IOExcep
100101
try {
101102
callback.run();
102103
} finally {
103-
for (String consumerTag : consumerTags) {
104+
for (String consumerTag : Utility.copy(consumerTags)) {
104105
this.deleteRecordedConsumer(consumerTag);
105106
}
106107
this.connection.unregisterChannel(this);

src/main/java/com/rabbitmq/utility/Utility.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import java.util.ArrayList;
2121
import java.util.Collections;
2222
import java.util.LinkedHashMap;
23+
import java.util.LinkedHashSet;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Set;
2527

2628
/**
2729
* Catch-all holder class for static helper methods.
@@ -91,6 +93,21 @@ public static String makeStackTrace(Throwable throwable) {
9193
return text;
9294
}
9395

96+
/**
97+
* Synchronizes on the set and then returns a copy of the set that is safe to iterate over. Useful when wanting to do thread-safe iteration over
98+
* a Set wrapped in {@link Collections#synchronizedSet(Set)}.
99+
*
100+
* @param set
101+
* The set, which may not be {@code null}
102+
* @return LinkedHashSet copy of the list
103+
*/
104+
public static <E> Set<E> copy(final Set<E> set) {
105+
// No Sonar: this very list instance can be synchronized in other places of its owning class
106+
synchronized (set) { //NOSONAR
107+
return new LinkedHashSet<>(set);
108+
}
109+
}
110+
94111
/**
95112
* Synchronizes on the list and then returns a copy of the list that is safe to iterate over. Useful when wanting to do thread-safe iteration over
96113
* a List wrapped in {@link Collections#synchronizedList(List)}.
@@ -102,7 +119,7 @@ public static String makeStackTrace(Throwable throwable) {
102119
public static <E> List<E> copy(final List<E> list) {
103120
// No Sonar: this very list instance can be synchronized in other places of its owning class
104121
synchronized (list) { //NOSONAR
105-
return new ArrayList<E>(list);
122+
return new ArrayList<>(list);
106123
}
107124
}
108125

@@ -117,7 +134,7 @@ public static <E> List<E> copy(final List<E> list) {
117134
public static <K, V> Map<K, V> copy(final Map<K, V> map) {
118135
// No Sonar: this very map instance can be synchronized in other places of its owning class
119136
synchronized (map) { //NOSONAR
120-
return new LinkedHashMap<K, V>(map);
137+
return new LinkedHashMap<>(map);
121138
}
122139
}
123140
}

0 commit comments

Comments
 (0)