Skip to content

Commit d6ad02c

Browse files
authored
Merge branch 'main' into fix/128221
2 parents ddac429 + 0695b15 commit d6ad02c

File tree

8 files changed

+138
-55
lines changed

8 files changed

+138
-55
lines changed

docs/changelog/131937.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131937
2+
summary: Fix race condition in `RemoteClusterService.collectNodes()`
3+
area: Distributed
4+
type: bug
5+
issues: []

gradle/verification-metadata.xml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,24 +2150,24 @@
21502150
<sha256 value="5c8551990307a032336d98ddaed549a39a689f07d4d4c6b950601bf22b3d6a1b" origin="Generated by Gradle"/>
21512151
</artifact>
21522152
</component>
2153-
<component group="org.apache.arrow" name="arrow-format" version="16.1.0">
2154-
<artifact name="arrow-format-16.1.0.jar">
2155-
<sha256 value="ad97e0fc72e193b1de3cbce4818d1ff16e81673fd523d001e8d2774bde40ee6c" origin="Generated by Gradle"/>
2153+
<component group="org.apache.arrow" name="arrow-format" version="18.3.0">
2154+
<artifact name="arrow-format-18.3.0.jar">
2155+
<sha256 value="728297d7757d192053ce071cc59e76c8a03ff4f5e430177fa97e96b29dd2de1b" origin="Generated by Gradle"/>
21562156
</artifact>
21572157
</component>
2158-
<component group="org.apache.arrow" name="arrow-memory-core" version="16.1.0">
2159-
<artifact name="arrow-memory-core-16.1.0.jar">
2160-
<sha256 value="da7af1a1a899bd5a1b6c71284243b9f3c0e1098f0cb10cd7be4b8b455ced79dd" origin="Generated by Gradle"/>
2158+
<component group="org.apache.arrow" name="arrow-memory-core" version="18.3.0">
2159+
<artifact name="arrow-memory-core-18.3.0.jar">
2160+
<sha256 value="32ed0719bf2ba42becc3a88c95722851cf1bb88b37d06c95654a1152bed6ef2e" origin="Generated by Gradle"/>
21612161
</artifact>
21622162
</component>
2163-
<component group="org.apache.arrow" name="arrow-memory-unsafe" version="16.1.0">
2164-
<artifact name="arrow-memory-unsafe-16.1.0.jar">
2165-
<sha256 value="6534eded25f2c30593416a294c1047f0b017baa9906d98f6f3270737b076c745" origin="Generated by Gradle"/>
2163+
<component group="org.apache.arrow" name="arrow-memory-unsafe" version="18.3.0">
2164+
<artifact name="arrow-memory-unsafe-18.3.0.jar">
2165+
<sha256 value="e9652f322d7be306c400ea0d775b3114a11b48d2afe23e8b8566f13c469d5483" origin="Generated by Gradle"/>
21662166
</artifact>
21672167
</component>
2168-
<component group="org.apache.arrow" name="arrow-vector" version="16.1.0">
2169-
<artifact name="arrow-vector-16.1.0.jar">
2170-
<sha256 value="c5837b3aa24dfd93759f57bc5759b9a8fbb5bf3912d55994d70cabb904436aab" origin="Generated by Gradle"/>
2168+
<component group="org.apache.arrow" name="arrow-vector" version="18.3.0">
2169+
<artifact name="arrow-vector-18.3.0.jar">
2170+
<sha256 value="b37eda92daccaffc12abf5ed425db1d0bdb3edea150a6ca856f1cf4292442299" origin="Generated by Gradle"/>
21712171
</artifact>
21722172
</component>
21732173
<component group="org.apache.avro" name="avro" version="1.7.4">

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,9 @@ default void copyBlob(
308308
* <p>
309309
* This operation, together with {@link #compareAndSetRegister}, must have linearizable semantics: a collection of such operations must
310310
* act as if they operate serially, with each operation taking place at some instant in between its invocation and its completion.
311+
* <p>
312+
* If the listener completes exceptionally then the write operation should be considered as continuing to run and may therefore appear
313+
* to occur at some later point in time.
311314
*
312315
* @param purpose The purpose of the operation
313316
* @param key key of the value to update
@@ -330,6 +333,9 @@ void compareAndExchangeRegister(
330333
* <p>
331334
* This operation, together with {@link #compareAndExchangeRegister}, must have linearizable semantics: a collection of such operations
332335
* must act as if they operate serially, with each operation taking place at some instant in between its invocation and its completion.
336+
* <p>
337+
* If the listener completes exceptionally then the write operation should be considered as continuing to run and may therefore appear
338+
* to occur at some later point in time.
333339
*
334340
* @param purpose The purpose of the operation
335341
* @param key key of the value to update
@@ -361,7 +367,10 @@ default void compareAndSetRegister(
361367
* This operation has read-after-write consistency with respect to writes performed using {@link #compareAndExchangeRegister} and
362368
* {@link #compareAndSetRegister}, but does not guarantee full linearizability. In particular, a {@code getRegister} performed during
363369
* one of these write operations may return either the old or the new value, and a caller may therefore observe the old value
364-
* <i>after</i> observing the new value, as long as both such read operations take place before the write operation completes.
370+
* <i>after</i> observing the new value, as long as both such read operations take place before the success of the write operation.
371+
* <p>
372+
* Write operations which complete exceptionally may behave as if they continue to run, thus yielding old or new values for an extended
373+
* period of time. If multiple writes fail then {@code getRegister} may return any of the written values.
365374
*
366375
* @param purpose The purpose of the operation
367376
* @param key key of the value to get

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.support.CountDownActionListener;
1818
import org.elasticsearch.action.support.IndicesOptions;
1919
import org.elasticsearch.action.support.PlainActionFuture;
20+
import org.elasticsearch.action.support.RefCountingListener;
2021
import org.elasticsearch.action.support.RefCountingRunnable;
2122
import org.elasticsearch.client.internal.RemoteClusterClient;
2223
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -29,7 +30,6 @@
2930
import org.elasticsearch.common.settings.Setting;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
32-
import org.elasticsearch.common.util.concurrent.CountDown;
3333
import org.elasticsearch.common.util.concurrent.EsExecutors;
3434
import org.elasticsearch.core.IOUtils;
3535
import org.elasticsearch.core.TimeValue;
@@ -567,36 +567,26 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
567567
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
568568
);
569569
}
570+
final var connectionsMap = new HashMap<String, RemoteClusterConnection>();
570571
for (String cluster : clusters) {
571-
if (this.remoteClusters.containsKey(cluster) == false) {
572+
final var connection = this.remoteClusters.get(cluster);
573+
if (connection == null) {
572574
listener.onFailure(new NoSuchRemoteClusterException(cluster));
573575
return;
574576
}
577+
connectionsMap.put(cluster, connection);
575578
}
576579

577580
final Map<String, Function<String, DiscoveryNode>> clusterMap = new HashMap<>();
578-
CountDown countDown = new CountDown(clusters.size());
579-
Function<String, DiscoveryNode> nullFunction = s -> null;
580-
for (final String cluster : clusters) {
581-
RemoteClusterConnection connection = this.remoteClusters.get(cluster);
582-
connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>() {
583-
@Override
584-
public void onResponse(Function<String, DiscoveryNode> nodeLookup) {
585-
synchronized (clusterMap) {
586-
clusterMap.put(cluster, nodeLookup);
587-
}
588-
if (countDown.countDown()) {
589-
listener.onResponse((clusterAlias, nodeId) -> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId));
590-
}
591-
}
592-
593-
@Override
594-
public void onFailure(Exception e) {
595-
if (countDown.fastForward()) { // we need to check if it's true since we could have multiple failures
596-
listener.onFailure(e);
597-
}
581+
final var finalListener = listener.<Void>safeMap(
582+
ignored -> (clusterAlias, nodeId) -> clusterMap.getOrDefault(clusterAlias, s -> null).apply(nodeId)
583+
);
584+
try (var refs = new RefCountingListener(finalListener)) {
585+
connectionsMap.forEach((cluster, connection) -> connection.collectNodes(refs.acquire(nodeLookup -> {
586+
synchronized (clusterMap) {
587+
clusterMap.put(cluster, nodeLookup);
598588
}
599-
});
589+
})));
600590
}
601591
}
602592

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
package org.elasticsearch.transport;
1010

1111
import org.apache.logging.log4j.Level;
12+
import org.apache.lucene.store.AlreadyClosedException;
1213
import org.elasticsearch.TransportVersion;
1314
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.LatchedActionListener;
1416
import org.elasticsearch.action.OriginalIndices;
1517
import org.elasticsearch.action.support.ActionTestUtils;
1618
import org.elasticsearch.action.support.IndicesOptions;
@@ -1060,6 +1062,85 @@ public void onFailure(Exception e) {
10601062
}
10611063
}
10621064

1065+
public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
1066+
final List<DiscoveryNode> knownNodes_c1 = new CopyOnWriteArrayList<>();
1067+
1068+
try (
1069+
var c1N1 = startTransport(
1070+
"cluster_1_node_1",
1071+
knownNodes_c1,
1072+
VersionInformation.CURRENT,
1073+
TransportVersion.current(),
1074+
Settings.EMPTY
1075+
);
1076+
var transportService = MockTransportService.createNewService(
1077+
Settings.EMPTY,
1078+
VersionInformation.CURRENT,
1079+
TransportVersion.current(),
1080+
threadPool,
1081+
null
1082+
)
1083+
) {
1084+
final var c1N1Node = c1N1.getLocalNode();
1085+
knownNodes_c1.add(c1N1Node);
1086+
final var seedList = List.of(c1N1Node.getAddress().toString());
1087+
transportService.start();
1088+
transportService.acceptIncomingRequests();
1089+
1090+
try (RemoteClusterService service = new RemoteClusterService(createSettings("cluster_1", seedList), transportService)) {
1091+
service.initializeRemoteClusters();
1092+
assertTrue(service.isCrossClusterSearchEnabled());
1093+
final var numTasks = between(3, 5);
1094+
final var taskLatch = new CountDownLatch(numTasks);
1095+
1096+
ESTestCase.startInParallel(numTasks, threadNumber -> {
1097+
if (threadNumber == 0) {
1098+
taskLatch.countDown();
1099+
boolean isLinked = true;
1100+
while (taskLatch.getCount() != 0) {
1101+
final var future = new PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus>();
1102+
final var settings = createSettings("cluster_1", isLinked ? Collections.emptyList() : seedList);
1103+
service.updateRemoteCluster("cluster_1", settings, future);
1104+
safeGet(future);
1105+
isLinked = isLinked == false;
1106+
}
1107+
return;
1108+
}
1109+
1110+
// Verify collectNodes() always invokes the listener, even if the node is concurrently being unlinked.
1111+
try {
1112+
for (int i = 0; i < 10; ++i) {
1113+
final var latch = new CountDownLatch(1);
1114+
final var exRef = new AtomicReference<Exception>();
1115+
service.collectNodes(Set.of("cluster_1"), new LatchedActionListener<>(new ActionListener<>() {
1116+
@Override
1117+
public void onResponse(BiFunction<String, String, DiscoveryNode> func) {
1118+
assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId()));
1119+
}
1120+
1121+
@Override
1122+
public void onFailure(Exception e) {
1123+
exRef.set(e);
1124+
}
1125+
}, latch));
1126+
safeAwait(latch);
1127+
if (exRef.get() != null) {
1128+
assertThat(
1129+
exRef.get(),
1130+
either(instanceOf(TransportException.class)).or(instanceOf(NoSuchRemoteClusterException.class))
1131+
.or(instanceOf(AlreadyClosedException.class))
1132+
.or(instanceOf(NoSeedNodeLeftException.class))
1133+
);
1134+
}
1135+
}
1136+
} finally {
1137+
taskLatch.countDown();
1138+
}
1139+
});
1140+
}
1141+
}
1142+
}
1143+
10631144
public void testRemoteClusterSkipIfDisconnectedSetting() {
10641145
{
10651146
Settings settings = Settings.builder()

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,6 @@ private void initGiantTextField(int docs) throws IOException {
894894
}
895895
}
896896
}
897-
898897
logger.info("loading many documents with one big text field - docs per bulk {}", docsPerBulk);
899898

900899
int fieldSize = Math.toIntExact(ByteSizeValue.ofMb(5).getBytes());
@@ -1064,6 +1063,15 @@ private void bulk(String name, String bulk) throws IOException {
10641063
);
10651064
Response response = client().performRequest(request);
10661065
assertThat(entityAsMap(response), matchesMap().entry("errors", false).extraOk());
1066+
1067+
/*
1068+
* Flush after each bulk to clear the test-time seenSequenceNumbers Map in
1069+
* TranslogWriter. Without this the server will OOM from time to time keeping
1070+
* stuff around to run assertions on.
1071+
*/
1072+
request = new Request("POST", "/" + name + "/_flush");
1073+
response = client().performRequest(request);
1074+
assertThat(entityAsMap(response), matchesMap().entry("_shards", matchesMap().extraOk().entry("failed", 0)).extraOk());
10671075
}
10681076

