Skip to content

Commit d8bd6b6

Browse files
authored
Flat index resolution (#138557)
1 parent f506922 commit d8bd6b6

File tree

8 files changed

+210
-144
lines changed

8 files changed

+210
-144
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,19 @@ public Map<String, Cluster> getClusters() {
275275
return clusterInfo;
276276
}
277277

278+
/**
279+
* This creates an initial Cluster object with indexExpression and skipUnavailable.
280+
*/
281+
public void initCluster(String clusterAlias, String indexExpression) {
282+
swapCluster(clusterAlias, (ca, previous) -> {
283+
var expr = indexExpression;
284+
if (previous != null) {
285+
expr = previous.getIndexExpression() + "," + indexExpression;
286+
}
287+
return new Cluster(clusterAlias, expr, shouldSkipOnFailure(clusterAlias));
288+
});
289+
}
290+
278291
/**
279292
* Utility to swap a Cluster object. Guidelines for the remapping function:
280293
* <ul>

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -447,34 +447,22 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
447447
}
448448
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
449449
String indexName = EnrichPolicy.getBaseName(policyName);
450-
indexResolver.resolveIndices(
451-
indexName,
452-
IndexResolver.ALL_FIELDS,
453-
null,
454-
false,
455-
// Disable aggregate_metric_double and dense_vector until we get version checks in planning
456-
false,
457-
false,
458-
refs.acquire(indexResult -> {
459-
if (indexResult.isValid() && indexResult.get().concreteQualifiedIndices().size() == 1) {
460-
EsIndex esIndex = indexResult.get();
461-
var concreteIndices = Map.of(
462-
request.clusterAlias,
463-
Iterables.get(esIndex.concreteQualifiedIndices(), 0)
464-
);
465-
var resolved = new ResolvedEnrichPolicy(
466-
p.getMatchField(),
467-
p.getType(),
468-
p.getEnrichFields(),
469-
concreteIndices,
470-
esIndex.mapping()
471-
);
472-
resolvedPolices.put(policyName, resolved);
473-
} else {
474-
failures.put(policyName, indexResult.toString());
475-
}
476-
})
477-
);
450+
indexResolver.resolveIndices(indexName, IndexResolver.ALL_FIELDS, refs.acquire(indexResult -> {
451+
if (indexResult.isValid() && indexResult.get().concreteQualifiedIndices().size() == 1) {
452+
EsIndex esIndex = indexResult.get();
453+
var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteQualifiedIndices(), 0));
454+
var resolved = new ResolvedEnrichPolicy(
455+
p.getMatchField(),
456+
p.getType(),
457+
p.getEnrichFields(),
458+
concreteIndices,
459+
esIndex.mapping()
460+
);
461+
resolvedPolices.put(policyName, resolved);
462+
} else {
463+
failures.put(policyName, indexResult.toString());
464+
}
465+
}));
478466
}
479467
}
480468
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/QueryBuilderResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private static QueryRewriteContext queryRewriteContext(TransportActionServices s
6363
ClusterState clusterState = services.clusterService().state();
6464
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
6565
indexNames.toArray(String[]::new),
66-
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
66+
IndexResolver.DEFAULT_OPTIONS,
6767
services.projectResolver().getProjectMetadata(clusterState),
6868
services.indexNameExpressionResolver(),
6969
services.transportService().getRemoteClusterService(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
2929
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster;
3030
import org.elasticsearch.xpack.esql.analysis.Analyzer;
31+
import org.elasticsearch.xpack.esql.index.EsIndex;
3132
import org.elasticsearch.xpack.esql.index.IndexResolution;
3233
import org.elasticsearch.xpack.esql.plan.IndexPattern;
3334
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -337,13 +338,7 @@ public static void initCrossClusterState(
337338
// so that the CCS telemetry handler can recognize that this error is CCS-related
338339
try {
339340
groupedIndices.forEach((clusterAlias, indices) -> {
340-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
341-
var indexExpr = Strings.arrayToCommaDelimitedString(indices.indices());
342-
if (v != null) {
343-
indexExpr = v.getIndexExpression() + "," + indexExpr;
344-
}
345-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
346-
});
341+
executionInfo.initCluster(clusterAlias, Strings.arrayToCommaDelimitedString(indices.indices()));
347342
});
348343
} finally {
349344
executionInfo.clusterInfoInitializing(false);
@@ -362,6 +357,12 @@ public static void initCrossClusterState(
362357
}
363358
}
364359

360+
public static void initCrossClusterState(EsIndex esIndex, EsqlExecutionInfo executionInfo) {
361+
esIndex.originalIndices().forEach((clusterAlias, indices) -> {
362+
executionInfo.initCluster(clusterAlias, Strings.collectionToCommaDelimitedString(indices));
363+
});
364+
}
365+
365366
/**
366367
* Mark cluster with a final status (success or failure).
367368
* Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -605,11 +605,6 @@ private void preAnalyzeLookupIndex(
605605
indexResolver.resolveIndices(
606606
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern),
607607
result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
608-
null,
609-
false,
610-
// Disable aggregate_metric_double and dense_vector until we get version checks in planning
611-
false,
612-
false,
613608
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
614609
);
615610
}
@@ -810,20 +805,34 @@ private void preAnalyzeMainIndices(
810805
QueryBuilder requestFilter,
811806
ActionListener<PreAnalysisResult> listener
812807
) {
813-
EsqlCCSUtils.initCrossClusterState(
814-
indicesExpressionGrouper,
815-
verifier.licenseState(),
816-
preAnalysis.indexes().keySet(),
817-
executionInfo
818-
);
819-
// The main index pattern dictates on which nodes the query can be executed,
820-
// so we use the minimum transport version from this field caps request.
821-
forAll(
822-
preAnalysis.indexes().entrySet().iterator(),
823-
result,
824-
(entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
825-
listener
808+
assert ThreadPool.assertCurrentThreadPool(
809+
ThreadPool.Names.SEARCH,
810+
ThreadPool.Names.SEARCH_COORDINATION,
811+
ThreadPool.Names.SYSTEM_READ
826812
);
813+
if (crossProjectModeDecider.crossProjectEnabled() == false) {
814+
EsqlCCSUtils.initCrossClusterState(
815+
indicesExpressionGrouper,
816+
verifier.licenseState(),
817+
preAnalysis.indexes().keySet(),
818+
executionInfo
819+
);
820+
// The main index pattern dictates on which nodes the query can be executed,
821+
// so we use the minimum transport version from this field caps request.
822+
forAll(
823+
preAnalysis.indexes().entrySet().iterator(),
824+
result,
825+
(e, r, l) -> preAnalyzeMainIndices(e.getKey(), e.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
826+
listener
827+
);
828+
} else {
829+
forAll(
830+
preAnalysis.indexes().entrySet().iterator(),
831+
result,
832+
(e, r, l) -> preAnalyzeFlatMainIndices(e.getKey(), e.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
833+
listener
834+
);
835+
}
827836
}
828837

829838
private void preAnalyzeMainIndices(
@@ -835,28 +844,14 @@ private void preAnalyzeMainIndices(
835844
QueryBuilder requestFilter,
836845
ActionListener<PreAnalysisResult> listener
837846
) {
838-
assert ThreadPool.assertCurrentThreadPool(
839-
ThreadPool.Names.SEARCH,
840-
ThreadPool.Names.SEARCH_COORDINATION,
841-
ThreadPool.Names.SYSTEM_READ
842-
);
843847
if (executionInfo.clusterAliases().isEmpty()) {
844848
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
845849
listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern())));
846850
} else {
847851
indexResolver.resolveIndicesVersioned(
848852
indexPattern.indexPattern(),
849853
result.fieldNames,
850-
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
851-
switch (indexMode) {
852-
case IndexMode.TIME_SERIES -> {
853-
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
854-
yield requestFilter != null
855-
? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
856-
: indexModeFilter;
857-
}
858-
default -> requestFilter;
859-
},
854+
createQueryFilter(indexMode, requestFilter),
860855
indexMode == IndexMode.TIME_SERIES,
861856
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
862857
preAnalysis.useDenseVectorWhenNotSupported(),
@@ -872,6 +867,43 @@ private void preAnalyzeMainIndices(
872867
}
873868
}
874869

870+
private void preAnalyzeFlatMainIndices(
871+
IndexPattern indexPattern,
872+
IndexMode indexMode,
873+
PreAnalyzer.PreAnalysis preAnalysis,
874+
EsqlExecutionInfo executionInfo,
875+
PreAnalysisResult result,
876+
QueryBuilder requestFilter,
877+
ActionListener<PreAnalysisResult> listener
878+
) {
879+
indexResolver.resolveFlatWorldIndicesVersioned(
880+
indexPattern.indexPattern(),
881+
result.fieldNames,
882+
createQueryFilter(indexMode, requestFilter),
883+
indexMode == IndexMode.TIME_SERIES,
884+
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
885+
preAnalysis.useDenseVectorWhenNotSupported(),
886+
listener.delegateFailureAndWrap((l, indexResolution) -> {
887+
EsqlCCSUtils.initCrossClusterState(indexResolution.inner().get(), executionInfo);
888+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
889+
l.onResponse(
890+
result.withIndices(indexPattern, indexResolution.inner()).withMinimumTransportVersion(indexResolution.minimumVersion())
891+
);
892+
})
893+
);
894+
}
895+
896+
private static QueryBuilder createQueryFilter(IndexMode indexMode, QueryBuilder requestFilter) {
897+
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
898+
return switch (indexMode) {
899+
case IndexMode.TIME_SERIES -> {
900+
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
901+
yield requestFilter != null ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) : indexModeFilter;
902+
}
903+
default -> requestFilter;
904+
};
905+
}
906+
875907
private void analyzeWithRetry(
876908
LogicalPlan parsed,
877909
Configuration configuration,

0 commit comments

Comments
 (0)