3333import org .elasticsearch .search .SearchPhaseResult ;
3434import org .elasticsearch .search .SearchShardTarget ;
3535import org .elasticsearch .search .builder .PointInTimeBuilder ;
36- import org .elasticsearch .search .builder .SearchSourceBuilder ;
3736import org .elasticsearch .search .internal .AliasFilter ;
3837import org .elasticsearch .search .internal .SearchContext ;
3938import org .elasticsearch .search .internal .ShardSearchContextId ;
@@ -87,18 +86,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
8786 private final SetOnce <AtomicArray <ShardSearchFailure >> shardFailures = new SetOnce <>();
8887 private final Object shardFailuresMutex = new Object ();
8988 private final AtomicBoolean hasShardResponse = new AtomicBoolean (false );
90- private final AtomicInteger successfulOps = new AtomicInteger () ;
89+ private final AtomicInteger successfulOps ;
9190 private final SearchTimeProvider timeProvider ;
9291 private final SearchResponse .Clusters clusters ;
9392
94- protected final List <SearchShardIterator > toSkipShardsIts ;
9593 protected final List <SearchShardIterator > shardsIts ;
9694 private final SearchShardIterator [] shardIterators ;
9795 private final AtomicInteger outstandingShards ;
9896 private final int maxConcurrentRequestsPerNode ;
9997 private final Map <String , PendingExecutions > pendingExecutionsPerNode = new ConcurrentHashMap <>();
10098 private final boolean throttleConcurrentRequests ;
10199 private final AtomicBoolean requestCancelled = new AtomicBoolean ();
100+ private final int skippedCount ;
102101
103102 // protected for tests
104103 protected final SubscribableListener <Void > doneFuture = new SubscribableListener <>();
@@ -124,18 +123,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
124123 ) {
125124 super (name );
126125 this .namedWriteableRegistry = namedWriteableRegistry ;
127- final List <SearchShardIterator > toSkipIterators = new ArrayList <>();
128126 final List <SearchShardIterator > iterators = new ArrayList <>();
127+ int skipped = 0 ;
129128 for (final SearchShardIterator iterator : shardsIts ) {
130129 if (iterator .skip ()) {
131- toSkipIterators . add ( iterator ) ;
130+ skipped ++ ;
132131 } else {
133132 iterators .add (iterator );
134133 }
135134 }
136- this .toSkipShardsIts = toSkipIterators ;
135+ this .skippedCount = skipped ;
137136 this .shardsIts = iterators ;
138- outstandingShards = new AtomicInteger (shardsIts .size ());
137+ outstandingShards = new AtomicInteger (iterators .size ());
138+ successfulOps = new AtomicInteger (skipped );
139139 this .shardIterators = iterators .toArray (new SearchShardIterator [0 ]);
140140 // we later compute the shard index based on the natural order of the shards
141141 // that participate in the search request. This means that this number is
@@ -166,11 +166,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
166166 protected void notifyListShards (
167167 SearchProgressListener progressListener ,
168168 SearchResponse .Clusters clusters ,
169- SearchSourceBuilder sourceBuilder
169+ SearchRequest searchRequest ,
170+ List <SearchShardIterator > allIterators
170171 ) {
172+ final List <SearchShard > skipped = new ArrayList <>(allIterators .size () - shardsIts .size ());
173+ for (SearchShardIterator iter : allIterators ) {
174+ if (iter .skip ()) {
175+ skipped .add (new SearchShard (iter .getClusterAlias (), iter .shardId ()));
176+ }
177+ }
178+ var sourceBuilder = searchRequest .source ();
171179 progressListener .notifyListShards (
172180 SearchProgressListener .buildSearchShardsFromIter (this .shardsIts ),
173- SearchProgressListener . buildSearchShardsFromIter ( toSkipShardsIts ) ,
181+ skipped ,
174182 clusters ,
175183 sourceBuilder == null || sourceBuilder .size () > 0 ,
176184 timeProvider
@@ -219,44 +227,37 @@ public final void start() {
219227
220228 @ Override
221229 protected final void run () {
222- for ( final SearchShardIterator iterator : toSkipShardsIts ) {
223- assert iterator . skip ();
224- skipShard ( iterator ) ;
230+ if ( outstandingShards . get () == 0 ) {
231+ onPhaseDone ();
232+ return ;
225233 }
226234 final Map <SearchShardIterator , Integer > shardIndexMap = Maps .newHashMapWithExpectedSize (shardIterators .length );
227235 for (int i = 0 ; i < shardIterators .length ; i ++) {
228236 shardIndexMap .put (shardIterators [i ], i );
229237 }
230- if (shardsIts .size () > 0 ) {
231- doCheckNoMissingShards (getName (), request , shardsIts );
232- Version version = request .minCompatibleShardNode ();
233- if (version != null && Version .CURRENT .minimumCompatibilityVersion ().equals (version ) == false ) {
234- if (checkMinimumVersion (shardsIts ) == false ) {
235- throw new VersionMismatchException (
236- "One of the shards is incompatible with the required minimum version [{}]" ,
237- request .minCompatibleShardNode ()
238- );
239- }
240- }
241- for (int i = 0 ; i < shardsIts .size (); i ++) {
242- final SearchShardIterator shardRoutings = shardsIts .get (i );
243- assert shardRoutings .skip () == false ;
244- assert shardIndexMap .containsKey (shardRoutings );
245- int shardIndex = shardIndexMap .get (shardRoutings );
246- final SearchShardTarget routing = shardRoutings .nextOrNull ();
247- if (routing == null ) {
248- failOnUnavailable (shardIndex , shardRoutings );
249- } else {
250- performPhaseOnShard (shardIndex , shardRoutings , routing );
251- }
238+ doCheckNoMissingShards (getName (), request , shardsIts );
239+ Version version = request .minCompatibleShardNode ();
240+ if (version != null && Version .CURRENT .minimumCompatibilityVersion ().equals (version ) == false ) {
241+ if (checkMinimumVersion (shardsIts ) == false ) {
242+ throw new VersionMismatchException (
243+ "One of the shards is incompatible with the required minimum version [{}]" ,
244+ request .minCompatibleShardNode ()
245+ );
252246 }
253247 }
254- }
248+ for (int i = 0 ; i < shardsIts .size (); i ++) {
249+ final SearchShardIterator shardRoutings = shardsIts .get (i );
250+ assert shardRoutings .skip () == false ;
251+ assert shardIndexMap .containsKey (shardRoutings );
252+ int shardIndex = shardIndexMap .get (shardRoutings );
253+ final SearchShardTarget routing = shardRoutings .nextOrNull ();
254+ if (routing == null ) {
255+ failOnUnavailable (shardIndex , shardRoutings );
256+ } else {
257+ performPhaseOnShard (shardIndex , shardRoutings , routing );
258+ }
255259
256- void skipShard (SearchShardIterator iterator ) {
257- successfulOps .incrementAndGet ();
258- assert iterator .skip ();
259- successfulShardExecution ();
260+ }
260261 }
261262
262263 private boolean checkMinimumVersion (List <SearchShardIterator > shardsIts ) {
@@ -274,32 +275,6 @@ private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
274275 return true ;
275276 }
276277
277- private static boolean assertExecuteOnStartThread () {
278- // Ensure that the current code has the following stacktrace:
279- // AbstractSearchAsyncAction#start -> AbstractSearchAsyncAction#executePhase -> AbstractSearchAsyncAction#performPhaseOnShard
280- final StackTraceElement [] stackTraceElements = Thread .currentThread ().getStackTrace ();
281- assert stackTraceElements .length >= 6 : stackTraceElements ;
282- int index = 0 ;
283- assert stackTraceElements [index ++].getMethodName ().equals ("getStackTrace" );
284- assert stackTraceElements [index ++].getMethodName ().equals ("assertExecuteOnStartThread" );
285- assert stackTraceElements [index ++].getMethodName ().equals ("failOnUnavailable" );
286- if (stackTraceElements [index ].getMethodName ().equals ("performPhaseOnShard" )) {
287- assert stackTraceElements [index ].getClassName ().endsWith ("CanMatchPreFilterSearchPhase" );
288- index ++;
289- }
290- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" );
291- assert stackTraceElements [index ++].getMethodName ().equals ("run" );
292-
293- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" );
294- assert stackTraceElements [index ++].getMethodName ().equals ("executePhase" );
295-
296- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" );
297- assert stackTraceElements [index ++].getMethodName ().equals ("start" );
298-
299- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" ) == false ;
300- return true ;
301- }
302-
303278 private void performPhaseOnShard (final int shardIndex , final SearchShardIterator shardIt , final SearchShardTarget shard ) {
304279 if (throttleConcurrentRequests ) {
305280 var pendingExecutions = pendingExecutionsPerNode .computeIfAbsent (
@@ -318,7 +293,7 @@ private void doPerformPhaseOnShard(int shardIndex, SearchShardIterator shardIt,
318293 public void innerOnResponse (Result result ) {
319294 try {
320295 releasable .close ();
321- onShardResult (result , shardIt );
296+ onShardResult (result );
322297 } catch (Exception exc ) {
323298 onShardFailure (shardIndex , shard , shardIt , exc );
324299 }
@@ -341,7 +316,6 @@ public void onFailure(Exception e) {
341316 }
342317
343318 private void failOnUnavailable (int shardIndex , SearchShardIterator shardIt ) {
344- assert assertExecuteOnStartThread ();
345319 SearchShardTarget unassignedShard = new SearchShardTarget (null , shardIt .shardId (), shardIt .getClusterAlias ());
346320 onShardFailure (shardIndex , unassignedShard , shardIt , new NoShardAvailableActionException (shardIt .shardId ()));
347321 }
@@ -398,7 +372,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
398372 "Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})" ,
399373 discrepancy ,
400374 successfulOps .get (),
401- toSkipShardsIts . size () ,
375+ skippedCount ,
402376 getNumShards (),
403377 currentPhase
404378 );
@@ -537,9 +511,8 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
537511 /**
538512 * Executed once for every successful shard level request.
539513 * @param result the result returned form the shard
540- * @param shardIt the shard iterator
541514 */
542- protected void onShardResult (Result result , SearchShardIterator shardIt ) {
515+ protected void onShardResult (Result result ) {
543516 assert result .getShardIndex () != -1 : "shard index is not set" ;
544517 assert result .getSearchShardTarget () != null : "search shard target must not be null" ;
545518 hasShardResponse .set (true );
@@ -637,7 +610,7 @@ private SearchResponse buildSearchResponse(
637610 scrollId ,
638611 getNumShards (),
639612 numSuccess ,
640- toSkipShardsIts . size () ,
613+ skippedCount ,
641614 buildTookInMillis (),
642615 failures ,
643616 clusters ,
@@ -729,7 +702,7 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
729702 /**
730703 * Executed once all shard results have been received and processed
731704 * @see #onShardFailure(int, SearchShardTarget, Exception)
732- * @see #onShardResult(SearchPhaseResult, SearchShardIterator )
705+ * @see #onShardResult(SearchPhaseResult)
733706 */
734707 private void onPhaseDone () { // as a tribute to @kimchy aka. finishHim()
735708 executeNextPhase (getName (), this ::getNextPhase );
0 commit comments