@@ -545,7 +545,7 @@ public void testAllowPartialResults() throws InterruptedException {
545545
546546 SearchTransportService transportService = new SearchTransportService (null , null , null );
547547 Map <String , Transport .Connection > lookup = new HashMap <>();
548- Map <ShardId , Boolean > seenShard = new ConcurrentHashMap <>();
548+ Map <ShardId , AtomicInteger > seenShard = new ConcurrentHashMap <>();
549549 lookup .put (primaryNode .getId (), new MockConnection (primaryNode ));
550550 lookup .put (replicaNode .getId (), new MockConnection (replicaNode ));
551551 Map <String , AliasFilter > aliasFilters = Collections .singletonMap ("_na_" , AliasFilter .EMPTY );
@@ -581,17 +581,18 @@ protected void executePhaseOnShard(
581581 Transport .Connection connection ,
582582 SearchActionListener <TestSearchPhaseResult > listener
583583 ) {
584- seenShard .computeIfAbsent (shardIt .shardId (), (i ) -> {
584+ AtomicInteger retries = seenShard .computeIfAbsent (shardIt .shardId (), (i ) -> {
585585 numRequests .incrementAndGet (); // only count this once per shard copy
586- return Boolean . TRUE ;
586+ return new AtomicInteger ( 0 ) ;
587587 });
588+ int numRetries = retries .incrementAndGet ();
588589 new Thread (() -> {
589590 TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult (
590591 new ShardSearchContextId (UUIDs .randomBase64UUID (), contextIdGenerator .incrementAndGet ()),
591592 connection .getNode ()
592593 );
593594 try {
594- if (shardIt .remaining () > 0 ) {
595+ if (numRetries < shardIt .size () ) {
595596 numFailReplicas .incrementAndGet ();
596597 listener .onFailure (new RuntimeException ());
597598 } else {
@@ -643,10 +644,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException {
643644 );
644645 // Skip all the shards
645646 searchShardIterator .skip (true );
646- searchShardIterator .reset ();
647647 searchShardIterators .add (searchShardIterator );
648648 }
649- List <SearchShardIterator > shardsIter = searchShardIterators ;
650649 Map <String , Transport .Connection > lookup = Map .of (primaryNode .getId (), new MockConnection (primaryNode ));
651650
652651 CountDownLatch latch = new CountDownLatch (1 );
@@ -665,11 +664,11 @@ public void testSkipUnavailableSearchShards() throws InterruptedException {
665664 null ,
666665 request ,
667666 responseListener ,
668- shardsIter ,
667+ searchShardIterators ,
669668 new TransportSearchAction .SearchTimeProvider (0 , 0 , () -> 0 ),
670669 ClusterState .EMPTY_STATE ,
671670 null ,
672- new ArraySearchPhaseResults <>(shardsIter .size ()),
671+ new ArraySearchPhaseResults <>(searchShardIterators .size ()),
673672 request .getMaxConcurrentShardRequests (),
674673 SearchResponse .Clusters .EMPTY
675674 ) {
@@ -702,7 +701,7 @@ public void run() {
702701 assertNotNull (searchResponse .get ());
703702 assertThat (searchResponse .get ().getSkippedShards (), equalTo (numUnavailableSkippedShards ));
704703 assertThat (searchResponse .get ().getFailedShards (), equalTo (0 ));
705- assertThat (searchResponse .get ().getSuccessfulShards (), equalTo (shardsIter .size ()));
704+ assertThat (searchResponse .get ().getSuccessfulShards (), equalTo (searchShardIterators .size ()));
706705 }
707706
708707 static List <SearchShardIterator > getShardsIter (
@@ -728,7 +727,6 @@ static List<SearchShardIterator> getShardsIter(
728727 for (int i = 0 ; i < numShards ; i ++) {
729728 ArrayList <ShardRouting > started = new ArrayList <>();
730729 ArrayList <ShardRouting > initializing = new ArrayList <>();
731- ArrayList <ShardRouting > unassigned = new ArrayList <>();
732730
733731 ShardRouting routing = ShardRouting .newUnassigned (
734732 new ShardId (index , i ),
@@ -758,8 +756,6 @@ static List<SearchShardIterator> getShardsIter(
758756 } else {
759757 initializing .add (routing );
760758 }
761- } else {
762- unassigned .add (routing ); // unused yet
763759 }
764760 }
765761 Collections .shuffle (started , random ());
0 commit comments