diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 15c4ade757b36..af9b8e0e4dac1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -20,7 +20,6 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.FailureCollector; -import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.IndexModeFieldMapper; @@ -372,66 +371,22 @@ public void analyzedPlan( return; } - CheckedFunction analyzeAction = (l) -> { - handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, l.indices.failures()); - Analyzer analyzer = new Analyzer( - new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution), - verifier - ); - LogicalPlan plan = analyzer.analyze(parsed); - plan.setAnalyzed(); - return plan; - }; - PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet()); EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo); - var listener = SubscribableListener.newForked( - l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l) - ) + var listener = SubscribableListener. // + newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)) .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) .andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l)); // first resolve the lookup indices, then the main indices for (var index : preAnalysis.lookupIndices) { listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l)); } - listener.andThen((l, result) -> { - // resolve the main indices - preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l); - }).andThen((l, result) -> { - // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for - // invalid index resolution to updateExecutionInfo - // If we run out of clusters to search due to unavailability we can stop the analysis right here - if (result.indices.isValid() && allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; - // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step - l.onResponse(result); - }).andThen((l, result) -> { - // first attempt (maybe the only one) at analyzing the plan - analyzeAndMaybeRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l); - }).andThen((l, result) -> { - assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; - - // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l); - }).andThen((l, result) -> { - assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; - LOGGER.debug("Analyzing the plan (second attempt, without filter)"); - LogicalPlan plan; - try { - // the order here is tricky - if the cluster has been filtered and later became unavailable, - // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false); - plan = analyzeAction.apply(result); - } catch (Exception e) { - l.onFailure(e); - return; - } - LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); - l.onResponse(plan); - }).addListener(logicalPlanListener); + listener.andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) + .andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l)) + .addListener(logicalPlanListener); } private void preAnalyzeLookupIndex( @@ -699,42 +654,28 @@ private void preAnalyzeMainIndices( } } - /** - * Check if there are any clusters to search. - * - * @return true if there are no clusters to search, false otherwise - */ - private boolean allCCSClustersSkipped( + private void analyzeWithRetry( + LogicalPlan parsed, + QueryBuilder requestFilter, + PreAnalyzer.PreAnalysis preAnalysis, EsqlExecutionInfo executionInfo, PreAnalysisResult result, - ActionListener logicalPlanListener + ActionListener listener ) { - IndexResolution indexResolution = result.indices; - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); - if (executionInfo.isCrossClusterSearch() - && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { - // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception - // to let the LogicalPlanActionListener decide how to proceed - LOGGER.debug("No more clusters to search, ending analysis stage"); - logicalPlanListener.onFailure(new NoClustersToSearchException()); - return true; + if (result.indices.isValid()) { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); + if (executionInfo.isCrossClusterSearch() + && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { + // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception + // to let the LogicalPlanActionListener decide how to proceed + LOGGER.debug("No more clusters to search, ending analysis stage"); + listener.onFailure(new NoClustersToSearchException()); + return; + } } - return false; - } - - private static void analyzeAndMaybeRetry( - CheckedFunction analyzeAction, - QueryBuilder requestFilter, - PreAnalysisResult result, - EsqlExecutionInfo executionInfo, - ActionListener logicalPlanListener, - ActionListener l - ) { - LogicalPlan plan = null; - var filterPresentMessage = requestFilter == null ? "without" : "with"; - var attemptMessage = requestFilter == null ? "the only" : "first"; - LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); + var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter"; + LOGGER.debug("Analyzing the plan ({})", description); try { if (result.indices.isValid() || requestFilter != null) { @@ -742,32 +683,35 @@ private static void analyzeAndMaybeRetry( // when the resolution result is not valid for a different reason. EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null); } - plan = analyzeAction.apply(result); - } catch (Exception e) { - if (e instanceof VerificationException ve) { - LOGGER.debug( - "Analyzing the plan ({} attempt, {} filter) failed with {}", - attemptMessage, - filterPresentMessage, - ve.getDetailedMessage() - ); - if (requestFilter == null) { - // if the initial request didn't have a filter, then just pass the exception back to the user - logicalPlanListener.onFailure(ve); - } else { - // interested only in a VerificationException, but this time we are taking out the index filter - // to try and make the index resolution work without any index filtering. In the next step... to be continued - l.onResponse(result); - } + LogicalPlan plan = analyzedPlan(parsed, result, executionInfo); + LOGGER.debug("Analyzed plan ({}):\n{}", description, plan); + // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning + listener.onResponse(plan); + } catch (VerificationException ve) { + LOGGER.debug("Analyzing the plan ({}) failed with {}", description, ve.getDetailedMessage()); + if (requestFilter == null) { + // if the initial request didn't have a filter, then just pass the exception back to the user + listener.onFailure(ve); } else { - // if the query failed with any other type of exception, then just pass the exception back to the user - logicalPlanListener.onFailure(e); + // retrying and make the index resolution work without any index filtering. + preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> { + LOGGER.debug("Analyzing the plan (second attempt, without filter)"); + try { + // the order here is tricky - if the cluster has been filtered and later became unavailable, + // do we want to declare it successful or skipped? For now, unavailability takes precedence. + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false); + LogicalPlan plan = analyzedPlan(parsed, r, executionInfo); + LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan); + l.onResponse(plan); + } catch (Exception e) { + l.onFailure(e); + } + })); } - return; + } catch (Exception e) { + listener.onFailure(e); } - LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); - // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning - logicalPlanListener.onResponse(plan); } private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener l) { @@ -792,6 +736,17 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu return EstimatesRowSize.estimateRowSize(0, physicalPlan); } + private LogicalPlan analyzedPlan(LogicalPlan parsed, PreAnalysisResult r, EsqlExecutionInfo executionInfo) throws Exception { + handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, r.indices.failures()); + Analyzer analyzer = new Analyzer( + new AnalyzerContext(configuration, functionRegistry, r.indices, r.lookupIndices, r.enrichResolution, r.inferenceResolution), + verifier + ); + LogicalPlan plan = analyzer.analyze(parsed); + plan.setAnalyzed(); + return plan; + } + public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { if (logicalPlan.preOptimized() == false) { throw new IllegalStateException("Expected pre-optimized plan");