Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TestTransportChannel;

import java.time.Instant;
import java.util.Collection;
import java.util.Objects;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this annotation necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, fixed in d5b17ab

public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// Need to enable write load decider to enable node usage stats collection
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

public void testMostRecentValueIsUsedWhenNodeRequestFails() {
final var dataNodeName = internalCluster().startDataOnlyNode();
final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName);
final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName);
final var threadPoolName = randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);

// Intercept the node request and return some fake values
final int totalThreadPoolThreads = randomIntBetween(2, 40);
final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true);
final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000);
dataNodeTransportService.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> {
NodeUsageStatsForThreadPoolsAction.NodeResponse response = safeAwait(
l -> handler.messageReceived(
request,
new TestTransportChannel(l.map(res -> (NodeUsageStatsForThreadPoolsAction.NodeResponse) res)),
task
)
);
final var responseStats = response.getNodeUsageStatsForThreadPools();
channel.sendResponse(
new NodeUsageStatsForThreadPoolsAction.NodeResponse(
response.getNode(),
new NodeUsageStatsForThreadPools(
responseStats.nodeId(),
Maps.copyMapWithAddedOrReplacedEntry(
responseStats.threadPoolUsageStatsMap(),
threadPoolName,
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
totalThreadPoolThreads,
averageThreadPoolUtilization,
maxThreadPoolQueueLatencyMillis
)
),
Instant.now()
)
)
);
}
);

// This info should contain our fake values
final var successfulStats = assertThreadPoolHasStats(
dataNodeClusterService.localNode().getId(),
threadPoolName,
totalThreadPoolThreads,
averageThreadPoolUtilization,
maxThreadPoolQueueLatencyMillis,
null
);

// Now simulate an error
dataNodeTransportService.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> {
channel.sendResponse(new Exception("simulated error"));
}
);

// The next response should also contain our fake values
assertThreadPoolHasStats(
dataNodeClusterService.localNode().getId(),
threadPoolName,
totalThreadPoolThreads,
averageThreadPoolUtilization,
maxThreadPoolQueueLatencyMillis,
successfulStats.timestamp()
);
}

private NodeUsageStatsForThreadPools assertThreadPoolHasStats(
String nodeId,
String threadPoolName,
int totalThreadPoolThreads,
float averageThreadPoolUtilization,
long maxThreadPoolQueueLatencyMillis,
@Nullable Instant timestamp
) {
final var clusterInfo = Objects.requireNonNull(refreshClusterInfo());
final var nodeUsageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId);
if (timestamp != null) {
assertThat(nodeUsageStatsForThreadPools.timestamp(), equalTo(timestamp));
}
final var usageStatsMap = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap();
assertThat(usageStatsMap, hasKey(threadPoolName));
final var threadPoolStats = usageStatsMap.get(threadPoolName);
assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads));
assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization));
assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis));
return nodeUsageStatsForThreadPools;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.transport.TransportService;

import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -375,7 +376,7 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
)
);

return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap);
return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap, Instant.now());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_API_DISABLE_EIS_RATE_LIMITING = def(9_152_0_00);
public static final TransportVersion GEMINI_THINKING_BUDGET_ADDED = def(9_153_0_00);
public static final TransportVersion VISIT_PERCENTAGE = def(9_154_0_00);
public static final TransportVersion TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS = def(9_155_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static class NodeResponse extends BaseNodeResponse {

protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
this.nodeUsageStatsForThreadPools = NodeUsageStatsForThreadPools.readFrom(in);
}

public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) {
Expand All @@ -118,7 +118,7 @@ public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageSt

public NodeResponse(StreamInput in) throws IOException {
super(in);
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
this.nodeUsageStatsForThreadPools = NodeUsageStatsForThreadPools.readFrom(in);
}

public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -117,11 +117,9 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
maxQueueLatencyMillis
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats);
return new NodeUsageStatsForThreadPoolsAction.NodeResponse(
localNode,
new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool)
new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats), Instant.now())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timestamp being added on the source node could be problematic if there were clock skew in the cluster. I wonder if it should be recorded on the client side instead.

