@@ -341,6 +341,13 @@ void executeRequest(
341341 ActionListener <SearchResponse > listener ,
342342 Function <ActionListener <SearchResponse >, SearchPhaseProvider > searchPhaseProvider
343343 ) {
344+ final SearchSourceBuilder source = original .source ();
345+ final boolean isExplain = source != null && source .explain () != null && source .explain ();
346+ if (shouldOpenPIT (source )) {
347+ executeOpenPit (task , original , listener , searchPhaseProvider , source );
348+ return ;
349+ }
350+
344351 final long relativeStartNanos = System .nanoTime ();
345352 final SearchTimeProvider timeProvider = new SearchTimeProvider (
346353 original .getOrCreateAbsoluteStartMillis (),
@@ -370,187 +377,180 @@ void executeRequest(
370377 );
371378 frozenIndexCheck (resolvedIndices );
372379 }
373-
374- ActionListener <SearchRequest > rewriteListener = listener .delegateFailureAndWrap ((delegate , rewritten ) -> {
375- if (ccsCheckCompatibility ) {
376- checkCCSVersionCompatibility (rewritten );
377- }
378-
379- if (resolvedIndices .getRemoteClusterIndices ().isEmpty ()) {
380- executeLocalSearch (
381- task ,
382- timeProvider ,
383- rewritten ,
384- resolvedIndices ,
385- projectState ,
386- SearchResponse .Clusters .EMPTY ,
387- searchPhaseProvider .apply (delegate )
388- );
389- } else {
390- if (delegate instanceof TelemetryListener tl ) {
391- tl .setRemotes (resolvedIndices .getRemoteClusterIndices ().size ());
392- if (task .isAsync ()) {
393- tl .setFeature (CCSUsageTelemetry .ASYNC_FEATURE );
394- }
395- if (original .pointInTimeBuilder () != null ) {
396- tl .setFeature (CCSUsageTelemetry .PIT_FEATURE );
397- }
398- tl .setClient (task );
399- // Check if any of the index patterns are wildcard patterns
400- var localIndices = resolvedIndices .getLocalIndices ();
401- if (localIndices != null && Arrays .stream (localIndices .indices ()).anyMatch (Regex ::isSimpleMatchPattern )) {
402- tl .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
403- }
404- if (resolvedIndices .getRemoteClusterIndices ()
405- .values ()
406- .stream ()
407- .anyMatch (indices -> Arrays .stream (indices .indices ()).anyMatch (Regex ::isSimpleMatchPattern ))) {
408- tl .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
409- }
380+ Rewriteable .rewriteAndFetch (
381+ original ,
382+ searchService .getRewriteContext (timeProvider ::absoluteStartMillis , resolvedIndices , original .pointInTimeBuilder (), isExplain ),
383+ listener .delegateFailureAndWrap ((delegate , rewritten ) -> {
384+ if (ccsCheckCompatibility ) {
385+ checkCCSVersionCompatibility (rewritten );
410386 }
411- final TaskId parentTaskId = task .taskInfo (clusterService .localNode ().getId (), false ).taskId ();
412- if (shouldMinimizeRoundtrips (rewritten )) {
413- if (delegate instanceof TelemetryListener tl ) {
414- tl .setFeature (CCSUsageTelemetry .MRT_FEATURE );
415- }
416- final AggregationReduceContext .Builder aggregationReduceContextBuilder = rewritten .source () != null
417- && rewritten .source ().aggregations () != null
418- ? searchService .aggReduceContextBuilder (task ::isCancelled , rewritten .source ().aggregations ())
419- : null ;
420- SearchResponse .Clusters clusters = new SearchResponse .Clusters (
421- resolvedIndices .getLocalIndices (),
422- resolvedIndices .getRemoteClusterIndices (),
423- true ,
424- remoteClusterService ::isSkipUnavailable
425- );
426- if (resolvedIndices .getLocalIndices () == null ) {
427- // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
428- task .getProgressListener ()
429- .notifyListShards (Collections .emptyList (), Collections .emptyList (), clusters , false , timeProvider );
430- }
431- ccsRemoteReduce (
387+
388+ if (resolvedIndices .getRemoteClusterIndices ().isEmpty ()) {
389+ executeLocalSearch (
432390 task ,
433- parentTaskId ,
391+ timeProvider ,
434392 rewritten ,
435393 resolvedIndices ,
436- clusters ,
437- timeProvider ,
438- aggregationReduceContextBuilder ,
439- remoteClusterService ,
440- threadPool ,
441- delegate ,
442- (r , l ) -> executeLocalSearch (
443- task ,
444- timeProvider ,
445- r ,
446- resolvedIndices ,
447- projectState ,
448- clusters ,
449- searchPhaseProvider .apply (l )
450- )
394+ projectState ,
395+ SearchResponse .Clusters .EMPTY ,
396+ searchPhaseProvider .apply (delegate )
451397 );
452398 } else {
453- final SearchContextId searchContext = resolvedIndices .getSearchContextId ();
454- SearchResponse .Clusters clusters = new SearchResponse .Clusters (
455- resolvedIndices .getLocalIndices (),
456- resolvedIndices .getRemoteClusterIndices (),
457- false ,
458- remoteClusterService ::isSkipUnavailable
459- );
399+ executeSearchWithRemotes (task , searchPhaseProvider , delegate , rewritten , resolvedIndices , timeProvider , projectState );
400+ }
401+ })
402+ );
403+ }
460404
461- // TODO: pass parentTaskId
462- collectSearchShards (
463- rewritten .indicesOptions (),
464- rewritten .preference (),
465- rewritten .routing (),
466- rewritten .source () != null ? rewritten .source ().query () : null ,
467- Objects .requireNonNullElse (rewritten .allowPartialSearchResults (), searchService .defaultAllowPartialSearchResults ()),
468- searchContext ,
469- resolvedIndices .getRemoteClusterIndices (),
470- clusters ,
471- timeProvider ,
472- transportService ,
473- delegate .delegateFailureAndWrap ((finalDelegate , searchShardsResponses ) -> {
474- final BiFunction <String , String , DiscoveryNode > clusterNodeLookup = getRemoteClusterNodeLookup (
475- searchShardsResponses
476- );
477- final Map <String , AliasFilter > remoteAliasFilters ;
478- final List <SearchShardIterator > remoteShardIterators ;
479- if (searchContext != null ) {
480- remoteAliasFilters = searchContext .aliasFilter ();
481- remoteShardIterators = getRemoteShardsIteratorFromPointInTime (
482- searchShardsResponses ,
483- searchContext ,
484- rewritten .pointInTimeBuilder ().getKeepAlive (),
485- resolvedIndices .getRemoteClusterIndices ()
486- );
487- } else {
488- remoteAliasFilters = new HashMap <>();
489- for (SearchShardsResponse searchShardsResponse : searchShardsResponses .values ()) {
490- remoteAliasFilters .putAll (searchShardsResponse .getAliasFilters ());
491- }
492- remoteShardIterators = getRemoteShardsIterator (
493- searchShardsResponses ,
494- resolvedIndices .getRemoteClusterIndices (),
495- remoteAliasFilters
496- );
497- }
498- executeSearch (
499- task ,
500- timeProvider ,
501- rewritten ,
502- resolvedIndices ,
503- remoteShardIterators ,
504- clusterNodeLookup ,
505- projectState ,
506- remoteAliasFilters ,
507- clusters ,
508- searchPhaseProvider .apply (finalDelegate )
509- );
510- })
511- );
405+ private void executeOpenPit (
406+ SearchTask task ,
407+ SearchRequest original ,
408+ ActionListener <SearchResponse > listener ,
409+ Function <ActionListener <SearchResponse >, SearchPhaseProvider > searchPhaseProvider ,
410+ SearchSourceBuilder source
411+ ) {
412+ // disabling shard reordering for request
413+ original .setPreFilterShardSize (Integer .MAX_VALUE );
414+ openPIT (client , original , searchService .getDefaultKeepAliveInMillis (), listener .delegateFailureAndWrap ((delegate , resp ) -> {
415+ // We set the keep alive to -1 to indicate that we don't need the pit id in the response.
416+ // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
417+ source .pointInTimeBuilder (new PointInTimeBuilder (resp .getPointInTimeId ()).setKeepAlive (TimeValue .MINUS_ONE ));
418+ var pitListener = new SearchResponseActionListener (delegate ) {
419+ @ Override
420+ public void onResponse (SearchResponse response ) {
421+ // we need to close the PIT first so we delay the release of the response to after the closing
422+ response .incRef ();
423+ closePIT (client , original .source ().pointInTimeBuilder (), () -> ActionListener .respondAndRelease (delegate , response ));
424+ }
425+
426+ @ Override
427+ public void onFailure (Exception e ) {
428+ closePIT (client , original .source ().pointInTimeBuilder (), () -> delegate .onFailure (e ));
512429 }
430+ };
431+ executeRequest (task , original , pitListener , searchPhaseProvider );
432+ }));
433+ }
434+
435+ private void executeSearchWithRemotes (
436+ SearchTask task ,
437+ Function <ActionListener <SearchResponse >, SearchPhaseProvider > searchPhaseProvider ,
438+ ActionListener <SearchResponse > delegate ,
439+ SearchRequest rewritten ,
440+ ResolvedIndices resolvedIndices ,
441+ SearchTimeProvider timeProvider ,
442+ ProjectState projectState
443+ ) {
444+ if (delegate instanceof TelemetryListener tl ) {
445+ tl .setRemotes (resolvedIndices .getRemoteClusterIndices ().size ());
446+ if (task .isAsync ()) {
447+ tl .setFeature (CCSUsageTelemetry .ASYNC_FEATURE );
513448 }
514- });
449+ if (rewritten .pointInTimeBuilder () != null ) {
450+ tl .setFeature (CCSUsageTelemetry .PIT_FEATURE );
451+ }
452+ tl .setClient (task );
453+ // Check if any of the index patterns are wildcard patterns
454+ var localIndices = resolvedIndices .getLocalIndices ();
455+ if (localIndices != null && Arrays .stream (localIndices .indices ()).anyMatch (Regex ::isSimpleMatchPattern )) {
456+ tl .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
457+ }
458+ if (resolvedIndices .getRemoteClusterIndices ()
459+ .values ()
460+ .stream ()
461+ .anyMatch (indices -> Arrays .stream (indices .indices ()).anyMatch (Regex ::isSimpleMatchPattern ))) {
462+ tl .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
463+ }
464+ }
465+ if (shouldMinimizeRoundtrips (rewritten )) {
466+ if (delegate instanceof TelemetryListener tl ) {
467+ tl .setFeature (CCSUsageTelemetry .MRT_FEATURE );
468+ }
469+ final AggregationReduceContext .Builder aggregationReduceContextBuilder = rewritten .source () != null
470+ && rewritten .source ().aggregations () != null
471+ ? searchService .aggReduceContextBuilder (task ::isCancelled , rewritten .source ().aggregations ())
472+ : null ;
473+ SearchResponse .Clusters clusters = new SearchResponse .Clusters (
474+ resolvedIndices .getLocalIndices (),
475+ resolvedIndices .getRemoteClusterIndices (),
476+ true ,
477+ remoteClusterService ::isSkipUnavailable
478+ );
479+ if (resolvedIndices .getLocalIndices () == null ) {
480+ // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local
481+ // shards)
482+ task .getProgressListener ()
483+ .notifyListShards (Collections .emptyList (), Collections .emptyList (), clusters , false , timeProvider );
484+ }
485+ ccsRemoteReduce (
486+ task ,
487+ task .taskInfo (clusterService .localNode ().getId (), false ).taskId (),
488+ rewritten ,
489+ resolvedIndices ,
490+ clusters ,
491+ timeProvider ,
492+ aggregationReduceContextBuilder ,
493+ remoteClusterService ,
494+ threadPool ,
495+ delegate ,
496+ (r , l ) -> executeLocalSearch (task , timeProvider , r , resolvedIndices , projectState , clusters , searchPhaseProvider .apply (l ))
497+ );
498+ } else {
499+ final SearchContextId searchContext = resolvedIndices .getSearchContextId ();
500+ SearchResponse .Clusters clusters = new SearchResponse .Clusters (
501+ resolvedIndices .getLocalIndices (),
502+ resolvedIndices .getRemoteClusterIndices (),
503+ false ,
504+ remoteClusterService ::isSkipUnavailable
505+ );
515506
516- final SearchSourceBuilder source = original .source ();
517- final boolean isExplain = source != null && source .explain () != null && source .explain ();
518- if (shouldOpenPIT (source )) {
519- // disabling shard reordering for request
520- original .setPreFilterShardSize (Integer .MAX_VALUE );
521- openPIT (client , original , searchService .getDefaultKeepAliveInMillis (), listener .delegateFailureAndWrap ((delegate , resp ) -> {
522- // We set the keep alive to -1 to indicate that we don't need the pit id in the response.
523- // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
524- source .pointInTimeBuilder (new PointInTimeBuilder (resp .getPointInTimeId ()).setKeepAlive (TimeValue .MINUS_ONE ));
525- var pitListener = new SearchResponseActionListener (delegate ) {
526- @ Override
527- public void onResponse (SearchResponse response ) {
528- // we need to close the PIT first so we delay the release of the response to after the closing
529- response .incRef ();
530- closePIT (
531- client ,
532- original .source ().pointInTimeBuilder (),
533- () -> ActionListener .respondAndRelease (delegate , response )
507+ // TODO: pass parentTaskId
508+ collectSearchShards (
509+ rewritten .indicesOptions (),
510+ rewritten .preference (),
511+ rewritten .routing (),
512+ rewritten .source () != null ? rewritten .source ().query () : null ,
513+ Objects .requireNonNullElse (rewritten .allowPartialSearchResults (), searchService .defaultAllowPartialSearchResults ()),
514+ searchContext ,
515+ resolvedIndices .getRemoteClusterIndices (),
516+ clusters ,
517+ timeProvider ,
518+ transportService ,
519+ delegate .delegateFailureAndWrap ((finalDelegate , searchShardsResponses ) -> {
520+ final Map <String , AliasFilter > remoteAliasFilters ;
521+ final List <SearchShardIterator > remoteShardIterators ;
522+ if (searchContext != null ) {
523+ remoteAliasFilters = searchContext .aliasFilter ();
524+ remoteShardIterators = getRemoteShardsIteratorFromPointInTime (
525+ searchShardsResponses ,
526+ searchContext ,
527+ rewritten .pointInTimeBuilder ().getKeepAlive (),
528+ resolvedIndices .getRemoteClusterIndices ()
529+ );
530+ } else {
531+ remoteAliasFilters = new HashMap <>();
532+ for (SearchShardsResponse searchShardsResponse : searchShardsResponses .values ()) {
533+ remoteAliasFilters .putAll (searchShardsResponse .getAliasFilters ());
534+ }
535+ remoteShardIterators = getRemoteShardsIterator (
536+ searchShardsResponses ,
537+ resolvedIndices .getRemoteClusterIndices (),
538+ remoteAliasFilters
534539 );
535540 }
536-
537- @ Override
538- public void onFailure (Exception e ) {
539- closePIT (client , original .source ().pointInTimeBuilder (), () -> delegate .onFailure (e ));
540- }
541- };
542- executeRequest (task , original , pitListener , searchPhaseProvider );
543- }));
544- } else {
545- Rewriteable .rewriteAndFetch (
546- original ,
547- searchService .getRewriteContext (
548- timeProvider ::absoluteStartMillis ,
549- resolvedIndices ,
550- original .pointInTimeBuilder (),
551- isExplain
552- ),
553- rewriteListener
541+ executeSearch (
542+ task ,
543+ timeProvider ,
544+ rewritten ,
545+ resolvedIndices ,
546+ remoteShardIterators ,
547+ getRemoteClusterNodeLookup (searchShardsResponses ),
548+ projectState ,
549+ remoteAliasFilters ,
550+ clusters ,
551+ searchPhaseProvider .apply (finalDelegate )
552+ );
553+ })
554554 );
555555 }
556556 }
0 commit comments