Skip to content

Commit c0e0bf7

Browse files
Fix race condition in RemoteClusterService.collectNodes()
It is possible for a linked remote to get unlinked in between the containsKey() and get() calls in collectNodes(). This change adds a test that produces the NullPointerException and adds a fix.
1 parent f1e42c3 commit c0e0bf7

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,13 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
580580
Function<String, DiscoveryNode> nullFunction = s -> null;
581581
for (final String cluster : clusters) {
582582
RemoteClusterConnection connection = this.remoteClusters.get(cluster);
583+
// Ensure the connection is not null, it could have been removed since the containsKey() call above.
584+
if (connection == null) {
585+
if (countDown.fastForward()) {
586+
listener.onFailure(new NoSuchRemoteClusterException(cluster));
587+
}
588+
break;
589+
}
583590
connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>() {
584591
@Override
585592
public void onResponse(Function<String, DiscoveryNode> nodeLookup) {

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.Level;
1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.LatchedActionListener;
1415
import org.elasticsearch.action.OriginalIndices;
1516
import org.elasticsearch.action.support.ActionTestUtils;
1617
import org.elasticsearch.action.support.IndicesOptions;
@@ -1060,6 +1061,79 @@ public void onFailure(Exception e) {
10601061
}
10611062
}
10621063

1064+
public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
1065+
final List<DiscoveryNode> knownNodes_c1 = new CopyOnWriteArrayList<>();
1066+
1067+
try (
1068+
var c1N1 = startTransport(
1069+
"cluster_1_node_1",
1070+
knownNodes_c1,
1071+
VersionInformation.CURRENT,
1072+
TransportVersion.current(),
1073+
Settings.EMPTY
1074+
);
1075+
var transportService = MockTransportService.createNewService(
1076+
Settings.EMPTY,
1077+
VersionInformation.CURRENT,
1078+
TransportVersion.current(),
1079+
threadPool,
1080+
null
1081+
)
1082+
) {
1083+
final var c1N1Node = c1N1.getLocalNode();
1084+
knownNodes_c1.add(c1N1Node);
1085+
final var seedList = List.of(c1N1Node.getAddress().toString());
1086+
transportService.start();
1087+
transportService.acceptIncomingRequests();
1088+
1089+
try (RemoteClusterService service = new RemoteClusterService(createSettings("cluster_1", seedList), transportService)) {
1090+
service.initializeRemoteClusters();
1091+
assertTrue(service.isCrossClusterSearchEnabled());
1092+
final var numTasks = between(3, 5);
1093+
final var taskLatch = new CountDownLatch(numTasks);
1094+
1095+
ESTestCase.startInParallel(numTasks, threadNumber -> {
1096+
if (threadNumber == 0) {
1097+
taskLatch.countDown();
1098+
boolean isLinked = true;
1099+
while (taskLatch.getCount() != 0) {
1100+
final var latch = new CountDownLatch(1);
1101+
service.updateRemoteCluster(
1102+
"cluster_1",
1103+
createSettings("cluster_1", isLinked ? Collections.emptyList() : seedList),
1104+
new LatchedActionListener<>(ActionListener.noop(), latch)
1105+
);
1106+
safeAwait(latch);
1107+
isLinked = isLinked == false;
1108+
}
1109+
return;
1110+
}
1111+
1112+
// Verify collectNodes() always invokes the listener, even if the node is concurrently being unlinked.
1113+
try {
1114+
for (int i = 0; i < 1000; ++i) {
1115+
final var latch = new CountDownLatch(1);
1116+
service.collectNodes(Set.of("cluster_1"), new LatchedActionListener<>(new ActionListener<>() {
1117+
@Override
1118+
public void onResponse(BiFunction<String, String, DiscoveryNode> func) {
1119+
assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId()));
1120+
}
1121+
1122+
@Override
1123+
public void onFailure(Exception e) {
1124+
assertNotNull(e);
1125+
}
1126+
}, latch));
1127+
safeAwait(latch);
1128+
}
1129+
} finally {
1130+
taskLatch.countDown();
1131+
}
1132+
});
1133+
}
1134+
}
1135+
}
1136+
10631137
public void testRemoteClusterSkipIfDisconnectedSetting() {
10641138
{
10651139
Settings settings = Settings.builder()

0 commit comments

Comments
 (0)