Skip to content

Commit 237805a

Browse files
committed
inline trailing steps
1 parent 371f2e4 commit 237805a

File tree

1 file changed

+25
-37
lines changed
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session

1 file changed

+25
-37
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -409,34 +409,9 @@ public void analyzedPlan(
409409
for (var index : preAnalysis.lookupIndices) {
410410
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
411411
}
412-
listener.<PreAnalysisResult>andThen((l, result) -> {
413-
// resolve the main indices
414-
preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l);
415-
}).<PreAnalysisResult>andThen((l, result) -> {
416-
// first attempt (maybe the only one) at analyzing the plan
417-
analyzeWithRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l);
418-
}).<PreAnalysisResult>andThen((l, result) -> {
419-
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
420-
421-
// here the requestFilter is set to null, performing the pre-analysis after the first step failed
422-
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
423-
}).<LogicalPlan>andThen((l, result) -> {
424-
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
425-
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
426-
LogicalPlan plan;
427-
try {
428-
// the order here is tricky - if the cluster has been filtered and later became unavailable,
429-
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
430-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
431-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false);
432-
plan = analyzeAction.apply(result);
433-
} catch (Exception e) {
434-
l.onFailure(e);
435-
return;
436-
}
437-
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
438-
l.onResponse(plan);
439-
}).addListener(logicalPlanListener);
412+
listener.<PreAnalysisResult>andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
413+
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(analyzeAction, requestFilter, preAnalysis, executionInfo, result, l))
414+
.addListener(logicalPlanListener);
440415
}
441416

442417
private void preAnalyzeLookupIndex(
@@ -704,13 +679,13 @@ private void preAnalyzeMainIndices(
704679
}
705680
}
706681

707-
private static void analyzeWithRetry(
682+
private void analyzeWithRetry(
708683
CheckedFunction<PreAnalysisResult, LogicalPlan, Exception> analyzeAction,
709684
QueryBuilder requestFilter,
710-
PreAnalysisResult result,
685+
PreAnalyzer.PreAnalysis preAnalysis,
711686
EsqlExecutionInfo executionInfo,
712-
ActionListener<LogicalPlan> logicalPlanListener,
713-
ActionListener<PreAnalysisResult> stepListener
687+
PreAnalysisResult result,
688+
ActionListener<LogicalPlan> listener
714689
) {
715690
if (result.indices.isValid()) {
716691
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
@@ -719,7 +694,7 @@ private static void analyzeWithRetry(
719694
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
720695
// to let the LogicalPlanActionListener decide how to proceed
721696
LOGGER.debug("No more clusters to search, ending analysis stage");
722-
logicalPlanListener.onFailure(new NoClustersToSearchException());
697+
listener.onFailure(new NoClustersToSearchException());
723698
return;
724699
}
725700
}
@@ -736,18 +711,31 @@ private static void analyzeWithRetry(
736711
LogicalPlan plan = analyzeAction.apply(result);
737712
LOGGER.debug("Analyzed plan ({}):\n{}", description, plan);
738713
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
739-
logicalPlanListener.onResponse(plan);
714+
listener.onResponse(plan);
740715
} catch (VerificationException ve) {
741716
LOGGER.debug("Analyzing the plan ({}) failed with {}", description, ve.getDetailedMessage());
742717
if (requestFilter == null) {
743718
// if the initial request didn't have a filter, then just pass the exception back to the user
744-
logicalPlanListener.onFailure(ve);
719+
listener.onFailure(ve);
745720
} else {
746721
// retrying and make the index resolution work without any index filtering.
747-
stepListener.onResponse(result);
722+
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> {
723+
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
724+
try {
725+
// the order here is tricky - if the cluster has been filtered and later became unavailable,
726+
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
727+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures());
728+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
729+
LogicalPlan plan = analyzeAction.apply(r);
730+
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
731+
l.onResponse(plan);
732+
} catch (Exception e) {
733+
l.onFailure(e);
734+
}
735+
}));
748736
}
749737
} catch (Exception e) {
750-
logicalPlanListener.onFailure(e);
738+
listener.onFailure(e);
751739
}
752740
}
753741

0 commit comments

Comments
 (0)