Skip to content

Commit c5a3eb8

Browse files
authored
Query hot indices first (#122928) (#128950)
(cherry picked from commit 734dd07)
1 parent d7fb0e2 commit c5a3eb8

File tree

2 files changed

+95
-11
lines changed

2 files changed

+95
-11
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.search.ShardSearchFailure;
1919
import org.elasticsearch.action.support.TransportActions;
2020
import org.elasticsearch.cluster.node.DiscoveryNode;
21+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.breaker.CircuitBreakingException;
2324
import org.elasticsearch.common.util.Maps;
@@ -38,10 +39,12 @@
3839

3940
import java.util.ArrayList;
4041
import java.util.Collections;
42+
import java.util.Comparator;
4143
import java.util.HashMap;
4244
import java.util.HashSet;
4345
import java.util.IdentityHashMap;
4446
import java.util.Iterator;
47+
import java.util.LinkedHashMap;
4548
import java.util.List;
4649
import java.util.Map;
4750
import java.util.Queue;
@@ -60,6 +63,19 @@
6063
*/
6164
abstract class DataNodeRequestSender {
6265

66+
/**
67+
* Query order according to the
68+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
69+
*/
70+
private static final List<String> NODE_QUERY_ORDER = List.of(
71+
DiscoveryNodeRole.SEARCH_ROLE.roleName(),
72+
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE.roleName(),
73+
DiscoveryNodeRole.DATA_HOT_NODE_ROLE.roleName(),
74+
DiscoveryNodeRole.DATA_WARM_NODE_ROLE.roleName(),
75+
DiscoveryNodeRole.DATA_COLD_NODE_ROLE.roleName(),
76+
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()
77+
);
78+
6379
private final ClusterService clusterService;
6480
private final TransportService transportService;
6581
private final Executor esqlExecutor;
@@ -126,12 +142,39 @@ final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTa
126142
)
127143
)
128144
) {
129-
pendingShardIds.addAll(targetShards.shards.keySet());
145+
pendingShardIds.addAll(order(targetShards));
130146
trySendingRequestsForPendingShards(targetShards, computeListener);
131147
}
132148
}, listener::onFailure));
133149
}
134150

