99
1010package org .elasticsearch .action .admin .cluster .node .usage ;
1111
12- import org .apache .logging .log4j .LogManager ;
13- import org .apache .logging .log4j .Logger ;
1412import org .elasticsearch .action .ActionType ;
1513import org .elasticsearch .action .FailedNodeException ;
1614import org .elasticsearch .action .support .ActionFilters ;
1715import org .elasticsearch .action .support .nodes .TransportNodesAction ;
1816import org .elasticsearch .cluster .NodeUsageStatsForThreadPools ;
1917import org .elasticsearch .cluster .NodeUsageStatsForThreadPools .ThreadPoolUsageStats ;
2018import org .elasticsearch .cluster .node .DiscoveryNode ;
19+ import org .elasticsearch .cluster .routing .allocation .AllocationDeciderMetrics ;
2120import org .elasticsearch .cluster .service .ClusterService ;
2221import org .elasticsearch .common .io .stream .StreamInput ;
2322import org .elasticsearch .common .util .concurrent .TaskExecutionTimeTrackingEsThreadPoolExecutor ;
2423import org .elasticsearch .injection .guice .Inject ;
2524import org .elasticsearch .tasks .Task ;
25+ import org .elasticsearch .telemetry .metric .LongWithAttributes ;
2626import org .elasticsearch .threadpool .ThreadPool ;
2727import org .elasticsearch .transport .TransportService ;
2828
2929import java .io .IOException ;
30+ import java .util .Collection ;
3031import java .util .HashMap ;
3132import java .util .List ;
3233import java .util .Map ;
34+ import java .util .Set ;
35+ import java .util .concurrent .atomic .AtomicLong ;
3336
3437/**
3538 * Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in
@@ -42,20 +45,21 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA
4245 NodeUsageStatsForThreadPoolsAction .NodeResponse ,
4346 Void > {
4447
45- private static final Logger logger = LogManager .getLogger (TransportNodeUsageStatsForThreadPoolsAction .class );
46-
4748 public static final String NAME = "internal:monitor/thread_pool/stats" ;
4849 public static final ActionType <NodeUsageStatsForThreadPoolsAction .Response > TYPE = new ActionType <>(NAME );
50+ private static final int NO_VALUE = -1 ;
4951
5052 private final ThreadPool threadPool ;
5153 private final ClusterService clusterService ;
54+ private final AtomicLong lastMaxQueueLatencyMillis = new AtomicLong (NO_VALUE );
5255
5356 @ Inject
5457 public TransportNodeUsageStatsForThreadPoolsAction (
5558 ThreadPool threadPool ,
5659 ClusterService clusterService ,
5760 TransportService transportService ,
58- ActionFilters actionFilters
61+ ActionFilters actionFilters ,
62+ AllocationDeciderMetrics allocationDeciderMetrics
5963 ) {
6064 super (
6165 NAME ,
@@ -67,6 +71,7 @@ public TransportNodeUsageStatsForThreadPoolsAction(
6771 );
6872 this .threadPool = threadPool ;
6973 this .clusterService = clusterService ;
74+ allocationDeciderMetrics .registerWriteLoadDeciderMaxLatencyGauge (this ::getMaxQueueLatencyMetric );
7075 }
7176
7277 @ Override
@@ -99,15 +104,17 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
99104 assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor ;
100105 var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor ) writeExecutor ;
101106
107+ long maxQueueLatencyMillis = Math .max (
108+ trackingForWriteExecutor .getMaxQueueLatencyMillisSinceLastPollAndReset (),
109+ trackingForWriteExecutor .peekMaxQueueLatencyInQueueMillis ()
110+ );
111+ lastMaxQueueLatencyMillis .set (maxQueueLatencyMillis );
102112 ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats (
103113 trackingForWriteExecutor .getMaximumPoolSize (),
104114 (float ) trackingForWriteExecutor .pollUtilization (
105115 TaskExecutionTimeTrackingEsThreadPoolExecutor .UtilizationTrackingPurpose .ALLOCATION
106116 ),
107- Math .max (
108- trackingForWriteExecutor .getMaxQueueLatencyMillisSinceLastPollAndReset (),
109- trackingForWriteExecutor .peekMaxQueueLatencyInQueueMillis ()
110- )
117+ maxQueueLatencyMillis
111118 );
112119
113120 Map <String , ThreadPoolUsageStats > perThreadPool = new HashMap <>();
@@ -117,4 +124,13 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
117124 new NodeUsageStatsForThreadPools (localNode .getId (), perThreadPool )
118125 );
119126 }
127+
128+ private Collection <LongWithAttributes > getMaxQueueLatencyMetric () {
129+ long maxQueueLatencyValue = lastMaxQueueLatencyMillis .getAndSet (NO_VALUE );
130+ if (maxQueueLatencyValue != NO_VALUE ) {
131+ return Set .of (new LongWithAttributes (maxQueueLatencyValue ));
132+ } else {
133+ return Set .of ();
134+ }
135+ }
120136}
0 commit comments