Skip to content

Commit 53c5b1f

Browse files
committed
Query hot indices first
1 parent 1c368c7 commit 53c5b1f

File tree

2 files changed

+70
-19
lines changed

2 files changed

+70
-19
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.search.SearchShardsResponse;
1818
import org.elasticsearch.action.support.TransportActions;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2021
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2122
import org.elasticsearch.compute.operator.DriverProfile;
2223
import org.elasticsearch.compute.operator.FailureCollector;
@@ -35,9 +36,11 @@
3536

3637
import java.util.ArrayList;
3738
import java.util.Collections;
39+
import java.util.Comparator;
3840
import java.util.HashMap;
3941
import java.util.IdentityHashMap;
4042
import java.util.Iterator;
43+
import java.util.LinkedHashMap;
4144
import java.util.List;
4245
import java.util.Map;
4346
import java.util.Queue;
@@ -52,6 +55,13 @@
5255
* and executing these computes on the data nodes.
5356
*/
5457
abstract class DataNodeRequestSender {
58+
59+
private static final String[] NODE_QUERY_ORDER = new String[] {
60+
DiscoveryNodeRole.DATA_HOT_NODE_ROLE.roleName(),
61+
DiscoveryNodeRole.DATA_WARM_NODE_ROLE.roleName(),
62+
DiscoveryNodeRole.DATA_COLD_NODE_ROLE.roleName(),
63+
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName() };
64+
5565
private final TransportService transportService;
5666
private final Executor esqlExecutor;
5767
private final CancellableTask rootTask;
@@ -97,12 +107,35 @@ final void startComputeOnDataNodes(
97107
nodePermits.putIfAbsent(node, new Semaphore(1));
98108
}
99109
}
100-
pendingShardIds.addAll(targetShards.shards.keySet());
110+
pendingShardIds.addAll(order(targetShards));
101111
trySendingRequestsForPendingShards(targetShards, computeListener);
102112
}
103113
}, listener::onFailure));
104114
}
105115

116+
private static List<ShardId> order(TargetShards targetShards) {
117+
var ordered = new ArrayList<>(targetShards.shards.keySet());
118+
ordered.sort(Comparator.comparingInt(shardId -> nodeOrder(targetShards.getShard(shardId).remainingNodes)));
119+
return ordered;
120+
}
121+
122+
private static int nodeOrder(List<DiscoveryNode> nodes) {
123+
if (nodes.isEmpty()) {
124+
return Integer.MAX_VALUE;
125+
}
126+
// assumes all shard nodes have same roles
127+
var node = nodes.getFirst();
128+
if (node.hasRole(DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE.roleName()) || node.hasRole(DiscoveryNodeRole.DATA_ROLE.roleName())) {
129+
return 0;
130+
}
131+
for (int i = 0; i < NODE_QUERY_ORDER.length; i++) {
132+
if (node.hasRole(NODE_QUERY_ORDER[i])) {
133+
return i;
134+
}
135+
}
136+
return NODE_QUERY_ORDER.length;
137+
}
138+
106139
private void trySendingRequestsForPendingShards(TargetShards targetShards, ComputeListener computeListener) {
107140
changed.set(true);
108141
final ActionListener<Void> listener = computeListener.acquireAvoid();
@@ -243,17 +276,11 @@ TargetShard getShard(ShardId shardId) {
243276
/**
244277
* (Remaining) allocated nodes of a given shard id and its alias filter
245278
*/
246-
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {
247-
248-
}
279+
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {}
249280

250-
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {
281+
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {}
251282

252-
}
253-
254-
private record ShardFailure(boolean fatal, Exception failure) {
255-
256-
}
283+
private record ShardFailure(boolean fatal, Exception failure) {}
257284

258285
/**
259286
* Selects the next nodes to send requests to. Limits to at most one outstanding request per node.
@@ -262,7 +289,7 @@ private record ShardFailure(boolean fatal, Exception failure) {
262289
*/
263290
private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
264291
assert sendingLock.isHeldByCurrentThread();
265-
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
292+
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new LinkedHashMap<>();
266293
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
267294
while (shardsIt.hasNext()) {
268295
ShardId shardId = shardsIt.next();
@@ -288,16 +315,17 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
288315
}
289316
}
290317
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
291-
for (var e : nodeToShardIds.entrySet()) {
292-
List<ShardId> shardIds = e.getValue();
318+
for (var entry : nodeToShardIds.entrySet()) {
319+
var node = entry.getKey();
320+
var shardIds = entry.getValue();
293321
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
294322
for (ShardId shardId : shardIds) {
295323
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
296324
if (aliasFilter != null) {
297325
aliasFilters.put(shardId.getIndex(), aliasFilter);
298326
}
299327
}
300-
nodeRequests.add(new NodeRequest(e.getKey(), shardIds, aliasFilters));
328+
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
301329
}
302330
return nodeRequests;
303331
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import java.util.function.Function;
4949
import java.util.stream.Collectors;
5050

51+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE;
52+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_HOT_NODE_ROLE;
5153
import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest;
5254
import static org.hamcrest.Matchers.containsString;
5355
import static org.hamcrest.Matchers.equalTo;
@@ -63,11 +65,12 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
6365
private Executor executor = null;
6466
private static final String ESQL_TEST_EXECUTOR = "esql_test_executor";
6567

66-
private final DiscoveryNode node1 = DiscoveryNodeUtils.create("node-1");
67-
private final DiscoveryNode node2 = DiscoveryNodeUtils.create("node-2");
68-
private final DiscoveryNode node3 = DiscoveryNodeUtils.create("node-3");
69-
private final DiscoveryNode node4 = DiscoveryNodeUtils.create("node-4");
70-
private final DiscoveryNode node5 = DiscoveryNodeUtils.create("node-5");
68+
private final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node-1").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
69+
private final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
70+
private final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
71+
private final DiscoveryNode node4 = DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
72+
private final DiscoveryNode node5 = DiscoveryNodeUtils.builder("node-5").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
73+
private final DiscoveryNode warmNode1 = DiscoveryNodeUtils.builder("node-1-warm").roles(Set.of(DATA_FROZEN_NODE_ROLE)).build();
7174
private final ShardId shard1 = new ShardId("index", "n/a", 1);
7275
private final ShardId shard2 = new ShardId("index", "n/a", 2);
7376
private final ShardId shard3 = new ShardId("index", "n/a", 3);
@@ -244,6 +247,26 @@ public void testAllowPartialResults() {
244247
assertThat(resp.successfulShards, equalTo(1));
245248
}
246249

250+
public void testQueryHotShardsFirst() {
251+
var targetShards = shuffledList(
252+
List.of(
253+
targetShard(shard1, node1),
254+
targetShard(shard2, node2),
255+
targetShard(shard3, node3),
256+
targetShard(shard4, node4),
257+
targetShard(shard5, warmNode1)
258+
)
259+
);
260+
var sent = Collections.synchronizedList(new ArrayList<NodeRequest>());
261+
var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
262+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
263+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
264+
});
265+
safeGet(future);
266+
assertThat(sent.size(), equalTo(5));
267+
assertThat(sent.getLast().node(), equalTo(warmNode1));
268+
}
269+
247270
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
248271
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
249272
}

0 commit comments

Comments
 (0)