151+
private static List<ShardId> order(TargetShards targetShards) {
152+
var computedNodeOrder = new HashMap<DiscoveryNode, Integer>();
153+
var ordered = new ArrayList<>(targetShards.shards.keySet());
154+
ordered.sort(Comparator.comparingInt(shardId -> nodesOrder(targetShards.getShard(shardId).remainingNodes, computedNodeOrder)));
155+
return ordered;
156+
}
157+
158+
private static int nodesOrder(List<DiscoveryNode> nodes, Map<DiscoveryNode, Integer> computedNodeOrder) {
159+
if (nodes.isEmpty()) {
160+
return Integer.MAX_VALUE;
161+
}
162+
var order = 0;
163+
for (var node : nodes) {
164+
order = Math.max(order, computedNodeOrder.computeIfAbsent(node, DataNodeRequestSender::nodeOrder));
165+
}
166+
return order;
167+
}
168+
169+
private static int nodeOrder(DiscoveryNode node) {
170+
for (int i = 0; i < NODE_QUERY_ORDER.size(); i++) {
171+
if (node.hasRole(NODE_QUERY_ORDER.get(i))) {
172+
return i;
173+
}
174+
}
175+
return Integer.MAX_VALUE;
176+
}
177+
135178
private void trySendingRequestsForPendingShards(TargetShards targetShards, ComputeListener computeListener) {
136179
changed.set(true);
137180
final ActionListener<Void> listener = computeListener.acquireAvoid();
@@ -338,7 +381,7 @@ private static boolean isRetryableFailure(ShardFailure failure) {
338381
*/
339382
private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
340383
assert sendingLock.isHeldByCurrentThread();
341-
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
384+
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new LinkedHashMap<>();
342385
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
343386

344387
while (shardsIt.hasNext()) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
package org.elasticsearch.xpack.esql.plugin;
99

1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.TransportVersion;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.NoShardAvailableActionException;
1314
import org.elasticsearch.action.OriginalIndices;
1415
import org.elasticsearch.action.search.SearchRequest;
1516
import org.elasticsearch.action.support.PlainActionFuture;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
19+
import org.elasticsearch.cluster.node.VersionInformation;
1820
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
1921
import org.elasticsearch.common.breaker.CircuitBreakingException;
2022
import org.elasticsearch.common.settings.Settings;
@@ -28,6 +30,7 @@
2830
import org.elasticsearch.search.internal.AliasFilter;
2931
import org.elasticsearch.tasks.CancellableTask;
3032
import org.elasticsearch.tasks.TaskId;
33+
import org.elasticsearch.test.transport.MockTransportService;
3134
import org.elasticsearch.threadpool.FixedExecutorBuilder;
3235
import org.elasticsearch.threadpool.TestThreadPool;
3336
import org.elasticsearch.transport.TransportService;
@@ -53,28 +56,31 @@
5356
import java.util.function.Function;
5457

5558
import static java.util.stream.Collectors.toMap;
59+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_COLD_NODE_ROLE;
60+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE;
61+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_HOT_NODE_ROLE;
62+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_WARM_NODE_ROLE;
5663
import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest;
64+
import static org.hamcrest.Matchers.anyOf;
5765
import static org.hamcrest.Matchers.contains;
5866
import static org.hamcrest.Matchers.containsString;
5967
import static org.hamcrest.Matchers.empty;
6068
import static org.hamcrest.Matchers.equalTo;
6169
import static org.hamcrest.Matchers.hasSize;
6270
import static org.hamcrest.Matchers.in;
6371
import static org.hamcrest.Matchers.not;
64-
import static org.mockito.Mockito.mock;
65-
import static org.mockito.Mockito.when;
6672

6773
public class DataNodeRequestSenderTests extends ComputeTestCase {
6874

6975
private TestThreadPool threadPool;
7076
private Executor executor = null;
7177
private static final String ESQL_TEST_EXECUTOR = "esql_test_executor";
7278

73-
private final DiscoveryNode node1 = DiscoveryNodeUtils.create("node-1");
74-
private final DiscoveryNode node2 = DiscoveryNodeUtils.create("node-2");
75-
private final DiscoveryNode node3 = DiscoveryNodeUtils.create("node-3");
76-
private final DiscoveryNode node4 = DiscoveryNodeUtils.create("node-4");
77-
private final DiscoveryNode node5 = DiscoveryNodeUtils.create("node-5");
79+
private final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node-1").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
80+
private final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
81+
private final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
82+
private final DiscoveryNode node4 = DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
83+
private final DiscoveryNode node5 = DiscoveryNodeUtils.builder("node-5").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
7884
private final ShardId shard1 = new ShardId("index", "n/a", 1);
7985
private final ShardId shard2 = new ShardId("index", "n/a", 2);
8086
private final ShardId shard3 = new ShardId("index", "n/a", 3);
@@ -378,6 +384,37 @@ public void testSkipRemovesPriorNonFatalErrors() {
378384
assertThat(response.failedShards, equalTo(0));
379385
}
380386

387+
public void testQueryHotShardsFirst() {
388+
var targetShards = shuffledList(
389+
List.of(
390+
targetShard(shard1, node1),
391+
targetShard(shard2, DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_WARM_NODE_ROLE)).build()),
392+
targetShard(shard3, DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_COLD_NODE_ROLE)).build()),
393+
targetShard(shard4, DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_FROZEN_NODE_ROLE)).build())
394+
)
395+
);
396+
var sent = Collections.synchronizedList(new ArrayList<String>());
397+
safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> {
398+
sent.add(node.getId());
399+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
400+
}));
401+
assertThat(sent, equalTo(List.of("node-1", "node-2", "node-3", "node-4")));
402+
}
403+
404+
public void testQueryHotShardsFirstWhenIlmMovesShard() {
405+
var warmNode2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_WARM_NODE_ROLE)).build();
406+
var targetShards = shuffledList(
407+
List.of(targetShard(shard1, node1), targetShard(shard2, shuffledList(List.of(node2, warmNode2)).toArray(DiscoveryNode[]::new)))
408+
);
409+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
410+
safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> {
411+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
412+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
413+
}));
414+
assertThat(groupRequests(sent, 1), equalTo(Map.of(node1, List.of(shard1))));
415+
assertThat(groupRequests(sent, 1), anyOf(equalTo(Map.of(node2, List.of(shard2))), equalTo(Map.of(warmNode2, List.of(shard2)))));
416+
}
417+
381418
public void testRetryMovedShard() {
382419
var attempt = new AtomicInteger(0);
383420
var response = safeGet(
@@ -553,8 +590,12 @@ PlainActionFuture<ComputeResponse> sendRequests(
553590
Sender sender
554591
) {
555592
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
556-
TransportService transportService = mock(TransportService.class);
557-
when(transportService.getThreadPool()).thenReturn(threadPool);
593+
TransportService transportService = MockTransportService.createNewService(
594+
Settings.EMPTY,
595+
VersionInformation.CURRENT,
596+
TransportVersion.current(),
597+
threadPool
598+
);
558599
CancellableTask task = new CancellableTask(
559600
randomNonNegativeLong(),
560601
"type",

0 commit comments

Comments
 (0)