Or any "don't trust this if it's older than" should be of a magnitude that we don't need to worry about clock skew?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not enthusiastic about the timestamp here. There's no use for the timestamp in the code and it's not obviously logged anyplace. It's invasive to add and seems to be trying to solve a problem we don't have.

Logging a WARN message whenever we fail to get fresh stats from a node would be sufficient to convey the time when that happens -- very rarely -- and that an issue occurred. It's reasonable to log a WARN message because the cluster is going to be in distress if there are repeated failures to fetch stats from a single or multiple nodes.

Copy link
Contributor Author

@nicktindall nicktindall Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair, it also requires the addition of a transport version. @mhl-b wdyt? I can easily revert it. I think I'm with @DiannaHohensee, it's something we can add when we need it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with that. I still think unbounded staleness is useless, even harmful. Utilization from 5 minutes ago has no meaning. Maybe we should allow only one missing measurement, without timestamp, but if missed twice we dont report anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's something we can add when we need it.

I dont think we can tell for sure once we blend together fresh and stale metrics. It would be some lagging node, that start to impact allocation decisions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you would track count of misses then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean use the timestamp. Once it's over a certain age we log a warning (I think we do something similar for autoscaling metrics)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then if we see it happening a lot or implicated in issues we can decide what to do about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean you will keep current version with instant? But rather source node, use client-side time tracking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes more sense to track it on the client (in the collector) that way we can probably avoid transport version changes, and don't have to worry about clock skew.

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public ClusterInfo(StreamInput in) throws IOException {
this.estimatedHeapUsages = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new);
this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::readFrom);
} else {
this.nodeUsageStatsForThreadPools = Map.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

package org.elasticsearch.cluster;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;

/**
* Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name.
Expand All @@ -24,48 +25,29 @@
* @param threadPoolUsageStatsMap A map of thread pool name ({@link org.elasticsearch.threadpool.ThreadPool.Names}) to the thread pool's
* usage stats ({@link ThreadPoolUsageStats}).
*/
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable {
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap, Instant timestamp)
implements
Writeable {

public NodeUsageStatsForThreadPools(StreamInput in) throws IOException {
this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new));
public static NodeUsageStatsForThreadPools readFrom(StreamInput in) throws IOException {
final var nodeId = in.readString();
final var threadPoolUsageStatsMap = in.readImmutableMap(ThreadPoolUsageStats::new);
final Instant receivedTime;
if (in.getTransportVersion().onOrAfter(TransportVersions.TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS)) {
receivedTime = in.readInstant();
} else {
receivedTime = Instant.now();
}
return new NodeUsageStatsForThreadPools(nodeId, threadPoolUsageStatsMap, receivedTime);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeMap(this.threadPoolUsageStatsMap, StreamOutput::writeWriteable);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, threadPoolUsageStatsMap);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o;
for (var entry : other.threadPoolUsageStatsMap.entrySet()) {
if (nodeId.equals(other.nodeId) == false) {
return false;
}
var loadStats = threadPoolUsageStatsMap.get(entry.getKey());
if (loadStats == null || loadStats.equals(entry.getValue()) == false) {
return false;
}
}
return true;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant because it's a record


@Override
public String toString() {
StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=[");
for (var entry : threadPoolUsageStatsMap.entrySet()) {
builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}");
if (out.getTransportVersion().onOrAfter(TransportVersions.TIMESTAMP_IN_NODE_USAGE_STATS_FOR_THREAD_POOLS)) {
out.writeInstant(this.timestamp);
}
builder.append("]}");
return builder.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Collects the thread pool usage stats for each node in the cluster.
* <p>
* Results are returned as a map of node ID to node usage stats.
* Results are returned as a map of node ID to node usage stats. Keeps track of the most recent
* usage stats for each node, which will be returned in the event of a failure response from that node.
*/
public class NodeUsageStatsForThreadPoolsCollector {
public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() {
Expand All @@ -37,6 +43,8 @@ public void collectUsageStats(
"transport_node_usage_stats_for_thread_pools_action"
);

private final Map<String, NodeUsageStatsForThreadPools> lastNodeUsageStatsPerNode = new ConcurrentHashMap<>();

/**
* Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster.
*
Expand All @@ -47,15 +55,39 @@ public void collectUsageStats(
ClusterState clusterState,
ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener
) {
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new);
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new);
// Discard last-seen values for any nodes no longer present in the cluster state
lastNodeUsageStatsPerNode.keySet().retainAll(Arrays.asList(dataNodeIds));
if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) {
client.execute(
TransportNodeUsageStatsForThreadPoolsAction.TYPE,
new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds),
listener.map(response -> response.getAllNodeUsageStatsForThreadPools())
listener.map(this::replaceFailuresWithLastSeenValues)
);
} else {
listener.onResponse(Map.of());
}
}

private Map<String, NodeUsageStatsForThreadPools> replaceFailuresWithLastSeenValues(
NodeUsageStatsForThreadPoolsAction.Response response
) {
final Map<String, NodeUsageStatsForThreadPools> returnedUsageStats = response.getAllNodeUsageStatsForThreadPools();
// Update the last-seen usage stats
this.lastNodeUsageStatsPerNode.putAll(returnedUsageStats);

if (response.hasFailures() == false) {
return returnedUsageStats;
}

// Add in the last-seen usage stats for any nodes that failed to respond
final Map<String, NodeUsageStatsForThreadPools> cachedValuesForFailed = new HashMap<>(returnedUsageStats);
for (FailedNodeException failedNodeException : response.failures()) {
final var nodeUsageStatsForThreadPools = lastNodeUsageStatsPerNode.get(failedNodeException.nodeId());
Copy link
Member

@ywangd ywangd Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, instead of using last value as a fallback, can we have a specific NodeUsageStatsForThreadPools object representing failure? Other parts of the code will have to check it explicitly to make decision, e.g. write load decider potentially rejecting allocation?

My thinking is that we probably don't want to fallback more than a few times, i.e. the last value needs to expire at certain point. I guess it's probably the reason you added the received timestamp? In that case, we still have to address what we use to indicate a "failed and expired" entry. If a node fails to respond ClusterInfo polling, it is likely overloaded, e.g. CBE. So seems safter to assume rejection or overall no movement for the node?

if (nodeUsageStatsForThreadPools != null) {
cachedValuesForFailed.put(failedNodeException.nodeId(), nodeUsageStatsForThreadPools);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can lastNodeUsageStatsPerNode be returned directly instead? putAll above adds the new values for the nodeId keys. So whatever nodes are missing in the new response will not be overridden in lastNodeUsageStatsPerNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes perhaps... I wonder if there's some way it could include values for nodes we didn't request? probably not if things happen in the sequence we expect them to happen. I will come back to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer not to, because lastNodeUsageStasPerNode is internal state, it's mutable and it will be mutated by the collector (to expire values for nodes no longer in the cluster). I think the way it is is more explicit, we take the cached value for any node in response.failures() and we return a static map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, good point about immutability 👍

Would it be sufficient to return a copy of lastNodeUsageStatsPerNode and skip this whole for-loop? No longer present nodes have already been filtered out in a prior stage, and the successful node responses were applied above.

}
}
Copy link
Contributor Author

@nicktindall nicktindall Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether it makes sense to cache these things forever, or put some limit on how long we consider them to be better than nothing. I can't imagine being part of the cluster, but returning errors for node usage stats requests is a situation that persists for very long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the last seen value for each node is cached. That doesn't seem expensive to save and it'll be refreshed frequently.

A WARN message log for each node that fails to respond would be good, along with its error cause/msg. It shouldn't happen often, so I don't expect it'll be noisy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was more worried about whether there's a point where the cached value is so stale it's not useful, but I think it's probably always better than nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, always better than nothing 👍

A a cluster where a node is failing repeatedly to return stats probably has much bigger problems than this stale value.

return cachedValuesForFailed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThrea
simulatedNodeWriteLoadDeltas.get(entry.getKey()),
nodesWithMovedAwayShard.contains(entry.getKey())
)
)
),
entry.getValue().timestamp()
);
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue);
} else {
Expand Down
Loading