Skip to content

Commit afc2051

Browse files
Collect node thread pool usage for shard balancing (#131480)
Adds a new transport action to collect usage stats from the data nodes. ClusterInfoService uses the action to pull thread pool usage information from the data nodes to the master node periodically. Also removes NodeUsageStatsForThreadPoolsCollector as a plugin interface and replaces it with a single class implementation. We no longer need a serverless collector implementation, it can all be done in stateful. Closes ES-12316
1 parent 4856d35 commit afc2051

File tree

15 files changed

+407
-135
lines changed

15 files changed

+407
-135
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionRequest;
1515
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
16+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
1617
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
1718
import org.elasticsearch.action.support.ActionFilter;
1819
import org.elasticsearch.action.support.ActionFilters;
@@ -21,6 +22,7 @@
2122
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2223
import org.elasticsearch.cluster.routing.RoutingTable;
2324
import org.elasticsearch.cluster.routing.ShardRouting;
25+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
2426
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2527
import org.elasticsearch.cluster.service.ClusterService;
2628
import org.elasticsearch.common.Strings;
@@ -39,16 +41,19 @@
3941
import org.elasticsearch.test.ESIntegTestCase;
4042
import org.elasticsearch.test.InternalTestCluster;
4143
import org.elasticsearch.test.transport.MockTransportService;
44+
import org.elasticsearch.threadpool.ThreadPool;
4245
import org.elasticsearch.transport.TransportService;
4346
import org.hamcrest.Matchers;
4447

4548
import java.util.ArrayList;
4649
import java.util.Arrays;
4750
import java.util.Collection;
51+
import java.util.Collections;
4852
import java.util.List;
4953
import java.util.Locale;
5054
import java.util.Map;
5155
import java.util.Set;
56+
import java.util.concurrent.CountDownLatch;
5257
import java.util.concurrent.atomic.AtomicBoolean;
5358

5459
import static java.util.Collections.emptySet;
@@ -334,4 +339,65 @@ public void testClusterInfoServiceInformationClearOnError() {
334339
);
335340
}
336341
}
342+
343+
public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
344+
var settings = Settings.builder()
345+
.put(
346+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
347+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
348+
)
349+
.build();
350+
var masterName = internalCluster().startMasterOnlyNode(settings);
351+
var dataNodeName = internalCluster().startDataOnlyNode(settings);
352+
ensureStableCluster(2);
353+
assertEquals(internalCluster().getMasterName(), masterName);
354+
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
355+
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);
356+
357+
// Track when the data node receives a poll from the master for the write thread pool's stats.
358+
final MockTransportService dataNodeMockTransportService = MockTransportService.getInstance(dataNodeName);
359+
final CountDownLatch nodeThreadPoolStatsPolledByMaster = new CountDownLatch(1);
360+
dataNodeMockTransportService.addRequestHandlingBehavior(
361+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
362+
(handler, request, channel, task) -> {
363+
handler.messageReceived(request, channel, task);
364+
365+
if (nodeThreadPoolStatsPolledByMaster.getCount() > 0) {
366+
logger.info("---> Data node received a request for thread pool stats");
367+
}
368+
nodeThreadPoolStatsPolledByMaster.countDown();
369+
}
370+
);
371+
372+
// Do some writes to create some write thread pool activity.
373+
final String indexName = randomIdentifier();
374+
for (int i = 0; i < randomIntBetween(1, 1000); i++) {
375+
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
376+
}
377+
378+
// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
379+
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
380+
InternalClusterInfoService.class,
381+
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
382+
);
383+
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
384+
385+
// Verify that the data node received a request for thread pool stats.
386+
safeAwait(nodeThreadPoolStatsPolledByMaster);
387+
388+
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
389+
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
390+
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg
391+
var dataNodeId = getNodeId(dataNodeName);
392+
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
393+
assertNotNull(nodeUsageStatsForThreadPool);
394+
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);
395+
396+
assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
397+
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
398+
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
399+
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
400+
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
401+
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
402+
}
337403
}

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
2626
import org.elasticsearch.cluster.InternalClusterInfoService;
2727
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
28-
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
2928
import org.elasticsearch.cluster.metadata.IndexMetadata;
3029
import org.elasticsearch.cluster.metadata.ProjectId;
3130
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -92,7 +91,6 @@
9291
import java.util.Arrays;
9392
import java.util.Collection;
9493
import java.util.Collections;
95-
import java.util.HashMap;
9694
import java.util.List;
9795
import java.util.Locale;
9896
import java.util.Map;
@@ -135,11 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
135133

