Skip to content

Commit 7403ca9

Browse files
Always fetch node usage stats for write load decider (#136212)
The NodeUsageStats Action will not be unconditionally called every 30 seconds, regardless of the write load decider being enabled. It is a lightweight action that fetches stats that are collected on the node regardless. This supports better seeing what happens when the write load decider is enabled, providing a before- (not just after) enablement picture. ES-12631
1 parent f74406d commit 7403ca9

File tree

2 files changed

+36
-64
lines changed

2 files changed

+36
-64
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -305,46 +305,27 @@ public void testHeapUsageEstimateIsPresent() {
305305
}
306306

307307
public void testNodeWriteLoadsArePresent() {
308-
// Disable write load decider to begin with
309-
setWriteLoadDeciderEnablement(WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED);
310-
311308
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
309+
310+
// Force a ClusterInfo refresh to run collection of the node thread pool usage stats.
312311
ClusterInfoServiceUtils.refresh(clusterInfoService);
313312
Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolStats = clusterInfoService.getClusterInfo()
314313
.getNodeUsageStatsForThreadPools();
315314
assertNotNull(nodeThreadPoolStats);
316-
/** Not collecting stats yet because allocation write load stats collection is disabled by default.
317-
* see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */
318-
assertTrue(nodeThreadPoolStats.isEmpty());
319-
320-
// Enable collection for node write loads.
321-
setWriteLoadDeciderEnablement(
322-
randomBoolean()
323-
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
324-
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
325-
);
326-
try {
327-
// Force a ClusterInfo refresh to run collection of the node thread pool usage stats.
328-
ClusterInfoServiceUtils.refresh(clusterInfoService);
329-
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();
330315

331-
/** Verify that each node has usage stats reported. */
332-
ClusterState state = getInstanceFromNode(ClusterService.class).state();
333-
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
334-
for (DiscoveryNode node : state.nodes()) {
335-
assertTrue(nodeThreadPoolStats.containsKey(node.getId()));
336-
NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId());
337-
assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId()));
338-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools
339-
.threadPoolUsageStatsMap()
340-
.get(ThreadPool.Names.WRITE);
341-
assertNotNull(writeThreadPoolStats);
342-
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
343-
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
344-
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
345-
}
346-
} finally {
347-
clearWriteLoadDeciderEnablementSetting();
316+
/** Verify that each node has usage stats reported. */
317+
ClusterState state = getInstanceFromNode(ClusterService.class).state();
318+
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
319+
for (DiscoveryNode node : state.nodes()) {
320+
assertTrue(nodeThreadPoolStats.containsKey(node.getId()));
321+
NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId());
322+
assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId()));
323+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap()
324+
.get(ThreadPool.Names.WRITE);
325+
assertNotNull(writeThreadPoolStats);
326+
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
327+
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
328+
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
348329
}
349330
}
350331

@@ -361,10 +342,10 @@ public void testShardWriteLoadsArePresent() {
361342

362343
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
363344

364-
// Explicitly disable write load decider
365-
setWriteLoadDeciderEnablement(WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED);
366-
367345
try {
346+
// Explicitly disable write load decider
347+
setWriteLoadDeciderEnablement(WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED);
348+
368349
// Stats should not be collected when the decider is disabled
369350
{
370351
ClusterInfoServiceUtils.refresh(clusterInfoService);

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ void execute() {
218218
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled());
219219
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
220220
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
221-
maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled);
221+
fetchNodesUsageStatsForThreadPools();
222222
}
223223
}
224224

@@ -257,34 +257,25 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) {
257257
}
258258
}
259259

260-
private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) {
261-
if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) {
262-
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
263-
fetchNodesUsageStatsForThreadPools();
264-
}
265-
} else {
266-
logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled");
267-
nodeThreadPoolUsageStatsPerNode = Map.of();
268-
}
269-
}
270-
271260
private void fetchNodesUsageStatsForThreadPools() {
272-
nodeUsageStatsForThreadPoolsCollector.collectUsageStats(
273-
client,
274-
clusterStateSupplier.get(),
275-
ActionListener.releaseAfter(new ActionListener<>() {
276-
@Override
277-
public void onResponse(Map<String, NodeUsageStatsForThreadPools> threadPoolStats) {
278-
nodeThreadPoolUsageStatsPerNode = threadPoolStats;
279-
}
261+
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
262+
nodeUsageStatsForThreadPoolsCollector.collectUsageStats(
263+
client,
264+
clusterStateSupplier.get(),
265+
ActionListener.releaseAfter(new ActionListener<>() {
266+
@Override
267+
public void onResponse(Map<String, NodeUsageStatsForThreadPools> threadPoolStats) {
268+
nodeThreadPoolUsageStatsPerNode = threadPoolStats;
269+
}
280270

281-
@Override
282-
public void onFailure(Exception e) {
283-
logger.warn("failed to fetch thread pool usage estimates for nodes", e);
284-
nodeThreadPoolUsageStatsPerNode = Map.of();
285-
}
286-
}, fetchRefs.acquire())
287-
);
271+
@Override
272+
public void onFailure(Exception e) {
273+
logger.warn("failed to fetch thread pool usage estimates for nodes", e);
274+
nodeThreadPoolUsageStatsPerNode = Map.of();
275+
}
276+
}, fetchRefs.acquire())
277+
);
278+
}
288279
}
289280

290281
private void fetchNodesEstimatedHeapUsage() {

0 commit comments

Comments
 (0)