1111
1212import org .elasticsearch .TransportVersion ;
1313import org .elasticsearch .action .ActionListener ;
14+ import org .elasticsearch .action .FailedNodeException ;
1415import org .elasticsearch .action .admin .cluster .node .usage .NodeUsageStatsForThreadPoolsAction ;
1516import org .elasticsearch .action .admin .cluster .node .usage .TransportNodeUsageStatsForThreadPoolsAction ;
1617import org .elasticsearch .client .internal .Client ;
18+ import org .elasticsearch .cluster .node .DiscoveryNode ;
1719
20+ import java .util .Arrays ;
21+ import java .util .HashMap ;
1822import java .util .Map ;
23+ import java .util .concurrent .ConcurrentHashMap ;
1924
2025/**
2126 * Collects the thread pool usage stats for each node in the cluster.
2227 * <p>
23- * Results are returned as a map of node ID to node usage stats.
28+ * Results are returned as a map of node ID to node usage stats. Keeps track of the most recent
29+ * usage stats for each node, which will be returned in the event of a failure response from that node.
2430 */
2531public class NodeUsageStatsForThreadPoolsCollector {
2632 public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector () {
@@ -37,6 +43,8 @@ public void collectUsageStats(
3743 "transport_node_usage_stats_for_thread_pools_action"
3844 );
3945
46+ private final Map <String , NodeUsageStatsForThreadPools > lastNodeUsageStatsForThreadPools = new ConcurrentHashMap <>();
47+
4048 /**
4149 * Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster.
4250 *
@@ -47,15 +55,39 @@ public void collectUsageStats(
4755 ClusterState clusterState ,
4856 ActionListener <Map <String , NodeUsageStatsForThreadPools >> listener
4957 ) {
50- var dataNodeIds = clusterState .nodes ().getDataNodes ().values ().stream ().map (node -> node .getId ()).toArray (String []::new );
58+ var dataNodeIds = clusterState .nodes ().getDataNodes ().values ().stream ().map (DiscoveryNode ::getId ).toArray (String []::new );
59+ // Discard last-seen values for any nodes no longer present in the cluster state
60+ lastNodeUsageStatsForThreadPools .keySet ().retainAll (Arrays .asList (dataNodeIds ));
5161 if (clusterState .getMinTransportVersion ().supports (TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION )) {
5262 client .execute (
5363 TransportNodeUsageStatsForThreadPoolsAction .TYPE ,
5464 new NodeUsageStatsForThreadPoolsAction .Request (dataNodeIds ),
55- listener .map (response -> response . getAllNodeUsageStatsForThreadPools () )
65+ listener .map (this :: replaceFailuresWithLastSeenValues )
5666 );
5767 } else {
5868 listener .onResponse (Map .of ());
5969 }
6070 }
71+
72+ private Map <String , NodeUsageStatsForThreadPools > replaceFailuresWithLastSeenValues (
73+ NodeUsageStatsForThreadPoolsAction .Response response
74+ ) {
75+ final Map <String , NodeUsageStatsForThreadPools > returnedUsageStats = response .getAllNodeUsageStatsForThreadPools ();
76+ // Update the last-seen usage stats
77+ this .lastNodeUsageStatsForThreadPools .putAll (returnedUsageStats );
78+
79+ if (response .hasFailures () == false ) {
80+ return returnedUsageStats ;
81+ }
82+
83+ // Add in the last-seen usage stats for any nodes that failed to respond
84+ final Map <String , NodeUsageStatsForThreadPools > cachedValuesForFailed = new HashMap <>(returnedUsageStats );
85+ for (FailedNodeException failedNodeException : response .failures ()) {
86+ NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = lastNodeUsageStatsForThreadPools .get (failedNodeException .nodeId ());
87+ if (nodeUsageStatsForThreadPools != null ) {
88+ cachedValuesForFailed .put (failedNodeException .nodeId (), nodeUsageStatsForThreadPools );
89+ }
90+ }
91+ return cachedValuesForFailed ;
92+ }
6193}
0 commit comments