Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
Expand Down Expand Up @@ -541,7 +542,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
null,
new InferenceService(mock(Client.class)),
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY),
TEST_PLANNER_SETTINGS
TEST_PLANNER_SETTINGS,
new CrossProjectModeDecider(Settings.EMPTY)
);

private static ClusterService createMockClusterService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Map.of());
}

public static IndexResolution empty(String indexPattern) {
return valid(new EsIndex(indexPattern, Map.of(), Map.of(), Set.of()));
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Set.of(), Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.compute.data.BlockFactoryProvider;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.xpack.esql.inference.InferenceService;
Expand All @@ -28,5 +29,6 @@ public record TransportActionServices(
UsageService usageService,
InferenceService inferenceService,
BlockFactoryProvider blockFactoryProvider,
PlannerSettings plannerSettings
PlannerSettings plannerSettings,
CrossProjectModeDecider crossProjectModeDecider
) {}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -170,7 +171,8 @@ public TransportEsqlQueryAction(
usageService,
new InferenceService(client),
blockFactoryProvider,
new PlannerSettings(clusterService)
new PlannerSettings(clusterService),
new CrossProjectModeDecider(clusterService.getSettings())
);

this.computeService = new ComputeService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,33 +325,29 @@ public static void initCrossClusterState(
return;
}
try {
// TODO it is not safe to concat multiple index patterns in case any of them contains exclusion.
// This is going to be resolved in #136804
String[] indexExpressions = indexPatterns.stream()
.map(indexPattern -> Strings.splitStringByCommaToArray(indexPattern.indexPattern()))
.reduce((a, b) -> {
String[] combined = new String[a.length + b.length];
System.arraycopy(a, 0, combined, 0, a.length);
System.arraycopy(b, 0, combined, a.length, b.length);
return combined;
})
.get();
var groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, indexExpressions, false);
for (IndexPattern indexPattern : indexPatterns) {
var groupedIndices = indicesGrouper.groupIndices(
IndicesOptions.DEFAULT,
Strings.splitStringByCommaToArray(indexPattern.indexPattern()),
false
);

executionInfo.clusterInfoInitializing(true);
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
// so that the CCS telemetry handler can recognize that this error is CCS-related
try {
for (var entry : groupedIndices.entrySet()) {
final String clusterAlias = entry.getKey();
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
executionInfo.clusterInfoInitializing(true);
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
// so that the CCS telemetry handler can recognize that this error is CCS-related
try {
groupedIndices.forEach((clusterAlias, indices) -> {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
var indexExpr = Strings.arrayToCommaDelimitedString(indices.indices());
if (v != null) {
indexExpr = v.getIndexExpression() + "," + indexExpr;
}
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
});
});
} finally {
executionInfo.clusterInfoInitializing(false);
}
} finally {
executionInfo.clusterInfoInitializing(false);
}

if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.data.Block;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -134,6 +136,7 @@ public interface PlanRunner {
private final RemoteClusterService remoteClusterService;
private final BlockFactory blockFactory;
private final ByteSizeValue intermediateLocalRelationMaxSize;
private final CrossProjectModeDecider crossProjectModeDecider;
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The crossProjectModeDecider field is added and initialized but never used in this class. If this field is being added for future use, consider adding a TODO comment explaining the intended usage. Otherwise, if it's not needed yet, it should be removed to avoid dead code.

Suggested change
private final CrossProjectModeDecider crossProjectModeDecider;

Copilot uses AI. Check for mistakes.
private final String clusterName;

private boolean explainMode;
Expand Down Expand Up @@ -168,6 +171,7 @@ public EsqlSession(
this.remoteClusterService = services.transportService().getRemoteClusterService();
this.blockFactory = services.blockFactoryProvider().blockFactory();
this.intermediateLocalRelationMaxSize = services.plannerSettings().intermediateLocalRelationMaxSize();
this.crossProjectModeDecider = services.crossProjectModeDecider();
this.clusterName = services.clusterService().getClusterName().value();
}

Expand Down Expand Up @@ -544,27 +548,16 @@ private void resolveIndicesAndAnalyze(
PreAnalysisResult result,
ActionListener<Versioned<LogicalPlan>> logicalPlanListener
) {
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);

SubscribableListener.<PreAnalysisResult>newForked(
// The main index pattern dictates on which nodes the query can be executed, so we use the minimum transport version from this
// field
// caps request.
l -> preAnalyzeMainIndices(preAnalysis.indexes().entrySet().iterator(), preAnalysis, executionInfo, result, requestFilter, l)
).andThenApply(r -> {
if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses
&& executionInfo.isCrossClusterSearch()
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
LOGGER.debug("No more clusters to search, ending analysis stage");
throw new NoClustersToSearchException();
}
return r;
})
SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
.andThenApply(r -> {
if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses
&& executionInfo.isCrossClusterSearch()
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
LOGGER.debug("No more clusters to search, ending analysis stage");
throw new NoClustersToSearchException();
}
return r;
})
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l))
.<PreAnalysisResult>andThen((l, r) -> {
enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution));
Expand All @@ -587,13 +580,7 @@ private void preAnalyzeLookupIndices(
EsqlExecutionInfo executionInfo,
ActionListener<PreAnalysisResult> listener
) {
if (lookupIndices.hasNext()) {
preAnalyzeLookupIndex(lookupIndices.next(), preAnalysisResult, executionInfo, listener.delegateFailureAndWrap((l, r) -> {
preAnalyzeLookupIndices(lookupIndices, r, executionInfo, l);
}));
} else {
listener.onResponse(preAnalysisResult);
}
forAll(lookupIndices, preAnalysisResult, (lookupIndex, r, l) -> preAnalyzeLookupIndex(lookupIndex, r, executionInfo, l), listener);
}