136134
@Override
137135
protected Collection<Class<? extends Plugin>> getPlugins() {
138-
return pluginList(
139-
InternalSettingsPlugin.class,
140-
BogusEstimatedHeapUsagePlugin.class,
141-
BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class
142-
);
136+
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class);
143137
}
144138

145139
public void testLockTryingToDelete() throws Exception {
@@ -334,8 +328,7 @@ public void testNodeWriteLoadsArePresent() {
334328
ClusterInfoServiceUtils.refresh(clusterInfoService);
335329
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();
336330

337-
/** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation
338-
* generates random usage values */
331+
/** Verify that each node has usage stats reported. */
339332
ClusterState state = getInstanceFromNode(ClusterService.class).state();
340333
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
341334
for (DiscoveryNode node : state.nodes()) {
@@ -348,7 +341,7 @@ public void testNodeWriteLoadsArePresent() {
348341
assertNotNull(writeThreadPoolStats);
349342
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
350343
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
351-
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
344+
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
352345
}
353346
} finally {
354347
updateClusterSettings(
@@ -993,61 +986,4 @@ public ClusterService getClusterService() {
993986
return clusterService.get();
994987
}
995988
}
996-
997-
/**
998-
* A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random
999-
* {@link NodeUsageStatsForThreadPools} for each node in the cluster.
1000-
* <p>
1001-
* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the
1002-
* plugin system can pick it up and use it for the test set-up.
1003-
*/
1004-
public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector {
1005-
1006-
private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin;
1007-
1008-
public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) {
1009-
this.plugin = plugin;
1010-
}
1011-
1012-
@Override
1013-
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
1014-
ActionListener.completeWith(
1015-
listener,
1016-
() -> plugin.getClusterService()
1017-
.state()
1018-
.nodes()
1019-
.stream()
1020-
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId())))
1021-
);
1022-
}
1023-
1024-
private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
1025-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
1026-
randomNonNegativeInt(),
1027-
randomFloat(),
1028-
randomNonNegativeLong()
1029-
);
1030-
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
1031-
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
1032-
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
1033-
}
1034-
}
1035-
1036-
/**
1037-
* Make a plugin to gain access to the {@link ClusterService} instance.
1038-
*/
1039-
public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin {
1040-
1041-
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
1042-
1043-
@Override
1044-
public Collection<?> createComponents(PluginServices services) {
1045-
clusterService.set(services.clusterService());
1046-
return List.of();
1047-
}
1048-
1049-
public ClusterService getClusterService() {
1050-
return clusterService.get();
1051-
}
1052-
}
1053989
}

server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector

Lines changed: 0 additions & 10 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ static TransportVersion def(int id) {
354354
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
355355
public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00);
356356
public static final TransportVersion ML_INFERENCE_AI21_COMPLETION_ADDED = def(9_134_0_00);
357+
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
357358

