-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Peek at queued task's latency in a thread pool #131329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
6b8854f
a14d8c2
e3d7d0e
20641af
99a0637
44e5f18
c61e055
f1e9d12
7b29402
c0f8604
4d030c9
e2f8e78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,18 +89,90 @@ public void testExecutionEWMACalculation() throws Exception { | |
| assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L)); | ||
| }); | ||
| assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); | ||
| executor.shutdown(); | ||
| executor.awaitTermination(10, TimeUnit.SECONDS); | ||
| ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| public void testMaxQueueLatency() throws Exception { | ||
| /** | ||
| * Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued. | ||
| * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}. | ||
| */ | ||
| public void testFrontOfQueueLatency() throws Exception { | ||
| ThreadContext context = new ThreadContext(Settings.EMPTY); | ||
| RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); | ||
| final var threadPoolName = randomIdentifier(); | ||
| final var barrier = new CyclicBarrier(2); | ||
| // Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and | ||
| // waiting for task execution to begin via the supplied barrier. | ||
| var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable( | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| barrier, | ||
| TimeUnit.NANOSECONDS.toNanos(1000000) | ||
| // This won't actually be used, because it is reported after a task is taken off the queue. This test peeks at the still queued | ||
| // tasks. | ||
| TimeUnit.MILLISECONDS.toNanos(1) | ||
| ); | ||
| TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( | ||
| "test-threadpool", | ||
| 1, | ||
| 1, | ||
| 1_000, | ||
| TimeUnit.MILLISECONDS, | ||
| ConcurrentCollections.newBlockingQueue(), | ||
| (runnable) -> adjustableTimedRunnable, | ||
| EsExecutors.daemonThreadFactory("queue-latency-test"), | ||
| new EsAbortPolicy(), | ||
| context, | ||
| randomBoolean() | ||
| ? EsExecutors.TaskTrackingConfig.builder() | ||
| .trackOngoingTasks() | ||
| .trackMaxQueueLatency() | ||
| .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) | ||
| .build() | ||
| : EsExecutors.TaskTrackingConfig.builder() | ||
| .trackMaxQueueLatency() | ||
| .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) | ||
| .build() | ||
| ); | ||
| try { | ||
| executor.prestartAllCoreThreads(); | ||
| logger.info("--> executor: {}", executor); | ||
|
|
||
| // Check that the peeking at a non-existence queue returns zero. | ||
| assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue()); | ||
|
|
||
| // Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has | ||
| // one thread) and can be peeked at. | ||
| executor.execute(() -> {}); | ||
| executor.execute(() -> {}); | ||
|
|
||
| waitForTimeToElapse(); | ||
| var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); | ||
| assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is assuming that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made this an assertBusy, since there's no problem peeking at the front of the queue repeatedly until we see something. |
||
| waitForTimeToElapse(); | ||
| var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); | ||
| assertThat( | ||
| "Expected a second peek to report a longer duration", | ||
| updatedFrontOfQueueDuration, | ||
| greaterThan(frontOfQueueDuration) | ||
| ); | ||
|
|
||
| // Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty. | ||
| safeAwait(barrier); | ||
| safeAwait(barrier); | ||
| assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue()); | ||
| } finally { | ||
| ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Verifies that tracking of the max queue latency (captured on task dequeue) is maintained. | ||
| * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()}. | ||
| */ | ||
| public void testMaxDequeuedQueueLatency() throws Exception { | ||
| ThreadContext context = new ThreadContext(Settings.EMPTY); | ||
| final var barrier = new CyclicBarrier(2); | ||
| // Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and | ||
| // waiting for task execution to begin via the supplied barrier. | ||
| var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable( | ||
| barrier, | ||
| TimeUnit.NANOSECONDS.toNanos(1000000) // Until changed, queue latencies will always be 1 millisecond. | ||
| ); | ||
| TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( | ||
| "test-threadpool", | ||
|
|
@@ -145,9 +217,7 @@ public void testMaxQueueLatency() throws Exception { | |
| assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); | ||
| assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); | ||
| } finally { | ||
| // Clean up. | ||
| executor.shutdown(); | ||
| executor.awaitTermination(10, TimeUnit.SECONDS); | ||
| ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -184,8 +254,7 @@ public void testExceptionThrowingTask() throws Exception { | |
| assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L)); | ||
| assertThat(executor.getActiveCount(), equalTo(0)); | ||
| assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); | ||
| executor.shutdown(); | ||
| executor.awaitTermination(10, TimeUnit.SECONDS); | ||
| ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| public void testGetOngoingTasks() throws Exception { | ||
|
|
@@ -222,8 +291,7 @@ public void testGetOngoingTasks() throws Exception { | |
| exitTaskLatch.countDown(); | ||
| assertBusy(() -> assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0))); | ||
| assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L)); | ||
| executor.shutdown(); | ||
| executor.awaitTermination(10, TimeUnit.SECONDS); | ||
| ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| public void testQueueLatencyHistogramMetrics() { | ||
|
|
@@ -387,4 +455,15 @@ long getQueueTimeNanos() { | |
| return queuedTimeTakenNanos; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Ensures that the time reported by {@code System.nanoTime()} has advanced. It is otherwise feasible for the clock to report no time | ||
| * passing between operations. Call this method if time passing must be guaranteed. | ||
| */ | ||
| private static void waitForTimeToElapse() throws InterruptedException { | ||
| final var startNanoTime = System.nanoTime(); | ||
| while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) { | ||
| Thread.sleep(100); | ||
|
||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of these were unused, so I took the opportunity to delete them. Does mess up the diff, a bit, though.