Skip to content

Commit f96674c

Browse files
committed
Remove queue latency metric changes
1 parent 51191eb commit f96674c

File tree

5 files changed

+10
-163
lines changed

5 files changed

+10
-163
lines changed

docs/changelog/120488.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,9 @@
1010
package org.elasticsearch.common.util.concurrent;
1111

1212
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
13-
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
1413
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
1514
import org.elasticsearch.core.TimeValue;
16-
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
17-
import org.elasticsearch.telemetry.metric.Instrument;
18-
import org.elasticsearch.telemetry.metric.LongWithAttributes;
19-
import org.elasticsearch.telemetry.metric.MeterRegistry;
20-
import org.elasticsearch.threadpool.ThreadPool;
21-
22-
import java.util.Arrays;
23-
import java.util.List;
15+
2416
import java.util.Map;
2517
import java.util.concurrent.BlockingQueue;
2618
import java.util.concurrent.ConcurrentHashMap;
@@ -30,16 +22,11 @@
3022
import java.util.concurrent.atomic.LongAdder;
3123
import java.util.function.Function;
3224

33-
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME;
34-
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION;
35-
3625
/**
3726
* An extension to thread pool executor, which tracks statistics for the task execution time.
3827
*/
3928
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
4029

41-
private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };
42-
4330
private final Function<Runnable, WrappedRunnable> runnableWrapper;
4431
private final ExponentiallyWeightedMovingAverage executionEWMA;
4532
private final LongAdder totalExecutionTime = new LongAdder();
@@ -48,7 +35,6 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
4835
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
4936
private volatile long lastPollTime = System.nanoTime();
5037
private volatile long lastTotalExecutionTime = 0;
51-
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram();
5238

5339
TaskExecutionTimeTrackingEsThreadPoolExecutor(
5440
String name,
@@ -69,34 +55,6 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
6955
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
7056
}
7157

