Skip to content

Commit 62b2e86

Browse files
authored
Publish queue latency metrics from tracked thread pools (elastic#120488)
Closes: ES-10531
1 parent b108e39 commit 62b2e86

File tree

5 files changed

+168
-10
lines changed

5 files changed

+168
-10
lines changed

docs/changelog/120488.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120488
2+
summary: Publish queue latency metrics from tracked thread pools
3+
area: "Infra/Metrics"
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,17 @@
1010
package org.elasticsearch.common.util.concurrent;
1111

1212
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
13+
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
1314
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
1415
import org.elasticsearch.core.TimeValue;
15-
16+
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
17+
import org.elasticsearch.telemetry.metric.Instrument;
18+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
19+
import org.elasticsearch.telemetry.metric.MeterRegistry;
20+
import org.elasticsearch.threadpool.ThreadPool;
21+
22+
import java.util.Arrays;
23+
import java.util.List;
1624
import java.util.Map;
1725
import java.util.concurrent.BlockingQueue;
1826
import java.util.concurrent.ConcurrentHashMap;
@@ -22,11 +30,17 @@
2230
import java.util.concurrent.atomic.LongAdder;
2331
import java.util.function.Function;
2432

33+
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME;
34+
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION;
35+
2536
/**
2637
* An extension to thread pool executor, which tracks statistics for the task execution time.
2738
*/
2839
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
2940

41+
public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18;
42+
private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };
43+
3044
private final Function<Runnable, WrappedRunnable> runnableWrapper;
3145
private final ExponentiallyWeightedMovingAverage executionEWMA;
3246
private final LongAdder totalExecutionTime = new LongAdder();
@@ -35,6 +49,7 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
3549
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
3650
private volatile long lastPollTime = System.nanoTime();
3751
private volatile long lastTotalExecutionTime = 0;
52+
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
3853

3954
TaskExecutionTimeTrackingEsThreadPoolExecutor(
4055
String name,
@@ -55,6 +70,36 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
5570
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
5671
}
5772

