Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Collects the thread pool usage stats for each node in the cluster.
* <p>
* Results are returned as a map of node ID to node usage stats.
* Results are returned as a map of node ID to node usage stats. Keeps track of the most recent
* usage stats for each node, which will be returned in the event of a failure response from that node.
*/
public class NodeUsageStatsForThreadPoolsCollector {
public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() {
Expand All @@ -37,6 +43,8 @@ public void collectUsageStats(
"transport_node_usage_stats_for_thread_pools_action"
);

private final Map<String, NodeUsageStatsForThreadPools> lastNodeUsageStatsPerNode = new ConcurrentHashMap<>();

/**
* Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster.
*
Expand All @@ -47,15 +55,39 @@ public void collectUsageStats(
ClusterState clusterState,
ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener
) {
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new);
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new);
// Discard last-seen values for any nodes no longer present in the cluster state
lastNodeUsageStatsPerNode.keySet().retainAll(Arrays.asList(dataNodeIds));
if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) {
client.execute(
TransportNodeUsageStatsForThreadPoolsAction.TYPE,
new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds),
listener.map(response -> response.getAllNodeUsageStatsForThreadPools())
listener.map(this::replaceFailuresWithLastSeenValues)
);
} else {
listener.onResponse(Map.of());
}
}

private Map<String, NodeUsageStatsForThreadPools> replaceFailuresWithLastSeenValues(
NodeUsageStatsForThreadPoolsAction.Response response
) {
final Map<String, NodeUsageStatsForThreadPools> returnedUsageStats = response.getAllNodeUsageStatsForThreadPools();
// Update the last-seen usage stats
this.lastNodeUsageStatsPerNode.putAll(returnedUsageStats);

if (response.hasFailures() == false) {
return returnedUsageStats;
}

// Add in the last-seen usage stats for any nodes that failed to respond
final Map<String, NodeUsageStatsForThreadPools> cachedValuesForFailed = new HashMap<>(returnedUsageStats);
for (FailedNodeException failedNodeException : response.failures()) {
final var nodeUsageStatsForThreadPools = lastNodeUsageStatsPerNode.get(failedNodeException.nodeId());
Copy link
Member

@ywangd ywangd Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, instead of using last value as a fallback, can we have a specific NodeUsageStatsForThreadPools object representing failure? Other parts of the code will have to check it explicitly to make decision, e.g. write load decider potentially rejecting allocation?

My thinking is that we probably don't want to fallback more than a few times, i.e. the last value needs to expire at certain point. I guess it's probably the reason you added the received timestamp? In that case, we still have to address what we use to indicate a "failed and expired" entry. If a node fails to respond ClusterInfo polling, it is likely overloaded, e.g. CBE. So seems safter to assume rejection or overall no movement for the node?

if (nodeUsageStatsForThreadPools != null) {
cachedValuesForFailed.put(failedNodeException.nodeId(), nodeUsageStatsForThreadPools);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can lastNodeUsageStatsPerNode be returned directly instead? putAll above adds the new values for the nodeId keys. So whatever nodes are missing in the new response will not be overridden in lastNodeUsageStatsPerNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes perhaps... I wonder if there's some way it could include values for nodes we didn't request? probably not if things happen in the sequence we expect them to happen. I will come back to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer not to, because lastNodeUsageStasPerNode is internal state, it's mutable and it will be mutated by the collector (to expire values for nodes no longer in the cluster). I think the way it is is more explicit, we take the cached value for any node in response.failures() and we return a static map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, good point about immutability 👍

Would it be sufficient to return a copy of lastNodeUsageStatsPerNode and skip this whole for-loop? No longer present nodes have already been filtered out in a prior stage, and the successful node responses were applied above.

}
}
Copy link
Contributor Author

@nicktindall nicktindall Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether it makes sense to cache these things forever, or put some limit on how long we consider them to be better than nothing. I can't imagine being part of the cluster, but returning errors for node usage stats requests is a situation that persists for very long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the last seen value for each node is cached. That doesn't seem expensive to save and it'll be refreshed frequently.

A WARN message log for each node that fails to respond would be good, along with its error cause/msg. It shouldn't happen often, so I don't expect it'll be noisy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was more worried about whether there's a point where the cached value is so stale it's not useful, but I think it's probably always better than nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, always better than nothing 👍

A a cluster where a node is failing repeatedly to return stats probably has much bigger problems than this stale value.

return cachedValuesForFailed;
}
}