Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -146,13 +147,42 @@ public int getCurrentQueueSize() {
return getQueue().size();
}

/**
* Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero.
* Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max
* latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use
* {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned.
*/
public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
if (trackMaxQueueLatency == false) {
return 0;
}
return maxQueueLatencyMillisSinceLastPoll.getThenReset();
}

/**
* Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the
* queue and calculates how long it has been there. Returns zero if there is no queue.
*/
public long peekMaxQueueLatencyInQueue() {
if (trackMaxQueueLatency == false) {
return 0;
}
var queue = getQueue();
if (queue.isEmpty()) {
return 0;
}
assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
var linkedTransferQueue = (LinkedTransferQueue) queue;

var task = linkedTransferQueue.peek();
assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
var wrappedTask = ((WrappedRunnable) task).unwrap();
assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
var timedTask = (TimedRunnable) wrappedTask;
return timedTask.getTimeSinceCreationNanos();
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ long getQueueTimeNanos() {
return beforeExecuteTime - creationTimeNanos;
}

/**
* Returns the time in nanoseconds since this task was created.
*/
long getTimeSinceCreationNanos() {
return System.nanoTime() - creationTimeNanos;
}

/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,90 @@ public void testExecutionEWMACalculation() throws Exception {
assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L));
});
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

public void testMaxQueueLatency() throws Exception {
/**
* Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued.
* Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}.
*/
public void testFrontOfQueueLatency() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
final var threadPoolName = randomIdentifier();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these were unused, so I took the opportunity to delete them. Does mess up the diff, a bit, though.

final var barrier = new CyclicBarrier(2);
// Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and
// waiting for task execution to begin via the supplied barrier.
var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable(
barrier,
TimeUnit.NANOSECONDS.toNanos(1000000)
// This won't actually be used, because it is reported after a task is taken off the queue. This test peeks at the still queued
// tasks.
TimeUnit.MILLISECONDS.toNanos(1)
);
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
"test-threadpool",
1,
1,
1_000,
TimeUnit.MILLISECONDS,
ConcurrentCollections.newBlockingQueue(),
(runnable) -> adjustableTimedRunnable,
EsExecutors.daemonThreadFactory("queue-latency-test"),
new EsAbortPolicy(),
context,
randomBoolean()
? EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackMaxQueueLatency()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
: EsExecutors.TaskTrackingConfig.builder()
.trackMaxQueueLatency()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
);
try {
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);

// Check that the peeking at a non-existence queue returns zero.
assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue());

// Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has
// one thread) and can be peeked at.
executor.execute(() -> {});
executor.execute(() -> {});

waitForTimeToElapse();
var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is assuming that System.nanoTime() advances by at least 1ns in between the second call to executor.execute() and the call to peekMaxQueueLatencyInQueue() and that's not a safe assumption, the clock ticks can be coarse enough to see the same time in both places. It needs us to sleep in a loop until we ourselves see the nanoTime() advance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made this an assertBusy, since there's no problem peeking at the front of the queue repeatedly until we see something.

waitForTimeToElapse();
var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
assertThat(
"Expected a second peek to report a longer duration",
updatedFrontOfQueueDuration,
greaterThan(frontOfQueueDuration)
);

// Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty.
safeAwait(barrier);
safeAwait(barrier);
assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue());
} finally {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}

/**
* Verifies that tracking of the max queue latency (captured on task dequeue) is maintained.
* Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()}.
*/
public void testMaxDequeuedQueueLatency() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
final var barrier = new CyclicBarrier(2);
// Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and
// waiting for task execution to begin via the supplied barrier.
var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable(
barrier,
TimeUnit.NANOSECONDS.toNanos(1000000) // Until changed, queue latencies will always be 1 millisecond.
);
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
"test-threadpool",
Expand Down Expand Up @@ -145,9 +217,7 @@ public void testMaxQueueLatency() throws Exception {
assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset());
assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset());
} finally {
// Clean up.
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -184,8 +254,7 @@ public void testExceptionThrowingTask() throws Exception {
assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L));
assertThat(executor.getActiveCount(), equalTo(0));
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

public void testGetOngoingTasks() throws Exception {
Expand Down Expand Up @@ -222,8 +291,7 @@ public void testGetOngoingTasks() throws Exception {
exitTaskLatch.countDown();
assertBusy(() -> assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)));
assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

public void testQueueLatencyHistogramMetrics() {
Expand Down Expand Up @@ -387,4 +455,15 @@ long getQueueTimeNanos() {
return queuedTimeTakenNanos;
}
}

/**
* Ensures that the time reported by {@code System.nanoTime()} has advanced. It is otherwise feasible for the clock to report no time
* passing between operations. Call this method if time passing must be guaranteed.
*/
private static void waitForTimeToElapse() throws InterruptedException {
final var startNanoTime = System.nanoTime();
while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) {
Thread.sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test will be fine if the clock advances at all, even by 1ns - no need to keep retrying until it hits 100,000,000ns :)

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to an itty-bitty single nano :) Not sure I'll ever write such a small number again. e2f8e78

}
}
}