diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 4f6886edc5fbc..7fb279f18b1dc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -23,7 +23,6 @@ public final class EnrichResolution { private final Map resolvedPolicies = ConcurrentCollections.newConcurrentMap(); private final Map errors = ConcurrentCollections.newConcurrentMap(); - private final Map unavailableClusters = ConcurrentCollections.newConcurrentMap(); public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) { return resolvedPolicies.get(new Key(policyName, mode)); @@ -52,14 +51,6 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } - public void addUnavailableCluster(String clusterAlias, Exception e) { - unavailableClusters.put(clusterAlias, e); - } - - public Map getUnavailableClusters() { - return unavailableClusters; - } - private record Key(String policyName, Enrich.Mode mode) { } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index b4583cb676f98..9449e6fe48228 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -10,7 +10,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.RefCountingListener; @@ -37,6 +36,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.enrich.EnrichMetadata; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; @@ -49,7 +49,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -59,6 +58,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards; + /** * Resolves enrich policies across clusters in several steps: * 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}. @@ -98,21 +99,22 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { /** * Resolves a set of enrich policies * - * @param targetClusters the target clusters * @param unresolvedPolicies the unresolved policies + * @param executionInfo the execution info * @param listener notified with the enrich resolution */ public void resolvePolicies( - Collection targetClusters, Collection unresolvedPolicies, + EsqlExecutionInfo executionInfo, ActionListener listener ) { - if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) { + if (unresolvedPolicies.isEmpty()) { listener.onResponse(new EnrichResolution()); return; } - final Set remoteClusters = new HashSet<>(targetClusters); - final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + + final Set remoteClusters = new HashSet<>(executionInfo.getClusters().keySet()); + final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); @@ -121,7 +123,14 @@ public void resolvePolicies( for (Map.Entry entry : lookupResponses.entrySet()) { String clusterAlias = entry.getKey(); if (entry.getValue().connectionError != null) { - enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); + assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false + : "Should never have a connection error for the local cluster"; + markClusterWithFinalStateAndNoShards( + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.SKIPPED, + entry.getValue().connectionError + ); // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy remoteClusters.remove(clusterAlias); } else { @@ -445,11 +454,4 @@ protected void getRemoteConnection(String cluster, ActionListener> groupIndicesPerCluster(Set remoteClusterNames, String[] indices) { - return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices) - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices()))); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 6d2f10d390c57..cd3ecb2508b9c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; @@ -19,7 +18,6 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.IndexModeFieldMapper; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -85,7 +83,6 @@ import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -365,16 +362,10 @@ public void analyzedPlan( final List indices = preAnalysis.indices; EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState()); - - final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( - configuredClusters, - indices.stream() - .flatMap(index -> Arrays.stream(Strings.commaDelimitedListToStringArray(index.indexPattern()))) - .toArray(String[]::new) - ).keySet(); + initializeClusterData(indices, executionInfo); var listener = SubscribableListener.newForked( - l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l) + l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l) ) .andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)) .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); @@ -400,12 +391,6 @@ public void analyzedPlan( }).andThen((l, result) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; - // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices - // resolving one more time (the first attempt failed and the query had a filter) - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> null); - } - // here the requestFilter is set to null, performing the pre-analysis after the first step failed preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l); }).andThen((l, result) -> { @@ -435,6 +420,26 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, // TODO: Verify that the resolved index actually has indexMode: "lookup" } + private void initializeClusterData(List indices, EsqlExecutionInfo executionInfo) { + if (indices.isEmpty()) { + return; + } + assert indices.size() == 1 : "Only single index pattern is supported"; + Map clusterIndices = indicesExpressionGrouper.groupIndices( + configuredClusters, + IndicesOptions.DEFAULT, + indices.getFirst().indexPattern() + ); + for (Map.Entry entry : clusterIndices.entrySet()) { + final String clusterAlias = entry.getKey(); + String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); + }); + } + } + private void preAnalyzeMainIndices( PreAnalyzer.PreAnalysis preAnalysis, EsqlExecutionInfo executionInfo, @@ -448,38 +453,8 @@ private void preAnalyzeMainIndices( // Note: JOINs are not supported but we detect them when listener.onFailure(new MappingException("Queries with multiple indices are not supported")); } else if (indices.size() == 1) { - // known to be unavailable from the enrich policy API call - Map unavailableClusters = result.enrichResolution.getUnavailableClusters(); IndexPattern table = indices.getFirst(); - Map clusterIndices = indicesExpressionGrouper.groupIndices( - configuredClusters, - IndicesOptions.DEFAULT, - table.indexPattern() - ); - for (Map.Entry entry : clusterIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - if (unavailableClusters.containsKey(k)) { - return new EsqlExecutionInfo.Cluster( - clusterAlias, - indexExpr, - executionInfo.isSkipUnavailable(clusterAlias), - EsqlExecutionInfo.Cluster.Status.SKIPPED, - 0, - 0, - 0, - 0, - List.of(new ShardSearchFailure(unavailableClusters.get(k))), - new TimeValue(0) - ); - } else { - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); - } - }); - } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 455a02ca8f2e5..b09195319c535 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.EnrichMetadata; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.session.IndexResolver; @@ -429,6 +430,10 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { PlainActionFuture future = new PlainActionFuture<>(); + EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true); + for (String cluster : clusters) { + esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*")); + } if (randomBoolean()) { unresolvedPolicies = new ArrayList<>(unresolvedPolicies); for (Enrich.Mode mode : Enrich.Mode.values()) { @@ -442,7 +447,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection