2929import org .elasticsearch .common .io .stream .StreamInput ;
3030import org .elasticsearch .common .io .stream .StreamOutput ;
3131import org .elasticsearch .common .io .stream .Writeable ;
32+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
3233import org .elasticsearch .common .util .concurrent .CountDown ;
3334import org .elasticsearch .common .util .concurrent .EsExecutors ;
3435import org .elasticsearch .common .util .concurrent .ListenableFuture ;
8283import java .util .function .IntUnaryOperator ;
8384
8485import static org .elasticsearch .action .search .SearchPhaseController .getTopDocsSize ;
86+ import static org .elasticsearch .search .sort .FieldSortBuilder .NAME ;
8587import static org .elasticsearch .search .sort .FieldSortBuilder .hasPrimaryFieldSort ;
8688
8789public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction <SearchPhaseResult > {
@@ -96,6 +98,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
9698 private volatile BottomSortValuesCollector bottomSortCollector ;
9799 private final Client client ;
98100 private final boolean batchQueryPhase ;
101+ private final SearchService searchService ;
99102
100103 SearchQueryThenFetchAsyncAction (
101104 Logger logger ,
@@ -114,7 +117,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
114117 SearchTask task ,
115118 SearchResponse .Clusters clusters ,
116119 Client client ,
117- boolean batchQueryPhase
120+ SearchService searchService
118121 ) {
119122 super (
120123 "query" ,
@@ -139,7 +142,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
139142 this .trackTotalHitsUpTo = request .resolveTrackTotalHitsUpTo ();
140143 this .progressListener = task .getProgressListener ();
141144 this .client = client ;
142- this .batchQueryPhase = batchQueryPhase ;
145+ this .batchQueryPhase = searchService .batchQueryPhase ();
146+ this .searchService = searchService ;
143147 // don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
144148 if (progressListener != SearchProgressListener .NOOP ) {
145149 notifyListShards (progressListener , clusters , request , shardsIts );
@@ -423,7 +427,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
423427 }
424428 AbstractSearchAsyncAction .doCheckNoMissingShards (getName (), request , shardsIts );
425429 final Map <CanMatchPreFilterSearchPhase .SendingTarget , NodeQueryRequest > perNodeQueries = new HashMap <>();
426- final String localNodeId = searchTransportService .transportService ().getLocalNode ().getId ();
430+ final var transportService = searchTransportService .transportService ();
431+ final String localNodeId = transportService .getLocalNode ().getId ();
427432 final int numberOfShardsTotal = shardsIts .size ();
428433 for (int i = 0 ; i < numberOfShardsTotal ; i ++) {
429434 final SearchShardIterator shardRoutings = shardsIts .get (i );
@@ -436,30 +441,82 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
436441 } else {
437442 final String nodeId = routing .getNodeId ();
438443 // local requests don't need batching as there's no network latency
439- if (localNodeId .equals (nodeId )) {
440- performPhaseOnShard (shardIndex , shardRoutings , routing );
441- } else {
442- var perNodeRequest = perNodeQueries .computeIfAbsent (
443- new CanMatchPreFilterSearchPhase .SendingTarget (routing .getClusterAlias (), nodeId ),
444- t -> new NodeQueryRequest (request , numberOfShardsTotal , timeProvider .absoluteStartMillis (), t .clusterAlias ())
445- );
446- final String indexUUID = routing .getShardId ().getIndex ().getUUID ();
447- perNodeRequest .shards .add (
448- new ShardToQuery (
449- concreteIndexBoosts .getOrDefault (indexUUID , DEFAULT_INDEX_BOOST ),
450- getOriginalIndices (shardIndex ).indices (),
451- shardIndex ,
452- routing .getShardId (),
453- shardRoutings .getSearchContextId ()
454- )
455- );
456- var filterForAlias = aliasFilter .getOrDefault (indexUUID , AliasFilter .EMPTY );
457- if (filterForAlias != AliasFilter .EMPTY ) {
458- perNodeRequest .aliasFilters .putIfAbsent (indexUUID , filterForAlias );
459- }
444+ var perNodeRequest = perNodeQueries .computeIfAbsent (
445+ new CanMatchPreFilterSearchPhase .SendingTarget (routing .getClusterAlias (), nodeId ),
446+ t -> new NodeQueryRequest (request , numberOfShardsTotal , timeProvider .absoluteStartMillis (), t .clusterAlias ())
447+ );
448+ final String indexUUID = routing .getShardId ().getIndex ().getUUID ();
449+ perNodeRequest .shards .add (
450+ new ShardToQuery (
451+ concreteIndexBoosts .getOrDefault (indexUUID , DEFAULT_INDEX_BOOST ),
452+ getOriginalIndices (shardIndex ).indices (),
453+ shardIndex ,
454+ routing .getShardId (),
455+ shardRoutings .getSearchContextId ()
456+ )
457+ );
458+ var filterForAlias = aliasFilter .getOrDefault (indexUUID , AliasFilter .EMPTY );
459+ if (filterForAlias != AliasFilter .EMPTY ) {
460+ perNodeRequest .aliasFilters .putIfAbsent (indexUUID , filterForAlias );
460461 }
461462 }
462463 }
464+ final var localTarget = new CanMatchPreFilterSearchPhase .SendingTarget (request .getLocalClusterAlias (), localNodeId );
465+ var localNodeRequest = perNodeQueries .remove (localTarget );
466+ if (localNodeRequest != null ) {
467+ transportService .getThreadPool ().executor (ThreadPool .Names .SEARCH_COORDINATION ).execute (new AbstractRunnable () {
468+ @ Override
469+ protected void doRun () {
470+ if (hasPrimaryFieldSort (request .source ())) {
471+ var pitBuilder = request .pointInTimeBuilder ();
472+ @ SuppressWarnings ("rawtypes" )
473+ final MinAndMax [] minAndMax = new MinAndMax [localNodeRequest .shards .size ()];
474+ for (int i = 0 ; i < minAndMax .length ; i ++) {
475+ var shardToQuery = localNodeRequest .shards .get (i );
476+ var shardId = shardToQuery .shardId ;
477+ var r = buildShardSearchRequest (
478+ shardId ,
479+ localNodeRequest .localClusterAlias ,
480+ shardToQuery .shardIndex ,
481+ shardToQuery .contextId ,
482+ new OriginalIndices (shardToQuery .originalIndices , request .indicesOptions ()),
483+ localNodeRequest .aliasFilters .getOrDefault (shardId .getIndex ().getUUID (), AliasFilter .EMPTY ),
484+ pitBuilder == null ? null : pitBuilder .getKeepAlive (),
485+ shardToQuery .boost ,
486+ request ,
487+ localNodeRequest .totalShards ,
488+ localNodeRequest .absoluteStartMillis ,
489+ false
490+ );
491+ minAndMax [i ] = searchService .canMatch (r ).estimatedMinAndMax ();
492+ }
493+ try {
494+ int [] indexes = CanMatchPreFilterSearchPhase .sortShards (
495+ localNodeRequest .shards ,
496+ minAndMax ,
497+ FieldSortBuilder .getPrimaryFieldSortOrNull (request .source ()).order ()
498+ );
499+ for (int i = 0 ; i < indexes .length ; i ++) {
500+ ShardToQuery shardToQuery = localNodeRequest .shards .get (i );
501+ shardToQuery = localNodeRequest .shards .set (i , shardToQuery );
502+ localNodeRequest .shards .set (i , shardToQuery );
503+ }
504+ } catch (Exception e ) {
505+ // ignored, field type conflicts will be dealt with in upstream logic
506+ // TODO: we should fail the query here, we're already seeing a field type conflict on the sort field,
507+ // no need to actually execute the queries and go through a lot of work before we inevitably have to
508+ // fail the search
509+ }
510+ }
511+ executeWithoutBatching (localTarget , localNodeRequest );
512+ }
513+
514+ @ Override
515+ public void onFailure (Exception e ) {
516+ SearchQueryThenFetchAsyncAction .this .onPhaseFailure (NAME , "" , e );
517+ }
518+ });
519+ }
463520 perNodeQueries .forEach ((routing , request ) -> {
464521 if (request .shards .size () == 1 ) {
465522 executeAsSingleRequest (routing , request .shards .getFirst ());
@@ -477,8 +534,12 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
477534 executeWithoutBatching (routing , request );
478535 return ;
479536 }
480- searchTransportService .transportService ()
481- .sendChildRequest (connection , NODE_SEARCH_ACTION_NAME , request , task , new TransportResponseHandler <NodeQueryResponse >() {
537+ transportService .sendChildRequest (
538+ connection ,
539+ NODE_SEARCH_ACTION_NAME ,
540+ request ,
541+ task ,
542+ new TransportResponseHandler <NodeQueryResponse >() {
482543 @ Override
483544 public NodeQueryResponse read (StreamInput in ) throws IOException {
484545 return new NodeQueryResponse (in );
@@ -531,7 +592,8 @@ public void handleException(TransportException e) {
531592 onPhaseFailure (getName (), "" , cause );
532593 }
533594 }
534- });
595+ }
596+ );
535597 });
536598 }
537599
0 commit comments