Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TestTransportChannel;

import java.util.Objects;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this annotation necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, fixed in d5b17ab

public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// Need to enable write load decider to enable node usage stats collection
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
}

public void testMostRecentValueIsUsedWhenNodeRequestFails() {
final var dataNodeName = internalCluster().startDataOnlyNode();
final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName);
final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName);
final var threadPoolName = randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);

// Intercept the node request and return some fake values
final int totalThreadPoolThreads = randomIntBetween(2, 40);
final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true);
final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000);
dataNodeTransportService.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> {
NodeUsageStatsForThreadPoolsAction.NodeResponse response = safeAwait(
l -> handler.messageReceived(
request,
new TestTransportChannel(l.map(res -> (NodeUsageStatsForThreadPoolsAction.NodeResponse) res)),
task
)
);
final var responseStats = response.getNodeUsageStatsForThreadPools();
channel.sendResponse(
new NodeUsageStatsForThreadPoolsAction.NodeResponse(
response.getNode(),
new NodeUsageStatsForThreadPools(
responseStats.nodeId(),
Maps.copyMapWithAddedOrReplacedEntry(
responseStats.threadPoolUsageStatsMap(),
threadPoolName,
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
totalThreadPoolThreads,
averageThreadPoolUtilization,
maxThreadPoolQueueLatencyMillis
)
)
)
)
);
}
);

// This info should contain our fake values
assertThreadPoolHasStats(
dataNodeClusterService.localNode().getId(),
threadPoolName,
totalThreadPoolThreads,
averageThreadPoolUtilization,
maxThreadPoolQueueLatencyMillis
);

// Now simulate an error
dataNodeTransportService.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> {
channel.sendResponse(new Exception("simulated error"));
}
);

// The next response should also contain our fake values
assertThreadPoolHasStats(
dataNodeClusterService.localNode().getId(),
threadPoolName,
totalThreadPoolThreads,
averageThreadPoolUtilization,
maxThreadPoolQueueLatencyMillis
);
}

