diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 762a8c280b7f3..20f480af172bd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -146,6 +147,12 @@ public int getCurrentQueueSize() { return getQueue().size(); } + /** + * Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero. + * Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max + * latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use + * {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned. + */ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { if (trackMaxQueueLatency == false) { return 0; @@ -153,6 +160,29 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { return maxQueueLatencyMillisSinceLastPoll.getThenReset(); } + /** + * Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the + * queue and calculates how long it has been there. Returns zero if there is no queue. + */ + public long peekMaxQueueLatencyInQueue() { + if (trackMaxQueueLatency == false) { + return 0; + } + var queue = getQueue(); + if (queue.isEmpty()) { + return 0; + } + assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass(); + var linkedTransferQueue = (LinkedTransferQueue) queue; + + var task = linkedTransferQueue.peek(); + assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass(); + var wrappedTask = ((WrappedRunnable) task).unwrap(); + assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); + var timedTask = (TimedRunnable) wrappedTask; + return timedTask.getTimeSinceCreationNanos(); + } + /** * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java index de89ad0d8ea3f..a0a3940aebd6d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java @@ -72,6 +72,13 @@ long getQueueTimeNanos() { return beforeExecuteTime - creationTimeNanos; } + /** + * Returns the time in nanoseconds since this task was created. + */ + long getTimeSinceCreationNanos() { + return System.nanoTime() - creationTimeNanos; + } + /** * Return the time this task spent being run. * If the task is still running or has not yet been run, returns -1. diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 505c26409a702..b4b33d1265bcb 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -89,18 +90,90 @@ public void testExecutionEWMACalculation() throws Exception { assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L)); }); assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } - public void testMaxQueueLatency() throws Exception { + /** + * Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued. + * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}. + */ + public void testFrontOfQueueLatency() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); - RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); - final var threadPoolName = randomIdentifier(); final var barrier = new CyclicBarrier(2); + // Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and + // waiting for task execution to begin via the supplied barrier. var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable( barrier, - TimeUnit.NANOSECONDS.toNanos(1000000) + // This won't actually be used, because it is reported after a task is taken off the queue. This test peeks at the still queued + // tasks. + TimeUnit.MILLISECONDS.toNanos(1) + ); + TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( + "test-threadpool", + 1, + 1, + 1_000, + TimeUnit.MILLISECONDS, + ConcurrentCollections.newBlockingQueue(), + (runnable) -> adjustableTimedRunnable, + EsExecutors.daemonThreadFactory("queue-latency-test"), + new EsAbortPolicy(), + context, + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackMaxQueueLatency() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder() + .trackMaxQueueLatency() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + ); + try { + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Check that the peeking at a non-existence queue returns zero. + assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue()); + + // Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has + // one thread) and can be peeked at. + executor.execute(() -> {}); + executor.execute(() -> {}); + + waitForTimeToElapse(); + var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); + assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L)); + waitForTimeToElapse(); + var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); + assertThat( + "Expected a second peek to report a longer duration", + updatedFrontOfQueueDuration, + greaterThan(frontOfQueueDuration) + ); + + // Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty. + safeAwait(barrier); + safeAwait(barrier); + assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue()); + } finally { + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + } + + /** + * Verifies that tracking of the max queue latency (captured on task dequeue) is maintained. + * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()}. + */ + public void testMaxDequeuedQueueLatency() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + final var barrier = new CyclicBarrier(2); + // Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and + // waiting for task execution to begin via the supplied barrier. + var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable( + barrier, + TimeUnit.NANOSECONDS.toNanos(1000000) // Until changed, queue latencies will always be 1 millisecond. ); TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( "test-threadpool", @@ -145,9 +218,7 @@ public void testMaxQueueLatency() throws Exception { assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); } finally { - // Clean up. - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } } @@ -184,8 +255,7 @@ public void testExceptionThrowingTask() throws Exception { assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L)); assertThat(executor.getActiveCount(), equalTo(0)); assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } public void testGetOngoingTasks() throws Exception { @@ -222,8 +292,7 @@ public void testGetOngoingTasks() throws Exception { exitTaskLatch.countDown(); assertBusy(() -> assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0))); assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L)); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } public void testQueueLatencyHistogramMetrics() { @@ -387,4 +456,15 @@ long getQueueTimeNanos() { return queuedTimeTakenNanos; } } + + /** + * Ensures that the time reported by {@code System.nanoTime()} has advanced. It is otherwise feasible for the clock to report no time + * passing between operations. Call this method if time passing must be guaranteed. + */ + private static void waitForTimeToElapse() throws InterruptedException { + final var startNanoTime = System.nanoTime(); + while ((System.nanoTime() - startNanoTime) < 1) { + Thread.sleep(Duration.ofNanos(1)); + } + } }