2727import java .util .concurrent .RejectedExecutionHandler ;
2828import java .util .concurrent .ThreadFactory ;
2929import java .util .concurrent .TimeUnit ;
30- import java .util .concurrent .atomic .AtomicReference ;
3130import java .util .concurrent .atomic .LongAdder ;
3231import java .util .function .Function ;
3332
@@ -48,14 +47,13 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
4847 private final boolean trackOngoingTasks ;
4948 // The set of currently running tasks and the timestamp of when they started execution in the Executor.
5049 private final Map <Runnable , Long > ongoingTasks = new ConcurrentHashMap <>();
51- private volatile long lastPollTime = System .nanoTime ();
52- private volatile long lastTotalExecutionTime = 0 ;
50+ private volatile long lastPollTimeAPM = System .nanoTime ();
51+ private volatile long lastTotalExecutionTimeAPM = 0 ;
52+ private volatile long lastPollTimeNanosAllocation = System .nanoTime ();
53+ private volatile long lastTotalExecutionTimeAllocation = 0 ;
5354 private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram (QUEUE_LATENCY_HISTOGRAM_BUCKETS );
5455 private final boolean trackQueueLatencyEWMA ;
55- private final boolean trackUtilizationEWMA ;
5656 private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA ;
57- private final ExponentiallyWeightedMovingAverage percentPoolUtilizationEWMA ;
58- private final AtomicReference <Double > lastUtilizationValue = new AtomicReference <>(0.0 );
5957
6058 TaskExecutionTimeTrackingEsThreadPoolExecutor (
6159 String name ,
@@ -77,8 +75,6 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
7775 this .trackOngoingTasks = trackingConfig .trackOngoingTasks ();
7876 this .trackQueueLatencyEWMA = trackingConfig .trackQueueLatencyEWMA ();
7977 this .queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage (trackingConfig .getQueueLatencyEwmaAlpha (), 0 );
80- this .trackUtilizationEWMA = trackingConfig .trackPoolUtilizationEWMA ();
81- this .percentPoolUtilizationEWMA = new ExponentiallyWeightedMovingAverage (trackingConfig .getPoolUtilizationEwmaAlpha (), 0 );
8278 }
8379
8480 public List <Instrument > setupMetrics (MeterRegistry meterRegistry , String threadPoolName ) {
@@ -106,7 +102,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
106102 ThreadPool .THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION ,
107103 "fraction of maximum thread time utilized for " + threadPoolName ,
108104 "fraction" ,
109- () -> new DoubleWithAttributes (pollUtilization (), Map .of ())
105+ () -> new DoubleWithAttributes (pollUtilization (true , false ), Map .of ())
110106 )
111107 );
112108 }
@@ -147,13 +143,6 @@ public int getCurrentQueueSize() {
147143 return getQueue ().size ();
148144 }
149145
150- public double getPercentPoolUtilizationEWMA () {
151- if (trackUtilizationEWMA == false ) {
152- return 0 ;
153- }
154- return this .percentPoolUtilizationEWMA .getAverage ();
155- }
156-
157146 public double getQueuedTaskLatencyMillisEWMA () {
158147 if (trackQueueLatencyEWMA == false ) {
159148 return 0 ;
@@ -162,38 +151,38 @@ public double getQueuedTaskLatencyMillisEWMA() {
162151 }
163152
164153 /**
165- * Returns the fraction of the maximum possible thread time that was actually used since the last time
166- * this method was called.
154+ * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
155+ * One of the two boolean parameters must be true, while the other false. There are two periodic pulling mechanisms that access
156+ * utilization reporting.
167157 *
168- * @return the utilization as a fraction, in the range [0, 1]
158+ * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
159+ * earlier, contributing a larger execution time.
169160 */
170- public double pollUtilization () {
161+ public double pollUtilization (boolean forAPM , boolean forAllocation ) {
162+ assert forAPM ^ forAllocation : "Can only collect one or the other, APM: " + forAPM + ", Allocation: " + forAllocation ;
163+
171164 final long currentTotalExecutionTimeNanos = totalExecutionTime .sum ();
172165 final long currentPollTimeNanos = System .nanoTime ();
173166
174- final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime ;
175- final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime ;
167+ final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - (forAPM
168+ ? lastTotalExecutionTimeAPM
169+ : lastTotalExecutionTimeAllocation );
170+ final long timeSinceLastPoll = currentPollTimeNanos - (forAPM ? lastPollTimeAPM : lastPollTimeNanosAllocation );
176171 final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize ();
177172 final double utilizationSinceLastPoll = (double ) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos ;
178173
179- lastTotalExecutionTime = currentTotalExecutionTimeNanos ;
180- lastPollTime = currentPollTimeNanos ;
181-
182- if ( trackUtilizationEWMA ) {
183- percentPoolUtilizationEWMA . addValue ( utilizationSinceLastPoll ) ;
184- // Test only tracking.
185- assert setUtilizationSinceLastPoll ( utilizationSinceLastPoll ) ;
174+ if ( forAPM ) {
175+ lastTotalExecutionTimeAPM = currentTotalExecutionTimeNanos ;
176+ lastPollTimeAPM = currentPollTimeNanos ;
177+ } else {
178+ assert forAllocation ;
179+ lastTotalExecutionTimeAllocation = currentTotalExecutionTimeNanos ;
180+ lastPollTimeNanosAllocation = currentPollTimeNanos ;
186181 }
187182
188183 return utilizationSinceLastPoll ;
189184 }
190185
191- // Test only
192- private boolean setUtilizationSinceLastPoll (double utilizationSinceLastPoll ) {
193- lastUtilizationValue .set (utilizationSinceLastPoll );
194- return true ;
195- }
196-
197186 @ Override
198187 protected void beforeExecute (Thread t , Runnable r ) {
199188 if (trackOngoingTasks ) {
@@ -209,9 +198,7 @@ protected void beforeExecute(Thread t, Runnable r) {
209198 queueLatencyMillisHistogram .addObservation (queueLatencyMillis );
210199
211200 if (trackQueueLatencyEWMA ) {
212- if (queueLatencyMillis > 0 ) {
213- queueLatencyMillisEWMA .addValue (queueLatencyMillis );
214- }
201+ queueLatencyMillisEWMA .addValue (queueLatencyMillis );
215202 }
216203 }
217204
@@ -257,9 +244,6 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
257244 if (trackQueueLatencyEWMA ) {
258245 sb .append ("task queue EWMA = " ).append (TimeValue .timeValueMillis ((long ) getQueuedTaskLatencyMillisEWMA ())).append (", " );
259246 }
260- if (trackUtilizationEWMA ) {
261- sb .append ("thread pool utilization percentage EWMA = " ).append (getPercentPoolUtilizationEWMA ()).append (", " );
262- }
263247 }
264248
265249 /**
@@ -283,18 +267,8 @@ public double getQueueLatencyEwmaAlpha() {
283267 return queueLatencyMillisEWMA .getAlpha ();
284268 }
285269
286- // Used for testing
287- public double getPoolUtilizationEwmaAlpha () {
288- return percentPoolUtilizationEWMA .getAlpha ();
289- }
290-
291270 // Used for testing
292271 public boolean trackingQueueLatencyEwma () {
293272 return trackQueueLatencyEWMA ;
294273 }
295-
296- // Used for testing
297- public boolean trackUtilizationEwma () {
298- return trackUtilizationEWMA ;
299- }
300274}
0 commit comments