@@ -230,9 +230,6 @@ protected final void run() {
230230 onPhaseDone ();
231231 return ;
232232 }
233- if (shardsIts .isEmpty ()) {
234- return ;
235- }
236233 final Map <SearchShardIterator , Integer > shardIndexMap = Maps .newHashMapWithExpectedSize (shardIterators .length );
237234 for (int i = 0 ; i < shardIterators .length ; i ++) {
238235 shardIndexMap .put (shardIterators [i ], i );
@@ -426,11 +423,15 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar
426423 performPhaseOnShard (shardIndex , shardIt , nextShard );
427424 } else {
428425 // count down outstanding shards, we're done with this shard as there's no more copies to try
429- final int outstanding = outstandingShards .decrementAndGet ();
430- assert outstanding >= 0 : "outstanding: " + outstanding ;
431- if (outstanding == 0 ) {
432- onPhaseDone ();
433- }
426+ finishOneShard ();
427+ }
428+ }
429+
430+ private void finishOneShard () {
431+ final int outstanding = outstandingShards .decrementAndGet ();
432+ assert outstanding >= 0 : "outstanding: " + outstanding ;
433+ if (outstanding == 0 ) {
434+ onPhaseDone ();
434435 }
435436 }
436437
@@ -501,31 +502,16 @@ protected void onShardResult(Result result) {
501502 logger .trace ("got first-phase result from {}" , result != null ? result .getSearchShardTarget () : null );
502503 }
503504 // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
504- // so its ok concurrency wise to miss potentially the shard failures being created because of another failure
505+ // so it's ok concurrency wise to miss potentially the shard failures being created because of another failure
505506 // in the #addShardFailure, because by definition, it will happen on *another* shardIndex
506507 AtomicArray <ShardSearchFailure > shardFailures = this .shardFailures .get ();
507508 if (shardFailures != null ) {
508509 shardFailures .set (result .getShardIndex (), null );
509510 }
510- results .consumeResult (result , this ::onShardResultConsumed );
511- }
512-
513- private void onShardResultConsumed () {
514- successfulOps .incrementAndGet ();
515- // we need to increment successful ops first before we compare the exit condition otherwise if we
516- // are fast we could concurrently update totalOps but then preempt one of the threads which can
517- // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
518- // increment all the "future" shards to update the total ops since we some may work and some may not...
519- // and when that happens, we break on total ops, so we must maintain them
520- successfulShardExecution ();
521- }
522-
523- private void successfulShardExecution () {
524- final int outstanding = outstandingShards .decrementAndGet ();
525- assert outstanding >= 0 : "outstanding: " + outstanding ;
526- if (outstanding == 0 ) {
527- onPhaseDone ();
528- }
511+ results .consumeResult (result , () -> {
512+ successfulOps .incrementAndGet ();
513+ finishOneShard ();
514+ });
529515 }
530516
531517 /**
0 commit comments