73+
public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
74+
return List.of(
75+
meterRegistry.registerLongsGauge(
76+
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_QUEUE_TIME,
77+
"Time tasks spent in the queue for the " + threadPoolName + " thread pool",
78+
"milliseconds",
79+
() -> {
80+
long[] snapshot = queueLatencyMillisHistogram.getSnapshot();
81+
int[] bucketUpperBounds = queueLatencyMillisHistogram.calculateBucketUpperBounds();
82+
List<LongWithAttributes> metricValues = Arrays.stream(LATENCY_PERCENTILES_TO_REPORT)
83+
.mapToObj(
84+
percentile -> new LongWithAttributes(
85+
queueLatencyMillisHistogram.getPercentile(percentile / 100f, snapshot, bucketUpperBounds),
86+
Map.of("percentile", String.valueOf(percentile))
87+
)
88+
)
89+
.toList();
90+
queueLatencyMillisHistogram.clear();
91+
return metricValues;
92+
}
93+
),
94+
meterRegistry.registerDoubleGauge(
95+
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
96+
"fraction of maximum thread time utilized for " + threadPoolName,
97+
"fraction",
98+
() -> new DoubleWithAttributes(pollUtilization(), Map.of())
99+
)
100+
);
101+
}
102+
58103
@Override
59104
protected Runnable wrapRunnable(Runnable command) {
60105
return super.wrapRunnable(this.runnableWrapper.apply(command));
@@ -116,6 +161,12 @@ protected void beforeExecute(Thread t, Runnable r) {
116161
if (trackOngoingTasks) {
117162
ongoingTasks.put(r, System.nanoTime());
118163
}
164+
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
165+
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
166+
timedRunnable.beforeExecute();
167+
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
168+
assert taskQueueLatency >= 0;
169+
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
119170
}
120171

121172
@Override

server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
1919
private final Runnable original;
2020
private final long creationTimeNanos;
21+
private long beforeExecuteTime = -1;
2122
private long startTimeNanos;
2223
private long finishTimeNanos = -1;
2324
private boolean failedOrRejected = false;
@@ -58,6 +59,19 @@ public boolean isForceExecution() {
5859
return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();
5960
}
6061

62+
/**
63+
* Returns the time in nanoseconds between the creation time and the execution time
64+
*
65+
* @return The time in nanoseconds or -1 if the task was never de-queued
66+
*/
67+
long getQueueTimeNanos() {
68+
if (beforeExecuteTime == -1) {
69+
assert false : "beforeExecute must be called before getQueueTimeNanos";
70+
return -1;
71+
}
72+
return beforeExecuteTime - creationTimeNanos;
73+
}
74+
6175
/**
6276
* Return the time this task spent being run.
6377
* If the task is still running or has not yet been run, returns -1.
@@ -70,6 +84,13 @@ long getTotalExecutionNanos() {
7084
return Math.max(finishTimeNanos - startTimeNanos, 1);
7185
}
7286

87+
/**
88+
* Called when the task has reached the front of the queue and is about to be executed
89+
*/
90+
public void beforeExecute() {
91+
beforeExecuteTime = System.nanoTime();
92+
}
93+
7394
/**
7495
* If the task was failed or rejected, return true.
7596
* Otherwise, false.

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.core.TimeValue;
3232
import org.elasticsearch.node.Node;
3333
import org.elasticsearch.node.ReportingService;
34-
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
3534
import org.elasticsearch.telemetry.metric.Instrument;
3635
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
3736
import org.elasticsearch.telemetry.metric.LongGauge;
@@ -154,6 +153,7 @@ public static class Names {
154153
public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current";
155154
public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current";
156155
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";
156+
public static final String THREAD_POOL_METRIC_NAME_QUEUE_TIME = ".queue.latency.histogram";
157157

158158
public enum ThreadPoolType {
159159
FIXED("fixed"),
@@ -379,14 +379,7 @@ private static ArrayList<Instrument> setupMetrics(MeterRegistry meterRegistry, S
379379
}
380380

381381
if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) {
382-
instruments.add(
383-
meterRegistry.registerDoubleGauge(
384-
prefix + THREAD_POOL_METRIC_NAME_UTILIZATION,
385-
"fraction of maximum thread time utilized for " + name,
386-
"fraction",
387-
() -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at)
388-
)
389-
);
382+
instruments.addAll(timeTrackingExecutor.setupMetrics(meterRegistry, name));
390383
}
391384
}
392385
return instruments;

server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,27 @@
99

1010
package org.elasticsearch.common.util.concurrent;
1111

12+
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
15+
import org.elasticsearch.telemetry.InstrumentType;
16+
import org.elasticsearch.telemetry.Measurement;
17+
import org.elasticsearch.telemetry.RecordingMeterRegistry;
1418
import org.elasticsearch.test.ESTestCase;
19+
import org.elasticsearch.threadpool.ThreadPool;
1520

21+
import java.util.List;
1622
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.CyclicBarrier;
24+
import java.util.concurrent.Future;
1725
import java.util.concurrent.TimeUnit;
1826
import java.util.function.Function;
1927

2028
import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA;
2129
import static org.hamcrest.Matchers.equalTo;
2230
import static org.hamcrest.Matchers.greaterThan;
2331
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
32+
import static org.hamcrest.Matchers.hasSize;
2433

2534
/**
2635
* Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests}
@@ -147,6 +156,85 @@ public void testGetOngoingTasks() throws Exception {
147156
executor.awaitTermination(10, TimeUnit.SECONDS);
148157
}
149158

159+
public void testQueueLatencyMetrics() {
160+
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
161+
final var threadPoolName = randomIdentifier();
162+
var executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
163+
threadPoolName,
164+
1,
165+
1,
166+
1000,
167+
TimeUnit.MILLISECONDS,
168+
ConcurrentCollections.newBlockingQueue(),
169+
TimedRunnable::new,
170+
EsExecutors.daemonThreadFactory("queuetest"),
171+
new EsAbortPolicy(),
172+
new ThreadContext(Settings.EMPTY),
173+
new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA)
174+
);
175+
executor.setupMetrics(meterRegistry, threadPoolName);
176+
177+
try {
178+
final var barrier = new CyclicBarrier(2);
179+
final ExponentialBucketHistogram expectedHistogram = new ExponentialBucketHistogram(
180+
TaskExecutionTimeTrackingEsThreadPoolExecutor.QUEUE_LATENCY_HISTOGRAM_BUCKETS
181+
);
182+
183+
/*
184+
* The thread pool has a single thread, so we submit a task that will occupy that thread
185+
* and cause subsequent tasks to be queued
186+
*/
187+
Future<?> runningTask = executor.submit(() -> {
188+
safeAwait(barrier);
189+
safeAwait(barrier);
190+
});
191+
safeAwait(barrier); // wait till the first task starts
192+
expectedHistogram.addObservation(0L); // the first task should not be delayed
193+
194+
/*
195+
* On each iteration we submit a task - which will be queued because of the
196+
* currently running task, pause for some random interval, then unblock the
197+
* new task by releasing the currently running task. This gives us a lower
198+
* bound for the real delays (the real delays will be greater than or equal
199+
* to the synthetic delays we add, i.e. each percentile should be >= our
200+
* expected values)
201+
*/
202+
for (int i = 0; i < 10; i++) {
203+
Future<?> waitingTask = executor.submit(() -> {
204+
safeAwait(barrier);
205+
safeAwait(barrier);
206+
});
207+
final long delayTimeMs = randomLongBetween(1, 50);
208+
safeSleep(delayTimeMs);
209+
safeAwait(barrier); // let the running task complete
210+
safeAwait(barrier); // wait for the next task to start
211+
safeGet(runningTask); // ensure previous task is complete
212+
expectedHistogram.addObservation(delayTimeMs);
213+
runningTask = waitingTask;
214+
}
215+
safeAwait(barrier); // let the last task finish
216+
safeGet(runningTask);
217+
meterRegistry.getRecorder().collect();
218+
219+
List<Measurement> measurements = meterRegistry.getRecorder()
220+
.getMeasurements(
221+
InstrumentType.LONG_GAUGE,
222+
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME
223+
);
224+
assertThat(measurements, hasSize(3));
225+
// we have to use greater than or equal to because the actual delay might be higher than what we imposed
226+
assertThat(getPercentile(measurements, "99"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.99f)));
227+
assertThat(getPercentile(measurements, "90"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.9f)));
228+
assertThat(getPercentile(measurements, "50"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.5f)));
229+
} finally {
230+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
231+
}
232+
}
233+
234+
private long getPercentile(List<Measurement> measurements, String percentile) {
235+
return measurements.stream().filter(m -> m.attributes().get("percentile").equals(percentile)).findFirst().orElseThrow().getLong();
236+
}
237+
150238
/**
151239
* The returned function outputs a WrappedRunnabled that simulates the case
152240
* where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}.

0 commit comments

Comments
 (0)