From 1083d4ad1829fe073e5ecf363ec41891344c27b3 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 15 Jul 2025 10:19:07 -0600 Subject: [PATCH 1/2] Refactor and simplify missing index & unavailability handling (#131252) (cherry picked from commit 366bc0068e6bfdfe055f4eb6e220ec891e2b9eb2) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java --- .../xpack/esql/session/EsqlCCSUtils.java | 30 +++++++++--------- .../xpack/esql/session/EsqlSession.java | 26 ++++------------ .../xpack/esql/session/EsqlCCSUtilsTests.java | 31 +++++++++---------- 3 files changed, 35 insertions(+), 52 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 2295abc2da551..7ea218ed6d61c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -18,7 +18,6 @@ import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; @@ -37,11 +36,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; public class EsqlCCSUtils { @@ -177,7 +176,11 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu } } - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { + static void updateExecutionInfoWithUnavailableClusters( + EsqlExecutionInfo execInfo, + Map> failures + ) { + Map unavailable = determineUnavailableRemoteClusters(failures); for (Map.Entry entry : unavailable.entrySet()) { String clusterAlias = entry.getKey(); boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); @@ -196,14 +199,16 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf static void updateExecutionInfoWithClustersWithNoMatchingIndices( EsqlExecutionInfo executionInfo, IndexResolution indexResolution, - Set unavailableClusters, - QueryBuilder filter + boolean usedFilter ) { - final Set clustersWithNoMatchingIndices = new HashSet<>(executionInfo.clusterAliases()); + // Get the clusters which are still running, and we will check whether they have any matching indices. + // NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters. + final Set clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING) + .map(Cluster::getClusterAlias) + .collect(Collectors.toSet()); for (String indexName : indexResolution.resolvedIndices()) { clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName)); } - clustersWithNoMatchingIndices.removeAll(unavailableClusters); /* * Rules enforced at planning time around non-matching indices * 1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere @@ -216,24 +221,20 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { - if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) { - // if cluster was already in a terminal state, we don't need to check it again - continue; - } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); if (concreteIndexRequested(executionInfo.getCluster(c).getIndexExpression())) { String error = Strings.format( "Unknown index [%s]", (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression) ); - if (executionInfo.isSkipUnavailable(c) == false || filter != null) { + if (executionInfo.isSkipUnavailable(c) == false || usedFilter) { if (fatalErrorMessage == null) { fatalErrorMessage = error; } else { fatalErrorMessage += "; " + error; } } - if (filter == null) { + if (usedFilter == false) { // We check for filter since the filter may be the reason why the index is missing, and then we don't want to mark yet markClusterWithFinalStateAndNoShards( executionInfo, @@ -269,8 +270,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( // Filter-less version, mainly for testing where we don't need filter support static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { - var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()).keySet(); - updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, unavailableClusters, null); + updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, false); } // visible for testing diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 0f0c59cbc6990..ea395d8d5e210 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -426,14 +426,8 @@ public void analyzedPlan( try { // the order here is tricky - if the cluster has been filtered and later became unavailable, // do we want to declare it successful or skipped? For now, unavailability takes precedence. - var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()); - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unavailableClusters); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( - executionInfo, - result.indices, - unavailableClusters.keySet(), - null - ); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false); plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); @@ -532,11 +526,9 @@ private boolean allCCSClustersSkipped( ActionListener logicalPlanListener ) { IndexResolution indexResolution = result.indices; - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters( - executionInfo, - EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()) - ); - if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); + if (executionInfo.isCrossClusterSearch() + && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception // to let the LogicalPlanActionListener decide how to proceed LOGGER.debug("No more clusters to search, ending analysis stage"); @@ -564,13 +556,7 @@ private static void analyzeAndMaybeRetry( if (result.indices.isValid() || requestFilter != null) { // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report // when the resolution result is not valid for a different reason. - var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()).keySet(); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( - executionInfo, - result.indices, - unavailableClusters, - requestFilter - ); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null); } plan = analyzeAction.apply(result); } catch (Exception e) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index c474fd128b229..c6a8184231f39 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -153,7 +153,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure); + var unvailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure), REMOTE2_ALIAS, List.of(failure)); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS))); @@ -185,7 +185,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure)) + () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, List.of(failure))) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -338,8 +338,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + var failures = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -349,9 +349,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + // since remote1 is in the failures Map (passed to IndexResolution.valid), + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); @@ -381,8 +380,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + var failures = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); @@ -390,9 +389,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + // skipped since remote1 is in the failures Map + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); @@ -430,8 +428,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + var failures = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -441,9 +439,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + // skipped since remote1 is in the failures Map + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); From 9529e7e261822c5a4709d00be251d33c06a1b19f Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 15 Jul 2025 10:52:13 -0600 Subject: [PATCH 2/2] add getClusterStates --- .../xpack/esql/action/EsqlExecutionInfo.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 021520b5b2bc2..7b5e84522e2dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Stream; /** * Holds execution metadata about ES|QL queries for cross-cluster searches in order to display @@ -311,6 +312,15 @@ public int getClusterStateCount(Cluster.Status status) { return (int) clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status).count(); } + /** + * @param status the status you want to access + * @return a stream of clusters with that status + */ + public Stream getClusterStates(Cluster.Status status) { + assert clusterInfo.isEmpty() == false : "ClusterMap in EsqlExecutionInfo must not be empty"; + return clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status); + } + @Override public String toString() { return "EsqlExecutionInfo{"