99
1010import org .elasticsearch .action .ActionListener ;
1111import org .elasticsearch .action .OriginalIndices ;
12- import org .elasticsearch .action .search .ShardSearchFailure ;
1312import org .elasticsearch .action .support .IndicesOptions ;
1413import org .elasticsearch .action .support .SubscribableListener ;
1514import org .elasticsearch .common .Strings ;
1918import org .elasticsearch .compute .data .Page ;
2019import org .elasticsearch .compute .operator .DriverProfile ;
2120import org .elasticsearch .core .Releasables ;
22- import org .elasticsearch .core .TimeValue ;
2321import org .elasticsearch .index .IndexMode ;
2422import org .elasticsearch .index .query .QueryBuilder ;
2523import org .elasticsearch .indices .IndicesExpressionGrouper ;
7876import org .elasticsearch .xpack .esql .telemetry .PlanTelemetry ;
7977
8078import java .util .ArrayList ;
81- import java .util .Arrays ;
8279import java .util .HashMap ;
8380import java .util .Iterator ;
8481import java .util .List ;
@@ -319,16 +316,10 @@ public void analyzedPlan(
319316 final List <TableInfo > indices = preAnalysis .indices ;
320317
321318 EsqlCCSUtils .checkForCcsLicense (executionInfo , indices , indicesExpressionGrouper , configuredClusters , verifier .licenseState ());
322-
323- final Set <String > targetClusters = enrichPolicyResolver .groupIndicesPerCluster (
324- configuredClusters ,
325- indices .stream ()
326- .flatMap (t -> Arrays .stream (Strings .commaDelimitedListToStringArray (t .id ().indexPattern ())))
327- .toArray (String []::new )
328- ).keySet ();
319+ initializeClusterData (indices , executionInfo );
329320
330321 var listener = SubscribableListener .<EnrichResolution >newForked (
331- l -> enrichPolicyResolver .resolvePolicies (targetClusters , unresolvedPolicies , l )
322+ l -> enrichPolicyResolver .resolvePolicies (unresolvedPolicies , executionInfo , l )
332323 ).<PreAnalysisResult >andThen ((l , enrichResolution ) -> resolveFieldNames (parsed , enrichResolution , l ));
333324 // first resolve the lookup indices, then the main indices
334325 for (TableInfo lookupIndex : preAnalysis .lookupIndices ) {
@@ -352,12 +343,6 @@ public void analyzedPlan(
352343 }).<PreAnalysisResult >andThen ((l , result ) -> {
353344 assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request" ;
354345
355- // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
356- // resolving one more time (the first attempt failed and the query had a filter)
357- for (String clusterAlias : executionInfo .clusterAliases ()) {
358- executionInfo .swapCluster (clusterAlias , (k , v ) -> null );
359- }
360-
361346 // here the requestFilter is set to null, performing the pre-analysis after the first step failed
362347 preAnalyzeIndices (preAnalysis .indices , executionInfo , result , null , l );
363348 }).<LogicalPlan >andThen ((l , result ) -> {
@@ -388,6 +373,26 @@ private void preAnalyzeLookupIndex(TableInfo tableInfo, PreAnalysisResult result
388373 // TODO: Verify that the resolved index actually has indexMode: "lookup"
389374 }
390375
376+ private void initializeClusterData (List <TableInfo > indices , EsqlExecutionInfo executionInfo ) {
377+ if (indices .isEmpty ()) {
378+ return ;
379+ }
380+ assert indices .size () == 1 : "Only single index pattern is supported" ;
381+ Map <String , OriginalIndices > clusterIndices = indicesExpressionGrouper .groupIndices (
382+ configuredClusters ,
383+ IndicesOptions .DEFAULT ,
384+ indices .get (0 ).id ().indexPattern ()
385+ );
386+ for (Map .Entry <String , OriginalIndices > entry : clusterIndices .entrySet ()) {
387+ final String clusterAlias = entry .getKey ();
388+ String indexExpr = Strings .arrayToCommaDelimitedString (entry .getValue ().indices ());
389+ executionInfo .swapCluster (clusterAlias , (k , v ) -> {
390+ assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet" ;
391+ return new EsqlExecutionInfo .Cluster (clusterAlias , indexExpr , executionInfo .isSkipUnavailable (clusterAlias ));
392+ });
393+ }
394+ }
395+
391396 private void preAnalyzeIndices (
392397 List <TableInfo > indices ,
393398 EsqlExecutionInfo executionInfo ,
@@ -400,39 +405,9 @@ private void preAnalyzeIndices(
400405 // Note: JOINs are not supported but we detect them when
401406 listener .onFailure (new MappingException ("Queries with multiple indices are not supported" ));
402407 } else if (indices .size () == 1 ) {
403- // known to be unavailable from the enrich policy API call
404- Map <String , Exception > unavailableClusters = result .enrichResolution .getUnavailableClusters ();
405408 TableInfo tableInfo = indices .get (0 );
406409 IndexPattern table = tableInfo .id ();
407410
408- Map <String , OriginalIndices > clusterIndices = indicesExpressionGrouper .groupIndices (
409- configuredClusters ,
410- IndicesOptions .DEFAULT ,
411- table .indexPattern ()
412- );
413- for (Map .Entry <String , OriginalIndices > entry : clusterIndices .entrySet ()) {
414- final String clusterAlias = entry .getKey ();
415- String indexExpr = Strings .arrayToCommaDelimitedString (entry .getValue ().indices ());
416- executionInfo .swapCluster (clusterAlias , (k , v ) -> {
417- assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet" ;
418- if (unavailableClusters .containsKey (k )) {
419- return new EsqlExecutionInfo .Cluster (
420- clusterAlias ,
421- indexExpr ,
422- executionInfo .isSkipUnavailable (clusterAlias ),
423- EsqlExecutionInfo .Cluster .Status .SKIPPED ,
424- 0 ,
425- 0 ,
426- 0 ,
427- 0 ,
428- List .of (new ShardSearchFailure (unavailableClusters .get (k ))),
429- new TimeValue (0 )
430- );
431- } else {
432- return new EsqlExecutionInfo .Cluster (clusterAlias , indexExpr , executionInfo .isSkipUnavailable (clusterAlias ));
433- }
434- });
435- }
436411 // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
437412 // based only on available clusters (which could now be an empty list)
438413 String indexExpressionToResolve = EsqlCCSUtils .createIndexExpressionFromAvailableClusters (executionInfo );
0 commit comments