4040import java .util .HashMap ;
4141import java .util .List ;
4242import java .util .Map ;
43+ import java .util .Objects ;
4344import java .util .Queue ;
4445import java .util .Set ;
4546import java .util .concurrent .Executor ;
@@ -85,7 +86,7 @@ public void setThreadPool() {
8586 }
8687
8788 @ After
88- public void shutdownThreadPool () throws Exception {
89+ public void shutdownThreadPool () {
8990 terminate (threadPool );
9091 }
9192
@@ -109,8 +110,7 @@ public void testOnePass() {
109110 Queue <NodeRequest > sent = ConcurrentCollections .newQueue ();
110111 var future = sendRequests (targetShards , randomBoolean (), (node , shardIds , aliasFilters , listener ) -> {
111112 sent .add (new NodeRequest (node , shardIds , aliasFilters ));
112- var resp = new DataNodeComputeResponse (List .of (), Map .of ());
113- runWithDelay (() -> listener .onResponse (resp ));
113+ runWithDelay (() -> listener .onResponse (new DataNodeComputeResponse (List .of (), Map .of ())));
114114 });
115115 safeGet (future );
116116 assertThat (sent .size (), equalTo (2 ));
@@ -123,8 +123,7 @@ public void testMissingShards() {
123123 var future = sendRequests (targetShards , false , (node , shardIds , aliasFilters , listener ) -> {
124124 fail ("expect no data-node request is sent when target shards are missing" );
125125 });
126- var error = expectThrows (NoShardAvailableActionException .class , future ::actionGet );
127- assertThat (error .getMessage (), containsString ("no shard copies found" ));
126+ expectThrows (NoShardAvailableActionException .class , containsString ("no shard copies found" ), future ::actionGet );
128127 }
129128 {
130129 var targetShards = List .of (targetShard (shard1 , node1 ), targetShard (shard3 ), targetShard (shard4 , node2 , node3 ));
@@ -244,6 +243,34 @@ public void testAllowPartialResults() {
244243 assertThat (resp .successfulShards , equalTo (1 ));
245244 }
246245
246+ public void testNonFatalErrorIsRetriedOnAnotherShard () {
247+ var targetShards = List .of (targetShard (shard1 , node1 , node2 ));
248+ Queue <NodeRequest > sent = ConcurrentCollections .newQueue ();
249+ var response = safeGet (sendRequests (targetShards , false , (node , shardIds , aliasFilters , listener ) -> {
250+ sent .add (new NodeRequest (node , shardIds , aliasFilters ));
251+ if (Objects .equals (node1 , node )) {
252+ runWithDelay (() -> listener .onFailure (new RuntimeException ("test request level non fatal failure" ), false ));
253+ } else {
254+ runWithDelay (() -> listener .onResponse (new DataNodeComputeResponse (List .of (), Map .of ())));
255+ }
256+ }));
257+ assertThat (response .totalShards , equalTo (1 ));
258+ assertThat (response .successfulShards , equalTo (1 ));
259+ assertThat (response .failedShards , equalTo (0 ));
260+ assertThat (sent .size (), equalTo (2 ));
261+ }
262+
263+ public void testNonFatalFailedOnAllNodes () {
264+ var targetShards = List .of (targetShard (shard1 , node1 , node2 ));
265+ Queue <NodeRequest > sent = ConcurrentCollections .newQueue ();
266+ var future = sendRequests (targetShards , false , (node , shardIds , aliasFilters , listener ) -> {
267+ sent .add (new NodeRequest (node , shardIds , aliasFilters ));
268+ runWithDelay (() -> listener .onFailure (new RuntimeException ("test request level non fatal failure" ), false ));
269+ });
270+ expectThrows (RuntimeException .class , equalTo ("test request level non fatal failure" ), future ::actionGet );
271+ assertThat (sent .size (), equalTo (2 ));
272+ }
273+
247274 static DataNodeRequestSender .TargetShard targetShard (ShardId shardId , DiscoveryNode ... nodes ) {
248275 return new DataNodeRequestSender .TargetShard (shardId , new ArrayList <>(Arrays .asList (nodes )), null );
249276 }
0 commit comments