Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -146,13 +147,42 @@ public int getCurrentQueueSize() {
return getQueue().size();
}

/**
* Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero.
* Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max
* latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use
* {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned.
*/
public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
if (trackMaxQueueLatency == false) {
return 0;
}
return maxQueueLatencyMillisSinceLastPoll.getThenReset();
}

/**
* Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the
* queue and calculates how long it has been there. Returns zero if there is no queue.
*/
public long peekMaxQueueLatencyInQueue() {
if (trackMaxQueueLatency == false) {
return 0;
}
var queue = getQueue();
if (queue.isEmpty()) {
return 0;
}
assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
var linkedTransferQueue = (LinkedTransferQueue) queue;

var task = linkedTransferQueue.peek();
assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
var wrappedTask = ((WrappedRunnable) task).unwrap();
assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
var timedTask = (TimedRunnable) wrappedTask;
return timedTask.getTimeSinceCreationNanos();
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ long getQueueTimeNanos() {
return beforeExecuteTime - creationTimeNanos;
}

/**
* Returns the time in nanoseconds since this task was created.
*/
long getTimeSinceCreationNanos() {
return System.nanoTime() - creationTimeNanos;
}

/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,95 @@ public void testExecutionEWMACalculation() throws Exception {
assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L));
});
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

public void testMaxQueueLatency() throws Exception {
/**
* Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued.
* Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}.
*/
public void testFrontOfQueueLatency() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
final var threadPoolName = randomIdentifier();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these were unused, so I took the opportunity to delete them. Does mess up the diff, a bit, though.

final var barrier = new CyclicBarrier(2);
// Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and
// waiting for task execution to begin via the supplied barrier.
var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable(
barrier,
TimeUnit.NANOSECONDS.toNanos(1000000)
// This won't actually be used, because it is reported after a task is taken off the queue. This test peeks at the still queued
// tasks.
TimeUnit.MILLISECONDS.toNanos(1)
);
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
"test-threadpool",
1,
1,
1_000,
TimeUnit.MILLISECONDS,
ConcurrentCollections.newBlockingQueue(),
(runnable) -> adjustableTimedRunnable,
EsExecutors.daemonThreadFactory("queue-latency-test"),
new EsAbortPolicy(),
context,
randomBoolean()
? EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackMaxQueueLatency()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
: EsExecutors.TaskTrackingConfig.builder()
.trackMaxQueueLatency()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
);
try {
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);

// Check that the peeking at a non-existence queue returns zero.
assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue());

// Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has
// one thread) and can be peeked at.
executor.execute(() -> {});
executor.execute(() -> {});

var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
assertBusy(
// Wrap this call in an assertBusy because it's feasible for the thread pool's clock to see no time pass.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather we were specific about what we're waiting for - it should be enough to check that time has passed, so we should fail the test if time has passed but we don't see anything in the queue for some reason.

(also this won't work anyway because you need to re-read frontOfQueueDuration if you're retrying)

() -> assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L))
);
safeSleep(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, the scheduler and/or clock might be coarse enough that no time passes here. We again need to sleep in a loop until we see time pass. See e.g. org.elasticsearch.cluster.service.ClusterServiceIT#waitForTimeToElapse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a very fancy method. Hmm, I could make a variation of that method for this file (other one is an IT test, not unit test). But how about I wrap these calls in an assertBusy and document the concern? I've gone ahead with that, let me know if you prefer a variation of ClusterServiceIT#waitForTimeToElapse instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd rather we waited just for time to pass. It doesn't need to do everything that waitForTimeToElapse does, this is designed to deal with the caching in ThreadPool::relativeTimeInMillis too (across many different ThreadPool instances) - we just need to check System.nanoTime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that is simpler than I was initially imagining. Updated in 7b29402, I think that's what you mean

var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
assertBusy(
// Again add an assertBusy to ensure time passes on the thread pool's clock and there are no races.
() -> assertThat(
"Expected a second peek to report a longer duration",
updatedFrontOfQueueDuration,
greaterThan(frontOfQueueDuration)
)
);

// Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty.
safeAwait(barrier);
safeAwait(barrier);
assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue());
} finally {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}

/**
* Verifies that tracking of the max queue latency (captured on task dequeue) is maintained.
* Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()}.
*/
public void testMaxDequeuedQueueLatency() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
final var barrier = new CyclicBarrier(2);
// Replace all tasks submitted to the thread pool with a configurable task that supports configuring queue latency durations and
// waiting for task execution to begin via the supplied barrier.
var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable(
barrier,
TimeUnit.NANOSECONDS.toNanos(1000000) // Until changed, queue latencies will always be 1 millisecond.
);
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
"test-threadpool",
Expand Down Expand Up @@ -145,9 +222,7 @@ public void testMaxQueueLatency() throws Exception {
assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset());
assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset());
} finally {
// Clean up.
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -184,8 +259,7 @@ public void testExceptionThrowingTask() throws Exception {
assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L));
assertThat(executor.getActiveCount(), equalTo(0));
assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

public void testGetOngoingTasks() throws Exception {
Expand Down Expand Up @@ -222,8 +296,7 @@ public void testGetOngoingTasks() throws Exception {
exitTaskLatch.countDown();
assertBusy(() -> assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)));
assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

public void testQueueLatencyHistogramMetrics() {
Expand Down