1414import org .elasticsearch .action .fieldcaps .FieldCapabilitiesFailure ;
1515import org .elasticsearch .action .search .ShardSearchFailure ;
1616import org .elasticsearch .action .support .SubscribableListener ;
17+ import org .elasticsearch .common .TriConsumer ;
1718import org .elasticsearch .common .collect .Iterators ;
1819import org .elasticsearch .common .unit .ByteSizeValue ;
1920import org .elasticsearch .compute .data .Block ;
@@ -547,28 +548,24 @@ private void resolveIndicesAndAnalyze(
547548 PreAnalysisResult result ,
548549 ActionListener <Versioned <LogicalPlan >> logicalPlanListener
549550 ) {
550- EsqlCCSUtils .initCrossClusterState (
551- indicesExpressionGrouper ,
552- verifier .licenseState (),
553- preAnalysis .indexes ().keySet (),
554- executionInfo
555- );
556-
557- SubscribableListener .<PreAnalysisResult >newForked (
558- // The main index pattern dictates on which nodes the query can be executed, so we use the minimum transport version from this
559- // field
560- // caps request.
561- l -> preAnalyzeMainIndices (preAnalysis .indexes ().entrySet ().iterator (), preAnalysis , executionInfo , result , requestFilter , l )
562- ).andThenApply (r -> {
563- if (r .indexResolution .isEmpty () == false // Rule out ROW case with no FROM clauses
564- && executionInfo .isCrossClusterSearch ()
565- && executionInfo .getRunningClusterAliases ().findAny ().isEmpty ()) {
566- LOGGER .debug ("No more clusters to search, ending analysis stage" );
567- throw new NoClustersToSearchException ();
568- }
569- return r ;
570- })
571- .<PreAnalysisResult >andThen ((l , r ) -> preAnalyzeLookupIndices (preAnalysis .lookupIndices ().iterator (), r , executionInfo , l ))
551+ SubscribableListener .<PreAnalysisResult >newForked (l -> preAnalyzeMainIndices (preAnalysis , executionInfo , result , requestFilter , l ))
552+ .andThenApply (r -> {
553+ if (r .indexResolution .isEmpty () == false // Rule out ROW case with no FROM clauses
554+ && executionInfo .isCrossClusterSearch ()
555+ && executionInfo .getRunningClusterAliases ().findAny ().isEmpty ()) {
556+ LOGGER .debug ("No more clusters to search, ending analysis stage" );
557+ throw new NoClustersToSearchException ();
558+ }
559+ return r ;
560+ })
561+ .<PreAnalysisResult >andThen (
562+ (l , r ) -> forAll (
563+ preAnalysis .lookupIndices ().iterator (),
564+ r ,
565+ (lookupIndex , innerR , innerL ) -> preAnalyzeLookupIndex (lookupIndex , innerR , executionInfo , innerL ),
566+ l
567+ )
568+ )
572569 .<PreAnalysisResult >andThen ((l , r ) -> {
573570 enrichPolicyResolver .resolvePolicies (preAnalysis .enriches (), executionInfo , l .map (r ::withEnrichResolution ));
574571 })
@@ -590,13 +587,7 @@ private void preAnalyzeLookupIndices(
590587 EsqlExecutionInfo executionInfo ,
591588 ActionListener <PreAnalysisResult > listener
592589 ) {
593- if (lookupIndices .hasNext ()) {
594- preAnalyzeLookupIndex (lookupIndices .next (), preAnalysisResult , executionInfo , listener .delegateFailureAndWrap ((l , r ) -> {
595- preAnalyzeLookupIndices (lookupIndices , r , executionInfo , l );
596- }));
597- } else {
598- listener .onResponse (preAnalysisResult );
599- }
590+ forAll (lookupIndices , preAnalysisResult , (lookupIndex , r , l ) -> preAnalyzeLookupIndex (lookupIndex , r , executionInfo , l ), listener );
600591 }
601592
602593 private void preAnalyzeLookupIndex (
@@ -808,29 +799,26 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
808799 * indices.
809800 */
810801 private void preAnalyzeMainIndices (
811- Iterator <Map .Entry <IndexPattern , IndexMode >> indexPatterns ,
812802 PreAnalyzer .PreAnalysis preAnalysis ,
813803 EsqlExecutionInfo executionInfo ,
814804 PreAnalysisResult result ,
815805 QueryBuilder requestFilter ,
816806 ActionListener <PreAnalysisResult > listener
817807 ) {
818- if (indexPatterns .hasNext ()) {
819- var index = indexPatterns .next ();
820- preAnalyzeMainIndices (
821- index .getKey (),
822- index .getValue (),
823- preAnalysis ,
824- executionInfo ,
825- result ,
826- requestFilter ,
827- listener .delegateFailureAndWrap ((l , r ) -> {
828- preAnalyzeMainIndices (indexPatterns , preAnalysis , executionInfo , r , requestFilter , l );
829- })
830- );
831- } else {
832- listener .onResponse (result );
833- }
808+ EsqlCCSUtils .initCrossClusterState (
809+ indicesExpressionGrouper ,
810+ verifier .licenseState (),
811+ preAnalysis .indexes ().keySet (),
812+ executionInfo
813+ );
814+ // The main index pattern dictates on which nodes the query can be executed,
815+ // so we use the minimum transport version from this field caps request.
816+ forAll (
817+ preAnalysis .indexes ().entrySet ().iterator (),
818+ result ,
819+ (entry , r , l ) -> preAnalyzeMainIndices (entry .getKey (), entry .getValue (), preAnalysis , executionInfo , r , requestFilter , l ),
820+ listener
821+ );
834822 }
835823
836824 private void preAnalyzeMainIndices (
@@ -997,6 +985,19 @@ private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan, PhysicalPl
997985 return plan ;
998986 }
999987
988+ private static <T > void forAll (
989+ Iterator <T > iterator ,
990+ PreAnalysisResult result ,
991+ TriConsumer <T , PreAnalysisResult , ActionListener <PreAnalysisResult >> consumer ,
992+ ActionListener <PreAnalysisResult > listener
993+ ) {
994+ if (iterator .hasNext ()) {
995+ consumer .apply (iterator .next (), result , listener .delegateFailureAndWrap ((l , r ) -> forAll (iterator , r , consumer , l )));
996+ } else {
997+ listener .onResponse (result );
998+ }
999+ }
1000+
10001001 public record PreAnalysisResult (
10011002 Set <String > fieldNames ,
10021003 Set <String > wildcardJoinIndices ,
0 commit comments