Skip to content

Commit 2ddccfd

Browse files
idegtiarenkojfreden
authored andcommitted
Query hot indices first (#122928)
1 parent 5c6d5e1 commit 2ddccfd

File tree

2 files changed

+96
-11
lines changed

2 files changed

+96
-11
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.search.SearchShardsResponse;
1818
import org.elasticsearch.action.support.TransportActions;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2021
import org.elasticsearch.common.breaker.CircuitBreakingException;
2122
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2223
import org.elasticsearch.compute.operator.DriverProfile;
@@ -36,9 +37,11 @@
3637

3738
import java.util.ArrayList;
3839
import java.util.Collections;
40+
import java.util.Comparator;
3941
import java.util.HashMap;
4042
import java.util.IdentityHashMap;
4143
import java.util.Iterator;
44+
import java.util.LinkedHashMap;
4245
import java.util.List;
4346
import java.util.Map;
4447
import java.util.Queue;
@@ -54,6 +57,20 @@
5457
* and executing these computes on the data nodes.
5558
*/
5659
abstract class DataNodeRequestSender {
60+
61+
/**
62+
* Query order according to the
63+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
64+
*/
65+
private static final List<String> NODE_QUERY_ORDER = List.of(
66+
DiscoveryNodeRole.SEARCH_ROLE.roleName(),
67+
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE.roleName(),
68+
DiscoveryNodeRole.DATA_HOT_NODE_ROLE.roleName(),
69+
DiscoveryNodeRole.DATA_WARM_NODE_ROLE.roleName(),
70+
DiscoveryNodeRole.DATA_COLD_NODE_ROLE.roleName(),
71+
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()
72+
);
73+
5774
private final TransportService transportService;
5875
private final Executor esqlExecutor;
5976
private final CancellableTask rootTask;
@@ -106,12 +123,39 @@ final void startComputeOnDataNodes(
106123
nodePermits.putIfAbsent(node, new Semaphore(1));
107124
}
108125
}
109-
pendingShardIds.addAll(targetShards.shards.keySet());
126+
pendingShardIds.addAll(order(targetShards));
110127
trySendingRequestsForPendingShards(targetShards, computeListener);
111128
}
112129
}, listener::onFailure));
113130
}
114131

