Skip to content

Commit 87ebe39

Browse files
Refactor to use RefCountingListener
1 parent 6377745 commit 87ebe39

File tree

1 file changed

+13
-30
lines changed

1 file changed

+13
-30
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.support.CountDownActionListener;
1818
import org.elasticsearch.action.support.IndicesOptions;
1919
import org.elasticsearch.action.support.PlainActionFuture;
20+
import org.elasticsearch.action.support.RefCountingListener;
2021
import org.elasticsearch.action.support.RefCountingRunnable;
2122
import org.elasticsearch.client.internal.RemoteClusterClient;
2223
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -29,7 +30,6 @@
2930
import org.elasticsearch.common.settings.Setting;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
32-
import org.elasticsearch.common.util.concurrent.CountDown;
3333
import org.elasticsearch.common.util.concurrent.EsExecutors;
3434
import org.elasticsearch.core.IOUtils;
3535
import org.elasticsearch.core.TimeValue;
@@ -568,43 +568,26 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
568568
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
569569
);
570570
}
571+
final var connectionsMap = new HashMap<String, RemoteClusterConnection>();
571572
for (String cluster : clusters) {
572-
if (this.remoteClusters.containsKey(cluster) == false) {
573+
final var connection = this.remoteClusters.get(cluster);
574+
if (connection == null) {
573575
listener.onFailure(new NoSuchRemoteClusterException(cluster));
574576
return;
575577
}
578+
connectionsMap.put(cluster, connection);
576579
}
577580

578581
final Map<String, Function<String, DiscoveryNode>> clusterMap = new HashMap<>();
579-
CountDown countDown = new CountDown(clusters.size());
580-
Function<String, DiscoveryNode> nullFunction = s -> null;
581-
for (final String cluster : clusters) {
582-
RemoteClusterConnection connection = this.remoteClusters.get(cluster);
583-
// Ensure the connection is not null, it could have been removed since the containsKey() call above.
584-
if (connection == null) {
585-
if (countDown.fastForward()) {
586-
listener.onFailure(new NoSuchRemoteClusterException(cluster));
587-
}
588-
break;
589-
}
590-
connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>() {
591-
@Override
592-
public void onResponse(Function<String, DiscoveryNode> nodeLookup) {
593-
synchronized (clusterMap) {
594-
clusterMap.put(cluster, nodeLookup);
595-
}
596-
if (countDown.countDown()) {
597-
listener.onResponse((clusterAlias, nodeId) -> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId));
598-
}
599-
}
600-
601-
@Override
602-
public void onFailure(Exception e) {
603-
if (countDown.fastForward()) { // we need to check if it's true since we could have multiple failures
604-
listener.onFailure(e);
605-
}
582+
final var finalListener = listener.<Void>safeMap(
583+
ignored -> (clusterAlias, nodeId) -> clusterMap.getOrDefault(clusterAlias, s -> null).apply(nodeId)
584+
);
585+
try (var refs = new RefCountingListener(finalListener)) {
586+
connectionsMap.forEach((cluster, connection) -> connection.collectNodes(refs.acquire(nodeLookup -> {
587+
synchronized (clusterMap) {
588+
clusterMap.put(cluster, nodeLookup);
606589
}
607-
});
590+
})));
608591
}
609592
}
610593

0 commit comments

Comments
 (0)