Skip to content

Commit 0bed3de

Browse files
committed
replace analyzeAction with a method
1 parent 237805a commit 0bed3de

File tree

1 file changed

+17
-19
lines changed
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session

1 file changed

+17
-19
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.compute.data.Page;
2121
import org.elasticsearch.compute.operator.DriverCompletionInfo;
2222
import org.elasticsearch.compute.operator.FailureCollector;
23-
import org.elasticsearch.core.CheckedFunction;
2423
import org.elasticsearch.core.Releasables;
2524
import org.elasticsearch.index.IndexMode;
2625
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
@@ -384,33 +383,21 @@ public void analyzedPlan(
384383
return;
385384
}
386385

387-
CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction = (l) -> {
388-
handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, l.indices.failures());
389-
Analyzer analyzer = new Analyzer(
390-
new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution),
391-
verifier
392-
);
393-
LogicalPlan plan = analyzer.analyze(parsed);
394-
plan.setAnalyzed();
395-
return plan;
396-
};
397-
398386
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
399387
var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet());
400388

401389
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
402390

403-
var listener = SubscribableListener.<EnrichResolution>newForked(
404-
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
405-
)
391+
var listener = SubscribableListener. //
392+
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l))
406393
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
407394
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
408395
// first resolve the lookup indices, then the main indices
409396
for (var index : preAnalysis.lookupIndices) {
410397
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
411398
}
412399
listener.<PreAnalysisResult>andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
413-
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(analyzeAction, requestFilter, preAnalysis, executionInfo, result, l))
400+
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l))
414401
.addListener(logicalPlanListener);
415402
}
416403

@@ -680,7 +667,7 @@ private void preAnalyzeMainIndices(
680667
}
681668

682669
private void analyzeWithRetry(
683-
CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction,
670+
LogicalPlan parsed,
684671
QueryBuilder requestFilter,
685672
PreAnalyzer.PreAnalysis preAnalysis,
686673
EsqlExecutionInfo executionInfo,
@@ -708,7 +695,7 @@ private void analyzeWithRetry(
708695
// when the resolution result is not valid for a different reason.
709696
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null);
710697
}
711-
LogicalPlan plan = analyzeAction.apply(result);
698+
LogicalPlan plan = analyzedPlan(parsed, result, executionInfo);
712699
LOGGER.debug("Analyzed plan ({}):\n{}", description, plan);
713700
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
714701
listener.onResponse(plan);
@@ -726,7 +713,7 @@ private void analyzeWithRetry(
726713
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
727714
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures());
728715
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
729-
LogicalPlan plan = analyzeAction.apply(r);
716+
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
730717
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
731718
l.onResponse(plan);
732719
} catch (Exception e) {
@@ -761,6 +748,17 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
761748
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
762749
}
763750

751+
private LogicalPlan analyzedPlan(LogicalPlan parsed, PreAnalysisResult r, EsqlExecutionInfo executionInfo) throws Exception {
752+
handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, r.indices.failures());
753+
Analyzer analyzer = new Analyzer(
754+
new AnalyzerContext(configuration, functionRegistry, r.indices, r.lookupIndices, r.enrichResolution, r.inferenceResolution),
755+
verifier
756+
);
757+
LogicalPlan plan = analyzer.analyze(parsed);
758+
plan.setAnalyzed();
759+
return plan;
760+
}
761+
764762
private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
765763
if (logicalPlan.preOptimized() == false) {
766764
throw new IllegalStateException("Expected pre-optimized plan");

0 commit comments

Comments
 (0)