Skip to content

Commit 31f150c

Browse files
Collect node thread pool usage for shard balancing
Adds a new transport action to collect usage stats from the data nodes. ClusterInfoService uses the action to pull thread pool usage information from the data nodes to the master node periodically. Also removes NodeUsageStatsForThreadPoolsCollector as an interface/plugin and replaces it with a single class implementation. Closes ES-12316
1 parent 560ffb9 commit 31f150c

File tree

14 files changed

+408
-133
lines changed

14 files changed

+408
-133
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionRequest;
1515
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
16+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
1617
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
1718
import org.elasticsearch.action.support.ActionFilter;
1819
import org.elasticsearch.action.support.ActionFilters;
@@ -21,6 +22,7 @@
2122
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2223
import org.elasticsearch.cluster.routing.RoutingTable;
2324
import org.elasticsearch.cluster.routing.ShardRouting;
25+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
2426
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2527
import org.elasticsearch.cluster.service.ClusterService;
2628
import org.elasticsearch.common.Strings;
@@ -39,21 +41,25 @@
3941
import org.elasticsearch.test.ESIntegTestCase;
4042
import org.elasticsearch.test.InternalTestCluster;
4143
import org.elasticsearch.test.transport.MockTransportService;
44+
import org.elasticsearch.threadpool.ThreadPool;
4245
import org.elasticsearch.transport.TransportService;
4346
import org.hamcrest.Matchers;
4447

4548
import java.util.ArrayList;
4649
import java.util.Arrays;
4750
import java.util.Collection;
51+
import java.util.Collections;
4852
import java.util.List;
4953
import java.util.Locale;
5054
import java.util.Map;
5155
import java.util.Set;
56+
import java.util.concurrent.CountDownLatch;
5257
import java.util.concurrent.atomic.AtomicBoolean;
5358

5459
import static java.util.Collections.emptySet;
5560
import static java.util.Collections.singletonList;
5661
import static java.util.Collections.unmodifiableSet;
62+
import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING;
5763
import static org.elasticsearch.common.util.set.Sets.newHashSet;
5864
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5965
import static org.hamcrest.CoreMatchers.equalTo;
@@ -202,7 +208,7 @@ public void testClusterInfoServiceInformationClearOnError() {
202208
internalCluster().startNodes(
203209
2,
204210
// manually control publishing
205-
Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()
211+
Settings.builder().put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()
206212
);
207213
prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get();
208214
ensureGreen("test");
@@ -334,4 +340,65 @@ public void testClusterInfoServiceInformationClearOnError() {
334340
);
335341
}
336342
}
343+
344+
public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
345+
var settings = Settings.builder()
346+
.put(
347+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
348+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
349+
)
350+
.build();
351+
var masterName = internalCluster().startMasterOnlyNode(settings);
352+
var dataNodeName = internalCluster().startDataOnlyNode(settings);
353+
ensureStableCluster(2);
354+
assertEquals(internalCluster().getMasterName(), masterName);
355+
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
356+
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);
357+
358+
// Track when the data node receives a poll from the master for the write thread pool's stats.
359+
final MockTransportService dataNodeMockTransportService = MockTransportService.getInstance(dataNodeName);
360+
final CountDownLatch nodeThreadPoolStatsPolledByMaster = new CountDownLatch(1);
361+
dataNodeMockTransportService.addRequestHandlingBehavior(
362+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
363+
(handler, request, channel, task) -> {
364+
handler.messageReceived(request, channel, task);
365+
366+
if (nodeThreadPoolStatsPolledByMaster.getCount() > 0) {
367+
logger.info("---> Data node received a request for thread pool stats");
368+
}
369+
nodeThreadPoolStatsPolledByMaster.countDown();
370+
}
371+
);
372+
373+
// Do some writes to create some write thread pool activity.
374+
final String indexName = randomIdentifier();
375+
for (int i = 0; i < randomIntBetween(1, 1000); i++) {
376+
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
377+
}
378+
379+
// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
380+
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
381+
InternalClusterInfoService.class,
382+
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
383+
);
384+
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
385+
386+
// Verify that the data node received a request for thread pool stats.
387+
safeAwait(nodeThreadPoolStatsPolledByMaster);
388+
389+
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
390+
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
391+
assertThat(usageStatsForThreadPools.size(), equalTo(2)); // master and data node
392+
var dataNodeId = getNodeId(dataNodeName);
393+
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
394+
assertNotNull(nodeUsageStatsForThreadPool);
395+
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);
396+
397+
assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
398+
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
399+
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
400+
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
401+
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
402+
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
403+
}
337404
}

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
2626
import org.elasticsearch.cluster.InternalClusterInfoService;
2727
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
28-
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
2928
import org.elasticsearch.cluster.metadata.IndexMetadata;
3029
import org.elasticsearch.cluster.node.DiscoveryNode;
3130
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -91,7 +90,6 @@
9190
import java.util.Arrays;
9291
import java.util.Collection;
9392
import java.util.Collections;
94-
import java.util.HashMap;
9593
import java.util.List;
9694
import java.util.Locale;
9795
import java.util.Map;
@@ -133,11 +131,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
133131