10691077
private void initIndex(String name, String bulk) throws IOException {

x-pack/plugin/esql/arrow/build.gradle

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ dependencies {
1212
compileOnly project(':x-pack:plugin:esql:compute')
1313
compileOnly project(':x-pack:plugin:esql-core')
1414
compileOnly project(':x-pack:plugin:mapper-version')
15-
implementation('org.apache.arrow:arrow-vector:16.1.0')
16-
implementation('org.apache.arrow:arrow-format:16.1.0')
17-
implementation('org.apache.arrow:arrow-memory-core:16.1.0')
15+
implementation('org.apache.arrow:arrow-vector:18.3.0')
16+
implementation('org.apache.arrow:arrow-format:18.3.0')
17+
implementation('org.apache.arrow:arrow-memory-core:18.3.0')
1818
implementation('org.checkerframework:checker-qual:3.42.0')
1919
implementation('com.google.flatbuffers:flatbuffers-java:23.5.26')
2020
// Needed for the json arrow serialization, and loaded even if we don't use it.
@@ -25,7 +25,7 @@ dependencies {
2525
runtimeOnly "org.slf4j:slf4j-nop:${versions.slf4j}"
2626

2727
testImplementation project(':test:framework')
28-
testImplementation('org.apache.arrow:arrow-memory-unsafe:16.1.0')
28+
testImplementation('org.apache.arrow:arrow-memory-unsafe:18.3.0')
2929
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}")
3030
}
3131

@@ -38,18 +38,8 @@ tasks.named("dependencyLicenses").configure {
3838
tasks.named("thirdPartyAudit").configure {
3939
ignoreViolations(
4040
// uses sun.misc.Unsafe. Only used in tests.
41-
'org.apache.arrow.memory.util.hash.SimpleHasher',
42-
'org.apache.arrow.memory.util.hash.MurmurHasher',
4341
'org.apache.arrow.memory.util.MemoryUtil',
4442
'org.apache.arrow.memory.util.MemoryUtil$1',
45-
'org.apache.arrow.vector.DecimalVector',
46-
'org.apache.arrow.vector.BaseFixedWidthVector',
47-
'org.apache.arrow.vector.util.DecimalUtility',
48-
'org.apache.arrow.vector.Decimal256Vector',
49-
'org.apache.arrow.vector.util.VectorAppender',
50-
'org.apache.arrow.memory.ArrowBuf',
51-
'org.apache.arrow.vector.BitVectorHelper',
52-
'org.apache.arrow.memory.util.ByteFunctionHelpers',
5343
)
5444
ignoreMissingClasses(
5545
'org.apache.commons.codec.binary.Hex'

x-pack/plugin/esql/qa/server/single-node/build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@ dependencies {
1616
javaRestTestImplementation project(xpackModule('esql'))
1717
yamlRestTestImplementation project(xpackModule('esql:qa:server'))
1818

19-
javaRestTestImplementation('org.apache.arrow:arrow-vector:16.1.0')
20-
javaRestTestImplementation('org.apache.arrow:arrow-format:16.1.0')
21-
javaRestTestImplementation('org.apache.arrow:arrow-memory-core:16.1.0')
19+
javaRestTestImplementation('org.apache.arrow:arrow-vector:18.3.0')
20+
javaRestTestImplementation('org.apache.arrow:arrow-format:18.3.0')
21+
javaRestTestImplementation('org.apache.arrow:arrow-memory-core:18.3.0')
2222
javaRestTestImplementation('org.checkerframework:checker-qual:3.42.0')
2323
javaRestTestImplementation('com.google.flatbuffers:flatbuffers-java:23.5.26')
2424
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
2525
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-core:${versions.jackson}")
2626
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}")
2727
javaRestTestImplementation("org.slf4j:slf4j-api:${versions.slf4j}")
2828
javaRestTestImplementation("org.slf4j:slf4j-nop:${versions.slf4j}")
29-
javaRestTestImplementation('org.apache.arrow:arrow-memory-unsafe:16.1.0')
29+
javaRestTestImplementation('org.apache.arrow:arrow-memory-unsafe:18.3.0')
3030

3131
clusterPlugins project(':plugins:mapper-size')
3232
clusterPlugins project(':plugins:mapper-murmur3')

0 commit comments

Comments
 (0)