Skip to content

Commit e5559ef

Browse files
authored
Use the last good NodeUsageStatsForThreadPools when a node returns an error (#133896)
1 parent bb32a24 commit e5559ef

File tree

4 files changed

+210
-9
lines changed

4 files changed

+210
-9
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster;
11+
12+
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
13+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
14+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.CollectionUtils;
18+
import org.elasticsearch.common.util.Maps;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.test.ESIntegTestCase;
21+
import org.elasticsearch.test.transport.MockTransportService;
22+
import org.elasticsearch.threadpool.ThreadPool;
23+
import org.elasticsearch.transport.TestTransportChannel;
24+
25+
import java.util.Collection;
26+
import java.util.Objects;
27+
28+
import static org.hamcrest.Matchers.equalTo;
29+
import static org.hamcrest.Matchers.hasKey;
30+
31+
public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase {
32+
33+
@Override
34+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
35+
return Settings.builder()
36+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
37+
// Need to enable write load decider to enable node usage stats collection
38+
.put(
39+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
40+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
41+
)
42+
.build();
43+
}
44+
45+
@Override
46+
protected Collection<Class<? extends Plugin>> nodePlugins() {
47+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
48+
}
49+
50+
public void testMostRecentValueIsUsedWhenNodeRequestFails() {
51+
final var dataNodeName = internalCluster().startDataOnlyNode();
52+
final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName);
53+
final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName);
54+
final var threadPoolName = randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);
55+
56+
// Intercept the node request and return some fake values
57+
final int totalThreadPoolThreads = randomIntBetween(2, 40);
58+
final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true);
59+
final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000);
60+
mockThreadPoolUsageStats(
61+
dataNodeTransportService,
62+
threadPoolName,
63+
totalThreadPoolThreads,
64+
averageThreadPoolUtilization,
65+
maxThreadPoolQueueLatencyMillis
66+
);
67+
68+
// This info should contain our fake values
69+
refreshClusterInfoAndAssertThreadPoolHasStats(
70+
dataNodeClusterService.localNode().getId(),
71+
threadPoolName,
72+
totalThreadPoolThreads,
73+
averageThreadPoolUtilization,
74+
maxThreadPoolQueueLatencyMillis
75+
);
76+
77+
// Now simulate an error
78+
dataNodeTransportService.clearInboundRules();
79+
dataNodeTransportService.addRequestHandlingBehavior(
80+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
81+
(handler, request, channel, task) -> {
82+
channel.sendResponse(new Exception("simulated error"));
83+
}
84+
);
85+
86+
// The next response should also contain our fake values
87+
refreshClusterInfoAndAssertThreadPoolHasStats(
88+
dataNodeClusterService.localNode().getId(),
89+
threadPoolName,
90+
totalThreadPoolThreads,
91+
averageThreadPoolUtilization,
92+
maxThreadPoolQueueLatencyMillis
93+
);
94+
95+
// Now start returning values again
96+
final int newTotalThreadPoolThreads = randomIntBetween(2, 40);
97+
final float newAverageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true);
98+
final long newMaxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000);
99+
mockThreadPoolUsageStats(
100+
dataNodeTransportService,
101+
threadPoolName,
102+
newTotalThreadPoolThreads,
103+
newAverageThreadPoolUtilization,
104+
newMaxThreadPoolQueueLatencyMillis
105+
);
106+
107+
// The next response should contain the current values again
108+
refreshClusterInfoAndAssertThreadPoolHasStats(
109+
dataNodeClusterService.localNode().getId(),
110+
threadPoolName,
111+
newTotalThreadPoolThreads,
112+
newAverageThreadPoolUtilization,
113+
newMaxThreadPoolQueueLatencyMillis
114+
);
115+
}
116+
117+
private static void mockThreadPoolUsageStats(
118+
MockTransportService dataNodeTransportService,
119+
String threadPoolName,
120+
int totalThreadPoolThreads,
121+
float averageThreadPoolUtilization,
122+
long maxThreadPoolQueueLatencyMillis
123+
) {
124+
dataNodeTransportService.clearInboundRules();
125+
dataNodeTransportService.addRequestHandlingBehavior(
126+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
127+
(handler, request, channel, task) -> {
128+
NodeUsageStatsForThreadPoolsAction.NodeResponse response = safeAwait(
129+
l -> handler.messageReceived(
130+
request,
131+
new TestTransportChannel(l.map(res -> (NodeUsageStatsForThreadPoolsAction.NodeResponse) res)),
132+
task
133+
)
134+
);
135+
final var responseStats = response.getNodeUsageStatsForThreadPools();
136+
channel.sendResponse(
137+
new NodeUsageStatsForThreadPoolsAction.NodeResponse(
138+
response.getNode(),
139+
new NodeUsageStatsForThreadPools(
140+
responseStats.nodeId(),
141+
Maps.copyMapWithAddedOrReplacedEntry(
142+
responseStats.threadPoolUsageStatsMap(),
143+
threadPoolName,
144+
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
145+
totalThreadPoolThreads,
146+
averageThreadPoolUtilization,
147+
maxThreadPoolQueueLatencyMillis
148+
)
149+
)
150+
)
151+
)
152+
);
153+
}
154+
);
155+
}
156+
157+
private void refreshClusterInfoAndAssertThreadPoolHasStats(
158+
String nodeId,
159+
String threadPoolName,
160+
int totalThreadPoolThreads,
161+
float averageThreadPoolUtilization,
162+
long maxThreadPoolQueueLatencyMillis
163+
) {
164+
final var clusterInfo = Objects.requireNonNull(refreshClusterInfo());
165+
final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap();
166+
assertThat(usageStatsMap, hasKey(threadPoolName));
167+
final var threadPoolStats = usageStatsMap.get(threadPoolName);
168+
assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads));
169+
assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization));
170+
assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis));
171+
}
172+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.io.IOException;
3030
import java.util.Collection;
31-
import java.util.HashMap;
3231
import java.util.List;
3332
import java.util.Map;
3433
import java.util.Set;
@@ -117,11 +116,9 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
117116
maxQueueLatencyMillis
118117
);
119118

