Skip to content

Commit 6485e97

Browse files
idegtiarenkojoshua-adams-1
authored andcommitted
Simplify EsqlSession (elastic#132848)
1 parent b7922ff commit 6485e97

File tree

1 file changed

+59
-104
lines changed
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session

1 file changed

+59
-104
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 59 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.compute.data.Page;
2121
import org.elasticsearch.compute.operator.DriverCompletionInfo;
2222
import org.elasticsearch.compute.operator.FailureCollector;
23-
import org.elasticsearch.core.CheckedFunction;
2423
import org.elasticsearch.core.Releasables;
2524
import org.elasticsearch.index.IndexMode;
2625
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
@@ -372,66 +371,22 @@ public void analyzedPlan(
372371
return;
373372
}
374373

375-
CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction = (l) -> {
376-
handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, l.indices.failures());
377-
Analyzer analyzer = new Analyzer(
378-
new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution),
379-
verifier
380-
);
381-
LogicalPlan plan = analyzer.analyze(parsed);
382-
plan.setAnalyzed();
383-
return plan;
384-
};
385-
386374
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
387375
var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet());
388376

389377
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
390378

391-
var listener = SubscribableListener.<EnrichResolution>newForked(
392-
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
393-
)
379+
var listener = SubscribableListener. //
380+
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l))
394381
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
395382
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
396383
// first resolve the lookup indices, then the main indices
397384
for (var index : preAnalysis.lookupIndices) {
398385
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
399386
}
400-
listener.<PreAnalysisResult>andThen((l, result) -> {
401-
// resolve the main indices
402-
preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l);
403-
}).<PreAnalysisResult>andThen((l, result) -> {
404-
// TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
405-
// invalid index resolution to updateExecutionInfo
406-
// If we run out of clusters to search due to unavailability we can stop the analysis right here
407-
if (result.indices.isValid() && allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
408-
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
409-
l.onResponse(result);
410-
}).<PreAnalysisResult>andThen((l, result) -> {
411-
// first attempt (maybe the only one) at analyzing the plan
412-
analyzeAndMaybeRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l);
413-
}).<PreAnalysisResult>andThen((l, result) -> {
414-
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
415-
416-
// here the requestFilter is set to null, performing the pre-analysis after the first step failed
417-
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
418-
}).<LogicalPlan>andThen((l, result) -> {
419-
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
420-
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
421-
LogicalPlan plan;
422-
try {
423-
// the order here is tricky - if the cluster has been filtered and later became unavailable,
424-
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
425-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
426-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false);
427-
plan = analyzeAction.apply(result);
428-
} catch (Exception e) {
429-
l.onFailure(e);
430-
return;
431-
}
432-
LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan);
433-
l.onResponse(plan);
434-
}).addListener(logicalPlanListener);
387+
listener.<PreAnalysisResult>andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
388+
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l))
389+
.addListener(logicalPlanListener);
435390
}
436391

437392
private void preAnalyzeLookupIndex(
@@ -699,75 +654,64 @@ private void preAnalyzeMainIndices(
699654
}
700655
}
701656

