Skip to content

Commit 11d2cf6

Browse files
Fix concurrent modification exception in ComponentRegistry (#4951) (#4954)
* Fix concurrent modification exception in ComponentRegistry * Reduce number of threads and iterations Co-authored-by: jack-berg <[email protected]> Co-authored-by: Jack Berg <[email protected]>
1 parent 5c77655 commit 11d2cf6

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

sdk/common/src/main/java/io/opentelemetry/sdk/internal/ComponentRegistry.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package io.opentelemetry.sdk.internal;
77

88
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.api.internal.GuardedBy;
910
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
11+
import java.util.ArrayList;
1012
import java.util.Collection;
1113
import java.util.Collections;
1214
import java.util.IdentityHashMap;
@@ -44,6 +46,9 @@ public final class ComponentRegistry<V> {
4446
private final Map<String, Map<String, Map<String, V>>> componentByNameVersionAndSchema =
4547
new ConcurrentHashMap<>();
4648

49+
private final Object lock = new Object();
50+
51+
@GuardedBy("lock")
4752
private final Set<V> allComponents = Collections.newSetFromMap(new IdentityHashMap<>());
4853

4954
private final Function<InstrumentationScopeInfo, V> factory;
@@ -109,7 +114,9 @@ public V get(
109114

110115
private V buildComponent(InstrumentationScopeInfo instrumentationScopeInfo) {
111116
V component = factory.apply(instrumentationScopeInfo);
112-
allComponents.add(component);
117+
synchronized (lock) {
118+
allComponents.add(component);
119+
}
113120
return component;
114121
}
115122

@@ -119,6 +126,8 @@ private V buildComponent(InstrumentationScopeInfo instrumentationScopeInfo) {
119126
* @return a {@code Collection} view of the registered components.
120127
*/
121128
public Collection<V> getComponents() {
122-
return Collections.unmodifiableCollection(allComponents);
129+
synchronized (lock) {
130+
return Collections.unmodifiableCollection(new ArrayList<>(allComponents));
131+
}
123132
}
124133
}

sdk/common/src/test/java/io/opentelemetry/sdk/internal/ComponentRegistryTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,18 @@
55

66
package io.opentelemetry.sdk.internal;
77

8+
import static java.util.stream.Collectors.joining;
89
import static org.assertj.core.api.Assertions.assertThat;
910

1011
import io.opentelemetry.api.common.Attributes;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.Random;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.Future;
19+
import java.util.stream.IntStream;
1120
import org.junit.jupiter.api.Test;
1221

1322
class ComponentRegistryTest {
@@ -52,5 +61,36 @@ void get_DifferentInstance() {
5261
.isNotSameAs(registry.get(NAME, null, null, Attributes.empty()));
5362
}
5463

64+
@Test
65+
@SuppressWarnings("ReturnValueIgnored")
66+
void getComponents_HighConcurrency() throws ExecutionException, InterruptedException {
67+
List<Future<?>> futures = new ArrayList<>();
68+
Random random = new Random();
69+
int concurrency = 2;
70+
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
71+
72+
try {
73+
for (int i = 0; i < 100; i++) {
74+
futures.add(
75+
executor.submit(
76+
() -> {
77+
String name =
78+
IntStream.range(0, 20)
79+
.mapToObj(unused -> String.valueOf((char) random.nextInt(26)))
80+
.collect(joining());
81+
registry.get(name, null, null, Attributes.empty());
82+
}));
83+
futures.add(
84+
executor.submit(() -> registry.getComponents().forEach(TestComponent::hashCode)));
85+
}
86+
87+
for (Future<?> future : futures) {
88+
future.get();
89+
}
90+
} finally {
91+
executor.shutdown();
92+
}
93+
}
94+
5595
private static final class TestComponent {}
5696
}

0 commit comments

Comments
 (0)