|
8 | 8 | package org.elasticsearch.xpack.esql.plugin; |
9 | 9 |
|
10 | 10 | import org.elasticsearch.ExceptionsHelper; |
| 11 | +import org.elasticsearch.TransportVersion; |
11 | 12 | import org.elasticsearch.action.ActionListener; |
12 | 13 | import org.elasticsearch.action.NoShardAvailableActionException; |
13 | 14 | import org.elasticsearch.action.OriginalIndices; |
14 | 15 | import org.elasticsearch.action.search.SearchRequest; |
15 | 16 | import org.elasticsearch.action.support.PlainActionFuture; |
16 | 17 | import org.elasticsearch.cluster.node.DiscoveryNode; |
17 | 18 | import org.elasticsearch.cluster.node.DiscoveryNodeUtils; |
| 19 | +import org.elasticsearch.cluster.node.VersionInformation; |
18 | 20 | import org.elasticsearch.common.breaker.CircuitBreaker.Durability; |
19 | 21 | import org.elasticsearch.common.breaker.CircuitBreakingException; |
20 | 22 | import org.elasticsearch.common.settings.Settings; |
|
28 | 30 | import org.elasticsearch.search.internal.AliasFilter; |
29 | 31 | import org.elasticsearch.tasks.CancellableTask; |
30 | 32 | import org.elasticsearch.tasks.TaskId; |
| 33 | +import org.elasticsearch.test.transport.MockTransportService; |
31 | 34 | import org.elasticsearch.threadpool.FixedExecutorBuilder; |
32 | 35 | import org.elasticsearch.threadpool.TestThreadPool; |
33 | 36 | import org.elasticsearch.transport.TransportService; |
|
53 | 56 | import java.util.function.Function; |
54 | 57 |
|
55 | 58 | import static java.util.stream.Collectors.toMap; |
| 59 | +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_COLD_NODE_ROLE; |
| 60 | +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE; |
| 61 | +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_HOT_NODE_ROLE; |
| 62 | +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_WARM_NODE_ROLE; |
56 | 63 | import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest; |
| 64 | +import static org.hamcrest.Matchers.anyOf; |
57 | 65 | import static org.hamcrest.Matchers.contains; |
58 | 66 | import static org.hamcrest.Matchers.containsString; |
59 | 67 | import static org.hamcrest.Matchers.empty; |
60 | 68 | import static org.hamcrest.Matchers.equalTo; |
61 | 69 | import static org.hamcrest.Matchers.hasSize; |
62 | 70 | import static org.hamcrest.Matchers.in; |
63 | 71 | import static org.hamcrest.Matchers.not; |
64 | | -import static org.mockito.Mockito.mock; |
65 | | -import static org.mockito.Mockito.when; |
66 | 72 |
|
67 | 73 | public class DataNodeRequestSenderTests extends ComputeTestCase { |
68 | 74 |
|
69 | 75 | private TestThreadPool threadPool; |
70 | 76 | private Executor executor = null; |
71 | 77 | private static final String ESQL_TEST_EXECUTOR = "esql_test_executor"; |
72 | 78 |
|
73 | | - private final DiscoveryNode node1 = DiscoveryNodeUtils.create("node-1"); |
74 | | - private final DiscoveryNode node2 = DiscoveryNodeUtils.create("node-2"); |
75 | | - private final DiscoveryNode node3 = DiscoveryNodeUtils.create("node-3"); |
76 | | - private final DiscoveryNode node4 = DiscoveryNodeUtils.create("node-4"); |
77 | | - private final DiscoveryNode node5 = DiscoveryNodeUtils.create("node-5"); |
| 79 | + private final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node-1").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); |
| 80 | + private final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); |
| 81 | + private final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); |
| 82 | + private final DiscoveryNode node4 = DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); |
| 83 | + private final DiscoveryNode node5 = DiscoveryNodeUtils.builder("node-5").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); |
78 | 84 | private final ShardId shard1 = new ShardId("index", "n/a", 1); |
79 | 85 | private final ShardId shard2 = new ShardId("index", "n/a", 2); |
80 | 86 | private final ShardId shard3 = new ShardId("index", "n/a", 3); |
@@ -378,6 +384,37 @@ public void testSkipRemovesPriorNonFatalErrors() { |
378 | 384 | assertThat(response.failedShards, equalTo(0)); |
379 | 385 | } |
380 | 386 |
|
| 387 | + public void testQueryHotShardsFirst() { |
| 388 | + var targetShards = shuffledList( |
| 389 | + List.of( |
| 390 | + targetShard(shard1, node1), |
| 391 | + targetShard(shard2, DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_WARM_NODE_ROLE)).build()), |
| 392 | + targetShard(shard3, DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_COLD_NODE_ROLE)).build()), |
| 393 | + targetShard(shard4, DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_FROZEN_NODE_ROLE)).build()) |
| 394 | + ) |
| 395 | + ); |
| 396 | + var sent = Collections.synchronizedList(new ArrayList<String>()); |
| 397 | + safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { |
| 398 | + sent.add(node.getId()); |
| 399 | + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); |
| 400 | + })); |
| 401 | + assertThat(sent, equalTo(List.of("node-1", "node-2", "node-3", "node-4"))); |
| 402 | + } |
| 403 | + |
| 404 | + public void testQueryHotShardsFirstWhenIlmMovesShard() { |
| 405 | + var warmNode2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_WARM_NODE_ROLE)).build(); |
| 406 | + var targetShards = shuffledList( |
| 407 | + List.of(targetShard(shard1, node1), targetShard(shard2, shuffledList(List.of(node2, warmNode2)).toArray(DiscoveryNode[]::new))) |
| 408 | + ); |
| 409 | + var sent = ConcurrentCollections.<NodeRequest>newQueue(); |
| 410 | + safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { |
| 411 | + sent.add(new NodeRequest(node, shardIds, aliasFilters)); |
| 412 | + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); |
| 413 | + })); |
| 414 | + assertThat(groupRequests(sent, 1), equalTo(Map.of(node1, List.of(shard1)))); |
| 415 | + assertThat(groupRequests(sent, 1), anyOf(equalTo(Map.of(node2, List.of(shard2))), equalTo(Map.of(warmNode2, List.of(shard2))))); |
| 416 | + } |
| 417 | + |
381 | 418 | public void testRetryMovedShard() { |
382 | 419 | var attempt = new AtomicInteger(0); |
383 | 420 | var response = safeGet( |
@@ -553,8 +590,12 @@ PlainActionFuture<ComputeResponse> sendRequests( |
553 | 590 | Sender sender |
554 | 591 | ) { |
555 | 592 | PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>(); |
556 | | - TransportService transportService = mock(TransportService.class); |
557 | | - when(transportService.getThreadPool()).thenReturn(threadPool); |
| 593 | + TransportService transportService = MockTransportService.createNewService( |
| 594 | + Settings.EMPTY, |
| 595 | + VersionInformation.CURRENT, |
| 596 | + TransportVersion.current(), |
| 597 | + threadPool |
| 598 | + ); |
558 | 599 | CancellableTask task = new CancellableTask( |
559 | 600 | randomNonNegativeLong(), |
560 | 601 | "type", |
|
0 commit comments