3333import org .elasticsearch .search .SearchPhaseResult ;
3434import org .elasticsearch .search .SearchShardTarget ;
3535import org .elasticsearch .search .builder .PointInTimeBuilder ;
36- import org .elasticsearch .search .builder .SearchSourceBuilder ;
3736import org .elasticsearch .search .internal .AliasFilter ;
3837import org .elasticsearch .search .internal .SearchContext ;
3938import org .elasticsearch .search .internal .ShardSearchContextId ;
@@ -88,18 +87,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
8887 private final SetOnce <AtomicArray <ShardSearchFailure >> shardFailures = new SetOnce <>();
8988 private final Object shardFailuresMutex = new Object ();
9089 private final AtomicBoolean hasShardResponse = new AtomicBoolean (false );
91- private final AtomicInteger successfulOps = new AtomicInteger () ;
90+ private final AtomicInteger successfulOps ;
9291 private final SearchTimeProvider timeProvider ;
9392 private final SearchResponse .Clusters clusters ;
9493
95- protected final List <SearchShardIterator > toSkipShardsIts ;
9694 protected final List <SearchShardIterator > shardsIts ;
9795 private final SearchShardIterator [] shardIterators ;
9896 private final AtomicInteger outstandingShards ;
9997 private final int maxConcurrentRequestsPerNode ;
10098 private final Map <String , PendingExecutions > pendingExecutionsPerNode = new ConcurrentHashMap <>();
10199 private final boolean throttleConcurrentRequests ;
102100 private final AtomicBoolean requestCancelled = new AtomicBoolean ();
101+ private final int skippedCount ;
103102
104103 // protected for tests
105104 protected final List <Releasable > releasables = new ArrayList <>();
@@ -125,18 +124,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
125124 ) {
126125 super (name );
127126 this .namedWriteableRegistry = namedWriteableRegistry ;
128- final List <SearchShardIterator > toSkipIterators = new ArrayList <>();
129127 final List <SearchShardIterator > iterators = new ArrayList <>();
128+ int skipped = 0 ;
130129 for (final SearchShardIterator iterator : shardsIts ) {
131130 if (iterator .skip ()) {
132- toSkipIterators . add ( iterator ) ;
131+ skipped ++ ;
133132 } else {
134133 iterators .add (iterator );
135134 }
136135 }
137- this .toSkipShardsIts = toSkipIterators ;
136+ this .skippedCount = skipped ;
138137 this .shardsIts = iterators ;
139- outstandingShards = new AtomicInteger (shardsIts .size ());
138+ outstandingShards = new AtomicInteger (iterators .size ());
139+ successfulOps = new AtomicInteger (skipped );
140140 this .shardIterators = iterators .toArray (new SearchShardIterator [0 ]);
141141 // we later compute the shard index based on the natural order of the shards
142142 // that participate in the search request. This means that this number is
@@ -167,11 +167,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
167167 protected void notifyListShards (
168168 SearchProgressListener progressListener ,
169169 SearchResponse .Clusters clusters ,
170- SearchSourceBuilder sourceBuilder
170+ SearchRequest searchRequest ,
171+ List <SearchShardIterator > allIterators
171172 ) {
173+ final List <SearchShard > skipped = new ArrayList <>(allIterators .size () - shardsIts .size ());
174+ for (SearchShardIterator iter : allIterators ) {
175+ if (iter .skip ()) {
176+ skipped .add (new SearchShard (iter .getClusterAlias (), iter .shardId ()));
177+ }
178+ }
179+ var sourceBuilder = searchRequest .source ();
172180 progressListener .notifyListShards (
173181 SearchProgressListener .buildSearchShardsFromIter (this .shardsIts ),
174- SearchProgressListener . buildSearchShardsFromIter ( toSkipShardsIts ) ,
182+ skipped ,
175183 clusters ,
176184 sourceBuilder == null || sourceBuilder .size () > 0 ,
177185 timeProvider
@@ -215,37 +223,29 @@ public final void start() {
215223
216224 @ Override
217225 protected final void run () {
218- for ( final SearchShardIterator iterator : toSkipShardsIts ) {
219- assert iterator . skip ();
220- skipShard ( iterator ) ;
226+ if ( outstandingShards . get () == 0 ) {
227+ onPhaseDone ();
228+ return ;
221229 }
222230 final Map <SearchShardIterator , Integer > shardIndexMap = Maps .newHashMapWithExpectedSize (shardIterators .length );
223231 for (int i = 0 ; i < shardIterators .length ; i ++) {
224232 shardIndexMap .put (shardIterators [i ], i );
225233 }
226- if (shardsIts .size () > 0 ) {
227- doCheckNoMissingShards (getName (), request , shardsIts );
228- for (int i = 0 ; i < shardsIts .size (); i ++) {
229- final SearchShardIterator shardRoutings = shardsIts .get (i );
230- assert shardRoutings .skip () == false ;
231- assert shardIndexMap .containsKey (shardRoutings );
232- int shardIndex = shardIndexMap .get (shardRoutings );
233- final SearchShardTarget routing = shardRoutings .nextOrNull ();
234- if (routing == null ) {
235- failOnUnavailable (shardIndex , shardRoutings );
236- } else {
237- performPhaseOnShard (shardIndex , shardRoutings , routing );
238- }
234+ doCheckNoMissingShards (getName (), request , shardsIts );
235+ for (int i = 0 ; i < shardsIts .size (); i ++) {
236+ final SearchShardIterator shardRoutings = shardsIts .get (i );
237+ assert shardRoutings .skip () == false ;
238+ assert shardIndexMap .containsKey (shardRoutings );
239+ int shardIndex = shardIndexMap .get (shardRoutings );
240+ final SearchShardTarget routing = shardRoutings .nextOrNull ();
241+ if (routing == null ) {
242+ failOnUnavailable (shardIndex , shardRoutings );
243+ } else {
244+ performPhaseOnShard (shardIndex , shardRoutings , routing );
239245 }
240246 }
241247 }
242248
243- void skipShard (SearchShardIterator iterator ) {
244- successfulOps .incrementAndGet ();
245- assert iterator .skip ();
246- successfulShardExecution ();
247- }
248-
249249 private void performPhaseOnShard (final int shardIndex , final SearchShardIterator shardIt , final SearchShardTarget shard ) {
250250 if (throttleConcurrentRequests ) {
251251 var pendingExecutions = pendingExecutionsPerNode .computeIfAbsent (
@@ -343,7 +343,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
343343 "Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})" ,
344344 discrepancy ,
345345 successfulOps .get (),
346- toSkipShardsIts . size () ,
346+ skippedCount ,
347347 getNumShards (),
348348 currentPhase
349349 );
@@ -585,7 +585,7 @@ private SearchResponse buildSearchResponse(
585585 scrollId ,
586586 getNumShards (),
587587 numSuccess ,
588- toSkipShardsIts . size () ,
588+ skippedCount ,
589589 buildTookInMillis (),
590590 failures ,
591591 clusters ,
0 commit comments