Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/131937.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131937
summary: Fix race condition in `RemoteClusterService.collectNodes()`
area: Distributed
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,13 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
Function<String, DiscoveryNode> nullFunction = s -> null;
for (final String cluster : clusters) {
RemoteClusterConnection connection = this.remoteClusters.get(cluster);
// Ensure the connection is not null, it could have been removed since the containsKey() call above.
if (connection == null) {
if (countDown.fastForward()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of using fastForward like this - it means the listener completes early, potentially while other collectNodes calls are still running, which can cause duplicated work and other confusions. The same concern applies to the fastForward call that already existed.

Instead, could we adjust this all to use a RefCountingListener? That would (a) delay the completion properly and (b) collect multiple failures (up to 10) as well as (c) being more idiomatic of modern Elasticsearch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks David, much cleaner!

listener.onFailure(new NoSuchRemoteClusterException(cluster));
}
break;
}
connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>() {
@Override
public void onResponse(Function<String, DiscoveryNode> nodeLookup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Level;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -1060,6 +1061,79 @@ public void onFailure(Exception e) {
}
}

public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
final List<DiscoveryNode> knownNodes_c1 = new CopyOnWriteArrayList<>();

try (
var c1N1 = startTransport(
"cluster_1_node_1",
knownNodes_c1,
VersionInformation.CURRENT,
TransportVersion.current(),
Settings.EMPTY
);
var transportService = MockTransportService.createNewService(
Settings.EMPTY,
VersionInformation.CURRENT,
TransportVersion.current(),
threadPool,
null
)
) {
final var c1N1Node = c1N1.getLocalNode();
knownNodes_c1.add(c1N1Node);
final var seedList = List.of(c1N1Node.getAddress().toString());
transportService.start();
transportService.acceptIncomingRequests();

try (RemoteClusterService service = new RemoteClusterService(createSettings("cluster_1", seedList), transportService)) {
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
final var numTasks = between(3, 5);
final var taskLatch = new CountDownLatch(numTasks);

ESTestCase.startInParallel(numTasks, threadNumber -> {
if (threadNumber == 0) {
taskLatch.countDown();
boolean isLinked = true;
while (taskLatch.getCount() != 0) {
final var latch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
createSettings("cluster_1", isLinked ? Collections.emptyList() : seedList),
new LatchedActionListener<>(ActionListener.noop(), latch)
);
safeAwait(latch);
isLinked = isLinked == false;
}
return;
}

// Verify collectNodes() always invokes the listener, even if the node is concurrently being unlinked.
try {
for (int i = 0; i < 1000; ++i) {
final var latch = new CountDownLatch(1);
service.collectNodes(Set.of("cluster_1"), new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(BiFunction<String, String, DiscoveryNode> func) {
assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId()));
}

@Override
public void onFailure(Exception e) {
assertNotNull(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can verify the exception is NoSuchRemoteClusterException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the test in a loop and we can hit a few different exceptions here. I refactored to test them as done in testCollectNodes() above.

}
}, latch));
safeAwait(latch);
}
} finally {
taskLatch.countDown();
}
});
}
}
}

public void testRemoteClusterSkipIfDisconnectedSetting() {
{
Settings settings = Settings.builder()
Expand Down
Loading