Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
Expand Down Expand Up @@ -372,66 +371,22 @@ public void analyzedPlan(
return;
}

CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction = (l) -> {
handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, l.indices.failures());
Analyzer analyzer = new Analyzer(
new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution),
verifier
);
LogicalPlan plan = analyzer.analyze(parsed);
plan.setAnalyzed();
return plan;
};

PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet());

EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);

var listener = SubscribableListener.<EnrichResolution>newForked(
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
)
var listener = SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
// first resolve the lookup indices, then the main indices
for (var index : preAnalysis.lookupIndices) {
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
}
listener.<PreAnalysisResult>andThen((l, result) -> {
// resolve the main indices
preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l);
}).<PreAnalysisResult>andThen((l, result) -> {
// TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
// invalid index resolution to updateExecutionInfo
// If we run out of clusters to search due to unavailability we can stop the analysis right here
if (result.indices.isValid() && allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
l.onResponse(result);
}).<PreAnalysisResult>andThen((l, result) -> {
// first attempt (maybe the only one) at analyzing the plan
analyzeAndMaybeRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l);
}).<PreAnalysisResult>andThen((l, result) -> {
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";

// here the requestFilter is set to null, performing the pre-analysis after the first step failed
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
}).<LogicalPlan>andThen((l, result) -> {
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
LogicalPlan plan;
try {
// the order here is tricky - if the cluster has been filtered and later became unavailable,
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false);
plan = analyzeAction.apply(result);
} catch (Exception e) {
l.onFailure(e);
return;
}
LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan);
l.onResponse(plan);
}).addListener(logicalPlanListener);
listener.<PreAnalysisResult>andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l))
.addListener(logicalPlanListener);
}

private void preAnalyzeLookupIndex(
Expand Down Expand Up @@ -699,75 +654,64 @@ private void preAnalyzeMainIndices(
}
}

/**
* Check if there are any clusters to search.
*
* @return true if there are no clusters to search, false otherwise
*/
private boolean allCCSClustersSkipped(
private void analyzeWithRetry(
LogicalPlan parsed,
QueryBuilder requestFilter,
PreAnalyzer.PreAnalysis preAnalysis,
EsqlExecutionInfo executionInfo,
PreAnalysisResult result,
ActionListener<LogicalPlan> logicalPlanListener
ActionListener<LogicalPlan> listener
) {
IndexResolution indexResolution = result.indices;
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
// to let the LogicalPlanActionListener decide how to proceed
LOGGER.debug("No more clusters to search, ending analysis stage");
logicalPlanListener.onFailure(new NoClustersToSearchException());
return true;
if (result.indices.isValid()) {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
// to let the LogicalPlanActionListener decide how to proceed
LOGGER.debug("No more clusters to search, ending analysis stage");
listener.onFailure(new NoClustersToSearchException());
return;
}
}

return false;
}

private static void analyzeAndMaybeRetry(
CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction,
QueryBuilder requestFilter,
PreAnalysisResult result,
EsqlExecutionInfo executionInfo,
ActionListener<LogicalPlan> logicalPlanListener,
ActionListener<PreAnalysisResult> l
) {
LogicalPlan plan = null;
var filterPresentMessage = requestFilter == null ? "without" : "with";
var attemptMessage = requestFilter == null ? "the only" : "first";
LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
LOGGER.debug("Analyzing the plan ({})", description);

try {
if (result.indices.isValid() || requestFilter != null) {
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
// when the resolution result is not valid for a different reason.
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null);
}
plan = analyzeAction.apply(result);
} catch (Exception e) {
if (e instanceof VerificationException ve) {
LOGGER.debug(
"Analyzing the plan ({} attempt, {} filter) failed with {}",
attemptMessage,
filterPresentMessage,
ve.getDetailedMessage()
);
if (requestFilter == null) {
// if the initial request didn't have a filter, then just pass the exception back to the user
logicalPlanListener.onFailure(ve);
} else {
// interested only in a VerificationException, but this time we are taking out the index filter
// to try and make the index resolution work without any index filtering. In the next step... to be continued
l.onResponse(result);
}
LogicalPlan plan = analyzedPlan(parsed, result, executionInfo);
LOGGER.debug("Analyzed plan ({}):\n{}", description, plan);
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
listener.onResponse(plan);
} catch (VerificationException ve) {
LOGGER.debug("Analyzing the plan ({}) failed with {}", description, ve.getDetailedMessage());
if (requestFilter == null) {
// if the initial request didn't have a filter, then just pass the exception back to the user
listener.onFailure(ve);
} else {
// if the query failed with any other type of exception, then just pass the exception back to the user
logicalPlanListener.onFailure(e);
// retrying and make the index resolution work without any index filtering.
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> {
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
try {
// the order here is tricky - if the cluster has been filtered and later became unavailable,
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures());
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
l.onResponse(plan);
} catch (Exception e) {
l.onFailure(e);
}
}));
}
return;
} catch (Exception e) {
listener.onFailure(e);
}
LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
logicalPlanListener.onResponse(plan);
}

private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener<PreAnalysisResult> l) {
Expand All @@ -792,6 +736,17 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
}

private LogicalPlan analyzedPlan(LogicalPlan parsed, PreAnalysisResult r, EsqlExecutionInfo executionInfo) throws Exception {
handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, r.indices.failures());
Analyzer analyzer = new Analyzer(
new AnalyzerContext(configuration, functionRegistry, r.indices, r.lookupIndices, r.enrichResolution, r.inferenceResolution),
verifier
);
LogicalPlan plan = analyzer.analyze(parsed);
plan.setAnalyzed();
return plan;
}

public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
if (logicalPlan.preOptimized() == false) {
throw new IllegalStateException("Expected pre-optimized plan");
Expand Down