Skip to content

Commit 93b16dc

Browse files
Peek at queued task's latency in a thread pool (#131329)
Relates ES-12233
1 parent f9b4337 commit 93b16dc

File tree

3 files changed

+130
-13
lines changed

3 files changed

+130
-13
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.LinkedTransferQueue;
2728
import java.util.concurrent.RejectedExecutionHandler;
2829
import java.util.concurrent.ThreadFactory;
2930
import java.util.concurrent.TimeUnit;
@@ -146,13 +147,42 @@ public int getCurrentQueueSize() {
146147
return getQueue().size();
147148
}
148149

150+
/**
151+
* Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero.
152+
* Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max
153+
* latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use
154+
* {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned.
155+
*/
149156
public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
150157
if (trackMaxQueueLatency == false) {
151158
return 0;
152159
}
153160
return maxQueueLatencyMillisSinceLastPoll.getThenReset();
154161
}
155162

163+
/**
164+
* Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the
165+
* queue and calculates how long it has been there. Returns zero if there is no queue.
166+
*/
167+
public long peekMaxQueueLatencyInQueue() {
168+
if (trackMaxQueueLatency == false) {
169+
return 0;
170+
}
171+
var queue = getQueue();
172+
if (queue.isEmpty()) {
173+
return 0;
174+
}
175+
assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
176+
var linkedTransferQueue = (LinkedTransferQueue) queue;
177+
178+
var task = linkedTransferQueue.peek();
179+
assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
180+
var wrappedTask = ((WrappedRunnable) task).unwrap();
181+
assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
182+
var timedTask = (TimedRunnable) wrappedTask;
183+
return timedTask.getTimeSinceCreationNanos();
184+
}
185+
156186
/**
157187
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
158188
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the

server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ long getQueueTimeNanos() {
7272
return beforeExecuteTime - creationTimeNanos;
7373
}
7474

75+
/**
76+
* Returns the time in nanoseconds since this task was created.
77+
*/
78+
long getTimeSinceCreationNanos() {
79+
return System.nanoTime() - creationTimeNanos;
80+
}
81+
7582
/**
7683
* Return the time this task spent being run.
7784
* If the task is still running or has not yet been run, returns -1.

server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.test.ESTestCase;
1818
import org.elasticsearch.threadpool.ThreadPool;
1919

20+
import java.time.Duration;
2021
import java.util.List;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.CyclicBarrier;
@@ -89,18 +90,90 @@ public void testExecutionEWMACalculation() throws Exception {
8990
assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L));
9091
});
9192
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
92-
executor.shutdown();
93-
executor.awaitTermination(10, TimeUnit.SECONDS);
93+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
9494
}
9595

96-
public void testMaxQueueLatency() throws Exception {
96+
/**
97+
* Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued.
98+
* Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}.
99+
*/
100+
public void testFrontOfQueueLatency() throws Exception {
97101
ThreadContext context = new ThreadContext(Settings.EMPTY);
98-
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
99-
final var threadPoolName = randomIdentifier();
100102
final var barrier = new CyclicBarrier(2);
103+
// Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and
104+
// waiting for task execution to begin via the supplied barrier.
101105
var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable(
102106
barrier,
103-
TimeUnit.NANOSECONDS.toNanos(1000000)
107+
// This won't actually be used, because it is reported after a task is taken off the queue. This test peeks at the still queued
108+
// tasks.
109+
TimeUnit.MILLISECONDS.toNanos(1)
110+
);
111+
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
112+
"test-threadpool",
113+
1,
114+
1,
115+
1_000,
116+
TimeUnit.MILLISECONDS,
117+
ConcurrentCollections.newBlockingQueue(),
118+
(runnable) -> adjustableTimedRunnable,
119+
EsExecutors.daemonThreadFactory("queue-latency-test"),
120+
new EsAbortPolicy(),
121+
context,
122+
randomBoolean()
123+
? EsExecutors.TaskTrackingConfig.builder()
124+
.trackOngoingTasks()
125+
.trackMaxQueueLatency()
126+
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
127+
.build()
128+
: EsExecutors.TaskTrackingConfig.builder()
129+
.trackMaxQueueLatency()
130+
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
131+
.build()
132+
);
133+
try {
134+
executor.prestartAllCoreThreads();
135+
logger.info("--> executor: {}", executor);
136+
137+
// Check that the peeking at a non-existence queue returns zero.
138+
assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue());
139+
140+
// Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has
141+
// one thread) and can be peeked at.
142+
executor.execute(() -> {});
143+
executor.execute(() -> {});
144+
145+
waitForTimeToElapse();
146+
var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
147+
assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L));
148+
waitForTimeToElapse();
149+
var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
150+
assertThat(
151+
"Expected a second peek to report a longer duration",
152+
updatedFrontOfQueueDuration,
153+
greaterThan(frontOfQueueDuration)
154+
);
155+
156+
// Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty.
157+
safeAwait(barrier);
158+
safeAwait(barrier);
159+
assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue());
160+
} finally {
161+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
162+
}
163+
}
164+
165+
/**
166+
* Verifies that tracking of the max queue latency (captured on task dequeue) is maintained.
167+
* Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()}.
168+
*/
169+
public void testMaxDequeuedQueueLatency() throws Exception {
170+
ThreadContext context = new ThreadContext(Settings.EMPTY);
171+
final var barrier = new CyclicBarrier(2);
172+
// Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and
173+
// waiting for task execution to begin via the supplied barrier.
174+
var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable(
175+
barrier,
176+
TimeUnit.NANOSECONDS.toNanos(1000000) // Until changed, queue latencies will always be 1 millisecond.
104177
);
105178
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
106179
"test-threadpool",
@@ -145,9 +218,7 @@ public void testMaxQueueLatency() throws Exception {
145218
assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset());
146219
assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset());
147220
} finally {
148-
// Clean up.
149-
executor.shutdown();
150-
executor.awaitTermination(10, TimeUnit.SECONDS);
221+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
151222
}
152223
}
153224

@@ -184,8 +255,7 @@ public void testExceptionThrowingTask() throws Exception {
184255
assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L));
185256
assertThat(executor.getActiveCount(), equalTo(0));
186257
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
187-
executor.shutdown();
188-
executor.awaitTermination(10, TimeUnit.SECONDS);
258+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
189259
}
190260

191261
public void testGetOngoingTasks() throws Exception {
@@ -222,8 +292,7 @@ public void testGetOngoingTasks() throws Exception {
222292
exitTaskLatch.countDown();
223293
assertBusy(() -> assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)));
224294
assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L));
225-
executor.shutdown();
226-
executor.awaitTermination(10, TimeUnit.SECONDS);
295+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
227296
}
228297

229298
public void testQueueLatencyHistogramMetrics() {
@@ -387,4 +456,15 @@ long getQueueTimeNanos() {
387456
return queuedTimeTakenNanos;
388457
}
389458
}
459+
460+
/**
461+
* Ensures that the time reported by {@code System.nanoTime()} has advanced. It is otherwise feasible for the clock to report no time
462+
* passing between operations. Call this method if time passing must be guaranteed.
463+
*/
464+
private static void waitForTimeToElapse() throws InterruptedException {
465+
final var startNanoTime = System.nanoTime();
466+
while ((System.nanoTime() - startNanoTime) < 1) {
467+
Thread.sleep(Duration.ofNanos(1));
468+
}
469+
}
390470
}

0 commit comments

Comments
 (0)