72-
public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
73-
return List.of(
74-
meterRegistry.registerLongsGauge(
75-
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_QUEUE_TIME,
76-
"Time tasks spent in the queue for the " + threadPoolName + " thread pool",
77-
"milliseconds",
78-
() -> {
79-
List<LongWithAttributes> metricValues = Arrays.stream(LATENCY_PERCENTILES_TO_REPORT)
80-
.mapToObj(
81-
percentile -> new LongWithAttributes(
82-
queueLatencyMillisHistogram.getPercentile(percentile / 100f),
83-
Map.of("percentile", String.valueOf(percentile))
84-
)
85-
)
86-
.toList();
87-
queueLatencyMillisHistogram.clear();
88-
return metricValues;
89-
}
90-
),
91-
meterRegistry.registerDoubleGauge(
92-
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
93-
"fraction of maximum thread time utilized for " + threadPoolName,
94-
"fraction",
95-
() -> new DoubleWithAttributes(pollUtilization(), Map.of())
96-
)
97-
);
98-
}
99-
10058
@Override
10159
protected Runnable wrapRunnable(Runnable command) {
10260
return super.wrapRunnable(this.runnableWrapper.apply(command));
@@ -158,12 +116,6 @@ protected void beforeExecute(Thread t, Runnable r) {
158116
if (trackOngoingTasks) {
159117
ongoingTasks.put(r, System.nanoTime());
160118
}
161-
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
162-
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
163-
timedRunnable.beforeExecute();
164-
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
165-
assert taskQueueLatency >= 0;
166-
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
167119
}
168120

169121
@Override

server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
1919
private final Runnable original;
2020
private final long creationTimeNanos;
21-
private long beforeExecuteTime = -1;
2221
private long startTimeNanos;
2322
private long finishTimeNanos = -1;
2423
private boolean failedOrRejected = false;
@@ -59,19 +58,6 @@ public boolean isForceExecution() {
5958
return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();
6059
}
6160

62-
/**
63-
* Returns the time in nanoseconds between the creation time and the execution time
64-
*
65-
* @return The time in nanoseconds or -1 if the task was never de-queued
66-
*/
67-
long getQueueTimeNanos() {
68-
if (beforeExecuteTime == -1) {
69-
assert false : "beforeExecute must be called before getQueueTimeNanos";
70-
return -1;
71-
}
72-
return beforeExecuteTime - creationTimeNanos;
73-
}
74-
7561
/**
7662
* Return the time this task spent being run.
7763
* If the task is still running or has not yet been run, returns -1.
@@ -84,13 +70,6 @@ long getTotalExecutionNanos() {
8470
return Math.max(finishTimeNanos - startTimeNanos, 1);
8571
}
8672

87-
/**
88-
* Called when the task has reached the front of the queue and is about to be executed
89-
*/
90-
public void beforeExecute() {
91-
beforeExecuteTime = System.nanoTime();
92-
}
93-
9473
/**
9574
* If the task was failed or rejected, return true.
9675
* Otherwise, false.

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.core.TimeValue;
3232
import org.elasticsearch.node.Node;
3333
import org.elasticsearch.node.ReportingService;
34+
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
3435
import org.elasticsearch.telemetry.metric.Instrument;
3536
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
3637
import org.elasticsearch.telemetry.metric.LongGauge;
@@ -153,7 +154,6 @@ public static class Names {
153154
public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current";
154155
public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current";
155156
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";
156-
public static final String THREAD_POOL_METRIC_NAME_QUEUE_TIME = ".threads.queue.latency.histogram";
157157

158158
public enum ThreadPoolType {
159159
FIXED("fixed"),
@@ -379,7 +379,14 @@ private static ArrayList<Instrument> setupMetrics(MeterRegistry meterRegistry, S
379379
}
380380

381381
if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) {
382-
instruments.addAll(timeTrackingExecutor.setupMetrics(meterRegistry, name));
382+
instruments.add(
383+
meterRegistry.registerDoubleGauge(
384+
prefix + THREAD_POOL_METRIC_NAME_UTILIZATION,
385+
"fraction of maximum thread time utilized for " + name,
386+
"fraction",
387+
() -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at)
388+
)
389+
);
383390
}
384391
}
385392
return instruments;

server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,18 @@
99

1010
package org.elasticsearch.common.util.concurrent;
1111

12-
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
1312
import org.elasticsearch.common.settings.Settings;
1413
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
15-
import org.elasticsearch.telemetry.InstrumentType;
16-
import org.elasticsearch.telemetry.Measurement;
17-
import org.elasticsearch.telemetry.RecordingMeterRegistry;
1814
import org.elasticsearch.test.ESTestCase;
19-
import org.elasticsearch.threadpool.ThreadPool;
2015

21-
import java.util.List;
2216
import java.util.concurrent.CountDownLatch;
23-
import java.util.concurrent.CyclicBarrier;
24-
import java.util.concurrent.Future;
2517
import java.util.concurrent.TimeUnit;
2618
import java.util.function.Function;
2719

2820
import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA;
2921
import static org.hamcrest.Matchers.equalTo;
3022
import static org.hamcrest.Matchers.greaterThan;
3123
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
32-
import static org.hamcrest.Matchers.hasSize;
3324

3425
/**
3526
* Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests}
@@ -156,83 +147,6 @@ public void testGetOngoingTasks() throws Exception {
156147
executor.awaitTermination(10, TimeUnit.SECONDS);
157148
}
158149

159-
public void testQueueLatencyMetrics() {
160-
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
161-
final var threadPoolName = randomIdentifier();
162-
var executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
163-
threadPoolName,
164-
1,
165-
1,
166-
1000,
167-
TimeUnit.MILLISECONDS,
168-
ConcurrentCollections.newBlockingQueue(),
169-
TimedRunnable::new,
170-
EsExecutors.daemonThreadFactory("queuetest"),
171-
new EsAbortPolicy(),
172-
new ThreadContext(Settings.EMPTY),
173-
new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA)
174-
);
175-
executor.setupMetrics(meterRegistry, threadPoolName);
176-
177-
try {
178-
final var barrier = new CyclicBarrier(2);
179-
final ExponentialBucketHistogram expectedHistogram = new ExponentialBucketHistogram();
180-
181-
/*
182-
* The thread pool has a single thread, so we submit a task that will occupy that thread
183-
* and cause subsequent tasks to be queued
184-
*/
185-
Future<?> runningTask = executor.submit(() -> {
186-
safeAwait(barrier);
187-
safeAwait(barrier);
188-
});
189-
safeAwait(barrier); // wait till the first task starts
190-
expectedHistogram.addObservation(0L); // the first task should not be delayed
191-
192-
/*
193-
* On each iteration we submit a task - which will be queued because of the
194-
* currently running task, pause for some random interval, then unblock the
195-
* new task by releasing the currently running task. This gives us a lower
196-
* bound for the real delays (the real delays will be greater than or equal
197-
* to the synthetic delays we add, i.e. each percentile should be >= our
198-
* expected values)
199-
*/
200-
for (int i = 0; i < 10; i++) {
201-
Future<?> waitingTask = executor.submit(() -> {
202-
safeAwait(barrier);
203-
safeAwait(barrier);
204-
});
205-
final long delayTimeMs = randomLongBetween(1, 50);
206-
safeSleep(delayTimeMs);
207-
safeAwait(barrier); // let the running task complete
208-
safeAwait(barrier); // wait for the next task to start
209-
safeGet(runningTask); // ensure previous task is complete
210-
expectedHistogram.addObservation(delayTimeMs);
211-
runningTask = waitingTask;
212-
}
213-
safeAwait(barrier); // let the last task finish
214-
safeGet(runningTask);
215-
meterRegistry.getRecorder().collect();
216-
217-
List<Measurement> measurements = meterRegistry.getRecorder()
218-
.getMeasurements(
219-
InstrumentType.LONG_GAUGE,
220-
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME
221-
);
222-
assertThat(measurements, hasSize(3));
223-
// we have to use greater than or equal to because the actual delay might be higher than what we imposed
224-
assertThat(getPercentile(measurements, "99"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.99f)));
225-
assertThat(getPercentile(measurements, "90"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.9f)));
226-
assertThat(getPercentile(measurements, "50"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.5f)));
227-
} finally {
228-
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
229-
}
230-
}
231-
232-
private long getPercentile(List<Measurement> measurements, String percentile) {
233-
return measurements.stream().filter(m -> m.attributes().get("percentile").equals(percentile)).findFirst().orElseThrow().getLong();
234-
}
235-
236150
/**
237151
* The returned function outputs a WrappedRunnabled that simulates the case
238152
* where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}.

0 commit comments

Comments
 (0)