diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f51e7209314ba..76d0d25b19e79 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -305,46 +305,27 @@ public void testHeapUsageEstimateIsPresent() { } public void testNodeWriteLoadsArePresent() { - // Disable write load decider to begin with - setWriteLoadDeciderEnablement(WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED); - InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + + // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. ClusterInfoServiceUtils.refresh(clusterInfoService); Map nodeThreadPoolStats = clusterInfoService.getClusterInfo() .getNodeUsageStatsForThreadPools(); assertNotNull(nodeThreadPoolStats); - /** Not collecting stats yet because allocation write load stats collection is disabled by default. - * see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */ - assertTrue(nodeThreadPoolStats.isEmpty()); - - // Enable collection for node write loads. - setWriteLoadDeciderEnablement( - randomBoolean() - ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED - : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY - ); - try { - // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. - ClusterInfoServiceUtils.refresh(clusterInfoService); - nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); - /** Verify that each node has usage stats reported. */ - ClusterState state = getInstanceFromNode(ClusterService.class).state(); - assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); - for (DiscoveryNode node : state.nodes()) { - assertTrue(nodeThreadPoolStats.containsKey(node.getId())); - NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId()); - assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId())); - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools - .threadPoolUsageStatsMap() - .get(ThreadPool.Names.WRITE); - assertNotNull(writeThreadPoolStats); - assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); - assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); - assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); - } - } finally { - clearWriteLoadDeciderEnablementSetting(); + /** Verify that each node has usage stats reported. */ + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); + for (DiscoveryNode node : state.nodes()) { + assertTrue(nodeThreadPoolStats.containsKey(node.getId())); + NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId()); + assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId())); + NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + assertNotNull(writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } } @@ -361,10 +342,10 @@ public void testShardWriteLoadsArePresent() { final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); - // Explicitly disable write load decider - setWriteLoadDeciderEnablement(WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED); - try { + // Explicitly disable write load decider + setWriteLoadDeciderEnablement(WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED); + // Stats should not be collected when the decider is disabled { ClusterInfoServiceUtils.refresh(clusterInfoService); diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 9c0c44fa8cbae..e3724d8dbc989 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -218,7 +218,7 @@ void execute() { maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled()); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); - maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); + fetchNodesUsageStatsForThreadPools(); } } @@ -257,34 +257,25 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } } - private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) { - if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) { - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodesUsageStatsForThreadPools(); - } - } else { - logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled"); - nodeThreadPoolUsageStatsPerNode = Map.of(); - } - } - private void fetchNodesUsageStatsForThreadPools() { - nodeUsageStatsForThreadPoolsCollector.collectUsageStats( - client, - clusterStateSupplier.get(), - ActionListener.releaseAfter(new ActionListener<>() { - @Override - public void onResponse(Map threadPoolStats) { - nodeThreadPoolUsageStatsPerNode = threadPoolStats; - } + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + nodeUsageStatsForThreadPoolsCollector.collectUsageStats( + client, + clusterStateSupplier.get(), + ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map threadPoolStats) { + nodeThreadPoolUsageStatsPerNode = threadPoolStats; + } - @Override - public void onFailure(Exception e) { - logger.warn("failed to fetch thread pool usage estimates for nodes", e); - nodeThreadPoolUsageStatsPerNode = Map.of(); - } - }, fetchRefs.acquire()) - ); + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch thread pool usage estimates for nodes", e); + nodeThreadPoolUsageStatsPerNode = Map.of(); + } + }, fetchRefs.acquire()) + ); + } } private void fetchNodesEstimatedHeapUsage() {