diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 68810e0686813..1caef55748eff 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -56,6 +56,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; @@ -541,7 +542,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { null, new InferenceService(mock(Client.class)), new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY), - TEST_PLANNER_SETTINGS + TEST_PLANNER_SETTINGS, + new CrossProjectModeDecider(Settings.EMPTY) ); private static ClusterService createMockClusterService() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java index 4d31f48da77de..c2e8663150339 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java @@ -36,6 +36,10 @@ public static IndexResolution valid(EsIndex index) { return valid(index, index.concreteIndices(), Map.of()); } + public static IndexResolution empty(String indexPattern) { + return valid(new EsIndex(indexPattern, Map.of(), Map.of(), Set.of())); + } + public static IndexResolution invalid(String invalid) { Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid"); return new IndexResolution(null, invalid, Set.of(), Map.of()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java index 66defcf0ab1e3..391a5543ad922 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java @@ -13,6 +13,7 @@ import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.transport.TransportService; import org.elasticsearch.usage.UsageService; import org.elasticsearch.xpack.esql.inference.InferenceService; @@ -28,5 +29,6 @@ public record TransportActionServices( UsageService usageService, InferenceService inferenceService, BlockFactoryProvider blockFactoryProvider, - PlannerSettings plannerSettings + PlannerSettings plannerSettings, + CrossProjectModeDecider crossProjectModeDecider ) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index f8d24720231e5..46904ef3d9a6b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -170,7 +171,8 @@ public TransportEsqlQueryAction( usageService, new InferenceService(client), blockFactoryProvider, - new PlannerSettings(clusterService) + new PlannerSettings(clusterService), + new CrossProjectModeDecider(clusterService.getSettings()) ); this.computeService = new ComputeService( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index a97b13a61a71e..a9a5b354fd029 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -325,33 +325,29 @@ public static void initCrossClusterState( return; } try { - // TODO it is not safe to concat multiple index patterns in case any of them contains exclusion. - // This is going to be resolved in #136804 - String[] indexExpressions = indexPatterns.stream() - .map(indexPattern -> Strings.splitStringByCommaToArray(indexPattern.indexPattern())) - .reduce((a, b) -> { - String[] combined = new String[a.length + b.length]; - System.arraycopy(a, 0, combined, 0, a.length); - System.arraycopy(b, 0, combined, a.length, b.length); - return combined; - }) - .get(); - var groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, indexExpressions, false); + for (IndexPattern indexPattern : indexPatterns) { + var groupedIndices = indicesGrouper.groupIndices( + IndicesOptions.DEFAULT, + Strings.splitStringByCommaToArray(indexPattern.indexPattern()), + false + ); - executionInfo.clusterInfoInitializing(true); - // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error - // so that the CCS telemetry handler can recognize that this error is CCS-related - try { - for (var entry : groupedIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); + executionInfo.clusterInfoInitializing(true); + // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error + // so that the CCS telemetry handler can recognize that this error is CCS-related + try { + groupedIndices.forEach((clusterAlias, indices) -> { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + var indexExpr = Strings.arrayToCommaDelimitedString(indices.indices()); + if (v != null) { + indexExpr = v.getIndexExpression() + "," + indexExpr; + } + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); + }); }); + } finally { + executionInfo.clusterInfoInitializing(false); } - } finally { - executionInfo.clusterInfoInitializing(false); } if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 3fca6e7ee31ce..17a679e206073 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; @@ -33,6 +34,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; @@ -134,6 +136,7 @@ public interface PlanRunner { private final RemoteClusterService remoteClusterService; private final BlockFactory blockFactory; private final ByteSizeValue intermediateLocalRelationMaxSize; + private final CrossProjectModeDecider crossProjectModeDecider; private final String clusterName; private boolean explainMode; @@ -168,6 +171,7 @@ public EsqlSession( this.remoteClusterService = services.transportService().getRemoteClusterService(); this.blockFactory = services.blockFactoryProvider().blockFactory(); this.intermediateLocalRelationMaxSize = services.plannerSettings().intermediateLocalRelationMaxSize(); + this.crossProjectModeDecider = services.crossProjectModeDecider(); this.clusterName = services.clusterService().getClusterName().value(); } @@ -544,27 +548,16 @@ private void resolveIndicesAndAnalyze( PreAnalysisResult result, ActionListener> logicalPlanListener ) { - EsqlCCSUtils.initCrossClusterState( - indicesExpressionGrouper, - verifier.licenseState(), - preAnalysis.indexes().keySet(), - executionInfo - ); - - SubscribableListener.newForked( - // The main index pattern dictates on which nodes the query can be executed, so we use the minimum transport version from this - // field - // caps request. - l -> preAnalyzeMainIndices(preAnalysis.indexes().entrySet().iterator(), preAnalysis, executionInfo, result, requestFilter, l) - ).andThenApply(r -> { - if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses - && executionInfo.isCrossClusterSearch() - && executionInfo.getRunningClusterAliases().findAny().isEmpty()) { - LOGGER.debug("No more clusters to search, ending analysis stage"); - throw new NoClustersToSearchException(); - } - return r; - }) + SubscribableListener.newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) + .andThenApply(r -> { + if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses + && executionInfo.isCrossClusterSearch() + && executionInfo.getRunningClusterAliases().findAny().isEmpty()) { + LOGGER.debug("No more clusters to search, ending analysis stage"); + throw new NoClustersToSearchException(); + } + return r; + }) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) .andThen((l, r) -> { enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); @@ -587,13 +580,7 @@ private void preAnalyzeLookupIndices( EsqlExecutionInfo executionInfo, ActionListener listener ) { - if (lookupIndices.hasNext()) { - preAnalyzeLookupIndex(lookupIndices.next(), preAnalysisResult, executionInfo, listener.delegateFailureAndWrap((l, r) -> { - preAnalyzeLookupIndices(lookupIndices, r, executionInfo, l); - })); - } else { - listener.onResponse(preAnalysisResult); - } + forAll(lookupIndices, preAnalysisResult, (lookupIndex, r, l) -> preAnalyzeLookupIndex(lookupIndex, r, executionInfo, l), listener); } private void preAnalyzeLookupIndex( @@ -805,29 +792,26 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) { * indices. */ private void preAnalyzeMainIndices( - Iterator> indexPatterns, PreAnalyzer.PreAnalysis preAnalysis, EsqlExecutionInfo executionInfo, PreAnalysisResult result, QueryBuilder requestFilter, ActionListener listener ) { - if (indexPatterns.hasNext()) { - var index = indexPatterns.next(); - preAnalyzeMainIndices( - index.getKey(), - index.getValue(), - preAnalysis, - executionInfo, - result, - requestFilter, - listener.delegateFailureAndWrap((l, r) -> { - preAnalyzeMainIndices(indexPatterns, preAnalysis, executionInfo, r, requestFilter, l); - }) - ); - } else { - listener.onResponse(result); - } + EsqlCCSUtils.initCrossClusterState( + indicesExpressionGrouper, + verifier.licenseState(), + preAnalysis.indexes().keySet(), + executionInfo + ); + // The main index pattern dictates on which nodes the query can be executed, + // so we use the minimum transport version from this field caps request. + forAll( + preAnalysis.indexes().entrySet().iterator(), + result, + (entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l), + listener + ); } private void preAnalyzeMainIndices( @@ -844,12 +828,9 @@ private void preAnalyzeMainIndices( ThreadPool.Names.SEARCH_COORDINATION, ThreadPool.Names.SYSTEM_READ ); - // TODO: This is not yet index specific, but that will not matter as soon as #136804 is dealt with if (executionInfo.clusterAliases().isEmpty()) { // return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index) - listener.onResponse( - result.withIndices(indexPattern, IndexResolution.valid(new EsIndex(indexPattern.indexPattern(), Map.of(), Map.of()))) - ); + listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern()))); } else { indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion( indexPattern.indexPattern(), @@ -994,6 +975,19 @@ private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan, PhysicalPl return plan; } + private static void forAll( + Iterator iterator, + PreAnalysisResult result, + TriConsumer> consumer, + ActionListener listener + ) { + if (iterator.hasNext()) { + consumer.apply(iterator.next(), result, listener.delegateFailureAndWrap((l, r) -> forAll(iterator, r, consumer, l))); + } else { + listener.onResponse(result); + } + } + public record PreAnalysisResult( Set fieldNames, Set wildcardJoinIndices,