Skip to content

Commit 37b20e8

Browse files
sub class pollUtilization into UtilizationTracker; rename from EWMA to average
1 parent 47debcd commit 37b20e8

File tree

4 files changed

+80
-60
lines changed

4 files changed

+80
-60
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -582,9 +582,9 @@ public static class TaskTrackingConfig {
582582

583583
private final boolean trackExecutionTime;
584584
private final boolean trackOngoingTasks;
585-
private final boolean trackQueueLatencyEWMA;
585+
private final boolean trackQueueLatencyAverage;
586586
private final double executionTimeEwmaAlpha;
587-
private final double queueLatencyEWMAAlpha;
587+
private final double queueLatencyEwmaAlpha;
588588

589589
public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
590590
false,
@@ -610,32 +610,32 @@ public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlp
610610
*/
611611
public TaskTrackingConfig(
612612
boolean trackOngoingTasks,
613-
boolean trackQueueLatencyEWMA,
614-
double executionTimeEWMAAlpha,
615-
double queueLatencyEWMAAlpha
613+
boolean trackQueueLatencyAverage,
614+
double executionTimeEwmaAlpha,
615+
double queueLatencyEwmaAlpha
616616
) {
617-
this(true, trackOngoingTasks, trackQueueLatencyEWMA, executionTimeEWMAAlpha, queueLatencyEWMAAlpha);
617+
this(true, trackOngoingTasks, trackQueueLatencyAverage, executionTimeEwmaAlpha, queueLatencyEwmaAlpha);
618618
}
619619

620620
/**
621621
* @param trackExecutionTime Whether to track execution stats
622622
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
623-
* @param trackQueueLatencyEWMA Whether to track queue latency {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage}
623+
* @param trackQueueLatencyAverage Whether to track the average queue latency.
624624
* @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
625-
* @param queueLatencyEWMAAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage).
625+
* @param queueLatencyEwmaAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage).
626626
*/
627627
private TaskTrackingConfig(
628628
boolean trackExecutionTime,
629629
boolean trackOngoingTasks,
630-
boolean trackQueueLatencyEWMA,
630+
boolean trackQueueLatencyAverage,
631631
double executionTimeEWMAAlpha,
632-
double queueLatencyEWMAAlpha
632+
double queueLatencyEwmaAlpha
633633
) {
634634
this.trackExecutionTime = trackExecutionTime;
635635
this.trackOngoingTasks = trackOngoingTasks;
636-
this.trackQueueLatencyEWMA = trackQueueLatencyEWMA;
636+
this.trackQueueLatencyAverage = trackQueueLatencyAverage;
637637
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
638-
this.queueLatencyEWMAAlpha = queueLatencyEWMAAlpha;
638+
this.queueLatencyEwmaAlpha = queueLatencyEwmaAlpha;
639639
}
640640

641641
public boolean trackExecutionTime() {
@@ -646,16 +646,16 @@ public boolean trackOngoingTasks() {
646646
return trackOngoingTasks;
647647
}
648648

649-
public boolean trackQueueLatencyEWMA() {
650-
return trackQueueLatencyEWMA;
649+
public boolean trackQueueLatencyAverage() {
650+
return trackQueueLatencyAverage;
651651
}
652652

653653
public double getExecutionTimeEwmaAlpha() {
654654
return executionTimeEwmaAlpha;
655655
}
656656

657657
public double getQueueLatencyEwmaAlpha() {
658-
return queueLatencyEWMAAlpha;
658+
return queueLatencyEwmaAlpha;
659659
}
660660
}
661661

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,18 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
4747
private final boolean trackOngoingTasks;
4848
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
4949
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
50-
private volatile long lastPollTimeAPM = System.nanoTime();
51-
private volatile long lastTotalExecutionTimeAPM = 0;
52-
private volatile long lastPollTimeNanosAllocation = System.nanoTime();
53-
private volatile long lastTotalExecutionTimeAllocation = 0;
5450
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
55-
private final boolean trackQueueLatencyEWMA;
51+
private final boolean trackQueueLatencyAverage;
5652
private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA;
5753

54+
public enum UtilizationTrackingPurpose {
55+
APM,
56+
ALLOCATION,
57+
}
58+
59+
private volatile UtilizationTracker apmUtilizationTracker;
60+
private volatile UtilizationTracker allocationUtilizationTracker;
61+
5862
TaskExecutionTimeTrackingEsThreadPoolExecutor(
5963
String name,
6064
int corePoolSize,
@@ -73,8 +77,10 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
7377
this.runnableWrapper = runnableWrapper;
7478
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
7579
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
76-
this.trackQueueLatencyEWMA = trackingConfig.trackQueueLatencyEWMA();
80+
this.trackQueueLatencyAverage = trackingConfig.trackQueueLatencyAverage();
7781
this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0);
82+
this.apmUtilizationTracker = new UtilizationTracker();
83+
this.allocationUtilizationTracker = new UtilizationTracker();
7884
}
7985

8086
public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
@@ -102,7 +108,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
102108
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
103109
"fraction of maximum thread time utilized for " + threadPoolName,
104110
"fraction",
105-
() -> new DoubleWithAttributes(pollUtilization(true, false), Map.of())
111+
() -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of())
106112
)
107113
);
108114
}
@@ -143,44 +149,30 @@ public int getCurrentQueueSize() {
143149
return getQueue().size();
144150
}
145151

