diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index a4a8ed63a1a03..3e1ca1fb75019 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -40,7 +40,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toSet; public class EsqlCCSUtils { @@ -153,30 +155,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn } } - static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { - StringBuilder sb = new StringBuilder(); - for (String clusterAlias : executionInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - // Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out. - if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) { - if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); - } else { - String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); - for (String index : indexExpression.split(",")) { - sb.append(clusterAlias).append(':').append(index).append(','); - } - } - } - } - - if (sb.length() > 0) { - return sb.substring(0, sb.length() - 1); - } else { - return ""; - } - } - static void updateExecutionInfoWithUnavailableClusters( EsqlExecutionInfo execInfo, Map> failures @@ -206,7 +184,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( // NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters. final Set clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING) .map(Cluster::getClusterAlias) - .collect(Collectors.toSet()); + .collect(toSet()); for (String indexName : indexResolution.resolvedIndices()) { clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName)); } @@ -414,4 +392,22 @@ public static String inClusterName(String clusterAlias) { return "in remote cluster [" + clusterAlias + "]"; } } + + public static Set getRemotesOf(Set concreteIndices) { + return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet()); + } + + /** + * Given input like index=lookup-1 and remotes=[r1,r2,r3] this constructs output like `r1:lookup-1,r2:lookup-1,r3-lookup-1` + * This is needed in order to require lookup index is present on every remote in order to correctly execute a query with join. + */ + public static String qualifyWithRunningRemotes(String index, Set remotes, EsqlExecutionInfo executionInfo) { + if (remotes.isEmpty()) { + return index; + } + return remotes.stream().filter(remote -> { + var cluster = executionInfo.getCluster(remote); + return cluster == null || cluster.getStatus() == Cluster.Status.RUNNING; + }).map(remote -> RemoteClusterAware.buildRemoteIndexName(remote, index)).collect(joining(",")); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 555fab6df0c47..d9c0092ddb550 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -78,16 +78,18 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toSet; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.getRemotesOf; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.qualifyWithRunningRemotes; public class EsqlSession { @@ -372,58 +374,54 @@ public void analyzedPlan( return; } - PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); + var preAnalysis = preAnalyzer.preAnalyze(parsed); EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo); - var listener = SubscribableListener. // + SubscribableListener. // newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l)) .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) - .andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l)); - // first resolve the lookup indices, then the main indices - for (var index : preAnalysis.lookupIndices) { - listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l)); - } - listener.andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) - .andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l)) + .andThen((l, r) -> resolveInferences(parsed, r, l)) + .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) + .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices.iterator(), r, executionInfo, l)) + .andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l)) .addListener(logicalPlanListener); } + private void preAnalyzeLookupIndices( + Iterator lookupIndices, + PreAnalysisResult preAnalysisResult, + EsqlExecutionInfo executionInfo, + ActionListener listener + ) { + if (lookupIndices.hasNext()) { + preAnalyzeLookupIndex(lookupIndices.next(), preAnalysisResult, executionInfo, listener.delegateFailureAndWrap((l, r) -> { + preAnalyzeLookupIndices(lookupIndices, r, executionInfo, l); + })); + } else { + listener.onResponse(preAnalysisResult); + } + } + private void preAnalyzeLookupIndex( IndexPattern lookupIndexPattern, PreAnalysisResult result, EsqlExecutionInfo executionInfo, ActionListener listener ) { - String localPattern = lookupIndexPattern.indexPattern(); - assert RemoteClusterAware.isRemoteIndexName(localPattern) == false - : "Lookup index name should not include remote, but got: " + localPattern; + String lookupJoinPattern = lookupIndexPattern.indexPattern(); + assert RemoteClusterAware.isRemoteIndexName(lookupJoinPattern) == false + : "Lookup index name should not include remote, but got: " + lookupJoinPattern; assert ThreadPool.assertCurrentThreadPool( ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION, ThreadPool.Names.SYSTEM_READ ); - Set fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames; - - String patternWithRemotes; - - if (executionInfo.getClusters().isEmpty()) { - patternWithRemotes = localPattern; - } else { - // convert index -> cluster1:index,cluster2:index, etc.for each running cluster - patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING) - .map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern)) - .collect(Collectors.joining(",")); - } - if (patternWithRemotes.isEmpty()) { - return; - } - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( - patternWithRemotes, - fieldNames, + qualifyWithRunningRemotes(lookupJoinPattern, getRemotesOf(result.indices.resolvedIndices()), executionInfo), + result.wildcardJoinIndices().contains(lookupJoinPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames, null, false, - listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) + listener.map(indexResolution -> receiveLookupIndexResolution(result, lookupJoinPattern, executionInfo, indexResolution)) ); } @@ -621,51 +619,29 @@ private void preAnalyzeMainIndices( ThreadPool.Names.SYSTEM_READ ); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one - List indices = preAnalysis.indices; - if (indices.size() > 1) { - // Note: JOINs are not supported but we detect them when - listener.onFailure(new MappingException("Queries with multiple indices are not supported")); - } else if (indices.size() == 1) { - IndexPattern table = indices.getFirst(); - - // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search - // based only on available clusters (which could now be an empty list) - String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); - if (indexExpressionToResolve.isEmpty()) { - // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution - listener.onResponse( - result.withIndexResolution(IndexResolution.valid(new EsIndex(table.indexPattern(), Map.of(), Map.of()))) - ); - } else { - boolean includeAllDimensions = false; - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - if (preAnalysis.indexMode == IndexMode.TIME_SERIES) { - includeAllDimensions = true; - // TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message. - var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); - if (requestFilter != null) { - requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter); - } else { - requestFilter = indexModeFilter; - } - } - indexResolver.resolveAsMergedMapping( - indexExpressionToResolve, - result.fieldNames, + switch (preAnalysis.indices.size()) { + case 0 -> listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); + case 1 -> indexResolver.resolveAsMergedMapping( + preAnalysis.indices.getFirst().indexPattern(), + result.fieldNames, + merge( requestFilter, - includeAllDimensions, - listener.delegateFailure((l, indexResolution) -> { - l.onResponse(result.withIndexResolution(indexResolution)); - }) - ); - } + preAnalysis.indexMode == IndexMode.TIME_SERIES + ? new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()) + : null + ), + preAnalysis.indexMode == IndexMode.TIME_SERIES, + listener.delegateFailure((l, indexResolution) -> l.onResponse(result.withIndexResolution(indexResolution))) + ); + default -> listener.onFailure(new MappingException("Queries with multiple indices are not supported")); + } + } + + private static QueryBuilder merge(QueryBuilder q1, QueryBuilder q2) { + if (q1 != null && q2 != null) { + return new BoolQueryBuilder().filter(q1).filter(q2); } else { - try { - // occurs when dealing with local relations (row a = 1) - listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); - } catch (Exception ex) { - listener.onFailure(ex); - } + return q1 != null ? q1 : q2; } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index a12d26f48b608..0788190ab2ea3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -37,7 +37,6 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -49,6 +48,7 @@ import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.initCrossClusterState; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -59,64 +59,54 @@ public class EsqlCCSUtilsTests extends ESTestCase { private final String REMOTE1_ALIAS = "remote1"; private final String REMOTE2_ALIAS = "remote2"; - public void testCreateIndexExpressionFromAvailableClusters() { + public void testQualifyWithRunningRemotes() { // no clusters marked as skipped { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); - executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); - executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true)); - String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); - List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); - assertThat(list.size(), equalTo(5)); assertThat( - new HashSet<>(list), - equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*")) + asSet(EsqlCCSUtils.qualifyWithRunningRemotes("data", executionInfo.clusterAliases(), executionInfo)), + containsInAnyOrder("data", "remote1:data", "remote2:data") ); } // one cluster marked as skipped, so not present in revised index expression { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); - executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true)); executionInfo.swapCluster( REMOTE2_ALIAS, - (k, v) -> new EsqlExecutionInfo.Cluster( - REMOTE2_ALIAS, - "mylogs1,mylogs2,logs*", - true, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ) + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); - String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); - List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); - assertThat(list.size(), equalTo(3)); - assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); + assertThat( + asSet(EsqlCCSUtils.qualifyWithRunningRemotes("data", executionInfo.clusterAliases(), executionInfo)), + containsInAnyOrder("data", "remote1:data") + ); } // two clusters marked as skipped, so only local cluster present in revised index expression { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false)); executionInfo.swapCluster( REMOTE1_ALIAS, - (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); executionInfo.swapCluster( REMOTE2_ALIAS, - (k, v) -> new EsqlExecutionInfo.Cluster( - REMOTE2_ALIAS, - "mylogs1,mylogs2,logs*", - true, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ) + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); - assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + assertThat( + asSet(EsqlCCSUtils.qualifyWithRunningRemotes("data", executionInfo.clusterAliases(), executionInfo)), + containsInAnyOrder("data") + ); } // only remotes present and all marked as skipped, so in revised index expression should be empty string @@ -124,22 +114,21 @@ public void testCreateIndexExpressionFromAvailableClusters() { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster( REMOTE1_ALIAS, - (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); executionInfo.swapCluster( REMOTE2_ALIAS, - (k, v) -> new EsqlExecutionInfo.Cluster( - REMOTE2_ALIAS, - "mylogs1,mylogs2,logs*", - true, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ) + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); - assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); + assertThat(asSet(EsqlCCSUtils.qualifyWithRunningRemotes("data", executionInfo.clusterAliases(), executionInfo)), empty()); } } + private static Set asSet(String expr) { + return Set.of(Strings.splitStringByCommaToArray(expr)); + } + public void testUpdateExecutionInfoWithUnavailableClusters() { // skip_unavailable=true clusters are unavailable, both marked as SKIPPED