702-
/**
703-
* Check if there are any clusters to search.
704-
*
705-
* @return true if there are no clusters to search, false otherwise
706-
*/
707-
private boolean allCCSClustersSkipped(
657+
private void analyzeWithRetry(
658+
LogicalPlan parsed,
659+
QueryBuilder requestFilter,
660+
PreAnalyzer.PreAnalysis preAnalysis,
708661
EsqlExecutionInfo executionInfo,
709662
PreAnalysisResult result,
710-
ActionListener<LogicalPlan> logicalPlanListener
663+
ActionListener<LogicalPlan> listener
711664
) {
712-
IndexResolution indexResolution = result.indices;
713-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
714-
if (executionInfo.isCrossClusterSearch()
715-
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
716-
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
717-
// to let the LogicalPlanActionListener decide how to proceed
718-
LOGGER.debug("No more clusters to search, ending analysis stage");
719-
logicalPlanListener.onFailure(new NoClustersToSearchException());
720-
return true;
665+
if (result.indices.isValid()) {
666+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
667+
if (executionInfo.isCrossClusterSearch()
668+
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
669+
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
670+
// to let the LogicalPlanActionListener decide how to proceed
671+
LOGGER.debug("No more clusters to search, ending analysis stage");
672+
listener.onFailure(new NoClustersToSearchException());
673+
return;
674+
}
721675
}
722676

723-
return false;
724-
}
725-
726-
private static void analyzeAndMaybeRetry(
727-
CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction,
728-
QueryBuilder requestFilter,
729-
PreAnalysisResult result,
730-
EsqlExecutionInfo executionInfo,
731-
ActionListener<LogicalPlan> logicalPlanListener,
732-
ActionListener<PreAnalysisResult> l
733-
) {
734-
LogicalPlan plan = null;
735-
var filterPresentMessage = requestFilter == null ? "without" : "with";
736-
var attemptMessage = requestFilter == null ? "the only" : "first";
737-
LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);
677+
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
678+
LOGGER.debug("Analyzing the plan ({})", description);
738679

739680
try {
740681
if (result.indices.isValid() || requestFilter != null) {
741682
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
742683
// when the resolution result is not valid for a different reason.
743684
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null);
744685
}
745-
plan = analyzeAction.apply(result);
746-
} catch (Exception e) {
747-
if (e instanceof VerificationException ve) {
748-
LOGGER.debug(
749-
"Analyzing the plan ({} attempt, {} filter) failed with {}",
750-
attemptMessage,
751-
filterPresentMessage,
752-
ve.getDetailedMessage()
753-
);
754-
if (requestFilter == null) {
755-
// if the initial request didn't have a filter, then just pass the exception back to the user
756-
logicalPlanListener.onFailure(ve);
757-
} else {
758-
// interested only in a VerificationException, but this time we are taking out the index filter
759-
// to try and make the index resolution work without any index filtering. In the next step... to be continued
760-
l.onResponse(result);
761-
}
686+
LogicalPlan plan = analyzedPlan(parsed, result, executionInfo);
687+
LOGGER.debug("Analyzed plan ({}):\n{}", description, plan);
688+
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
689+
listener.onResponse(plan);
690+
} catch (VerificationException ve) {
691+
LOGGER.debug("Analyzing the plan ({}) failed with {}", description, ve.getDetailedMessage());
692+
if (requestFilter == null) {
693+
// if the initial request didn't have a filter, then just pass the exception back to the user
694+
listener.onFailure(ve);
762695
} else {
763-
// if the query failed with any other type of exception, then just pass the exception back to the user
764-
logicalPlanListener.onFailure(e);
696+
// retrying and make the index resolution work without any index filtering.
697+
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> {
698+
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
699+
try {
700+
// the order here is tricky - if the cluster has been filtered and later became unavailable,
701+
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
702+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures());
703+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
704+
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
705+
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
706+
l.onResponse(plan);
707+
} catch (Exception e) {
708+
l.onFailure(e);
709+
}
710+
}));
765711
}
766-
return;
712+
} catch (Exception e) {
713+
listener.onFailure(e);
767714
}
768-
LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
769-
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
770-
logicalPlanListener.onResponse(plan);
771715
}
772716

773717
private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener<PreAnalysisResult> l) {
@@ -792,6 +736,17 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
792736
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
793737
}
794738

739+
private LogicalPlan analyzedPlan(LogicalPlan parsed, PreAnalysisResult r, EsqlExecutionInfo executionInfo) throws Exception {
740+
handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, r.indices.failures());
741+
Analyzer analyzer = new Analyzer(
742+
new AnalyzerContext(configuration, functionRegistry, r.indices, r.lookupIndices, r.enrichResolution, r.inferenceResolution),
743+
verifier
744+
);
745+
LogicalPlan plan = analyzer.analyze(parsed);
746+
plan.setAnalyzed();
747+
return plan;
748+
}
749+
795750
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
796751
if (logicalPlan.preOptimized() == false) {
797752
throw new IllegalStateException("Expected pre-optimized plan");

0 commit comments

Comments
 (0)