From 640f0da5de4778bc69891ff0010a834a0710f610 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 1 Sep 2025 14:04:14 +1000 Subject: [PATCH 01/12] Use the last good NodeUsageStatsForThreadPools when a node returns an error --- ...NodeUsageStatsForThreadPoolsCollector.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index 0921bde87d353..b5e02bc4c115f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -11,16 +11,22 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import java.util.Arrays; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Collects the thread pool usage stats for each node in the cluster. *

- * Results are returned as a map of node ID to node usage stats. + * Results are returned as a map of node ID to node usage stats. Keeps track of the most recent + * usage stats for each node, which will be returned in the event of a failure response from that node. */ public class NodeUsageStatsForThreadPoolsCollector { public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() { @@ -37,6 +43,8 @@ public void collectUsageStats( "transport_node_usage_stats_for_thread_pools_action" ); + private final Map lastNodeUsageStatsForThreadPools = new ConcurrentHashMap<>(); + /** * Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster. * @@ -47,15 +55,39 @@ public void collectUsageStats( ClusterState clusterState, ActionListener> listener ) { - var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new); + var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new); + // Discard last-seen values for any nodes no longer present in the cluster state + lastNodeUsageStatsForThreadPools.keySet().retainAll(Arrays.asList(dataNodeIds)); if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) { client.execute( TransportNodeUsageStatsForThreadPoolsAction.TYPE, new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds), - listener.map(response -> response.getAllNodeUsageStatsForThreadPools()) + listener.map(this::replaceFailuresWithLastSeenValues) ); } else { listener.onResponse(Map.of()); } } + + private Map replaceFailuresWithLastSeenValues( + NodeUsageStatsForThreadPoolsAction.Response response + ) { + final Map returnedUsageStats = response.getAllNodeUsageStatsForThreadPools(); + // Update the last-seen usage stats + this.lastNodeUsageStatsForThreadPools.putAll(returnedUsageStats); + + if (response.hasFailures() == false) { + return returnedUsageStats; + } + + // Add in the last-seen usage stats for any nodes that failed to respond + final Map cachedValuesForFailed = new HashMap<>(returnedUsageStats); + for (FailedNodeException failedNodeException : response.failures()) { + NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = lastNodeUsageStatsForThreadPools.get(failedNodeException.nodeId()); + if (nodeUsageStatsForThreadPools != null) { + cachedValuesForFailed.put(failedNodeException.nodeId(), nodeUsageStatsForThreadPools); + } + } + return cachedValuesForFailed; + } } From 8c087f428fecc548e1a87c81f865edfe8f3c1502 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 1 Sep 2025 14:07:12 +1000 Subject: [PATCH 02/12] Tidy --- .../cluster/NodeUsageStatsForThreadPoolsCollector.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index b5e02bc4c115f..218429b00df6d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -43,7 +43,7 @@ public void collectUsageStats( "transport_node_usage_stats_for_thread_pools_action" ); - private final Map lastNodeUsageStatsForThreadPools = new ConcurrentHashMap<>(); + private final Map lastNodeUsageStatsPerNode = new ConcurrentHashMap<>(); /** * Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster. @@ -57,7 +57,7 @@ public void collectUsageStats( ) { var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new); // Discard last-seen values for any nodes no longer present in the cluster state - lastNodeUsageStatsForThreadPools.keySet().retainAll(Arrays.asList(dataNodeIds)); + lastNodeUsageStatsPerNode.keySet().retainAll(Arrays.asList(dataNodeIds)); if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) { client.execute( TransportNodeUsageStatsForThreadPoolsAction.TYPE, @@ -74,7 +74,7 @@ private Map replaceFailuresWithLastSeenVal ) { final Map returnedUsageStats = response.getAllNodeUsageStatsForThreadPools(); // Update the last-seen usage stats - this.lastNodeUsageStatsForThreadPools.putAll(returnedUsageStats); + this.lastNodeUsageStatsPerNode.putAll(returnedUsageStats); if (response.hasFailures() == false) { return returnedUsageStats; @@ -83,7 +83,7 @@ private Map replaceFailuresWithLastSeenVal // Add in the last-seen usage stats for any nodes that failed to respond final Map cachedValuesForFailed = new HashMap<>(returnedUsageStats); for (FailedNodeException failedNodeException : response.failures()) { - NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = lastNodeUsageStatsForThreadPools.get(failedNodeException.nodeId()); + final var nodeUsageStatsForThreadPools = lastNodeUsageStatsPerNode.get(failedNodeException.nodeId()); if (nodeUsageStatsForThreadPools != null) { cachedValuesForFailed.put(failedNodeException.nodeId(), nodeUsageStatsForThreadPools); } From 94d46a072de30e92f5012057be58e7cf0b6229c1 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 3 Sep 2025 15:13:25 +1000 Subject: [PATCH 03/12] Add test --- ...deUsageStatsForThreadPoolsCollectorIT.java | 126 ++++++++++++++++++ ...ortNodeUsageStatsForThreadPoolsAction.java | 5 +- .../elasticsearch/test/ESIntegTestCase.java | 12 +- 3 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java new file mode 100644 index 0000000000000..b90062b9fb285 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TestTransportChannel; + +import java.util.Objects; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // Need to enable write load decider to enable node usage stats collection + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build(); + } + + public void testMostRecentValueIsUsedWhenNodeRequestFails() { + final var dataNodeName = internalCluster().startDataOnlyNode(); + final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName); + final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName); + final var threadPoolName = randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH); + + // Intercept the node request and return some fake values + final int totalThreadPoolThreads = randomIntBetween(2, 40); + final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); + final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); + dataNodeTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + NodeUsageStatsForThreadPoolsAction.NodeResponse response = safeAwait( + l -> handler.messageReceived( + request, + new TestTransportChannel(l.map(res -> (NodeUsageStatsForThreadPoolsAction.NodeResponse) res)), + task + ) + ); + final var responseStats = response.getNodeUsageStatsForThreadPools(); + channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse( + response.getNode(), + new NodeUsageStatsForThreadPools( + responseStats.nodeId(), + Maps.copyMapWithAddedOrReplacedEntry( + responseStats.threadPoolUsageStatsMap(), + threadPoolName, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ) + ) + ) + ) + ); + } + ); + + // This info should contain our fake values + assertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // Now simulate an error + dataNodeTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + channel.sendResponse(new Exception("simulated error")); + } + ); + + // The next response should also contain our fake values + assertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + } + + private void assertThreadPoolHasStats( + String nodeId, + String threadPoolName, + int totalThreadPoolThreads, + float averageThreadPoolUtilization, + long maxThreadPoolQueueLatencyMillis + ) { + final var clusterInfo = Objects.requireNonNull(refreshClusterInfo()); + final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap(); + assertThat(usageStatsMap, hasKey(threadPoolName)); + final var threadPoolStats = usageStatsMap.get(threadPoolName); + assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads)); + assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization)); + assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 93a5c6f7dad88..0981ba84392cf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -110,11 +109,9 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( ) ); - Map perThreadPool = new HashMap<>(); - perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats); return new NodeUsageStatsForThreadPoolsAction.NodeResponse( localNode, - new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool) + new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats)) ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 8bc91ec6396cc..a563f73138fc5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -74,6 +74,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ClusterAdminClient; import org.elasticsearch.client.internal.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.ClusterModule; @@ -1572,14 +1573,21 @@ private static void ensureClusterInfoServiceRunning() { } } - public static void refreshClusterInfo() { + /** + * Refreshes the cluster info on the master + * + * @return The new cluster info if the refresh was executed, null if the ClusterInfoService was of an unknown type + */ + @Nullable + public static ClusterInfo refreshClusterInfo() { final ClusterInfoService clusterInfoService = internalCluster().getInstance( ClusterInfoService.class, internalCluster().getMasterName() ); if (clusterInfoService instanceof InternalClusterInfoService) { - ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService)); + return ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService)); } + return null; } /** From 1c6e23943cefd274d128b4f3ac25485b5ce3e40f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 4 Sep 2025 10:47:47 +1000 Subject: [PATCH 04/12] Fix comment --- .../src/main/java/org/elasticsearch/test/ESIntegTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index a563f73138fc5..ff5d9865514e0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1576,7 +1576,7 @@ private static void ensureClusterInfoServiceRunning() { /** * Refreshes the cluster info on the master * - * @return The new cluster info if the refresh was executed, null if the ClusterInfoService was of an unknown type + * @return The new cluster info if the refresh was executed, null if the {@link ClusterInfoService} was of an unknown type */ @Nullable public static ClusterInfo refreshClusterInfo() { From 8b374dd9bd7a28b2fa841d15a4afccf5720767b4 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 5 Sep 2025 16:59:23 +1000 Subject: [PATCH 05/12] Add timestamp to NodeUsageStatsForThreadPools --- ...deUsageStatsForThreadPoolsCollectorIT.java | 33 +++++++++--- .../decider/WriteLoadConstraintDeciderIT.java | 3 +- .../org/elasticsearch/TransportVersions.java | 1 + .../NodeUsageStatsForThreadPoolsAction.java | 4 +- ...ortNodeUsageStatsForThreadPoolsAction.java | 7 +-- .../elasticsearch/cluster/ClusterInfo.java | 2 +- .../cluster/NodeUsageStatsForThreadPools.java | 52 ++++++------------- .../ShardMovementWriteLoadSimulator.java | 3 +- .../cluster/ClusterInfoTests.java | 6 ++- .../ShardMovementWriteLoadSimulatorTests.java | 3 +- .../WriteLoadConstraintMonitorTests.java | 10 ++-- .../WriteLoadConstraintDeciderTests.java | 3 +- 12 files changed, 69 insertions(+), 58 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java index b90062b9fb285..338d9a4c5d448 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -14,12 +14,17 @@ import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TestTransportChannel; +import java.time.Instant; +import java.util.Collection; import java.util.Objects; import static org.hamcrest.Matchers.equalTo; @@ -40,6 +45,11 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + public void testMostRecentValueIsUsedWhenNodeRequestFails() { final var dataNodeName = internalCluster().startDataOnlyNode(); final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName); @@ -74,7 +84,8 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis ) - ) + ), + Instant.now() ) ) ); @@ -82,12 +93,13 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { ); // This info should contain our fake values - assertThreadPoolHasStats( + final var successfulStats = assertThreadPoolHasStats( dataNodeClusterService.localNode().getId(), threadPoolName, totalThreadPoolThreads, averageThreadPoolUtilization, - maxThreadPoolQueueLatencyMillis + maxThreadPoolQueueLatencyMillis, + null ); // Now simulate an error @@ -104,23 +116,30 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { threadPoolName, totalThreadPoolThreads, averageThreadPoolUtilization, - maxThreadPoolQueueLatencyMillis + maxThreadPoolQueueLatencyMillis, + successfulStats.timestamp() ); } - private void assertThreadPoolHasStats( + private NodeUsageStatsForThreadPools assertThreadPoolHasStats( String nodeId, String threadPoolName, int totalThreadPoolThreads, float averageThreadPoolUtilization, - long maxThreadPoolQueueLatencyMillis + long maxThreadPoolQueueLatencyMillis, + @Nullable Instant timestamp ) { final var clusterInfo = Objects.requireNonNull(refreshClusterInfo()); - final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap(); + final var nodeUsageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId); + if (timestamp != null) { + assertThat(nodeUsageStatsForThreadPools.timestamp(), equalTo(timestamp)); + } + final var usageStatsMap = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap(); assertThat(usageStatsMap, hasKey(threadPoolName)); final var threadPoolStats = usageStatsMap.get(threadPoolName); assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads)); assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization)); assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis)); + return nodeUsageStatsForThreadPools; } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index c8bd9ce10652b..e4c244aae22d7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -43,6 +43,7 @@ import org.elasticsearch.transport.TransportService; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -317,7 +318,7 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( ) ); - return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); + return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap, Instant.now()); } /** diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 85f7d75869085..766fafb6eb194 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -359,6 +359,7 @@ static TransportVersion def(int id) { public static final TransportVersion SEMANTIC_QUERY_MULTIPLE_INFERENCE_IDS = def(9_150_0_00); public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_151_0_00); public static final TransportVersion INFERENCE_API_DISABLE_EIS_RATE_LIMITING = def(9_152_0_00); + public static final TransportVersion TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS = def(9_153_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java index 829d1fd8eceaf..aef980ea040e0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java @@ -108,7 +108,7 @@ public static class NodeResponse extends BaseNodeResponse { protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException { super(in, node); - this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); + this.nodeUsageStatsForThreadPools = NodeUsageStatsForThreadPools.readFrom(in); } public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) { @@ -118,7 +118,7 @@ public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageSt public NodeResponse(StreamInput in) throws IOException { super(in); - this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); + this.nodeUsageStatsForThreadPools = NodeUsageStatsForThreadPools.readFrom(in); } public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 0981ba84392cf..b3a15c09f57ed 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -9,8 +9,6 @@ package org.elasticsearch.action.admin.cluster.node.usage; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -27,6 +25,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; @@ -41,8 +40,6 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA NodeUsageStatsForThreadPoolsAction.NodeResponse, Void> { - private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class); - public static final String NAME = "internal:monitor/thread_pool/stats"; public static final ActionType TYPE = new ActionType<>(NAME); @@ -111,7 +108,7 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( return new NodeUsageStatsForThreadPoolsAction.NodeResponse( localNode, - new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats)) + new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats), Instant.now()) ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 810a853c56f90..6bffdcc31f0eb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -110,7 +110,7 @@ public ClusterInfo(StreamInput in) throws IOException { this.estimatedHeapUsages = Map.of(); } if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { - this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new); + this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::readFrom); } else { this.nodeUsageStatsForThreadPools = Map.of(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index b23cbafdf9bd4..a831149a0abf8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -9,13 +9,14 @@ package org.elasticsearch.cluster; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; +import java.time.Instant; import java.util.Map; -import java.util.Objects; /** * Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name. @@ -24,48 +25,29 @@ * @param threadPoolUsageStatsMap A map of thread pool name ({@link org.elasticsearch.threadpool.ThreadPool.Names}) to the thread pool's * usage stats ({@link ThreadPoolUsageStats}). */ -public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap) implements Writeable { +public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap, Instant timestamp) + implements + Writeable { - public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { - this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new)); + public static NodeUsageStatsForThreadPools readFrom(StreamInput in) throws IOException { + final var nodeId = in.readString(); + final var threadPoolUsageStatsMap = in.readImmutableMap(ThreadPoolUsageStats::new); + final Instant receivedTime; + if (in.getTransportVersion().onOrAfter(TransportVersions.TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS)) { + receivedTime = in.readInstant(); + } else { + receivedTime = Instant.now(); + } + return new NodeUsageStatsForThreadPools(nodeId, threadPoolUsageStatsMap, receivedTime); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); out.writeMap(this.threadPoolUsageStatsMap, StreamOutput::writeWriteable); - } - - @Override - public int hashCode() { - return Objects.hash(nodeId, threadPoolUsageStatsMap); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; - for (var entry : other.threadPoolUsageStatsMap.entrySet()) { - if (nodeId.equals(other.nodeId) == false) { - return false; - } - var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); - if (loadStats == null || loadStats.equals(entry.getValue()) == false) { - return false; - } - } - return true; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=["); - for (var entry : threadPoolUsageStatsMap.entrySet()) { - builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}"); + if (out.getTransportVersion().onOrAfter(TransportVersions.TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS)) { + out.writeInstant(this.timestamp); } - builder.append("]}"); - return builder.toString(); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 81eace527d04a..76fa0c11ec80d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -80,7 +80,8 @@ public Map simulatedNodeUsageStatsForThrea simulatedNodeWriteLoadDeltas.get(entry.getKey()), nodesWithMovedAwayShard.contains(entry.getKey()) ) - ) + ), + entry.getValue().timestamp() ); adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue); } else { diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index d32f53fdfdee1..ace7126562ab5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -86,7 +87,10 @@ private static Map randomNodeUsageStatsFor ); Map usageStatsForThreadPools = new HashMap<>(); usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); - nodeUsageStatsForThreadPools.put(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools)); + nodeUsageStatsForThreadPools.put( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools, randomInstantBetween(Instant.MIN, Instant.MAX)) + ); } return nodeUsageStatsForThreadPools; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 6178f25976f51..532346ec783cc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; +import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -212,7 +213,7 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( final var nodeThreadPoolStats = arrayOfNodeThreadPoolStats[i]; if (nodeThreadPoolStats != null) { final var nodeId = "node_" + i; - nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats))); + nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats), Instant.now())); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java index fea79d92c9fbf..05732e9bbd5c7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Instant; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -260,7 +261,8 @@ public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHot tpStats.averageThreadPoolUtilization(), testState.latencyThresholdMillis + randomLongBetween(1, 100_000) ) - ) + ), + Instant.now() ); } return stats; @@ -369,7 +371,8 @@ private static ClusterInfo createClusterInfoWithHotSpots( randomFloatBetween(0f, 1f, true), randomLongBetween(queueLatencyThresholdMillis + 1, queueLatencyThresholdMillis * 2) ) - ) + ), + Instant.now() ); } else { // not-hot-spotting node @@ -382,7 +385,8 @@ private static ClusterInfo createClusterInfoWithHotSpots( randomFloatBetween(0f, maxRatioForUnderUtilised, true), randomLongBetween(0, queueLatencyThresholdMillis) ) - ) + ), + Instant.now() ); } }))) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 12bfd8a0a4789..6c12afb6318b0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Instant; import java.util.HashMap; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; @@ -301,6 +302,6 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( threadPoolUsageMap.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); // Create the node's thread pool usage map - return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); + return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap, Instant.now()); } } From 5fdf7d1ce9aad9201f15e047b6a56ce217d76c39 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 11:35:19 +1000 Subject: [PATCH 06/12] Revert "Add timestamp to NodeUsageStatsForThreadPools" This reverts commit 8b374dd9 --- ...deUsageStatsForThreadPoolsCollectorIT.java | 25 +++------ .../decider/WriteLoadConstraintDeciderIT.java | 3 +- .../org/elasticsearch/TransportVersions.java | 1 - .../NodeUsageStatsForThreadPoolsAction.java | 4 +- ...ortNodeUsageStatsForThreadPoolsAction.java | 7 ++- .../elasticsearch/cluster/ClusterInfo.java | 2 +- .../cluster/NodeUsageStatsForThreadPools.java | 52 +++++++++++++------ .../ShardMovementWriteLoadSimulator.java | 3 +- .../cluster/ClusterInfoTests.java | 6 +-- .../ShardMovementWriteLoadSimulatorTests.java | 3 +- .../WriteLoadConstraintMonitorTests.java | 10 ++-- .../WriteLoadConstraintDeciderTests.java | 3 +- 12 files changed, 58 insertions(+), 61 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java index 338d9a4c5d448..e32343587e129 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -16,14 +16,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; -import org.elasticsearch.core.Nullable; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TestTransportChannel; -import java.time.Instant; import java.util.Collection; import java.util.Objects; @@ -84,8 +82,7 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis ) - ), - Instant.now() + ) ) ) ); @@ -93,13 +90,12 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { ); // This info should contain our fake values - final var successfulStats = assertThreadPoolHasStats( + assertThreadPoolHasStats( dataNodeClusterService.localNode().getId(), threadPoolName, totalThreadPoolThreads, averageThreadPoolUtilization, - maxThreadPoolQueueLatencyMillis, - null + maxThreadPoolQueueLatencyMillis ); // Now simulate an error @@ -116,30 +112,23 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { threadPoolName, totalThreadPoolThreads, averageThreadPoolUtilization, - maxThreadPoolQueueLatencyMillis, - successfulStats.timestamp() + maxThreadPoolQueueLatencyMillis ); } - private NodeUsageStatsForThreadPools assertThreadPoolHasStats( + private void assertThreadPoolHasStats( String nodeId, String threadPoolName, int totalThreadPoolThreads, float averageThreadPoolUtilization, - long maxThreadPoolQueueLatencyMillis, - @Nullable Instant timestamp + long maxThreadPoolQueueLatencyMillis ) { final var clusterInfo = Objects.requireNonNull(refreshClusterInfo()); - final var nodeUsageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId); - if (timestamp != null) { - assertThat(nodeUsageStatsForThreadPools.timestamp(), equalTo(timestamp)); - } - final var usageStatsMap = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap(); + final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap(); assertThat(usageStatsMap, hasKey(threadPoolName)); final var threadPoolStats = usageStatsMap.get(threadPoolName); assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads)); assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization)); assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis)); - return nodeUsageStatsForThreadPools; } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index bec11c8091a91..ec584b1d0973d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -43,7 +43,6 @@ import org.elasticsearch.transport.TransportService; import java.nio.file.Path; -import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -376,7 +375,7 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( ) ); - return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap, Instant.now()); + return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); } /** diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index b42ffbc905894..9195e23f92ee4 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -344,7 +344,6 @@ static TransportVersion def(int id) { public static final TransportVersion INFERENCE_API_DISABLE_EIS_RATE_LIMITING = def(9_152_0_00); public static final TransportVersion GEMINI_THINKING_BUDGET_ADDED = def(9_153_0_00); public static final TransportVersion VISIT_PERCENTAGE = def(9_154_0_00); - public static final TransportVersion TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS = def(9_155_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java index aef980ea040e0..829d1fd8eceaf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java @@ -108,7 +108,7 @@ public static class NodeResponse extends BaseNodeResponse { protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException { super(in, node); - this.nodeUsageStatsForThreadPools = NodeUsageStatsForThreadPools.readFrom(in); + this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); } public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) { @@ -118,7 +118,7 @@ public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageSt public NodeResponse(StreamInput in) throws IOException { super(in); - this.nodeUsageStatsForThreadPools = NodeUsageStatsForThreadPools.readFrom(in); + this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); } public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index eed17eb425e1f..4eea9592bee37 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -9,6 +9,8 @@ package org.elasticsearch.action.admin.cluster.node.usage; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -27,7 +29,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -45,6 +46,8 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA NodeUsageStatsForThreadPoolsAction.NodeResponse, Void> { + private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class); + public static final String NAME = "internal:monitor/thread_pool/stats"; public static final ActionType TYPE = new ActionType<>(NAME); private static final int NO_VALUE = -1; @@ -119,7 +122,7 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( return new NodeUsageStatsForThreadPoolsAction.NodeResponse( localNode, - new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats), Instant.now()) + new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats)) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 6bffdcc31f0eb..810a853c56f90 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -110,7 +110,7 @@ public ClusterInfo(StreamInput in) throws IOException { this.estimatedHeapUsages = Map.of(); } if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { - this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::readFrom); + this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new); } else { this.nodeUsageStatsForThreadPools = Map.of(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index a831149a0abf8..b23cbafdf9bd4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -9,14 +9,13 @@ package org.elasticsearch.cluster; -import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; -import java.time.Instant; import java.util.Map; +import java.util.Objects; /** * Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name. @@ -25,29 +24,48 @@ * @param threadPoolUsageStatsMap A map of thread pool name ({@link org.elasticsearch.threadpool.ThreadPool.Names}) to the thread pool's * usage stats ({@link ThreadPoolUsageStats}). */ -public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap, Instant timestamp) - implements - Writeable { +public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap) implements Writeable { - public static NodeUsageStatsForThreadPools readFrom(StreamInput in) throws IOException { - final var nodeId = in.readString(); - final var threadPoolUsageStatsMap = in.readImmutableMap(ThreadPoolUsageStats::new); - final Instant receivedTime; - if (in.getTransportVersion().onOrAfter(TransportVersions.TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS)) { - receivedTime = in.readInstant(); - } else { - receivedTime = Instant.now(); - } - return new NodeUsageStatsForThreadPools(nodeId, threadPoolUsageStatsMap, receivedTime); + public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { + this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new)); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); out.writeMap(this.threadPoolUsageStatsMap, StreamOutput::writeWriteable); - if (out.getTransportVersion().onOrAfter(TransportVersions.TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS)) { - out.writeInstant(this.timestamp); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, threadPoolUsageStatsMap); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; + for (var entry : other.threadPoolUsageStatsMap.entrySet()) { + if (nodeId.equals(other.nodeId) == false) { + return false; + } + var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); + if (loadStats == null || loadStats.equals(entry.getValue()) == false) { + return false; + } + } + return true; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=["); + for (var entry : threadPoolUsageStatsMap.entrySet()) { + builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}"); } + builder.append("]}"); + return builder.toString(); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 76fa0c11ec80d..81eace527d04a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -80,8 +80,7 @@ public Map simulatedNodeUsageStatsForThrea simulatedNodeWriteLoadDeltas.get(entry.getKey()), nodesWithMovedAwayShard.contains(entry.getKey()) ) - ), - entry.getValue().timestamp() + ) ); adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue); } else { diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index ace7126562ab5..d32f53fdfdee1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.threadpool.ThreadPool; -import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -87,10 +86,7 @@ private static Map randomNodeUsageStatsFor ); Map usageStatsForThreadPools = new HashMap<>(); usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); - nodeUsageStatsForThreadPools.put( - ThreadPool.Names.WRITE, - new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools, randomInstantBetween(Instant.MIN, Instant.MAX)) - ); + nodeUsageStatsForThreadPools.put(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools)); } return nodeUsageStatsForThreadPools; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 532346ec783cc..6178f25976f51 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; -import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -213,7 +212,7 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( final var nodeThreadPoolStats = arrayOfNodeThreadPoolStats[i]; if (nodeThreadPoolStats != null) { final var nodeId = "node_" + i; - nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats), Instant.now())); + nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats))); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java index 05732e9bbd5c7..fea79d92c9fbf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; -import java.time.Instant; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -261,8 +260,7 @@ public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHot tpStats.averageThreadPoolUtilization(), testState.latencyThresholdMillis + randomLongBetween(1, 100_000) ) - ), - Instant.now() + ) ); } return stats; @@ -371,8 +369,7 @@ private static ClusterInfo createClusterInfoWithHotSpots( randomFloatBetween(0f, 1f, true), randomLongBetween(queueLatencyThresholdMillis + 1, queueLatencyThresholdMillis * 2) ) - ), - Instant.now() + ) ); } else { // not-hot-spotting node @@ -385,8 +382,7 @@ private static ClusterInfo createClusterInfoWithHotSpots( randomFloatBetween(0f, maxRatioForUnderUtilised, true), randomLongBetween(0, queueLatencyThresholdMillis) ) - ), - Instant.now() + ) ); } }))) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 6c12afb6318b0..12bfd8a0a4789 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import java.time.Instant; import java.util.HashMap; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; @@ -302,6 +301,6 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( threadPoolUsageMap.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); // Create the node's thread pool usage map - return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap, Instant.now()); + return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); } } From 6310e24fd1e97416aef8a1d96c5f4dd93a9ab6f2 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 11:55:49 +1000 Subject: [PATCH 07/12] Log when we get no stats back from a node, return a copy of the last seen stats --- ...ortNodeUsageStatsForThreadPoolsAction.java | 4 -- ...NodeUsageStatsForThreadPoolsCollector.java | 38 +++++++------------ 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 4eea9592bee37..727f05ef14915 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -9,8 +9,6 @@ package org.elasticsearch.action.admin.cluster.node.usage; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -46,8 +44,6 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA NodeUsageStatsForThreadPoolsAction.NodeResponse, Void> { - private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class); - public static final String NAME = "internal:monitor/thread_pool/stats"; public static final ActionType TYPE = new ActionType<>(NAME); private static final int NO_VALUE = -1; diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index 218429b00df6d..14fda10669191 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -9,6 +9,8 @@ package org.elasticsearch.cluster; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -18,9 +20,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import java.util.Arrays; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * Collects the thread pool usage stats for each node in the cluster. @@ -43,6 +45,8 @@ public void collectUsageStats( "transport_node_usage_stats_for_thread_pools_action" ); + private static final Logger logger = LogManager.getLogger(NodeUsageStatsForThreadPoolsCollector.class); + private final Map lastNodeUsageStatsPerNode = new ConcurrentHashMap<>(); /** @@ -62,32 +66,18 @@ public void collectUsageStats( client.execute( TransportNodeUsageStatsForThreadPoolsAction.TYPE, new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds), - listener.map(this::replaceFailuresWithLastSeenValues) + listener.map(response -> { + // Update last seen stats + lastNodeUsageStatsPerNode.putAll(response.getAllNodeUsageStatsForThreadPools()); + logger.warn( + "Got no usage stats from nodes {}", + response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.joining(", ")) + ); + return Map.copyOf(lastNodeUsageStatsPerNode); + }) ); } else { listener.onResponse(Map.of()); } } - - private Map replaceFailuresWithLastSeenValues( - NodeUsageStatsForThreadPoolsAction.Response response - ) { - final Map returnedUsageStats = response.getAllNodeUsageStatsForThreadPools(); - // Update the last-seen usage stats - this.lastNodeUsageStatsPerNode.putAll(returnedUsageStats); - - if (response.hasFailures() == false) { - return returnedUsageStats; - } - - // Add in the last-seen usage stats for any nodes that failed to respond - final Map cachedValuesForFailed = new HashMap<>(returnedUsageStats); - for (FailedNodeException failedNodeException : response.failures()) { - final var nodeUsageStatsForThreadPools = lastNodeUsageStatsPerNode.get(failedNodeException.nodeId()); - if (nodeUsageStatsForThreadPools != null) { - cachedValuesForFailed.put(failedNodeException.nodeId(), nodeUsageStatsForThreadPools); - } - } - return cachedValuesForFailed; - } } From 355f6d9d8931a2af11a071ba939c83d4633a9b7d Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 12:08:07 +1000 Subject: [PATCH 08/12] Naming --- .../cluster/NodeUsageStatsForThreadPoolsCollectorIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java index e32343587e129..8496edd4c6c58 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -90,7 +90,7 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { ); // This info should contain our fake values - assertThreadPoolHasStats( + refreshClusterInfoAndAssertThreadPoolHasStats( dataNodeClusterService.localNode().getId(), threadPoolName, totalThreadPoolThreads, @@ -107,7 +107,7 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { ); // The next response should also contain our fake values - assertThreadPoolHasStats( + refreshClusterInfoAndAssertThreadPoolHasStats( dataNodeClusterService.localNode().getId(), threadPoolName, totalThreadPoolThreads, @@ -116,7 +116,7 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { ); } - private void assertThreadPoolHasStats( + private void refreshClusterInfoAndAssertThreadPoolHasStats( String nodeId, String threadPoolName, int totalThreadPoolThreads, From 3decebd850f5451d2c888b64db9a3041bc25c59a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 12:10:39 +1000 Subject: [PATCH 09/12] Only log when there are failures --- .../cluster/NodeUsageStatsForThreadPoolsCollector.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index 14fda10669191..f30b8e03b7891 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -69,10 +69,12 @@ public void collectUsageStats( listener.map(response -> { // Update last seen stats lastNodeUsageStatsPerNode.putAll(response.getAllNodeUsageStatsForThreadPools()); - logger.warn( - "Got no usage stats from nodes {}", - response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.joining(", ")) - ); + if (response.failures().isEmpty() == false) { + logger.warn( + "Got no usage stats from nodes {}", + response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.joining(", ")) + ); + } return Map.copyOf(lastNodeUsageStatsPerNode); }) ); From ff795ac95aa1fbbe515f2a3d34202594c62bb336 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 19:09:52 +1000 Subject: [PATCH 10/12] Apply suggestion from @ywangd Co-authored-by: Yang Wang --- .../cluster/NodeUsageStatsForThreadPoolsCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index f30b8e03b7891..e455082d4d580 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -71,7 +71,7 @@ public void collectUsageStats( lastNodeUsageStatsPerNode.putAll(response.getAllNodeUsageStatsForThreadPools()); if (response.failures().isEmpty() == false) { logger.warn( - "Got no usage stats from nodes {}", + "Got no usage stats from nodes [{}], using last known stats for them", response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.joining(", ")) ); } From d5b17abaa72121a12417c4f411ecd7930bfbabd3 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 18:59:41 +1000 Subject: [PATCH 11/12] Remove unneeded annotation --- .../cluster/NodeUsageStatsForThreadPoolsCollectorIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java index 8496edd4c6c58..afade773e7426 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -28,7 +28,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase { @Override From ee9acc5bb29f3c2dde9e8babadf93ac00bf5b4b4 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 19:09:24 +1000 Subject: [PATCH 12/12] Test that current values are returned again after errors --- ...deUsageStatsForThreadPoolsCollectorIT.java | 91 +++++++++++++------ 1 file changed, 65 insertions(+), 26 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java index afade773e7426..9abf24939abc2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -57,6 +57,71 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { final int totalThreadPoolThreads = randomIntBetween(2, 40); final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); + mockThreadPoolUsageStats( + dataNodeTransportService, + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // This info should contain our fake values + refreshClusterInfoAndAssertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // Now simulate an error + dataNodeTransportService.clearInboundRules(); + dataNodeTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + channel.sendResponse(new Exception("simulated error")); + } + ); + + // The next response should also contain our fake values + refreshClusterInfoAndAssertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // Now start returning values again + final int newTotalThreadPoolThreads = randomIntBetween(2, 40); + final float newAverageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); + final long newMaxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); + mockThreadPoolUsageStats( + dataNodeTransportService, + threadPoolName, + newTotalThreadPoolThreads, + newAverageThreadPoolUtilization, + newMaxThreadPoolQueueLatencyMillis + ); + + // The next response should contain the current values again + refreshClusterInfoAndAssertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + newTotalThreadPoolThreads, + newAverageThreadPoolUtilization, + newMaxThreadPoolQueueLatencyMillis + ); + } + + private static void mockThreadPoolUsageStats( + MockTransportService dataNodeTransportService, + String threadPoolName, + int totalThreadPoolThreads, + float averageThreadPoolUtilization, + long maxThreadPoolQueueLatencyMillis + ) { + dataNodeTransportService.clearInboundRules(); dataNodeTransportService.addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> { @@ -87,32 +152,6 @@ public void testMostRecentValueIsUsedWhenNodeRequestFails() { ); } ); - - // This info should contain our fake values - refreshClusterInfoAndAssertThreadPoolHasStats( - dataNodeClusterService.localNode().getId(), - threadPoolName, - totalThreadPoolThreads, - averageThreadPoolUtilization, - maxThreadPoolQueueLatencyMillis - ); - - // Now simulate an error - dataNodeTransportService.addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> { - channel.sendResponse(new Exception("simulated error")); - } - ); - - // The next response should also contain our fake values - refreshClusterInfoAndAssertThreadPoolHasStats( - dataNodeClusterService.localNode().getId(), - threadPoolName, - totalThreadPoolThreads, - averageThreadPoolUtilization, - maxThreadPoolQueueLatencyMillis - ); } private void refreshClusterInfoAndAssertThreadPoolHasStats(