1212import org .apache .logging .log4j .Logger ;
1313import org .apache .lucene .util .FixedBitSet ;
1414import org .elasticsearch .action .ActionListener ;
15+ import org .elasticsearch .action .support .SubscribableListener ;
1516import org .elasticsearch .common .util .Maps ;
1617import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
1718import org .elasticsearch .common .util .concurrent .CountDown ;
@@ -76,7 +77,7 @@ final class CanMatchPreFilterSearchPhase {
7677 private int numPossibleMatches ;
7778 private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider ;
7879
79- CanMatchPreFilterSearchPhase (
80+ private CanMatchPreFilterSearchPhase (
8081 Logger logger ,
8182 SearchTransportService searchTransportService ,
8283 BiFunction <String , String , Transport .Connection > nodeIdToConnection ,
@@ -123,6 +124,57 @@ final class CanMatchPreFilterSearchPhase {
123124 this .shardItIndexMap = shardItIndexMap ;
124125 }
125126
127+ public static SubscribableListener <List <SearchShardIterator >> execute (
128+ Logger logger ,
129+ SearchTransportService searchTransportService ,
130+ BiFunction <String , String , Transport .Connection > nodeIdToConnection ,
131+ Map <String , AliasFilter > aliasFilter ,
132+ Map <String , Float > concreteIndexBoosts ,
133+ Executor executor ,
134+ SearchRequest request ,
135+ List <SearchShardIterator > shardsIts ,
136+ TransportSearchAction .SearchTimeProvider timeProvider ,
137+ SearchTask task ,
138+ boolean requireAtLeastOneMatch ,
139+ CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
140+ ) {
141+ if (shardsIts .isEmpty ()) {
142+ return SubscribableListener .newSucceeded (List .of ());
143+ }
144+ final SubscribableListener <List <SearchShardIterator >> listener = new SubscribableListener <>();
145+ // Note that the search is failed when this task is rejected by the executor
146+ executor .execute (new AbstractRunnable () {
147+ @ Override
148+ public void onFailure (Exception e ) {
149+ if (logger .isDebugEnabled ()) {
150+ logger .debug (() -> format ("Failed to execute [%s] while running [can_match] phase" , request ), e );
151+ }
152+ listener .onFailure (new SearchPhaseExecutionException ("can_match" , "start" , e , ShardSearchFailure .EMPTY_ARRAY ));
153+ }
154+
155+ @ Override
156+ protected void doRun () {
157+ assert assertSearchCoordinationThread ();
158+ new CanMatchPreFilterSearchPhase (
159+ logger ,
160+ searchTransportService ,
161+ nodeIdToConnection ,
162+ aliasFilter ,
163+ concreteIndexBoosts ,
164+ executor ,
165+ request ,
166+ shardsIts ,
167+ timeProvider ,
168+ task ,
169+ requireAtLeastOneMatch ,
170+ coordinatorRewriteContextProvider ,
171+ listener
172+ ).runCoordinatorRewritePhase ();
173+ }
174+ });
175+ return listener ;
176+ }
177+
126178 private static boolean assertSearchCoordinationThread () {
127179 return ThreadPool .assertCurrentThreadPool (ThreadPool .Names .SEARCH_COORDINATION );
128180 }
@@ -165,7 +217,7 @@ private void runCoordinatorRewritePhase() {
165217 }
166218 }
167219 if (matchedShardLevelRequests .isEmpty ()) {
168- finishPhase ( );
220+ listener . onResponse ( getIterator ( shardsIts ) );
169221 } else {
170222 // verify missing shards only for the shards that we hit for the query
171223 checkNoMissingShards (matchedShardLevelRequests );
@@ -176,18 +228,14 @@ private void runCoordinatorRewritePhase() {
176228 private void consumeResult (boolean canMatch , ShardSearchRequest request ) {
177229 CanMatchShardResponse result = new CanMatchShardResponse (canMatch , null );
178230 result .setShardIndex (request .shardRequestIndex ());
179- consumeResult (result , () -> {} );
231+ consumeResult (result );
180232 }
181233
182- private void consumeResult (CanMatchShardResponse result , Runnable next ) {
183- try {
184- final boolean canMatch = result .canMatch ();
185- final MinAndMax <?> minAndMax = result .estimatedMinAndMax ();
186- if (canMatch || minAndMax != null ) {
187- consumeResult (result .getShardIndex (), canMatch , minAndMax );
188- }
189- } finally {
190- next .run ();
234+ private void consumeResult (CanMatchShardResponse result ) {
235+ final boolean canMatch = result .canMatch ();
236+ final MinAndMax <?> minAndMax = result .estimatedMinAndMax ();
237+ if (canMatch || minAndMax != null ) {
238+ consumeResult (result .getShardIndex (), canMatch , minAndMax );
191239 }
192240 }
193241
@@ -226,7 +274,7 @@ private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchSha
226274 * If there are failures during a round, there will be a follow-up round
227275 * to retry on other available shard copies.
228276 */
229- class Round extends AbstractRunnable {
277+ private class Round extends AbstractRunnable {
230278 private final List <SearchShardIterator > shards ;
231279 private final CountDown countDown ;
232280 private final AtomicReferenceArray <Exception > failedResponses ;
@@ -296,11 +344,10 @@ public void onFailure(Exception e) {
296344
297345 private void onOperation (int idx , CanMatchShardResponse response ) {
298346 failedResponses .set (idx , null );
299- consumeResult (response , () -> {
300- if (countDown .countDown ()) {
301- finishRound ();
302- }
303- });
347+ consumeResult (response );
348+ if (countDown .countDown ()) {
349+ finishRound ();
350+ }
304351 }
305352
306353 private void onOperationFailed (int idx , Exception e ) {
@@ -322,7 +369,7 @@ private void finishRound() {
322369 }
323370 }
324371 if (remainingShards .isEmpty ()) {
325- finishPhase ( );
372+ listener . onResponse ( getIterator ( shardsIts ) );
326373 } else {
327374 // trigger another round, forcing execution
328375 executor .execute (new Round (remainingShards ) {
@@ -339,7 +386,7 @@ public void onFailure(Exception e) {
339386 if (logger .isDebugEnabled ()) {
340387 logger .debug (() -> format ("Failed to execute [%s] while running [can_match] phase" , request ), e );
341388 }
342- onPhaseFailure ( " round" , e );
389+ listener . onFailure ( new SearchPhaseExecutionException ( "can_match" , " round" , e , ShardSearchFailure . EMPTY_ARRAY ) );
343390 }
344391 }
345392
@@ -363,13 +410,9 @@ private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<
363410 );
364411 }
365412
366- private void finishPhase () {
367- listener .onResponse (getIterator (shardsIts ));
368- }
369-
370413 private static final float DEFAULT_INDEX_BOOST = 1.0f ;
371414
372- public CanMatchNodeRequest .Shard buildShardLevelRequest (SearchShardIterator shardIt ) {
415+ private CanMatchNodeRequest .Shard buildShardLevelRequest (SearchShardIterator shardIt ) {
373416 AliasFilter filter = aliasFilter .get (shardIt .shardId ().getIndex ().getUUID ());
374417 assert filter != null ;
375418 float indexBoost = concreteIndexBoosts .getOrDefault (shardIt .shardId ().getIndex ().getUUID (), DEFAULT_INDEX_BOOST );
@@ -386,33 +429,6 @@ public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shar
386429 );
387430 }
388431
389- public void start () {
390- if (shardsIts .isEmpty ()) {
391- finishPhase ();
392- return ;
393- }
394- // Note that the search is failed when this task is rejected by the executor
395- executor .execute (new AbstractRunnable () {
396- @ Override
397- public void onFailure (Exception e ) {
398- if (logger .isDebugEnabled ()) {
399- logger .debug (() -> format ("Failed to execute [%s] while running [can_match] phase" , request ), e );
400- }
401- onPhaseFailure ("start" , e );
402- }
403-
404- @ Override
405- protected void doRun () {
406- assert assertSearchCoordinationThread ();
407- runCoordinatorRewritePhase ();
408- }
409- });
410- }
411-
412- private void onPhaseFailure (String msg , Exception cause ) {
413- listener .onFailure (new SearchPhaseExecutionException ("can_match" , msg , cause , ShardSearchFailure .EMPTY_ARRAY ));
414- }
415-
416432 private synchronized List <SearchShardIterator > getIterator (List <SearchShardIterator > shardsIts ) {
417433 // TODO: pick the local shard when possible
418434 if (requireAtLeastOneMatch && numPossibleMatches == 0 ) {
0 commit comments