private void preAnalyzeLookupIndex(
Expand Down Expand Up @@ -805,29 +792,26 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
* indices.
*/
private void preAnalyzeMainIndices(
Iterator<Map.Entry<IndexPattern, IndexMode>> indexPatterns,
PreAnalyzer.PreAnalysis preAnalysis,
EsqlExecutionInfo executionInfo,
PreAnalysisResult result,
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
if (indexPatterns.hasNext()) {
var index = indexPatterns.next();
preAnalyzeMainIndices(
index.getKey(),
index.getValue(),
preAnalysis,
executionInfo,
result,
requestFilter,
listener.delegateFailureAndWrap((l, r) -> {
preAnalyzeMainIndices(indexPatterns, preAnalysis, executionInfo, r, requestFilter, l);
})
);
} else {
listener.onResponse(result);
}
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);
// The main index pattern dictates on which nodes the query can be executed,
// so we use the minimum transport version from this field caps request.
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
listener
);
}

private void preAnalyzeMainIndices(
Expand All @@ -844,12 +828,9 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
// TODO: This is not yet index specific, but that will not matter as soon as #136804 is dealt with
if (executionInfo.clusterAliases().isEmpty()) {
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(
result.withIndices(indexPattern, IndexResolution.valid(new EsIndex(indexPattern.indexPattern(), Map.of(), Map.of())))
);
listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern())));
} else {
indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion(
indexPattern.indexPattern(),
Expand Down Expand Up @@ -994,6 +975,19 @@ private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan, PhysicalPl
return plan;
}

private static <T> void forAll(
Iterator<T> iterator,
PreAnalysisResult result,
TriConsumer<T, PreAnalysisResult, ActionListener<PreAnalysisResult>> consumer,
ActionListener<PreAnalysisResult> listener
) {
if (iterator.hasNext()) {
consumer.apply(iterator.next(), result, listener.delegateFailureAndWrap((l, r) -> forAll(iterator, r, consumer, l)));
} else {
listener.onResponse(result);
}
}
Comment on lines +978 to +989
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The forAll helper method lacks documentation. Add a JavaDoc comment explaining that this is a recursive async iterator pattern that processes elements sequentially while threading a result object through each iteration. Include parameter descriptions, especially noting that the consumer receives the current element, accumulated result, and a listener for continuation.

Copilot uses AI. Check for mistakes.

public record PreAnalysisResult(
Set<String> fieldNames,
Set<String> wildcardJoinIndices,
Expand Down
Loading