358359
/*
359360
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
3939
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
4040
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
41+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
4142
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
4243
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
4344
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
@@ -629,6 +630,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
629630
ActionRegistry actions = new ActionRegistry();
630631

631632
actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class);
633+
actions.register(TransportNodeUsageStatsForThreadPoolsAction.TYPE, TransportNodeUsageStatsForThreadPoolsAction.class);
632634
actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class);
633635
actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class);
634636
actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.node.usage;
11+
12+
import org.elasticsearch.action.FailedNodeException;
13+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
14+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
15+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
16+
import org.elasticsearch.cluster.ClusterName;
17+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.common.io.stream.StreamInput;
20+
import org.elasticsearch.common.io.stream.StreamOutput;
21+
import org.elasticsearch.transport.AbstractTransportRequest;
22+
23+
import java.io.IOException;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* Defines the request/response types for {@link TransportNodeUsageStatsForThreadPoolsAction}.
30+
*/
31+
public class NodeUsageStatsForThreadPoolsAction {
32+
/**
33+
* The sender request type that will be resolved to send individual {@link NodeRequest} requests to every node in the cluster.
34+
*/
35+
public static class Request extends BaseNodesRequest {
36+
/**
37+
* @param nodeIds The list of nodes to which to send individual requests and collect responses from. If the list is null, all nodes
38+
* in the cluster will be sent a request.
39+
*/
40+
public Request(String[] nodeIds) {
41+
super(nodeIds);
42+
}
43+
}
44+
45+
/**
46+
* Request sent to and received by a cluster node. There are no parameters needed in the node-specific request.
47+
*/
48+
public static class NodeRequest extends AbstractTransportRequest {
49+
public NodeRequest(StreamInput in) throws IOException {
50+
super(in);
51+
}
52+
53+
public NodeRequest() {}
54+
}
55+
56+
/**
57+
* A collection of {@link NodeUsageStatsForThreadPools} responses from all the cluster nodes.
58+
*/
59+
public static class Response extends BaseNodesResponse<NodeUsageStatsForThreadPoolsAction.NodeResponse> {
60+
61+
protected Response(StreamInput in) throws IOException {
62+
super(in);
63+
}
64+
65+
public Response(
66+
ClusterName clusterName,
67+
List<NodeUsageStatsForThreadPoolsAction.NodeResponse> nodeResponses,
68+
List<FailedNodeException> nodeFailures
69+
) {
70+
super(clusterName, nodeResponses, nodeFailures);
71+
}
72+
73+
/**
74+
* Combines the responses from each node that was called into a single map (by node ID) for the final {@link Response}.
75+
*/
76+
public Map<String, NodeUsageStatsForThreadPools> getAllNodeUsageStatsForThreadPools() {
77+
Map<String, NodeUsageStatsForThreadPools> allNodeUsageStatsForThreadPools = new HashMap<>();
78+
for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) {
79+
allNodeUsageStatsForThreadPools.put(
80+
nodeResponse.getNodeUsageStatsForThreadPools().nodeId(),
81+
nodeResponse.getNodeUsageStatsForThreadPools()
82+
);
83+
}
84+
return allNodeUsageStatsForThreadPools;
85+
}
86+
87+
@Override
88+
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodeResponses) throws IOException {
89+
out.writeCollection(nodeResponses);
90+
}
91+
92+
@Override
93+
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
94+
return in.readCollectionAsList(NodeUsageStatsForThreadPoolsAction.NodeResponse::new);
95+
}
96+
97+
@Override
98+
public String toString() {
99+
return "NodeUsageStatsForThreadPoolsAction.Response{" + getNodes() + "}";
100+
}
101+
}
102+
103+
/**
104+
* A {@link NodeUsageStatsForThreadPools} response from a single cluster node.
105+
*/
106+
public static class NodeResponse extends BaseNodeResponse {
107+
private final NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools;
108+
109+
protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
110+
super(in, node);
111+
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
112+
}
113+
114+
public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) {
115+
super(node);
116+
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
117+
}
118+
119+
public NodeResponse(StreamInput in) throws IOException {
120+
super(in);
121+
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
122+
}
123+
124+
public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() {
125+
return nodeUsageStatsForThreadPools;
126+
}
127+
128+
@Override
129+
public void writeTo(StreamOutput out) throws IOException {
130+
super.writeTo(out);
131+
nodeUsageStatsForThreadPools.writeTo(out);
132+
}
133+
134+
@Override
135+
public String toString() {
136+
return "NodeUsageStatsForThreadPoolsAction.NodeResponse{"
137+
+ "nodeId="
138+
+ getNode().getId()
139+
+ ", nodeUsageStatsForThreadPools="
140+
+ nodeUsageStatsForThreadPools
141+
+ "}";
142+
}
143+
}
144+
145+
}

0 commit comments

Comments
 (0)