132+
private static List<ShardId> order(TargetShards targetShards) {
133+
var computedNodeOrder = new HashMap<DiscoveryNode, Integer>();
134+
var ordered = new ArrayList<>(targetShards.shards.keySet());
135+
ordered.sort(Comparator.comparingInt(shardId -> nodesOrder(targetShards.getShard(shardId).remainingNodes, computedNodeOrder)));
136+
return ordered;
137+
}
138+
139+
private static int nodesOrder(List<DiscoveryNode> nodes, Map<DiscoveryNode, Integer> computedNodeOrder) {
140+
if (nodes.isEmpty()) {
141+
return Integer.MAX_VALUE;
142+
}
143+
var order = 0;
144+
for (var node : nodes) {
145+
order = Math.max(order, computedNodeOrder.computeIfAbsent(node, DataNodeRequestSender::nodeOrder));
146+
}
147+
return order;
148+
}
149+
150+
private static int nodeOrder(DiscoveryNode node) {
151+
for (int i = 0; i < NODE_QUERY_ORDER.size(); i++) {
152+
if (node.hasRole(NODE_QUERY_ORDER.get(i))) {
153+
return i;
154+
}
155+
}
156+
return Integer.MAX_VALUE;
157+
}
158+
115159
private void trySendingRequestsForPendingShards(TargetShards targetShards, ComputeListener computeListener) {
116160
changed.set(true);
117161
final ActionListener<Void> listener = computeListener.acquireAvoid();
@@ -279,7 +323,7 @@ private record ShardFailure(boolean fatal, Exception failure) {}
279323
*/
280324
private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
281325
assert sendingLock.isHeldByCurrentThread();
282-
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
326+
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new LinkedHashMap<>();
283327
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
284328

285329
while (shardsIt.hasNext()) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
package org.elasticsearch.xpack.esql.plugin;
99

1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.TransportVersion;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.NoShardAvailableActionException;
1314
import org.elasticsearch.action.OriginalIndices;
1415
import org.elasticsearch.action.search.SearchRequest;
1516
import org.elasticsearch.action.support.PlainActionFuture;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
19+
import org.elasticsearch.cluster.node.VersionInformation;
1820
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
1921
import org.elasticsearch.common.breaker.CircuitBreakingException;
2022
import org.elasticsearch.common.settings.Settings;
@@ -29,6 +31,7 @@
2931
import org.elasticsearch.tasks.CancellableTask;
3032
import org.elasticsearch.tasks.Task;
3133
import org.elasticsearch.tasks.TaskId;
34+
import org.elasticsearch.test.transport.MockTransportService;
3235
import org.elasticsearch.threadpool.FixedExecutorBuilder;
3336
import org.elasticsearch.threadpool.TestThreadPool;
3437
import org.elasticsearch.transport.TransportService;
@@ -52,26 +55,29 @@
5255
import java.util.function.Function;
5356
import java.util.stream.Collectors;
5457

58+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_COLD_NODE_ROLE;
59+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE;
60+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_HOT_NODE_ROLE;
61+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_WARM_NODE_ROLE;
5562
import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest;
63+
import static org.hamcrest.Matchers.anyOf;
5664
import static org.hamcrest.Matchers.containsString;
5765
import static org.hamcrest.Matchers.equalTo;
5866
import static org.hamcrest.Matchers.hasSize;
5967
import static org.hamcrest.Matchers.in;
6068
import static org.hamcrest.Matchers.not;
61-
import static org.mockito.Mockito.mock;
62-
import static org.mockito.Mockito.when;
6369

6470
public class DataNodeRequestSenderTests extends ComputeTestCase {
6571

6672
private TestThreadPool threadPool;
6773
private Executor executor = null;
6874
private static final String ESQL_TEST_EXECUTOR = "esql_test_executor";
6975

70-
private final DiscoveryNode node1 = DiscoveryNodeUtils.create("node-1");
71-
private final DiscoveryNode node2 = DiscoveryNodeUtils.create("node-2");
72-
private final DiscoveryNode node3 = DiscoveryNodeUtils.create("node-3");
73-
private final DiscoveryNode node4 = DiscoveryNodeUtils.create("node-4");
74-
private final DiscoveryNode node5 = DiscoveryNodeUtils.create("node-5");
76+
private final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node-1").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
77+
private final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
78+
private final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
79+
private final DiscoveryNode node4 = DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
80+
private final DiscoveryNode node5 = DiscoveryNodeUtils.builder("node-5").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
7581
private final ShardId shard1 = new ShardId("index", "n/a", 1);
7682
private final ShardId shard2 = new ShardId("index", "n/a", 2);
7783
private final ShardId shard3 = new ShardId("index", "n/a", 3);
@@ -371,6 +377,37 @@ public void testSkipRemovesPriorNonFatalErrors() {
371377
assertThat(response.failedShards, equalTo(0));
372378
}
373379

380+
public void testQueryHotShardsFirst() {
381+
var targetShards = shuffledList(
382+
List.of(
383+
targetShard(shard1, node1),
384+
targetShard(shard2, DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_WARM_NODE_ROLE)).build()),
385+
targetShard(shard3, DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_COLD_NODE_ROLE)).build()),
386+
targetShard(shard4, DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_FROZEN_NODE_ROLE)).build())
387+
)
388+
);
389+
var sent = Collections.synchronizedList(new ArrayList<String>());
390+
safeGet(sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> {
391+
sent.add(node.getId());
392+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
393+
}));
394+
assertThat(sent, equalTo(List.of("node-1", "node-2", "node-3", "node-4")));
395+
}
396+
397+
public void testQueryHotShardsFirstWhenIlmMovesShard() {
398+
var warmNode2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_WARM_NODE_ROLE)).build();
399+
var targetShards = shuffledList(
400+
List.of(targetShard(shard1, node1), targetShard(shard2, shuffledList(List.of(node2, warmNode2)).toArray(DiscoveryNode[]::new)))
401+
);
402+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
403+
safeGet(sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> {
404+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
405+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
406+
}));
407+
assertThat(groupRequests(sent, 1), equalTo(Map.of(node1, List.of(shard1))));
408+
assertThat(groupRequests(sent, 1), anyOf(equalTo(Map.of(node2, List.of(shard2))), equalTo(Map.of(warmNode2, List.of(shard2)))));
409+
}
410+
374411
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
375412
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
376413
}
@@ -399,8 +436,12 @@ PlainActionFuture<ComputeResponse> sendRequests(
399436
Sender sender
400437
) {
401438
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
402-
TransportService transportService = mock(TransportService.class);
403-
when(transportService.getThreadPool()).thenReturn(threadPool);
439+
TransportService transportService = MockTransportService.createNewService(
440+
Settings.EMPTY,
441+
VersionInformation.CURRENT,
442+
TransportVersion.current(),
443+
threadPool
444+
);
404445
CancellableTask task = new CancellableTask(
405446
randomNonNegativeLong(),
406447
"type",

0 commit comments

Comments
 (0)