diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java new file mode 100644 index 0000000000000..9abf24939abc2 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollectorIT.java @@ -0,0 +1,172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TestTransportChannel; + +import java.util.Collection; +import java.util.Objects; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; + +public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // Need to enable write load decider to enable node usage stats collection + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + + public void testMostRecentValueIsUsedWhenNodeRequestFails() { + final var dataNodeName = internalCluster().startDataOnlyNode(); + final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName); + final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName); + final var threadPoolName = randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH); + + // Intercept the node request and return some fake values + final int totalThreadPoolThreads = randomIntBetween(2, 40); + final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); + final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); + mockThreadPoolUsageStats( + dataNodeTransportService, + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // This info should contain our fake values + refreshClusterInfoAndAssertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // Now simulate an error + dataNodeTransportService.clearInboundRules(); + dataNodeTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + channel.sendResponse(new Exception("simulated error")); + } + ); + + // The next response should also contain our fake values + refreshClusterInfoAndAssertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ); + + // Now start returning values again + final int newTotalThreadPoolThreads = randomIntBetween(2, 40); + final float newAverageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); + final long newMaxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); + mockThreadPoolUsageStats( + dataNodeTransportService, + threadPoolName, + newTotalThreadPoolThreads, + newAverageThreadPoolUtilization, + newMaxThreadPoolQueueLatencyMillis + ); + + // The next response should contain the current values again + refreshClusterInfoAndAssertThreadPoolHasStats( + dataNodeClusterService.localNode().getId(), + threadPoolName, + newTotalThreadPoolThreads, + newAverageThreadPoolUtilization, + newMaxThreadPoolQueueLatencyMillis + ); + } + + private static void mockThreadPoolUsageStats( + MockTransportService dataNodeTransportService, + String threadPoolName, + int totalThreadPoolThreads, + float averageThreadPoolUtilization, + long maxThreadPoolQueueLatencyMillis + ) { + dataNodeTransportService.clearInboundRules(); + dataNodeTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + NodeUsageStatsForThreadPoolsAction.NodeResponse response = safeAwait( + l -> handler.messageReceived( + request, + new TestTransportChannel(l.map(res -> (NodeUsageStatsForThreadPoolsAction.NodeResponse) res)), + task + ) + ); + final var responseStats = response.getNodeUsageStatsForThreadPools(); + channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse( + response.getNode(), + new NodeUsageStatsForThreadPools( + responseStats.nodeId(), + Maps.copyMapWithAddedOrReplacedEntry( + responseStats.threadPoolUsageStatsMap(), + threadPoolName, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + totalThreadPoolThreads, + averageThreadPoolUtilization, + maxThreadPoolQueueLatencyMillis + ) + ) + ) + ) + ); + } + ); + } + + private void refreshClusterInfoAndAssertThreadPoolHasStats( + String nodeId, + String threadPoolName, + int totalThreadPoolThreads, + float averageThreadPoolUtilization, + long maxThreadPoolQueueLatencyMillis + ) { + final var clusterInfo = Objects.requireNonNull(refreshClusterInfo()); + final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap(); + assertThat(usageStatsMap, hasKey(threadPoolName)); + final var threadPoolStats = usageStatsMap.get(threadPoolName); + assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads)); + assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization)); + assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 291a84059b8f4..727f05ef14915 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -117,11 +116,9 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( maxQueueLatencyMillis ); - Map perThreadPool = new HashMap<>(); - perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats); return new NodeUsageStatsForThreadPoolsAction.NodeResponse( localNode, - new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool) + new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats)) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index 0921bde87d353..e455082d4d580 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -9,18 +9,26 @@ package org.elasticsearch.cluster; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import java.util.Arrays; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * Collects the thread pool usage stats for each node in the cluster. *

- * Results are returned as a map of node ID to node usage stats. + * Results are returned as a map of node ID to node usage stats. Keeps track of the most recent + * usage stats for each node, which will be returned in the event of a failure response from that node. */ public class NodeUsageStatsForThreadPoolsCollector { public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() { @@ -37,6 +45,10 @@ public void collectUsageStats( "transport_node_usage_stats_for_thread_pools_action" ); + private static final Logger logger = LogManager.getLogger(NodeUsageStatsForThreadPoolsCollector.class); + + private final Map lastNodeUsageStatsPerNode = new ConcurrentHashMap<>(); + /** * Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster. * @@ -47,12 +59,24 @@ public void collectUsageStats( ClusterState clusterState, ActionListener> listener ) { - var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new); + var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new); + // Discard last-seen values for any nodes no longer present in the cluster state + lastNodeUsageStatsPerNode.keySet().retainAll(Arrays.asList(dataNodeIds)); if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) { client.execute( TransportNodeUsageStatsForThreadPoolsAction.TYPE, new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds), - listener.map(response -> response.getAllNodeUsageStatsForThreadPools()) + listener.map(response -> { + // Update last seen stats + lastNodeUsageStatsPerNode.putAll(response.getAllNodeUsageStatsForThreadPools()); + if (response.failures().isEmpty() == false) { + logger.warn( + "Got no usage stats from nodes [{}], using last known stats for them", + response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.joining(", ")) + ); + } + return Map.copyOf(lastNodeUsageStatsPerNode); + }) ); } else { listener.onResponse(Map.of()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 8bc91ec6396cc..ff5d9865514e0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -74,6 +74,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ClusterAdminClient; import org.elasticsearch.client.internal.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.ClusterModule; @@ -1572,14 +1573,21 @@ private static void ensureClusterInfoServiceRunning() { } } - public static void refreshClusterInfo() { + /** + * Refreshes the cluster info on the master + * + * @return The new cluster info if the refresh was executed, null if the {@link ClusterInfoService} was of an unknown type + */ + @Nullable + public static ClusterInfo refreshClusterInfo() { final ClusterInfoService clusterInfoService = internalCluster().getInstance( ClusterInfoService.class, internalCluster().getMasterName() ); if (clusterInfoService instanceof InternalClusterInfoService) { - ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService)); + return ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService)); } + return null; } /**