120-
Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
121-
perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats);
122119
return new NodeUsageStatsForThreadPoolsAction.NodeResponse(
123120
localNode,
124-
new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool)
121+
new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats))
125122
);
126123
}
127124

server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,26 @@
99

1010
package org.elasticsearch.cluster;
1111

12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
1214
import org.elasticsearch.TransportVersion;
1315
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.FailedNodeException;
1417
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
1518
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
1619
import org.elasticsearch.client.internal.Client;
20+
import org.elasticsearch.cluster.node.DiscoveryNode;
1721

22+
import java.util.Arrays;
1823
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.stream.Collectors;
1926

2027
/**
2128
* Collects the thread pool usage stats for each node in the cluster.
2229
* <p>
23-
* Results are returned as a map of node ID to node usage stats.
30+
* Results are returned as a map of node ID to node usage stats. Keeps track of the most recent
31+
* usage stats for each node, which will be returned in the event of a failure response from that node.
2432
*/
2533
public class NodeUsageStatsForThreadPoolsCollector {
2634
public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() {
@@ -37,6 +45,10 @@ public void collectUsageStats(
3745
"transport_node_usage_stats_for_thread_pools_action"
3846
);
3947

48+
private static final Logger logger = LogManager.getLogger(NodeUsageStatsForThreadPoolsCollector.class);
49+
50+
private final Map<String, NodeUsageStatsForThreadPools> lastNodeUsageStatsPerNode = new ConcurrentHashMap<>();
51+
4052
/**
4153
* Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster.
4254
*
@@ -47,12 +59,24 @@ public void collectUsageStats(
4759
ClusterState clusterState,
4860
ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener
4961
) {
50-
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new);
62+
var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(DiscoveryNode::getId).toArray(String[]::new);
63+
// Discard last-seen values for any nodes no longer present in the cluster state
64+
lastNodeUsageStatsPerNode.keySet().retainAll(Arrays.asList(dataNodeIds));
5165
if (clusterState.getMinTransportVersion().supports(TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) {
5266
client.execute(
5367
TransportNodeUsageStatsForThreadPoolsAction.TYPE,
5468
new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds),
55-
listener.map(response -> response.getAllNodeUsageStatsForThreadPools())
69+
listener.map(response -> {
70+
// Update last seen stats
71+
lastNodeUsageStatsPerNode.putAll(response.getAllNodeUsageStatsForThreadPools());
72+
if (response.failures().isEmpty() == false) {
73+
logger.warn(
74+
"Got no usage stats from nodes [{}], using last known stats for them",
75+
response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.joining(", "))
76+
);
77+
}
78+
return Map.copyOf(lastNodeUsageStatsPerNode);
79+
})
5680
);
5781
} else {
5882
listener.onResponse(Map.of());

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.elasticsearch.client.internal.Client;
7676
import org.elasticsearch.client.internal.ClusterAdminClient;
7777
import org.elasticsearch.client.internal.IndicesAdminClient;
78+
import org.elasticsearch.cluster.ClusterInfo;
7879
import org.elasticsearch.cluster.ClusterInfoService;
7980
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
8081
import org.elasticsearch.cluster.ClusterModule;
@@ -1575,14 +1576,21 @@ private static void ensureClusterInfoServiceRunning() {
15751576
}
15761577
}
15771578

1578-
public static void refreshClusterInfo() {
1579+
/**
1580+
* Refreshes the cluster info on the master
1581+
*
1582+
* @return The new cluster info if the refresh was executed, null if the {@link ClusterInfoService} was of an unknown type
1583+
*/
1584+
@Nullable
1585+
public static ClusterInfo refreshClusterInfo() {
15791586
final ClusterInfoService clusterInfoService = internalCluster().getInstance(
15801587
ClusterInfoService.class,
15811588
internalCluster().getMasterName()
15821589
);
15831590
if (clusterInfoService instanceof InternalClusterInfoService) {
1584-
ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService));
1591+
return ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService));
15851592
}
1593+
return null;
15861594
}
15871595

15881596
/**

0 commit comments

Comments
 (0)