134132
@Override
135133
protected Collection<Class<? extends Plugin>> getPlugins() {
136-
return pluginList(
137-
InternalSettingsPlugin.class,
138-
BogusEstimatedHeapUsagePlugin.class,
139-
BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class
140-
);
134+
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class);
141135
}
142136

143137
public void testLockTryingToDelete() throws Exception {
@@ -332,8 +326,7 @@ public void testNodeWriteLoadsArePresent() {
332326
ClusterInfoServiceUtils.refresh(clusterInfoService);
333327
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();
334328

335-
/** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation
336-
* generates random usage values */
329+
/** Verify that each node has usage stats reported. */
337330
ClusterState state = getInstanceFromNode(ClusterService.class).state();
338331
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
339332
for (DiscoveryNode node : state.nodes()) {
@@ -346,7 +339,7 @@ public void testNodeWriteLoadsArePresent() {
346339
assertNotNull(writeThreadPoolStats);
347340
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
348341
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
349-
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
342+
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
350343
}
351344
} finally {
352345
updateClusterSettings(
@@ -935,61 +928,4 @@ public ClusterService getClusterService() {
935928
return clusterService.get();
936929
}
937930
}
938-
939-
/**
940-
* A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random
941-
* {@link NodeUsageStatsForThreadPools} for each node in the cluster.
942-
* <p>
943-
* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the
944-
* plugin system can pick it up and use it for the test set-up.
945-
*/
946-
public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector {
947-
948-
private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin;
949-
950-
public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) {
951-
this.plugin = plugin;
952-
}
953-
954-
@Override
955-
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
956-
ActionListener.completeWith(
957-
listener,
958-
() -> plugin.getClusterService()
959-
.state()
960-
.nodes()
961-
.stream()
962-
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId())))
963-
);
964-
}
965-
966-
private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
967-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
968-
randomNonNegativeInt(),
969-
randomFloat(),
970-
randomNonNegativeLong()
971-
);
972-
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
973-
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
974-
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
975-
}
976-
}
977-
978-
/**
979-
* Make a plugin to gain access to the {@link ClusterService} instance.
980-
*/
981-
public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin {
982-
983-
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
984-
985-
@Override
986-
public Collection<?> createComponents(PluginServices services) {
987-
clusterService.set(services.clusterService());
988-
return List.of();
989-
}
990-
991-
public ClusterService getClusterService() {
992-
return clusterService.get();
993-
}
994-
}
995931
}

server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector

Lines changed: 0 additions & 10 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ static TransportVersion def(int id) {
342342
public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00);
343343
public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00);
344344
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
345+
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_124_0_00);
345346

346347
/*
347348
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
3939
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
4040
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
41+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
4142
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
4243
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
4344
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
@@ -629,6 +630,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
629630
ActionRegistry actions = new ActionRegistry();
630631

631632
actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class);
633+
actions.register(TransportNodeUsageStatsForThreadPoolsAction.TYPE, TransportNodeUsageStatsForThreadPoolsAction.class);
632634
actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class);
633635
actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class);
634636
actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);

0 commit comments

Comments
 (0)