146-
public double getQueuedTaskLatencyMillisEWMA() {
147-
if (trackQueueLatencyEWMA == false) {
152+
public double getQueuedTaskLatencyMillis() {
153+
if (trackQueueLatencyAverage == false) {
148154
return 0;
149155
}
150156
return queueLatencyMillisEWMA.getAverage();
151157
}
152158

153159
/**
154160
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
155-
* One of the two boolean parameters must be true, while the other false. There are two periodic pulling mechanisms that access
156-
* utilization reporting.
161+
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
162+
* caller.
157163
*
158164
* @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
159165
* earlier, contributing a larger execution time.
160166
*/
161-
public double pollUtilization(boolean forAPM, boolean forAllocation) {
162-
assert forAPM ^ forAllocation : "Can only collect one or the other, APM: " + forAPM + ", Allocation: " + forAllocation;
163-
164-
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
165-
final long currentPollTimeNanos = System.nanoTime();
166-
167-
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - (forAPM
168-
? lastTotalExecutionTimeAPM
169-
: lastTotalExecutionTimeAllocation);
170-
final long timeSinceLastPoll = currentPollTimeNanos - (forAPM ? lastPollTimeAPM : lastPollTimeNanosAllocation);
171-
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
172-
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
173-
174-
if (forAPM) {
175-
lastTotalExecutionTimeAPM = currentTotalExecutionTimeNanos;
176-
lastPollTimeAPM = currentPollTimeNanos;
177-
} else {
178-
assert forAllocation;
179-
lastTotalExecutionTimeAllocation = currentTotalExecutionTimeNanos;
180-
lastPollTimeNanosAllocation = currentPollTimeNanos;
167+
public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) {
168+
switch (utilizationTrackingPurpose) {
169+
case APM:
170+
return apmUtilizationTracker.pollUtilization();
171+
case ALLOCATION:
172+
return allocationUtilizationTracker.pollUtilization();
173+
default:
174+
throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]");
181175
}
182-
183-
return utilizationSinceLastPoll;
184176
}
185177

186178
@Override
@@ -197,7 +189,7 @@ protected void beforeExecute(Thread t, Runnable r) {
197189
var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency);
198190
queueLatencyMillisHistogram.addObservation(queueLatencyMillis);
199191

200-
if (trackQueueLatencyEWMA) {
192+
if (trackQueueLatencyAverage) {
201193
queueLatencyMillisEWMA.addValue(queueLatencyMillis);
202194
}
203195
}
@@ -241,8 +233,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
241233
.append("total task execution time = ")
242234
.append(TimeValue.timeValueNanos(getTotalTaskExecutionTime()))
243235
.append(", ");
244-
if (trackQueueLatencyEWMA) {
245-
sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillisEWMA())).append(", ");
236+
if (trackQueueLatencyAverage) {
237+
sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillis())).append(", ");
246238
}
247239
}
248240

@@ -269,6 +261,33 @@ public double getQueueLatencyEwmaAlpha() {
269261

270262
// Used for testing
271263
public boolean trackingQueueLatencyEwma() {
272-
return trackQueueLatencyEWMA;
264+
return trackQueueLatencyAverage;
265+
}
266+
267+
/**
268+
* Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization
269+
* since the last poll can be calculated for the next polling request.
270+
*
271+
* Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred.
272+
*/
273+
private class UtilizationTracker {
274+
volatile long lastPollTime = System.nanoTime();;
275+
volatile long lastTotalExecutionTime = 0;
276+
277+
public double pollUtilization() {
278+
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
279+
final long currentPollTimeNanos = System.nanoTime();
280+
281+
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
282+
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
283+
284+
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
285+
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
286+
287+
lastTotalExecutionTime = currentTotalExecutionTimeNanos;
288+
lastPollTime = currentPollTimeNanos;
289+
290+
return utilizationSinceLastPoll;
291+
}
273292
}
274293
}

server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,18 @@ public void testQueueLatencyEWMACalculation() throws Exception {
116116
executor.prestartAllCoreThreads();
117117
logger.info("--> executor: {}", executor);
118118

119-
assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.0);
119+
assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.0);
120120
// Using the settableQueuingWrapper each task will report being queued for 1ms
121121
executeTask(executor, 1);
122-
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.6); });
122+
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.6); });
123123
executeTask(executor, 1);
124-
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.84); });
124+
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.84); });
125125
executeTask(executor, 1);
126-
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.936); });
126+
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.936); });
127127
executeTask(executor, 1);
128-
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.9744); });
128+
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.9744); });
129129
executeTask(executor, 1);
130-
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.98976); });
130+
assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.98976); });
131131
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
132132
executor.shutdown();
133133
executor.awaitTermination(10, TimeUnit.SECONDS);

server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2121
import org.elasticsearch.common.util.concurrent.FutureUtils;
2222
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
23+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.node.Node;
2526
import org.elasticsearch.telemetry.InstrumentType;
@@ -509,7 +510,7 @@ public void testDetailedUtilizationMetric() throws Exception {
509510

510511
final long beforePreviousCollectNanos = System.nanoTime();
511512
meterRegistry.getRecorder().collect();
512-
double allocationUtilization = executor.pollUtilization(false, true);
513+
double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION);
513514
final long afterPreviousCollectNanos = System.nanoTime();
514515

515516
var metricValue = metricAsserter.assertLatestMetricValueMatches(
@@ -539,7 +540,7 @@ public void testDetailedUtilizationMetric() throws Exception {
539540

540541
final long beforeMetricsCollectedNanos = System.nanoTime();
541542
meterRegistry.getRecorder().collect();
542-
allocationUtilization = executor.pollUtilization(false, true);
543+
allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION);
543544
final long afterMetricsCollectedNanos = System.nanoTime();
544545

545546
// Calculate upper bound on utilisation metric

0 commit comments

Comments
 (0)