private void assertThreadPoolHasStats(
String nodeId,
String threadPoolName,
int totalThreadPoolThreads,
float averageThreadPoolUtilization,
long maxThreadPoolQueueLatencyMillis
) {
final var clusterInfo = Objects.requireNonNull(refreshClusterInfo());
final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap();
assertThat(usageStatsMap, hasKey(threadPoolName));
final var threadPoolStats = usageStatsMap.get(threadPoolName);
assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads));
assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization));
assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -110,11 +109,9 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
)
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats);
return new NodeUsageStatsForThreadPoolsAction.NodeResponse(
localNode,
new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool)
new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to do this to use Maps.copyMapWithAddedOrReplacedEntry, seems like it should be immutable anyhow.

);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Collects the thread pool usage stats for each node in the cluster.
* <p>
* Results are returned as a map of node ID to node usage stats.
* Results are returned as a map of node ID to node usage stats. Keeps track of the most recent
* usage stats for each node, which will be returned in the event of a failure response from that node.
*/
public class NodeUsageStatsForThreadPoolsCollector {
public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() {
Expand All @@ -37,6 +43,8 @@ public void collectUsageStats(
"transport_node_usage_stats_for_thread_pools_action"
);

private final Map<String, NodeUsageStatsForThreadPools> lastNodeUsageStatsPerNode = new ConcurrentHashMap<>();

/**
* Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster.
*
Expand All @@ -47,15 +55,39 @@ public void collectUsageStats(
ClusterState clusterState,
ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener
) {
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new);
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new);
// Discard last-seen values for any nodes no longer present in the cluster state
lastNodeUsageStatsPerNode.keySet().retainAll(Arrays.asList(dataNodeIds));
if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) {
client.execute(
TransportNodeUsageStatsForThreadPoolsAction.TYPE,
new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds),
listener.map(response -> response.getAllNodeUsageStatsForThreadPools())
listener.map(this::replaceFailuresWithLastSeenValues)
);
} else {
listener.onResponse(Map.of());
}
}

private Map<String, NodeUsageStatsForThreadPools> replaceFailuresWithLastSeenValues(
NodeUsageStatsForThreadPoolsAction.Response response
) {
final Map<String, NodeUsageStatsForThreadPools> returnedUsageStats = response.getAllNodeUsageStatsForThreadPools();
// Update the last-seen usage stats
this.lastNodeUsageStatsPerNode.putAll(returnedUsageStats);

if (response.hasFailures() == false) {
return returnedUsageStats;
}

// Add in the last-seen usage stats for any nodes that failed to respond
final Map<String, NodeUsageStatsForThreadPools> cachedValuesForFailed = new HashMap<>(returnedUsageStats);
for (FailedNodeException failedNodeException : response.failures()) {
final var nodeUsageStatsForThreadPools = lastNodeUsageStatsPerNode.get(failedNodeException.nodeId());
Copy link
Member

@ywangd ywangd Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, instead of using last value as a fallback, can we have a specific NodeUsageStatsForThreadPools object representing failure? Other parts of the code will have to check it explicitly to make decision, e.g. write load decider potentially rejecting allocation?

My thinking is that we probably don't want to fallback more than a few times, i.e. the last value needs to expire at certain point. I guess it's probably the reason you added the received timestamp? In that case, we still have to address what we use to indicate a "failed and expired" entry. If a node fails to respond ClusterInfo polling, it is likely overloaded, e.g. CBE. So seems safter to assume rejection or overall no movement for the node?

if (nodeUsageStatsForThreadPools != null) {
cachedValuesForFailed.put(failedNodeException.nodeId(), nodeUsageStatsForThreadPools);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can lastNodeUsageStatsPerNode be returned directly instead? putAll above adds the new values for the nodeId keys. So whatever nodes are missing in the new response will not be overridden in lastNodeUsageStatsPerNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes perhaps... I wonder if there's some way it could include values for nodes we didn't request? probably not if things happen in the sequence we expect them to happen. I will come back to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer not to, because lastNodeUsageStasPerNode is internal state, it's mutable and it will be mutated by the collector (to expire values for nodes no longer in the cluster). I think the way it is is more explicit, we take the cached value for any node in response.failures() and we return a static map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, good point about immutability 👍

Would it be sufficient to return a copy of lastNodeUsageStatsPerNode and skip this whole for-loop? No longer present nodes have already been filtered out in a prior stage, and the successful node responses were applied above.

}
}
Copy link
Contributor Author

@nicktindall nicktindall Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether it makes sense to cache these things forever, or put some limit on how long we consider them to be better than nothing. I can't imagine being part of the cluster, but returning errors for node usage stats requests is a situation that persists for very long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the last seen value for each node is cached. That doesn't seem expensive to save and it'll be refreshed frequently.

A WARN message log for each node that fails to respond would be good, along with its error cause/msg. It shouldn't happen often, so I don't expect it'll be noisy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was more worried about whether there's a point where the cached value is so stale it's not useful, but I think it's probably always better than nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, always better than nothing 👍

A a cluster where a node is failing repeatedly to return stats probably has much bigger problems than this stale value.

return cachedValuesForFailed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ClusterAdminClient;
import org.elasticsearch.client.internal.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.ClusterModule;
Expand Down Expand Up @@ -1572,14 +1573,21 @@ private static void ensureClusterInfoServiceRunning() {
}
}

public static void refreshClusterInfo() {
/**
* Refreshes the cluster info on the master
*
* @return The new cluster info if the refresh was executed, null if the ClusterInfoService was of an unknown type
*/
@Nullable
public static ClusterInfo refreshClusterInfo() {
final ClusterInfoService clusterInfoService = internalCluster().getInstance(
ClusterInfoService.class,
internalCluster().getMasterName()
);
if (clusterInfoService instanceof InternalClusterInfoService) {
ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService));
return ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService));
}
return null;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed useful to return the actual ClusterInfo here?


/**
Expand Down