|
51 | 51 | import java.util.concurrent.atomic.AtomicBoolean; |
52 | 52 | import java.util.concurrent.atomic.AtomicInteger; |
53 | 53 | import java.util.function.Function; |
54 | | -import java.util.stream.Collectors; |
55 | 54 |
|
| 55 | +import static java.util.stream.Collectors.toMap; |
56 | 56 | import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest; |
57 | 57 | import static org.hamcrest.Matchers.contains; |
58 | 58 | import static org.hamcrest.Matchers.containsString; |
@@ -402,6 +402,32 @@ public void testRetryMovedShard() { |
402 | 402 | assertThat(attempt.get(), equalTo(3)); |
403 | 403 | } |
404 | 404 |
|
| 405 | + public void testRetryMultipleMovedShards() { |
| 406 | + var attempt = new AtomicInteger(0); |
| 407 | + var response = safeGet( |
| 408 | + sendRequests( |
| 409 | + randomBoolean(), |
| 410 | + -1, |
| 411 | + List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node3)), |
| 412 | + shardIds -> shardIds.stream().collect(toMap(Function.identity(), shardId -> List.of(randomFrom(node1, node2, node3)))), |
| 413 | + (node, shardIds, aliasFilters, listener) -> runWithDelay( |
| 414 | + () -> listener.onResponse( |
| 415 | + attempt.incrementAndGet() <= 6 |
| 416 | + ? new DataNodeComputeResponse( |
| 417 | + List.of(), |
| 418 | + shardIds.stream().collect(toMap(Function.identity(), ShardNotFoundException::new)) |
| 419 | + ) |
| 420 | + : new DataNodeComputeResponse(List.of(), Map.of()) |
| 421 | + ) |
| 422 | + ) |
| 423 | + ) |
| 424 | + ); |
| 425 | + assertThat(response.totalShards, equalTo(3)); |
| 426 | + assertThat(response.successfulShards, equalTo(3)); |
| 427 | + assertThat(response.skippedShards, equalTo(0)); |
| 428 | + assertThat(response.failedShards, equalTo(0)); |
| 429 | + } |
| 430 | + |
405 | 431 | public void testDoesNotRetryMovedShardIndefinitely() { |
406 | 432 | var attempt = new AtomicInteger(0); |
407 | 433 | var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1)), shardIds -> { |
@@ -463,28 +489,28 @@ public void testRetryUnassignedShardWithoutPartialResults() { |
463 | 489 |
|
464 | 490 | ); |
465 | 491 | expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet); |
| 492 | + assertThat(attempt.get(), equalTo(1)); |
466 | 493 | } |
467 | 494 |
|
468 | 495 | public void testRetryUnassignedShardWithPartialResults() { |
469 | | - var response = safeGet( |
470 | | - sendRequests( |
471 | | - true, |
472 | | - -1, |
473 | | - List.of(targetShard(shard1, node1), targetShard(shard2, node2)), |
474 | | - shardIds -> Map.of(shard1, List.of()), |
475 | | - (node, shardIds, aliasFilters, listener) -> runWithDelay( |
476 | | - () -> listener.onResponse( |
477 | | - Objects.equals(shardIds, List.of(shard2)) |
478 | | - ? new DataNodeComputeResponse(List.of(), Map.of()) |
479 | | - : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) |
480 | | - ) |
| 496 | + var attempt = new AtomicInteger(0); |
| 497 | + var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> { |
| 498 | + attempt.incrementAndGet(); |
| 499 | + return Map.of(shard1, List.of()); |
| 500 | + }, |
| 501 | + (node, shardIds, aliasFilters, listener) -> runWithDelay( |
| 502 | + () -> listener.onResponse( |
| 503 | + Objects.equals(shardIds, List.of(shard2)) |
| 504 | + ? new DataNodeComputeResponse(List.of(), Map.of()) |
| 505 | + : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) |
481 | 506 | ) |
482 | 507 | ) |
483 | | - ); |
| 508 | + )); |
484 | 509 | assertThat(response.totalShards, equalTo(2)); |
485 | 510 | assertThat(response.successfulShards, equalTo(1)); |
486 | 511 | assertThat(response.skippedShards, equalTo(0)); |
487 | 512 | assertThat(response.failedShards, equalTo(1)); |
| 513 | + assertThat(attempt.get(), equalTo(1)); |
488 | 514 | } |
489 | 515 |
|
490 | 516 | static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { |
@@ -553,11 +579,7 @@ PlainActionFuture<ComputeResponse> sendRequests( |
553 | 579 | void searchShards(Set<String> concreteIndices, ActionListener<TargetShards> listener) { |
554 | 580 | runWithDelay( |
555 | 581 | () -> listener.onResponse( |
556 | | - new TargetShards( |
557 | | - shards.stream().collect(Collectors.toMap(TargetShard::shardId, Function.identity())), |
558 | | - shards.size(), |
559 | | - 0 |
560 | | - ) |
| 582 | + new TargetShards(shards.stream().collect(toMap(TargetShard::shardId, Function.identity())), shards.size(), 0) |
561 | 583 | ) |
562 | 584 | ); |
563 | 585 | } |
|
0 commit comments