diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index b9f2a5eb79f22..fa81ee40cb76d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -234,19 +234,29 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu } } - public void testWriteThreadpoolEwmaAlphaSetting() { + public void testWriteThreadpoolsEwmaAlphaSetting() { Settings settings = Settings.EMPTY; - var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; + var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; if (randomBoolean()) { - ewmaAlpha = randomDoubleBetween(0.0, 1.0, true); - settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), ewmaAlpha).build(); + executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); + settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha).build(); } var nodeName = internalCluster().startNode(settings); var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); + + // Verify that the write thread pools all use the tracking executor. for (var name : List.of(ThreadPool.Names.WRITE, ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) { assertThat(threadPool.executor(name), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); final var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(name); - assertThat(Double.compare(executor.getEwmaAlpha(), ewmaAlpha), CoreMatchers.equalTo(0)); + assertThat(Double.compare(executor.getExecutionEwmaAlpha(), executionEwmaAlpha), CoreMatchers.equalTo(0)); + + // Only the WRITE thread pool should enable further tracking. + if (name.equals(ThreadPool.Names.WRITE) == false) { + assertFalse(executor.trackingMaxQueueLatency()); + } else { + // Verify that the WRITE thread pool has extra tracking enabled. + assertTrue(executor.trackingMaxQueueLatency()); + } } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 4fd5225a29167..ca8447bfc47e1 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -577,24 +577,53 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } public static class TaskTrackingConfig { - // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable - public static final double DEFAULT_EWMA_ALPHA = 0.3; + public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; private final boolean trackExecutionTime; private final boolean trackOngoingTasks; - private final double ewmaAlpha; + private final boolean trackMaxQueueLatency; + private final double executionTimeEwmaAlpha; + + public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( + false, + false, + false, + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST + ); + public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( + true, + false, + false, + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST + ); - public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA); - public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA); + public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) { + this(true, trackOngoingTasks, false, executionTimeEWMAAlpha); + } - public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) { - this(true, trackOngoingTasks, ewmaAlpha); + /** + * Execution tracking enabled constructor, with extra options to enable further specialized tracking. + */ + public TaskTrackingConfig(boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha) { + this(true, trackOngoingTasks, trackMaxQueueLatency, executionTimeEwmaAlpha); } - private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) { + /** + * @param trackExecutionTime Whether to track execution stats + * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks + * @param trackMaxQueueLatency Whether to track max queue latency. + * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). + */ + private TaskTrackingConfig( + boolean trackExecutionTime, + boolean trackOngoingTasks, + boolean trackMaxQueueLatency, + double executionTimeEWMAAlpha + ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; - this.ewmaAlpha = EWMAAlpha; + this.trackMaxQueueLatency = trackMaxQueueLatency; + this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; } public boolean trackExecutionTime() { @@ -605,8 +634,12 @@ public boolean trackOngoingTasks() { return trackOngoingTasks; } - public double getEwmaAlpha() { - return ewmaAlpha; + public boolean trackMaxQueueLatency() { + return trackMaxQueueLatency; + } + + public double getExecutionTimeEwmaAlpha() { + return executionTimeEwmaAlpha; } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 2b1a5ff6e9c0c..3e2ad9ce5a091 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -13,6 +13,8 @@ import org.elasticsearch.common.metrics.ExponentialBucketHistogram; import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.Instrument; import org.elasticsearch.telemetry.metric.LongWithAttributes; @@ -27,6 +29,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -37,6 +40,7 @@ * An extension to thread pool executor, which tracks statistics for the task execution time. */ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { + private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingEsThreadPoolExecutor.class); public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18; private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 }; @@ -47,9 +51,17 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); - private volatile long lastPollTime = System.nanoTime(); - private volatile long lastTotalExecutionTime = 0; private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); + private final boolean trackMaxQueueLatency; + private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); + + public enum UtilizationTrackingPurpose { + APM, + ALLOCATION, + } + + private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); + private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -65,9 +77,11 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea TaskTrackingConfig trackingConfig ) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); + this.runnableWrapper = runnableWrapper; - this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); + this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -95,7 +109,7 @@ public List setupMetrics(MeterRegistry meterRegistry, String threadP ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, "fraction of maximum thread time utilized for " + threadPoolName, "fraction", - () -> new DoubleWithAttributes(pollUtilization(), Map.of()) + () -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of()) ) ); } @@ -136,24 +150,30 @@ public int getCurrentQueueSize() { return getQueue().size(); } + public long getMaxQueueLatencyMillisSinceLastPollAndReset() { + if (trackMaxQueueLatency == false) { + return 0; + } + return maxQueueLatencyMillisSinceLastPoll.getThenReset(); + } + /** - * Returns the fraction of the maximum possible thread time that was actually used since the last time - * this method was called. + * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. + * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the + * caller. * - * @return the utilization as a fraction, in the range [0, 1] + * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started + * earlier, contributing a larger execution time. */ - public double pollUtilization() { - final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); - final long currentPollTimeNanos = System.nanoTime(); - - final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; - final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; - final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); - final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; - - lastTotalExecutionTime = currentTotalExecutionTimeNanos; - lastPollTime = currentPollTimeNanos; - return utilizationSinceLastPoll; + public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) { + switch (utilizationTrackingPurpose) { + case APM: + return apmUtilizationTracker.pollUtilization(); + case ALLOCATION: + return allocationUtilizationTracker.pollUtilization(); + default: + throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]"); + } } @Override @@ -161,12 +181,18 @@ protected void beforeExecute(Thread t, Runnable r) { if (trackOngoingTasks) { ongoingTasks.put(r, System.nanoTime()); } + assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); timedRunnable.beforeExecute(); final long taskQueueLatency = timedRunnable.getQueueTimeNanos(); assert taskQueueLatency >= 0; - queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency)); + var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); + queueLatencyMillisHistogram.addObservation(queueLatencyMillis); + + if (trackMaxQueueLatency) { + maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis); + } } @Override @@ -222,7 +248,39 @@ public Map getOngoingTasks() { } // Used for testing - public double getEwmaAlpha() { + public double getExecutionEwmaAlpha() { return executionEWMA.getAlpha(); } + + // Used for testing + public boolean trackingMaxQueueLatency() { + return trackMaxQueueLatency; + } + + /** + * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization + * since the last poll can be calculated for the next polling request. + * + * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. + */ + private class UtilizationTracker { + long lastPollTime = System.nanoTime(); + long lastTotalExecutionTime = 0; + + public synchronized double pollUtilization() { + final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); + final long currentPollTimeNanos = System.nanoTime(); + + final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; + final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; + + final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); + final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; + + lastTotalExecutionTime = currentTotalExecutionTimeNanos; + lastPollTime = currentPollTimeNanos; + + return utilizationSinceLastPoll; + } + } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 7b69e3a164d5b..63f0257cb8aba 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -56,7 +56,7 @@ public Map getBuilders(Settings settings, int allocated allocatedProcessors, // 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores. Math.max(allocatedProcessors * 750, 10000), - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA) + new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA) ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 7f720721aebf2..d3ae1165a3aee 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA; +import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -51,7 +51,7 @@ public void testExecutionEWMACalculation() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -89,6 +89,56 @@ public void testExecutionEWMACalculation() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } + public void testMaxQueueLatency() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); + final var threadPoolName = randomIdentifier(); + final var barrier = new CyclicBarrier(2); + var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable( + barrier, + TimeUnit.NANOSECONDS.toNanos(1000000) + ); + TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( + "test-threadpool", + 1, + 1, + 1000, + TimeUnit.MILLISECONDS, + ConcurrentCollections.newBlockingQueue(), + (runnable) -> adjustableTimedRunnable, + EsExecutors.daemonThreadFactory("queue-latency-test"), + new EsAbortPolicy(), + context, + new TaskTrackingConfig(randomBoolean(), true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + ); + try { + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Check that the max is zero initially and after a reset. + assertEquals("The queue latency should be initialized zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + executor.execute(() -> {}); + safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. + assertEquals("Ran one task of 1ms, should be the max", 1, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + + // Check that the max is kept across multiple calls, where the last is not the max. + adjustableTimedRunnable.setQueuedTimeTakenNanos(5000000); + executeTask(executor, 1); + safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. + adjustableTimedRunnable.setQueuedTimeTakenNanos(1000000); + executeTask(executor, 1); + safeAwait(barrier); + assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + } finally { + // Clean up. + assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + } + /** Use a runnable wrapper that simulates a task with unknown failures. */ public void testExceptionThrowingTask() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); @@ -103,7 +153,7 @@ public void testExceptionThrowingTask() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -135,7 +185,7 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -156,7 +206,7 @@ public void testGetOngoingTasks() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } - public void testQueueLatencyMetrics() { + public void testQueueLatencyHistogramMetrics() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final var threadPoolName = randomIdentifier(); var executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( @@ -170,7 +220,7 @@ public void testQueueLatencyMetrics() { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), new ThreadContext(Settings.EMPTY), - new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.setupMetrics(meterRegistry, threadPoolName); @@ -261,18 +311,18 @@ private void executeTask(TaskExecutionTimeTrackingEsThreadPoolExecutor executor, } public class SettableTimedRunnable extends TimedRunnable { - private final long timeTaken; + private final long executionTimeTakenNanos; private final boolean testFailedOrRejected; - public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) { + public SettableTimedRunnable(long executionTimeTakenNanos, boolean failedOrRejected) { super(() -> {}); - this.timeTaken = timeTaken; + this.executionTimeTakenNanos = executionTimeTakenNanos; this.testFailedOrRejected = failedOrRejected; } @Override public long getTotalExecutionNanos() { - return timeTaken; + return executionTimeTakenNanos; } @Override @@ -280,4 +330,38 @@ public boolean getFailedOrRejected() { return testFailedOrRejected; } } + + /** + * This TimedRunnable override provides the following: + *
    + *
  • Overrides {@link TimedRunnable#getQueueTimeNanos()} so that arbitrary queue latencies can be set for the thread pool.
  • + *
  • Replaces any submitted Runnable task to the thread pool with a Runnable that only waits on a {@link CyclicBarrier}.
  • + *
+ * This allows dynamically manipulating the queue time with {@link #setQueuedTimeTakenNanos}, and provides a means of waiting for a task + * to start by calling {@code safeAwait(barrier)} after submitting a task. + *

+ * Look at {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#wrapRunnable} for how the ThreadPool uses this as a wrapper around all + * submitted tasks. + */ + public class AdjustableQueueTimeWithExecutionBarrierTimedRunnable extends TimedRunnable { + private long queuedTimeTakenNanos; + + /** + * @param barrier A barrier that the caller can wait upon to ensure a task starts. + * @param queuedTimeTakenNanos The default queue time reported for all tasks. + */ + public AdjustableQueueTimeWithExecutionBarrierTimedRunnable(CyclicBarrier barrier, long queuedTimeTakenNanos) { + super(() -> { safeAwait(barrier); }); + this.queuedTimeTakenNanos = queuedTimeTakenNanos; + } + + public void setQueuedTimeTakenNanos(long timeTakenNanos) { + this.queuedTimeTakenNanos = timeTakenNanos; + } + + @Override + long getQueueTimeNanos() { + return queuedTimeTakenNanos; + } + } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index ad86c1159f426..2cd166e002637 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.InstrumentType; @@ -509,13 +510,17 @@ public void testDetailedUtilizationMetric() throws Exception { final long beforePreviousCollectNanos = System.nanoTime(); meterRegistry.getRecorder().collect(); + double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); final long afterPreviousCollectNanos = System.nanoTime(); - metricAsserter.assertLatestMetricValueMatches( + + var metricValue = metricAsserter.assertLatestMetricValueMatches( InstrumentType.DOUBLE_GAUGE, ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, Measurement::getDouble, equalTo(0.0d) ); + logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); + assertThat(allocationUtilization, equalTo(0.0d)); final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); final long beforeStartNanos = System.nanoTime(); @@ -535,6 +540,7 @@ public void testDetailedUtilizationMetric() throws Exception { final long beforeMetricsCollectedNanos = System.nanoTime(); meterRegistry.getRecorder().collect(); + allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); final long afterMetricsCollectedNanos = System.nanoTime(); // Calculate upper bound on utilisation metric @@ -549,12 +555,14 @@ public void testDetailedUtilizationMetric() throws Exception { logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); - metricAsserter.assertLatestMetricValueMatches( + metricValue = metricAsserter.assertLatestMetricValueMatches( InstrumentType.DOUBLE_GAUGE, ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, Measurement::getDouble, matcher ); + logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); + assertThat(allocationUtilization, matcher); } finally { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } @@ -665,7 +673,7 @@ void assertLatestLongValueMatches(String metricName, InstrumentType instrumentTy assertLatestMetricValueMatches(instrumentType, metricName, Measurement::getLong, matcher); } - void assertLatestMetricValueMatches( + T assertLatestMetricValueMatches( InstrumentType instrumentType, String name, Function valueExtractor, @@ -674,7 +682,9 @@ void assertLatestMetricValueMatches( List measurements = meterRegistry.getRecorder() .getMeasurements(instrumentType, ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + name); assertFalse(name + " has no measurements", measurements.isEmpty()); - assertThat(valueExtractor.apply(measurements.getLast()), matcher); + var latestMetric = valueExtractor.apply(measurements.getLast()); + assertThat(latestMetric, matcher); + return latestMetric; } }