Skip to content

Commit 684894a

Browse files
authored
SOLR-18146: Fix race in CircuitBreakerRegistry (#4189)
Global CB loading is now only done by a single thread, and skipped by whichever threads "lose" that race on startup.
1 parent c481b0a commit 684894a

File tree

5 files changed

+187
-13
lines changed

5 files changed

+187
-13
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
title: Fix race conditions in "global" CircuitBreaker registration
2+
type: fixed
3+
authors:
4+
- name: Jason Gerlowski
5+
links:
6+
- name: SOLR-18146
7+
url: https://issues.apache.org/jira/browse/SOLR-18146

solr/core/src/java/org/apache/solr/util/circuitbreaker/CircuitBreakerRegistry.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Set;
3030
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.CopyOnWriteArrayList;
3132
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.regex.Matcher;
3334
import java.util.regex.Pattern;
@@ -50,9 +51,6 @@
5051
*/
5152
public class CircuitBreakerRegistry implements Closeable {
5253
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
53-
private final Map<SolrRequestType, List<CircuitBreaker>> circuitBreakerMap = new HashMap<>();
54-
private static final Map<SolrRequestType, List<CircuitBreaker>> globalCircuitBreakerMap =
55-
new ConcurrentHashMap<>();
5654
private static final Pattern SYSPROP_REGEX =
5755
Pattern.compile("solr.circuitbreaker\\.(update|query)\\.(cpu|mem|loadavg)");
5856
public static final String SYSPROP_PREFIX = "solr.circuitbreaker.";
@@ -64,6 +62,12 @@ public class CircuitBreakerRegistry implements Closeable {
6462
public static final String SYSPROP_QUERY_LOADAVG = SYSPROP_PREFIX + "query.loadavg";
6563
public static final String SYSPROP_WARN_ONLY_SUFFIX = ".warnonly";
6664

65+
private static boolean globalsInitialized = false;
66+
private static final Map<SolrRequestType, List<CircuitBreaker>> globalCircuitBreakerMap =
67+
new ConcurrentHashMap<>();
68+
69+
private final Map<SolrRequestType, List<CircuitBreaker>> circuitBreakerMap = new HashMap<>();
70+
6771
public CircuitBreakerRegistry(CoreContainer coreContainer) {
6872
initGlobal(coreContainer);
6973
}
@@ -118,7 +122,8 @@ private static String buildCircuitBreakerKey(String metricType, String threshold
118122
return metricType + ":" + System.getProperty(thresholdProp) + ":" + warnOnly;
119123
}
120124

121-
private static void initGlobal(CoreContainer coreContainer) {
125+
private static synchronized void initGlobal(CoreContainer coreContainer) {
126+
if (globalsInitialized) return;
122127
final var parsedBreakers = parseCircuitBreakersFromProperties(coreContainer);
123128
for (CircuitBreaker breaker : parsedBreakers) {
124129
registerGlobal(breaker);
@@ -129,6 +134,7 @@ private static void initGlobal(CoreContainer coreContainer) {
129134
breaker.getRequestTypes());
130135
}
131136
}
137+
globalsInitialized = true;
132138
}
133139

134140
/** List all registered circuit breakers for global context */
@@ -165,7 +171,7 @@ public static void registerGlobal(CircuitBreaker circuitBreaker) {
165171
.forEach(
166172
r -> {
167173
List<CircuitBreaker> list =
168-
globalCircuitBreakerMap.computeIfAbsent(r, k -> new ArrayList<>());
174+
globalCircuitBreakerMap.computeIfAbsent(r, k -> new CopyOnWriteArrayList<>());
169175
list.add(circuitBreaker);
170176
});
171177
}
@@ -235,14 +241,13 @@ public void close() throws IOException {
235241
}
236242
}
237243

238-
private static void closeGlobal() {
239-
synchronized (globalCircuitBreakerMap) {
240-
closeCircuitBreakers(
241-
globalCircuitBreakerMap.values().stream()
242-
.flatMap(List::stream)
243-
.collect(Collectors.toList()));
244-
globalCircuitBreakerMap.clear();
245-
}
244+
private static synchronized void closeGlobal() {
245+
closeCircuitBreakers(
246+
globalCircuitBreakerMap.values().stream()
247+
.flatMap(List::stream)
248+
.collect(Collectors.toList()));
249+
globalCircuitBreakerMap.clear();
250+
globalsInitialized = false;
246251
}
247252

248253
/**

solr/core/src/test/org/apache/solr/util/TestGlobalCircuitBreaker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public static void setUpClass() throws Exception {
3434
System.setProperty("queryResultCache.enabled", "false");
3535
System.setProperty("documentCache.enabled", "true");
3636

37+
// Deregister existing CBs so that they're re-populated on the next core load.
38+
CircuitBreakerRegistry.deregisterGlobal();
39+
3740
// Set a global update breaker for a low CPU, which will trip during indexing
3841
System.setProperty(CircuitBreakerRegistry.SYSPROP_UPDATE_LOADAVG, "0.1");
3942

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.solr.util.circuitbreaker;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.Set;
24+
import java.util.concurrent.BrokenBarrierException;
25+
import java.util.concurrent.CyclicBarrier;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Future;
28+
import org.apache.solr.SolrTestCaseJ4;
29+
import org.apache.solr.common.util.ExecutorUtil;
30+
import org.apache.solr.common.util.SolrNamedThreadFactory;
31+
import org.junit.After;
32+
import org.junit.Test;
33+
34+
/**
35+
* Attempts to reproduce the race condition described in SOLR-18146
36+
*
37+
* <p>The race: {@link CircuitBreakerRegistry} relies on a static map of global (i.e.
38+
* non-core-specific) CircuitBreakers. If Solr was loading multiple cores on startup, these
39+
* concurrent core-loads could race when initializing this static state leading to
40+
* ConcurrentModificationExceptions, ArrayIndexOutOfBoundsExceptions, dropped circuit breaker
41+
* entries, etc.
42+
*
43+
* <p>These issues should be resolved by SOLR-18146, but the test remains to catch them if they
44+
* recur. Often useful to run in larger iterations with {@code -Ptests.iters=20}.
45+
*/
46+
public class TestGlobalCircuitBreakerRegistration extends SolrTestCaseJ4 {
47+
48+
@After
49+
public void cleanup() {
50+
CircuitBreakerRegistry.deregisterGlobal();
51+
}
52+
53+
@Test
54+
public void testConcurrentGlobalRegistrationDoesNotThrow() throws Exception {
55+
final int threadCount = 50;
56+
final int itersPerThread = 20;
57+
final int total = threadCount * itersPerThread;
58+
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
59+
final List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
60+
61+
// Pre-create all circuit breakers before the barrier to make the race as "hot" as possible
62+
List<List<CircuitBreaker>> breakersPerThread = new ArrayList<>(threadCount);
63+
for (int t = 0; t < threadCount; t++) {
64+
List<CircuitBreaker> threadBreakers = new ArrayList<>(itersPerThread);
65+
for (int i = 0; i < itersPerThread; i++) {
66+
CircuitBreaker b =
67+
new CircuitBreaker() {
68+
@Override
69+
public boolean isTripped() {
70+
return false;
71+
}
72+
73+
@Override
74+
public String getErrorMessage() {
75+
return "test";
76+
}
77+
};
78+
b.setRequestTypes(List.of("query"));
79+
threadBreakers.add(b);
80+
}
81+
breakersPerThread.add(threadBreakers);
82+
}
83+
84+
ExecutorService executor =
85+
ExecutorUtil.newMDCAwareFixedThreadPool(
86+
threadCount, new SolrNamedThreadFactory("test-concurrent-cb-registration"));
87+
try {
88+
List<Future<?>> futures = new ArrayList<>();
89+
for (int t = 0; t < threadCount; t++) {
90+
final List<CircuitBreaker> myBreakers = breakersPerThread.get(t);
91+
futures.add(
92+
executor.submit(
93+
() -> {
94+
try {
95+
// Release all threads simultaneously to maximize contention on
96+
// the shared ArrayList for the QUERY key in globalCircuitBreakerMap.
97+
barrier.await();
98+
for (CircuitBreaker b : myBreakers) {
99+
CircuitBreakerRegistry.registerGlobal(b);
100+
}
101+
} catch (BrokenBarrierException | InterruptedException e) {
102+
Thread.currentThread().interrupt();
103+
} catch (Throwable ex) {
104+
// Captures ArrayIndexOutOfBoundsException (or similar) from
105+
// concurrent ArrayList.add() when the race is triggered.
106+
failures.add(ex);
107+
}
108+
}));
109+
}
110+
for (Future<?> f : futures) {
111+
f.get();
112+
}
113+
} finally {
114+
ExecutorUtil.shutdownAndAwaitTermination(executor);
115+
}
116+
117+
assertTrue(
118+
"Exceptions thrown during concurrent registerGlobal: " + failures, failures.isEmpty());
119+
120+
// Even without a thrown exception the backing ArrayList can be silently
121+
// corrupted: concurrent adds may produce null slots or overwrite each other,
122+
// losing registrations. Verify the invariant that all registered breakers are
123+
// non-null and that none were lost.
124+
Set<CircuitBreaker> registered = CircuitBreakerRegistry.listGlobal();
125+
assertFalse(
126+
"globalCircuitBreakerMap contains null entries — ArrayList corrupted by concurrent add()",
127+
registered.contains(null));
128+
assertEquals(
129+
"Expected "
130+
+ total
131+
+ " registered circuit breakers but found "
132+
+ registered.size()
133+
+ " — data was lost due to concurrent ArrayList.add() corruption",
134+
total,
135+
registered.size());
136+
}
137+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* Tests for {@link org.apache.solr.util.circuitbreaker.CircuitBreakerRegistry} and other CB
20+
* functionality.
21+
*/
22+
package org.apache.solr.util.circuitbreaker;

0 commit comments

Comments
 (0)