1313import org .elasticsearch .common .metrics .ExponentialBucketHistogram ;
1414import org .elasticsearch .common .util .concurrent .EsExecutors .TaskTrackingConfig ;
1515import org .elasticsearch .core .TimeValue ;
16+ import org .elasticsearch .logging .LogManager ;
17+ import org .elasticsearch .logging .Logger ;
1618import org .elasticsearch .telemetry .metric .DoubleWithAttributes ;
1719import org .elasticsearch .telemetry .metric .Instrument ;
1820import org .elasticsearch .telemetry .metric .LongWithAttributes ;
2729import java .util .concurrent .RejectedExecutionHandler ;
2830import java .util .concurrent .ThreadFactory ;
2931import java .util .concurrent .TimeUnit ;
32+ import java .util .concurrent .atomic .LongAccumulator ;
3033import java .util .concurrent .atomic .LongAdder ;
3134import java .util .function .Function ;
3235
3740 * An extension to thread pool executor, which tracks statistics for the task execution time.
3841 */
3942public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
43+ private static final Logger logger = LogManager .getLogger (TaskExecutionTimeTrackingEsThreadPoolExecutor .class );
4044
4145 public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18 ;
4246 private static final int [] LATENCY_PERCENTILES_TO_REPORT = { 50 , 90 , 99 };
@@ -47,9 +51,17 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
4751 private final boolean trackOngoingTasks ;
4852 // The set of currently running tasks and the timestamp of when they started execution in the Executor.
4953 private final Map <Runnable , Long > ongoingTasks = new ConcurrentHashMap <>();
50- private volatile long lastPollTime = System .nanoTime ();
51- private volatile long lastTotalExecutionTime = 0 ;
5254 private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram (QUEUE_LATENCY_HISTOGRAM_BUCKETS );
55+ private final boolean trackMaxQueueLatency ;
56+ private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator (Long ::max , 0 );
57+
58+ public enum UtilizationTrackingPurpose {
59+ APM ,
60+ ALLOCATION ,
61+ }
62+
63+ private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker ();
64+ private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker ();
5365
5466 TaskExecutionTimeTrackingEsThreadPoolExecutor (
5567 String name ,
@@ -65,9 +77,11 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
6577 TaskTrackingConfig trackingConfig
6678 ) {
6779 super (name , corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory , handler , contextHolder );
80+
6881 this .runnableWrapper = runnableWrapper ;
69- this .executionEWMA = new ExponentiallyWeightedMovingAverage (trackingConfig .getEwmaAlpha (), 0 );
82+ this .executionEWMA = new ExponentiallyWeightedMovingAverage (trackingConfig .getExecutionTimeEwmaAlpha (), 0 );
7083 this .trackOngoingTasks = trackingConfig .trackOngoingTasks ();
84+ this .trackMaxQueueLatency = trackingConfig .trackMaxQueueLatency ();
7185 }
7286
7387 public List <Instrument > setupMetrics (MeterRegistry meterRegistry , String threadPoolName ) {
@@ -95,7 +109,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
95109 ThreadPool .THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION ,
96110 "fraction of maximum thread time utilized for " + threadPoolName ,
97111 "fraction" ,
98- () -> new DoubleWithAttributes (pollUtilization (), Map .of ())
112+ () -> new DoubleWithAttributes (pollUtilization (UtilizationTrackingPurpose . APM ), Map .of ())
99113 )
100114 );
101115 }
@@ -136,37 +150,49 @@ public int getCurrentQueueSize() {
136150 return getQueue ().size ();
137151 }
138152
153+ public long getMaxQueueLatencyMillisSinceLastPollAndReset () {
154+ if (trackMaxQueueLatency == false ) {
155+ return 0 ;
156+ }
157+ return maxQueueLatencyMillisSinceLastPoll .getThenReset ();
158+ }
159+
139160 /**
140- * Returns the fraction of the maximum possible thread time that was actually used since the last time
141- * this method was called.
161+ * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
162+ * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
163+ * caller.
142164 *
143- * @return the utilization as a fraction, in the range [0, 1]
165+ * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
166+ * earlier, contributing a larger execution time.
144167 */
145- public double pollUtilization () {
146- final long currentTotalExecutionTimeNanos = totalExecutionTime .sum ();
147- final long currentPollTimeNanos = System .nanoTime ();
148-
149- final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime ;
150- final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime ;
151- final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize ();
152- final double utilizationSinceLastPoll = (double ) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos ;
153-
154- lastTotalExecutionTime = currentTotalExecutionTimeNanos ;
155- lastPollTime = currentPollTimeNanos ;
156- return utilizationSinceLastPoll ;
168+ public double pollUtilization (UtilizationTrackingPurpose utilizationTrackingPurpose ) {
169+ switch (utilizationTrackingPurpose ) {
170+ case APM :
171+ return apmUtilizationTracker .pollUtilization ();
172+ case ALLOCATION :
173+ return allocationUtilizationTracker .pollUtilization ();
174+ default :
175+ throw new IllegalStateException ("No operation defined for [" + utilizationTrackingPurpose + "]" );
176+ }
157177 }
158178
159179 @ Override
160180 protected void beforeExecute (Thread t , Runnable r ) {
161181 if (trackOngoingTasks ) {
162182 ongoingTasks .put (r , System .nanoTime ());
163183 }
184+
164185 assert super .unwrap (r ) instanceof TimedRunnable : "expected only TimedRunnables in queue" ;
165186 final TimedRunnable timedRunnable = (TimedRunnable ) super .unwrap (r );
166187 timedRunnable .beforeExecute ();
167188 final long taskQueueLatency = timedRunnable .getQueueTimeNanos ();
168189 assert taskQueueLatency >= 0 ;
169- queueLatencyMillisHistogram .addObservation (TimeUnit .NANOSECONDS .toMillis (taskQueueLatency ));
190+ var queueLatencyMillis = TimeUnit .NANOSECONDS .toMillis (taskQueueLatency );
191+ queueLatencyMillisHistogram .addObservation (queueLatencyMillis );
192+
193+ if (trackMaxQueueLatency ) {
194+ maxQueueLatencyMillisSinceLastPoll .accumulate (queueLatencyMillis );
195+ }
170196 }
171197
172198 @ Override
@@ -222,7 +248,39 @@ public Map<Runnable, Long> getOngoingTasks() {
222248 }
223249
224250 // Used for testing
225- public double getEwmaAlpha () {
251+ public double getExecutionEwmaAlpha () {
226252 return executionEWMA .getAlpha ();
227253 }
254+
255+ // Used for testing
256+ public boolean trackingMaxQueueLatency () {
257+ return trackMaxQueueLatency ;
258+ }
259+
260+ /**
261+ * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization
262+ * since the last poll can be calculated for the next polling request.
263+ *
264+ * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred.
265+ */
266+ private class UtilizationTracker {
267+ long lastPollTime = System .nanoTime ();
268+ long lastTotalExecutionTime = 0 ;
269+
270+ public synchronized double pollUtilization () {
271+ final long currentTotalExecutionTimeNanos = totalExecutionTime .sum ();
272+ final long currentPollTimeNanos = System .nanoTime ();
273+
274+ final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime ;
275+ final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime ;
276+
277+ final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize ();
278+ final double utilizationSinceLastPoll = (double ) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos ;
279+
280+ lastTotalExecutionTime = currentTotalExecutionTimeNanos ;
281+ lastPollTime = currentPollTimeNanos ;
282+
283+ return utilizationSinceLastPoll ;
284+ }
285+ }
228286}
0 commit comments