Skip to content

Commit c6236c3

Browse files
authored
Fix potential concurrent modification exception in DiscoveryNodeFilters (opensearch-project#19701)
While I haven't seen concurrency issues in DiscoveryNodeFilters, the included unit test (which fails without cloning the original filters) shows that there is potential for an exception. While in here, I also made some reads from volatile variables in FilterAllocationDecider more atomic to avoid potential null-pointer exceptions. Signed-off-by: Michael Froh <msfroh@apache.org>
1 parent 420f2e0 commit c6236c3

File tree

4 files changed

+77
-2
lines changed

4 files changed

+77
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
- Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets ([#19607](https://github.com/opensearch-project/OpenSearch/pull/19607))
3434
- Fix issue with updating core with a patch number other than 0 ([#19377](https://github.com/opensearch-project/OpenSearch/pull/19377))
3535
- [Java Agent] Allow JRT protocol URLs in protection domain extraction ([#19683](https://github.com/opensearch-project/OpenSearch/pull/19683))
36+
- Fix potential concurrent modification exception when updating allocation filters ([#19701])(https://github.com/opensearch-project/OpenSearch/pull/19701))
3637

3738
### Dependencies
3839
- Update to Gradle 9.1 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575))

server/src/main/java/org/opensearch/cluster/node/DiscoveryNodeFilters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static DiscoveryNodeFilters buildOrUpdateFromKeyValue(
9797
updated = new DiscoveryNodeFilters(opType, new HashMap<>());
9898
} else {
9999
assert opType == original.opType : "operation type should match with node filter parameter";
100-
updated = new DiscoveryNodeFilters(original.opType, original.filters);
100+
updated = new DiscoveryNodeFilters(original.opType, new HashMap<>(original.filters));
101101
}
102102
for (Map.Entry<String, String> entry : filters.entrySet()) {
103103
String[] values = Strings.tokenizeToStringArray(entry.getValue(), ",");

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
193193
@Override
194194
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
195195
Decision decision = shouldClusterFilter(node.node(), allocation);
196-
return decision != null && decision == Decision.NO ? decision : Decision.ALWAYS;
196+
return decision == Decision.NO ? decision : Decision.ALWAYS;
197197
}
198198

199199
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
@@ -258,6 +258,13 @@ private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, Ro
258258
}
259259

260260
private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
261+
// Copy values to local variables so we're not null-checking on volatile fields.
262+
// The value of a volatile field could change from non-null to null between the
263+
// check and its usage.
264+
DiscoveryNodeFilters clusterRequireFilters = this.clusterRequireFilters;
265+
DiscoveryNodeFilters clusterIncludeFilters = this.clusterIncludeFilters;
266+
DiscoveryNodeFilters clusterExcludeFilters = this.clusterExcludeFilters;
267+
261268
if (clusterRequireFilters != null) {
262269
if (clusterRequireFilters.match(node) == false) {
263270
return allocation.decision(

server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeFiltersTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@
4747
import java.util.HashMap;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.concurrent.ExecutionException;
51+
import java.util.concurrent.ExecutorService;
52+
import java.util.concurrent.Executors;
53+
import java.util.concurrent.Future;
54+
import java.util.concurrent.atomic.AtomicBoolean;
55+
import java.util.concurrent.atomic.AtomicReference;
5056

5157
import static java.util.Collections.emptyMap;
5258
import static java.util.Collections.emptySet;
@@ -354,6 +360,67 @@ public void testOpTypeMismatch() {
354360
}
355361
}
356362

363+
public void testConcurrentModification() {
364+
AtomicReference<DiscoveryNodeFilters> filters = new AtomicReference<>();
365+
AtomicBoolean keepRunning = new AtomicBoolean(true);
366+
int count = 200; // I can pretty reliably reproduce failures with 200 iterations, but not 150
367+
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
368+
// Thread 1: repeatedly update the filters
369+
Future<?> t1 = executor.submit(() -> {
370+
try {
371+
for (int i = 0; i < count && keepRunning.get(); i++) {
372+
Settings.Builder settingsBuilder = Settings.builder().put("xxx._id", "id" + i);
373+
if (i % 2 == 0) {
374+
settingsBuilder.put("xxx._name", "");
375+
} else {
376+
settingsBuilder.put("xxx._name", "name" + i);
377+
}
378+
DiscoveryNodeFilters newFilters = buildOrUpdateFromSettings(filters.get(), OR, "xxx.", settingsBuilder.build());
379+
filters.set(newFilters);
380+
try {
381+
Thread.sleep(1);
382+
} catch (InterruptedException e) {
383+
throw new RuntimeException(e);
384+
}
385+
}
386+
} finally {
387+
keepRunning.set(false);
388+
}
389+
});
390+
391+
// Thread 2: repeatedly read and match nodes against the filters
392+
Future<?> t2 = executor.submit(() -> {
393+
try {
394+
for (int i = 0; i < count && keepRunning.get(); i++) {
395+
DiscoveryNodeFilters currentFilters = filters.get();
396+
if (currentFilters != null) {
397+
DiscoveryNode node = new DiscoveryNode(
398+
"name" + i,
399+
"id" + i,
400+
buildNewFakeTransportAddress(),
401+
emptyMap(),
402+
emptySet(),
403+
Version.CURRENT
404+
);
405+
currentFilters.match(node);
406+
}
407+
try {
408+
Thread.sleep(1);
409+
} catch (InterruptedException e) {
410+
throw new RuntimeException(e);
411+
}
412+
}
413+
} finally {
414+
keepRunning.set(false);
415+
}
416+
});
417+
t2.get();
418+
t1.get();
419+
} catch (ExecutionException | InterruptedException e) {
420+
throw new RuntimeException(e);
421+
}
422+
}
423+
357424
private Settings shuffleSettings(Settings source) {
358425
Settings.Builder settings = Settings.builder();
359426
List<String> keys = new ArrayList<>(source.keySet());

0 commit comments

Comments
 (0)