Skip to content

Commit 4056a79

Browse files
authored
Resolve main indices before lookup (#134115)
Lookup indices must be resolved for every remote cluster that is going to be queried. Today list of remotes is derived from the index expression. With CPS/flat expressions remotes should be derived from the resolved main indices. This change moves lookup index resolution after main index resolution so that it will be possible to use main index resolution when deriving lookup indices remotes.
1 parent 45842f8 commit 4056a79

File tree

1 file changed

+22
-9
lines changed
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session

1 file changed

+22
-9
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.ArrayList;
7979
import java.util.Collection;
8080
import java.util.HashMap;
81+
import java.util.Iterator;
8182
import java.util.List;
8283
import java.util.Map;
8384
import java.util.Set;
@@ -372,22 +373,34 @@ public void analyzedPlan(
372373
return;
373374
}
374375

375-
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
376+
var preAnalysis = preAnalyzer.preAnalyze(parsed);
376377
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
377378

378-
var listener = SubscribableListener. //
379+
SubscribableListener. //
379380
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
380381
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
381-
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
382-
// first resolve the lookup indices, then the main indices
383-
for (var index : preAnalysis.lookupIndices) {
384-
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
385-
}
386-
listener.<PreAnalysisResult>andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
387-
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l))
382+
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
383+
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
384+
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices.iterator(), r, executionInfo, l))
385+
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
388386
.addListener(logicalPlanListener);
389387
}
390388

389+
private void preAnalyzeLookupIndices(
390+
Iterator<IndexPattern> lookupIndices,
391+
PreAnalysisResult preAnalysisResult,
392+
EsqlExecutionInfo executionInfo,
393+
ActionListener<PreAnalysisResult> listener
394+
) {
395+
if (lookupIndices.hasNext()) {
396+
preAnalyzeLookupIndex(lookupIndices.next(), preAnalysisResult, executionInfo, listener.delegateFailureAndWrap((l, r) -> {
397+
preAnalyzeLookupIndices(lookupIndices, r, executionInfo, l);
398+
}));
399+
} else {
400+
listener.onResponse(preAnalysisResult);
401+
}
402+
}
403+
391404
private void preAnalyzeLookupIndex(
392405
IndexPattern lookupIndexPattern,
393406
PreAnalysisResult result,

0 commit comments

Comments
 (0)