Skip to content

Commit 410d877

Browse files
authored
Wire cps check (#137446)
1 parent 42607fc commit 410d877

File tree

6 files changed

+76
-76
lines changed

6 files changed

+76
-76
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.elasticsearch.license.XPackLicenseState;
5757
import org.elasticsearch.search.SearchService;
5858
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
59+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
5960
import org.elasticsearch.tasks.TaskCancelledException;
6061
import org.elasticsearch.test.ESTestCase;
6162
import org.elasticsearch.test.TransportVersionUtils;
@@ -541,7 +542,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
541542
null,
542543
new InferenceService(mock(Client.class)),
543544
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY),
544-
TEST_PLANNER_SETTINGS
545+
TEST_PLANNER_SETTINGS,
546+
new CrossProjectModeDecider(Settings.EMPTY)
545547
);
546548

547549
private static ClusterService createMockClusterService() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ public static IndexResolution valid(EsIndex index) {
3636
return valid(index, index.concreteIndices(), Map.of());
3737
}
3838

39+
public static IndexResolution empty(String indexPattern) {
40+
return valid(new EsIndex(indexPattern, Map.of(), Map.of(), Set.of()));
41+
}
42+
3943
public static IndexResolution invalid(String invalid) {
4044
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
4145
return new IndexResolution(null, invalid, Set.of(), Map.of());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.compute.data.BlockFactoryProvider;
1414
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1515
import org.elasticsearch.search.SearchService;
16+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
1617
import org.elasticsearch.transport.TransportService;
1718
import org.elasticsearch.usage.UsageService;
1819
import org.elasticsearch.xpack.esql.inference.InferenceService;
@@ -28,5 +29,6 @@ public record TransportActionServices(
2829
UsageService usageService,
2930
InferenceService inferenceService,
3031
BlockFactoryProvider blockFactoryProvider,
31-
PlannerSettings plannerSettings
32+
PlannerSettings plannerSettings,
33+
CrossProjectModeDecider crossProjectModeDecider
3234
) {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.core.Nullable;
2727
import org.elasticsearch.injection.guice.Inject;
2828
import org.elasticsearch.search.SearchService;
29+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
2930
import org.elasticsearch.tasks.CancellableTask;
3031
import org.elasticsearch.tasks.Task;
3132
import org.elasticsearch.tasks.TaskId;
@@ -170,7 +171,8 @@ public TransportEsqlQueryAction(
170171
usageService,
171172
new InferenceService(client),
172173
blockFactoryProvider,
173-
new PlannerSettings(clusterService)
174+
new PlannerSettings(clusterService),
175+
new CrossProjectModeDecider(clusterService.getSettings())
174176
);
175177

176178
this.computeService = new ComputeService(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -325,33 +325,29 @@ public static void initCrossClusterState(
325325
return;
326326
}
327327
try {
328-
// TODO it is not safe to concat multiple index patterns in case any of them contains exclusion.
329-
// This is going to be resolved in #136804
330-
String[] indexExpressions = indexPatterns.stream()
331-
.map(indexPattern -> Strings.splitStringByCommaToArray(indexPattern.indexPattern()))
332-
.reduce((a, b) -> {
333-
String[] combined = new String[a.length + b.length];
334-
System.arraycopy(a, 0, combined, 0, a.length);
335-
System.arraycopy(b, 0, combined, a.length, b.length);
336-
return combined;
337-
})
338-
.get();
339-
var groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, indexExpressions, false);
328+
for (IndexPattern indexPattern : indexPatterns) {
329+
var groupedIndices = indicesGrouper.groupIndices(
330+
IndicesOptions.DEFAULT,
331+
Strings.splitStringByCommaToArray(indexPattern.indexPattern()),
332+
false
333+
);
340334

341-
executionInfo.clusterInfoInitializing(true);
342-
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
343-
// so that the CCS telemetry handler can recognize that this error is CCS-related
344-
try {
345-
for (var entry : groupedIndices.entrySet()) {
346-
final String clusterAlias = entry.getKey();
347-
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
348-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
349-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
350-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
335+
executionInfo.clusterInfoInitializing(true);
336+
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
337+
// so that the CCS telemetry handler can recognize that this error is CCS-related
338+
try {
339+
groupedIndices.forEach((clusterAlias, indices) -> {
340+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
341+
var indexExpr = Strings.arrayToCommaDelimitedString(indices.indices());
342+
if (v != null) {
343+
indexExpr = v.getIndexExpression() + "," + indexExpr;
344+
}
345+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
346+
});
351347
});
348+
} finally {
349+
executionInfo.clusterInfoInitializing(false);
352350
}
353-
} finally {
354-
executionInfo.clusterInfoInitializing(false);
355351
}
356352

357353
if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 43 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1515
import org.elasticsearch.action.search.ShardSearchFailure;
1616
import org.elasticsearch.action.support.SubscribableListener;
17+
import org.elasticsearch.common.TriConsumer;
1718
import org.elasticsearch.common.collect.Iterators;
1819
import org.elasticsearch.common.unit.ByteSizeValue;
1920
import org.elasticsearch.compute.data.Block;
@@ -33,6 +34,7 @@
3334
import org.elasticsearch.logging.LogManager;
3435
import org.elasticsearch.logging.Logger;
3536
import org.elasticsearch.search.SearchShardTarget;
37+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
3638
import org.elasticsearch.threadpool.ThreadPool;
3739
import org.elasticsearch.transport.RemoteClusterAware;
3840
import org.elasticsearch.transport.RemoteClusterService;
@@ -134,6 +136,7 @@ public interface PlanRunner {
134136
private final RemoteClusterService remoteClusterService;
135137
private final BlockFactory blockFactory;
136138
private final ByteSizeValue intermediateLocalRelationMaxSize;
139+
private final CrossProjectModeDecider crossProjectModeDecider;
137140
private final String clusterName;
138141

139142
private boolean explainMode;
@@ -168,6 +171,7 @@ public EsqlSession(
168171
this.remoteClusterService = services.transportService().getRemoteClusterService();
169172
this.blockFactory = services.blockFactoryProvider().blockFactory();
170173
this.intermediateLocalRelationMaxSize = services.plannerSettings().intermediateLocalRelationMaxSize();
174+
this.crossProjectModeDecider = services.crossProjectModeDecider();
171175
this.clusterName = services.clusterService().getClusterName().value();
172176
}
173177

@@ -544,27 +548,16 @@ private void resolveIndicesAndAnalyze(
544548
PreAnalysisResult result,
545549
ActionListener<Versioned<LogicalPlan>> logicalPlanListener
546550
) {
547-
EsqlCCSUtils.initCrossClusterState(
548-
indicesExpressionGrouper,
549-
verifier.licenseState(),
550-
preAnalysis.indexes().keySet(),
551-
executionInfo
552-
);
553-
554-
SubscribableListener.<PreAnalysisResult>newForked(
555-
// The main index pattern dictates on which nodes the query can be executed, so we use the minimum transport version from this
556-
// field
557-
// caps request.
558-
l -> preAnalyzeMainIndices(preAnalysis.indexes().entrySet().iterator(), preAnalysis, executionInfo, result, requestFilter, l)
559-
).andThenApply(r -> {
560-
if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses
561-
&& executionInfo.isCrossClusterSearch()
562-
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
563-
LOGGER.debug("No more clusters to search, ending analysis stage");
564-
throw new NoClustersToSearchException();
565-
}
566-
return r;
567-
})
551+
SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
552+
.andThenApply(r -> {
553+
if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses
554+
&& executionInfo.isCrossClusterSearch()
555+
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
556+
LOGGER.debug("No more clusters to search, ending analysis stage");
557+
throw new NoClustersToSearchException();
558+
}
559+
return r;
560+
})
568561
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l))
569562
.<PreAnalysisResult>andThen((l, r) -> {
570563
enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution));
@@ -587,13 +580,7 @@ private void preAnalyzeLookupIndices(
587580
EsqlExecutionInfo executionInfo,
588581
ActionListener<PreAnalysisResult> listener
589582
) {
590-
if (lookupIndices.hasNext()) {
591-
preAnalyzeLookupIndex(lookupIndices.next(), preAnalysisResult, executionInfo, listener.delegateFailureAndWrap((l, r) -> {
592-
preAnalyzeLookupIndices(lookupIndices, r, executionInfo, l);
593-
}));
594-
} else {
595-
listener.onResponse(preAnalysisResult);
596-
}
583+
forAll(lookupIndices, preAnalysisResult, (lookupIndex, r, l) -> preAnalyzeLookupIndex(lookupIndex, r, executionInfo, l), listener);
597584
}
598585

599586
private void preAnalyzeLookupIndex(
@@ -805,29 +792,26 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
805792
* indices.
806793
*/
807794
private void preAnalyzeMainIndices(
808-
Iterator<Map.Entry<IndexPattern, IndexMode>> indexPatterns,
809795
PreAnalyzer.PreAnalysis preAnalysis,
810796
EsqlExecutionInfo executionInfo,
811797
PreAnalysisResult result,
812798
QueryBuilder requestFilter,
813799
ActionListener<PreAnalysisResult> listener
814800
) {
815-
if (indexPatterns.hasNext()) {
816-
var index = indexPatterns.next();
817-
preAnalyzeMainIndices(
818-
index.getKey(),
819-
index.getValue(),
820-
preAnalysis,
821-
executionInfo,
822-
result,
823-
requestFilter,
824-
listener.delegateFailureAndWrap((l, r) -> {
825-
preAnalyzeMainIndices(indexPatterns, preAnalysis, executionInfo, r, requestFilter, l);
826-
})
827-
);
828-
} else {
829-
listener.onResponse(result);
830-
}
801+
EsqlCCSUtils.initCrossClusterState(
802+
indicesExpressionGrouper,
803+
verifier.licenseState(),
804+
preAnalysis.indexes().keySet(),
805+
executionInfo
806+
);
807+
// The main index pattern dictates on which nodes the query can be executed,
808+
// so we use the minimum transport version from this field caps request.
809+
forAll(
810+
preAnalysis.indexes().entrySet().iterator(),
811+
result,
812+
(entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
813+
listener
814+
);
831815
}
832816

833817
private void preAnalyzeMainIndices(
@@ -844,12 +828,9 @@ private void preAnalyzeMainIndices(
844828
ThreadPool.Names.SEARCH_COORDINATION,
845829
ThreadPool.Names.SYSTEM_READ
846830
);
847-
// TODO: This is not yet index specific, but that will not matter as soon as #136804 is dealt with
848831
if (executionInfo.clusterAliases().isEmpty()) {
849832
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
850-
listener.onResponse(
851-
result.withIndices(indexPattern, IndexResolution.valid(new EsIndex(indexPattern.indexPattern(), Map.of(), Map.of())))
852-
);
833+
listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern())));
853834
} else {
854835
indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion(
855836
indexPattern.indexPattern(),
@@ -994,6 +975,19 @@ private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan, PhysicalPl
994975
return plan;
995976
}
996977

978+
private static <T> void forAll(
979+
Iterator<T> iterator,
980+
PreAnalysisResult result,
981+
TriConsumer<T, PreAnalysisResult, ActionListener<PreAnalysisResult>> consumer,
982+
ActionListener<PreAnalysisResult> listener
983+
) {
984+
if (iterator.hasNext()) {
985+
consumer.apply(iterator.next(), result, listener.delegateFailureAndWrap((l, r) -> forAll(iterator, r, consumer, l)));
986+
} else {
987+
listener.onResponse(result);
988+
}
989+
}
990+
997991
public record PreAnalysisResult(
998992
Set<String> fieldNames,
999993
Set<String> wildcardJoinIndices,

0 commit comments

Comments
 (0)