From 4037ccfe1626006660d33149008b0b195688e9ac Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 30 Jun 2025 17:47:04 -0700 Subject: [PATCH 01/10] Add allocation write load stats to write thread pool Instrument the WRITE thread pool to collect: 1) pool utilization EWMA 2) queue latency EWMA Relates ES-12233 --- .../threadpool/SimpleThreadPoolIT.java | 36 ++++- .../common/settings/ClusterSettings.java | 2 + .../common/util/concurrent/EsExecutors.java | 108 +++++++++++++-- ...utionTimeTrackingEsThreadPoolExecutor.java | 78 ++++++++++- .../DefaultBuiltInExecutorBuilders.java | 13 +- .../elasticsearch/threadpool/ThreadPool.java | 60 ++++++++ ...TimeTrackingEsThreadPoolExecutorTests.java | 87 ++++++++++-- .../threadpool/ThreadPoolTests.java | 131 +++++++++++------- 8 files changed, 436 insertions(+), 79 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index b9f2a5eb79f22..8355090c702de 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -40,7 +40,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; +import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; +import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; +import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -234,19 +238,41 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu } } - public void testWriteThreadpoolEwmaAlphaSetting() { + public void testWriteThreadpoolsEwmaAlphaSetting() { Settings settings = Settings.EMPTY; - var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; + var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; + var queueLatencyEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; + var threadUtilizationEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; if (randomBoolean()) { - ewmaAlpha = randomDoubleBetween(0.0, 1.0, true); - settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), ewmaAlpha).build(); + executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); + queueLatencyEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); + threadUtilizationEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); + settings = Settings.builder() + .put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha) + .put(WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.getKey(), queueLatencyEwmaAlpha) + .put(WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA.getKey(), threadUtilizationEwmaAlpha) + .build(); } var nodeName = internalCluster().startNode(settings); var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); + + // Verify that the write thread pools all use the tracking executor. for (var name : List.of(ThreadPool.Names.WRITE, ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) { assertThat(threadPool.executor(name), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); final var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(name); - assertThat(Double.compare(executor.getEwmaAlpha(), ewmaAlpha), CoreMatchers.equalTo(0)); + assertThat(Double.compare(executor.getExecutionEwmaAlpha(), executionEwmaAlpha), CoreMatchers.equalTo(0)); + + // Only the WRITE thread pool should enable further tracking. + if (name.equals(ThreadPool.Names.WRITE) == false) { + assertFalse(executor.trackingQueueLatencyEwma()); + assertFalse(executor.trackUtilizationEwma()); + } else { + // Verify that the WRITE thread pool has extra tracking enabled. + assertTrue(executor.trackingQueueLatencyEwma()); + assertTrue(executor.trackUtilizationEwma()); + assertThat(Double.compare(executor.getQueueLatencyEwmaAlpha(), queueLatencyEwmaAlpha), CoreMatchers.equalTo(0)); + assertThat(Double.compare(executor.getPoolUtilizationEwmaAlpha(), threadUtilizationEwmaAlpha), CoreMatchers.equalTo(0)); + } } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1fbc8993cc5aa..a9c932b5ab7d4 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -547,6 +547,8 @@ public void apply(Settings value, Settings current, Settings previous) { ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING, ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING, + ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA, + ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 4fd5225a29167..27029b7fac37a 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -577,24 +577,94 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } public static class TaskTrackingConfig { - // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable - public static final double DEFAULT_EWMA_ALPHA = 0.3; + public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; + public static final double DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST = 0.6; + public static final double DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST = 0.6; private final boolean trackExecutionTime; private final boolean trackOngoingTasks; - private final double ewmaAlpha; + private final boolean trackQueueLatencyEWMA; + private final boolean trackPoolUtilizationEWMA; + private final double executionTimeEwmaAlpha; + private final double queueLatencyEWMAAlpha; + private final double poolUtilizationEWMAAlpha; + + public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( + false, + false, + false, + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, + DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST, + DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + ); + public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( + true, + false, + false, + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, + DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST, + DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + ); - public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA); - public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA); + public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) { + this( + true, + trackOngoingTasks, + false, + false, + executionTimeEWMAAlpha, + DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST, + DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + ); + } - public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) { - this(true, trackOngoingTasks, ewmaAlpha); + /** + * Execution tracking enabled constructor, with extra options to enable further specialized tracking. + */ + public TaskTrackingConfig( + boolean trackOngoingTasks, + boolean trackQueueLatencyEWMA, + boolean trackPoolUtilizationEWMA, + double executionTimeEWMAAlpha, + double queueLatencyEWMAAlpha, + double poolUtilizationEWMAAlpha + ) { + this( + true, + trackOngoingTasks, + trackQueueLatencyEWMA, + trackPoolUtilizationEWMA, + executionTimeEWMAAlpha, + queueLatencyEWMAAlpha, + poolUtilizationEWMAAlpha + ); } - private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) { + /** + * @param trackExecutionTime Whether to track execution stats + * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks + * @param trackQueueLatencyEWMA Whether to track queue latency {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} + * @param trackPoolUtilizationEWMA Whether to track the EWMA for thread pool thread utilization (percent use). + * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). + * @param queueLatencyEWMAAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage). + * @param poolUtilizationEWMAAlpha The alpha seed for pool utilization EWMA (ExponentiallyWeightedMovingAverage). + */ + private TaskTrackingConfig( + boolean trackExecutionTime, + boolean trackOngoingTasks, + boolean trackQueueLatencyEWMA, + boolean trackPoolUtilizationEWMA, + double executionTimeEWMAAlpha, + double queueLatencyEWMAAlpha, + double poolUtilizationEWMAAlpha + ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; - this.ewmaAlpha = EWMAAlpha; + this.trackQueueLatencyEWMA = trackQueueLatencyEWMA; + this.trackPoolUtilizationEWMA = trackPoolUtilizationEWMA; + this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; + this.queueLatencyEWMAAlpha = queueLatencyEWMAAlpha; + this.poolUtilizationEWMAAlpha = poolUtilizationEWMAAlpha; } public boolean trackExecutionTime() { @@ -605,8 +675,24 @@ public boolean trackOngoingTasks() { return trackOngoingTasks; } - public double getEwmaAlpha() { - return ewmaAlpha; + public boolean trackQueueLatencyEWMA() { + return trackQueueLatencyEWMA; + } + + public boolean trackPoolUtilizationEWMA() { + return trackPoolUtilizationEWMA; + } + + public double getExecutionTimeEwmaAlpha() { + return executionTimeEwmaAlpha; + } + + public double getQueueLatencyEwmaAlpha() { + return queueLatencyEWMAAlpha; + } + + public double getPoolUtilizationEwmaAlpha() { + return poolUtilizationEWMAAlpha; } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 2b1a5ff6e9c0c..8acf4259dd2a6 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,6 +27,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -50,6 +51,11 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private volatile long lastPollTime = System.nanoTime(); private volatile long lastTotalExecutionTime = 0; private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); + private final boolean trackQueueLatencyEWMA; + private final boolean trackUtilizationEWMA; + private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA; + private final ExponentiallyWeightedMovingAverage percentPoolUtilizationEWMA; + private final AtomicReference lastUtilizationValue = new AtomicReference<>(0.0); TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -65,9 +71,14 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea TaskTrackingConfig trackingConfig ) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); + this.runnableWrapper = runnableWrapper; - this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); + this.trackQueueLatencyEWMA = trackingConfig.trackQueueLatencyEWMA(); + this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0); + this.trackUtilizationEWMA = trackingConfig.trackPoolUtilizationEWMA(); + this.percentPoolUtilizationEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getPoolUtilizationEwmaAlpha(), 0); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -136,6 +147,20 @@ public int getCurrentQueueSize() { return getQueue().size(); } + public double getPercentPoolUtilizationEWMA() { + if (trackUtilizationEWMA == false) { + return 0; + } + return this.percentPoolUtilizationEWMA.getAverage(); + } + + public double getQueuedTaskLatencyMillisEWMA() { + if (trackQueueLatencyEWMA == false) { + return 0; + } + return queueLatencyMillisEWMA.getAverage(); + } + /** * Returns the fraction of the maximum possible thread time that was actually used since the last time * this method was called. @@ -153,20 +178,41 @@ public double pollUtilization() { lastTotalExecutionTime = currentTotalExecutionTimeNanos; lastPollTime = currentPollTimeNanos; + + if (trackUtilizationEWMA) { + percentPoolUtilizationEWMA.addValue(utilizationSinceLastPoll); + // Test only tracking. + assert setUtilizationSinceLastPoll(utilizationSinceLastPoll); + } + return utilizationSinceLastPoll; } + // Test only + private boolean setUtilizationSinceLastPoll(double utilizationSinceLastPoll) { + lastUtilizationValue.set(utilizationSinceLastPoll); + return true; + } + @Override protected void beforeExecute(Thread t, Runnable r) { if (trackOngoingTasks) { ongoingTasks.put(r, System.nanoTime()); } + assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); timedRunnable.beforeExecute(); final long taskQueueLatency = timedRunnable.getQueueTimeNanos(); assert taskQueueLatency >= 0; - queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency)); + var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); + queueLatencyMillisHistogram.addObservation(queueLatencyMillis); + + if (trackQueueLatencyEWMA) { + if (queueLatencyMillis > 0) { + queueLatencyMillisEWMA.addValue(queueLatencyMillis); + } + } } @Override @@ -208,6 +254,12 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { .append("total task execution time = ") .append(TimeValue.timeValueNanos(getTotalTaskExecutionTime())) .append(", "); + if (trackQueueLatencyEWMA) { + sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillisEWMA())).append(", "); + } + if (trackUtilizationEWMA) { + sb.append("thread pool utilization percentage EWMA = ").append(getPercentPoolUtilizationEWMA()).append(", "); + } } /** @@ -222,7 +274,27 @@ public Map getOngoingTasks() { } // Used for testing - public double getEwmaAlpha() { + public double getExecutionEwmaAlpha() { return executionEWMA.getAlpha(); } + + // Used for testing + public double getQueueLatencyEwmaAlpha() { + return queueLatencyMillisEWMA.getAlpha(); + } + + // Used for testing + public double getPoolUtilizationEwmaAlpha() { + return percentPoolUtilizationEWMA.getAlpha(); + } + + // Used for testing + public boolean trackingQueueLatencyEwma() { + return trackQueueLatencyEWMA; + } + + // Used for testing + public boolean trackUtilizationEwma() { + return trackUtilizationEWMA; + } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 336d978358b9f..7be67af612f6b 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -21,6 +21,8 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; +import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA; public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { @@ -32,6 +34,8 @@ public Map getBuilders(Settings settings, int allocated final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512); final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings); + final double queueLatencyEWMAAlpha = WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.get(settings); + final double threadUtilizationEWMAAlpha = WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA.get(settings); Map result = new HashMap<>(); result.put( @@ -55,7 +59,14 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.WRITE, allocatedProcessors, 10000, - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA) + new EsExecutors.TaskTrackingConfig( + true, + true, + true, + indexAutoscalingEWMA, + queueLatencyEWMAAlpha, + threadUtilizationEWMAAlpha + ) ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 4bc5f88abd65a..c702f32869d01 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -217,6 +217,42 @@ public static ThreadPoolType fromType(String type) { // moving average is 100ms, and we get one task which takes 20s the new EWMA will be ~500ms. public static final double DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA = 0.02; + /** + * If the queue latency reaches a high value (e.g. 10-30 seconds), then this thread pool is overwhelmed. It may be temporary, but that + * spike warrants the allocation balancer adjusting some number of shards, if possible. Therefore, it is alright to react quickly. + * + * As an example, suppose the EWMA is 10_000ms, i.e. 10 seconds. + * A single task in the queue that takes 30_000ms, i.e. 30 seconds, would result in a new EWMA of ~12_000ms + * 0.1 x 30_000ms + 0.9 x 10_000 = 3_000ms + 9_000ms = 12_000ms + */ + public static final double DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = 0.1; + + /** + * The utilization percentage is tracked as a value between 0 and 1. A utilization sample is collected every 60 seconds, and represents + * the average thread pool utilization over that time. The EWMA will not be updated frequently, therefore a new sample will be made to + * have a larger effect. + * + * Suppose a new utilization sample is 90%, and the EWMA is 50%. The new sample will have the following effect with different alphas: + * .2 x .100 + .8 x .50 = .2 + .40 = .60 + * .2 x .100 + .8 x .60 = .2 + .48 = .68 + * .2 x .100 + .8 x .68 = .2 + .544 = .744 + * .2 x .100 + .8 x .744 = .2 + .595 = .795 + * .2 x .100 + .8 x .795 = .2 + .636 = .836 + * + * .3 x .100 + .7 x .50 = .3 + .35 = .65 + * .3 x .100 + .7 x .65 = .3 + .455 = .755 + * .3 x .100 + .7 x .755 = .3 + .5285 = .8285 + * .3 x .100 + .7 x .8285 = .3 + .5799 = .8799 + * .3 x .100 + .7 x .8799 = .3 + .616 = .916 + * It would take 5 minutes at 100% utilization to push the EWMA from 50% to >90% (currently a write load threshold for allocation). + */ + // NOMERGE: this has a dependency on the APM polling interval. Need to figure that out, and make assurances. + public static final double DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA = 0.3; + + /** + * + */ + private final Map executors; private final ThreadPoolInfo threadPoolInfo; @@ -271,6 +307,30 @@ public Collection builders() { Setting.Property.NodeScope ); + /** + * The {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} alpha for tracking task queue latency. + */ + public static final Setting WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = Setting.doubleSetting( + "thread_pool.task_tracking.queue_latency.ewma_alpha", + DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA, + 0, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * The {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} alpha for tracking thread pool thread utilization percentage. + */ + public static final Setting WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA = Setting.doubleSetting( + "thread_pool.threads.percent_utilization.ewma_alpha", + DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA, + 0, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Defines and builds the many thread pools delineated in {@link Names}. * diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 7f720721aebf2..655ac5a0bdc6a 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA; +import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; +import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -51,7 +52,7 @@ public void testExecutionEWMACalculation() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -89,6 +90,51 @@ public void testExecutionEWMACalculation() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } + public void testQueueLatencyEWMACalculation() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); + final var threadPoolName = randomIdentifier(); + TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( + "test-threadpool", + 1, + 1, + 1000, + TimeUnit.MILLISECONDS, + ConcurrentCollections.newBlockingQueue(), + settableQueuingWrapper(TimeUnit.NANOSECONDS.toNanos(1000000)), + EsExecutors.daemonThreadFactory("queuetest"), + new EsAbortPolicy(), + context, + new TaskTrackingConfig( + randomBoolean(), + true, + false, + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, + 0.6, // DEFAULT_QUEUE_LATENCY_EWMA_ALPHA is the default, but no need to break the test if it changes. + DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + ) + ); + executor.setupMetrics(meterRegistry, threadPoolName); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.0); + // Using the settableQueuingWrapper each task will report being queued for 1ms + executeTask(executor, 1); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.6); }); + executeTask(executor, 1); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.84); }); + executeTask(executor, 1); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.936); }); + executeTask(executor, 1); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.9744); }); + executeTask(executor, 1); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.98976); }); + assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + /** Use a runnable wrapper that simulates a task with unknown failures. */ public void testExceptionThrowingTask() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); @@ -103,7 +149,7 @@ public void testExceptionThrowingTask() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -135,7 +181,7 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -170,7 +216,7 @@ public void testQueueLatencyMetrics() { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), new ThreadContext(Settings.EMPTY), - new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA) + new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.setupMetrics(meterRegistry, threadPoolName); @@ -231,6 +277,10 @@ public void testQueueLatencyMetrics() { } } + private void assertDoublesEqual(double expected, double actual) { + assertEquals(expected, actual, 0.00001); + } + private long getPercentile(List measurements, String percentile) { return measurements.stream().filter(m -> m.attributes().get("percentile").equals(percentile)).findFirst().orElseThrow().getLong(); } @@ -240,7 +290,15 @@ private long getPercentile(List measurements, String percentile) { * where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}. */ private Function settableWrapper(long timeTakenNanos) { - return (runnable) -> new SettableTimedRunnable(timeTakenNanos, false); + return (runnable) -> new SettableTimedRunnable(0, timeTakenNanos, false); + } + + /** + * The returned function outputs a WrappedRunnabled that simulates the case + * where {@link TimedRunnable#getQueueTimeNanos()} always returns {@code queueTimeTakenNanos}. + */ + private Function settableQueuingWrapper(long queueTimeTakenNanos) { + return (runnable) -> new SettableTimedRunnable(queueTimeTakenNanos, 0, false); } /** @@ -249,7 +307,7 @@ private Function settableWrapper(long timeTakenNanos) * the job failed or was rejected before it finished. */ private Function exceptionalWrapper() { - return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true); + return (runnable) -> new SettableTimedRunnable(0, TimeUnit.NANOSECONDS.toNanos(-1), true); } /** Execute a blank task {@code times} times for the executor */ @@ -261,23 +319,30 @@ private void executeTask(TaskExecutionTimeTrackingEsThreadPoolExecutor executor, } public class SettableTimedRunnable extends TimedRunnable { - private final long timeTaken; + private final long queuedTimeTakenNanos; + private final long executionTimeTakenNanos; private final boolean testFailedOrRejected; - public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) { + public SettableTimedRunnable(long queuedTimeTakenNanos, long executionTimeTakenNanos, boolean failedOrRejected) { super(() -> {}); - this.timeTaken = timeTaken; + this.queuedTimeTakenNanos = queuedTimeTakenNanos; + this.executionTimeTakenNanos = executionTimeTakenNanos; this.testFailedOrRejected = failedOrRejected; } @Override public long getTotalExecutionNanos() { - return timeTaken; + return executionTimeTakenNanos; } @Override public boolean getFailedOrRejected() { return testFailedOrRejected; } + + @Override + long getQueueTimeNanos() { + return queuedTimeTakenNanos; + } } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index ad86c1159f426..02246529765f9 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -41,6 +42,7 @@ import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT; +import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DO_NOT_TRACK; import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING; import static org.elasticsearch.threadpool.ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING; @@ -507,59 +509,91 @@ public void testDetailedUtilizationMetric() throws Exception { threadPool.executor(threadPoolName) ); - final long beforePreviousCollectNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - final long afterPreviousCollectNanos = System.nanoTime(); - metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - equalTo(0.0d) - ); - - final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); - final long beforeStartNanos = System.nanoTime(); - final CyclicBarrier barrier = new CyclicBarrier(2); - Future future = executor.submit(() -> { - long innerStartTimeNanos = System.nanoTime(); - safeSleep(100); - safeAwait(barrier); - minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); - }); - safeAwait(barrier); - safeGet(future); - final long maxDurationNanos = System.nanoTime() - beforeStartNanos; - - // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run - assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); - - final long beforeMetricsCollectedNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - final long afterMetricsCollectedNanos = System.nanoTime(); - - // Calculate upper bound on utilisation metric - final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; - final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); - final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; - - // Calculate lower bound on utilisation metric - final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; - final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); - final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; - - logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); - Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); - metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - matcher + ExponentiallyWeightedMovingAverage expectedUtilizationEWMA = new ExponentiallyWeightedMovingAverage( + DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST, + 0 ); + for (int i = 0; i < 5; ++i) { + runThreadUtilizationMetricCollection( + meterRegistry, + threadPoolName, + metricAsserter, + executor, + threadPoolInfo, + expectedUtilizationEWMA + ); + } } finally { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } } + private void runThreadUtilizationMetricCollection( + RecordingMeterRegistry meterRegistry, + String threadPoolName, + MetricAsserter metricAsserter, + TaskExecutionTimeTrackingEsThreadPoolExecutor executor, + ThreadPool.Info threadPoolInfo, + ExponentiallyWeightedMovingAverage expectedUtilizationEWMA + ) throws Exception { + + final long beforePreviousCollectNanos = System.nanoTime(); + meterRegistry.getRecorder().collect(); + final long afterPreviousCollectNanos = System.nanoTime(); + + var metricValue = metricAsserter.assertLatestMetricValueMatches( + InstrumentType.DOUBLE_GAUGE, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + Measurement::getDouble, + equalTo(0.0d) + ); + logger.info("---> Utilization metric data point for EWMA: " + metricValue); + expectedUtilizationEWMA.addValue(metricValue); + assertThat(executor.getPercentPoolUtilizationEWMA(), equalTo(expectedUtilizationEWMA.getAverage())); + + final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); + final long beforeStartNanos = System.nanoTime(); + final CyclicBarrier barrier = new CyclicBarrier(2); + Future future = executor.submit(() -> { + long innerStartTimeNanos = System.nanoTime(); + safeSleep(100); + safeAwait(barrier); + minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); + }); + safeAwait(barrier); + safeGet(future); + final long maxDurationNanos = System.nanoTime() - beforeStartNanos; + + // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run + assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); + + final long beforeMetricsCollectedNanos = System.nanoTime(); + meterRegistry.getRecorder().collect(); + final long afterMetricsCollectedNanos = System.nanoTime(); + + // Calculate upper bound on utilisation metric + final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; + final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); + final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; + + // Calculate lower bound on utilisation metric + final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; + final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); + final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; + + logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); + Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); + metricValue = metricAsserter.assertLatestMetricValueMatches( + InstrumentType.DOUBLE_GAUGE, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + Measurement::getDouble, + matcher + ); + logger.info("---> Utilization metric data point: " + metricValue); + expectedUtilizationEWMA.addValue(metricValue); + assertThat(executor.getPercentPoolUtilizationEWMA(), equalTo(expectedUtilizationEWMA.getAverage())); + } + public void testThreadCountMetrics() throws Exception { final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); @@ -665,7 +699,7 @@ void assertLatestLongValueMatches(String metricName, InstrumentType instrumentTy assertLatestMetricValueMatches(instrumentType, metricName, Measurement::getLong, matcher); } - void assertLatestMetricValueMatches( + T assertLatestMetricValueMatches( InstrumentType instrumentType, String name, Function valueExtractor, @@ -675,6 +709,7 @@ void assertLatestMetricValueMatches( .getMeasurements(instrumentType, ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + name); assertFalse(name + " has no measurements", measurements.isEmpty()); assertThat(valueExtractor.apply(measurements.getLast()), matcher); + return valueExtractor.apply(measurements.getLast()); } } From b8d8e5695ad041f42b46167b491e974fd277809e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 2 Jul 2025 12:04:28 -0700 Subject: [PATCH 02/10] reduce utilization EWMA to parallel polling: allow utilization to be polled by allocation --- .../threadpool/SimpleThreadPoolIT.java | 8 -- .../common/settings/ClusterSettings.java | 1 - .../common/util/concurrent/EsExecutors.java | 49 +------ ...utionTimeTrackingEsThreadPoolExecutor.java | 76 ++++------ .../DefaultBuiltInExecutorBuilders.java | 11 +- .../elasticsearch/threadpool/ThreadPool.java | 39 ----- ...TimeTrackingEsThreadPoolExecutorTests.java | 8 +- .../threadpool/ThreadPoolTests.java | 135 +++++++----------- 8 files changed, 89 insertions(+), 238 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index 8355090c702de..93df39474ead2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -41,10 +41,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; -import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; -import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -242,15 +240,12 @@ public void testWriteThreadpoolsEwmaAlphaSetting() { Settings settings = Settings.EMPTY; var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; var queueLatencyEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; - var threadUtilizationEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; if (randomBoolean()) { executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); queueLatencyEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); - threadUtilizationEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); settings = Settings.builder() .put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha) .put(WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.getKey(), queueLatencyEwmaAlpha) - .put(WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA.getKey(), threadUtilizationEwmaAlpha) .build(); } var nodeName = internalCluster().startNode(settings); @@ -265,13 +260,10 @@ public void testWriteThreadpoolsEwmaAlphaSetting() { // Only the WRITE thread pool should enable further tracking. if (name.equals(ThreadPool.Names.WRITE) == false) { assertFalse(executor.trackingQueueLatencyEwma()); - assertFalse(executor.trackUtilizationEwma()); } else { // Verify that the WRITE thread pool has extra tracking enabled. assertTrue(executor.trackingQueueLatencyEwma()); - assertTrue(executor.trackUtilizationEwma()); assertThat(Double.compare(executor.getQueueLatencyEwmaAlpha(), queueLatencyEwmaAlpha), CoreMatchers.equalTo(0)); - assertThat(Double.compare(executor.getPoolUtilizationEwmaAlpha(), threadUtilizationEwmaAlpha), CoreMatchers.equalTo(0)); } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a9c932b5ab7d4..14b80d8b2fa25 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -548,7 +548,6 @@ public void apply(Settings value, Settings current, Settings previous) { ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING, ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA, - ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 27029b7fac37a..94bc5779ed612 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -579,43 +579,30 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { public static class TaskTrackingConfig { public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; public static final double DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST = 0.6; - public static final double DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST = 0.6; private final boolean trackExecutionTime; private final boolean trackOngoingTasks; private final boolean trackQueueLatencyEWMA; - private final boolean trackPoolUtilizationEWMA; private final double executionTimeEwmaAlpha; private final double queueLatencyEWMAAlpha; - private final double poolUtilizationEWMAAlpha; public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( false, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, - DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST, - DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST ); public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( true, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, - DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST, - DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST ); public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) { - this( - true, - trackOngoingTasks, - false, - false, - executionTimeEWMAAlpha, - DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST, - DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST - ); + this(true, trackOngoingTasks, false, executionTimeEWMAAlpha, DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST); } /** @@ -624,47 +611,31 @@ public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlp public TaskTrackingConfig( boolean trackOngoingTasks, boolean trackQueueLatencyEWMA, - boolean trackPoolUtilizationEWMA, double executionTimeEWMAAlpha, - double queueLatencyEWMAAlpha, - double poolUtilizationEWMAAlpha + double queueLatencyEWMAAlpha ) { - this( - true, - trackOngoingTasks, - trackQueueLatencyEWMA, - trackPoolUtilizationEWMA, - executionTimeEWMAAlpha, - queueLatencyEWMAAlpha, - poolUtilizationEWMAAlpha - ); + this(true, trackOngoingTasks, trackQueueLatencyEWMA, executionTimeEWMAAlpha, queueLatencyEWMAAlpha); } /** * @param trackExecutionTime Whether to track execution stats * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks * @param trackQueueLatencyEWMA Whether to track queue latency {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} - * @param trackPoolUtilizationEWMA Whether to track the EWMA for thread pool thread utilization (percent use). * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). * @param queueLatencyEWMAAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage). - * @param poolUtilizationEWMAAlpha The alpha seed for pool utilization EWMA (ExponentiallyWeightedMovingAverage). */ private TaskTrackingConfig( boolean trackExecutionTime, boolean trackOngoingTasks, boolean trackQueueLatencyEWMA, - boolean trackPoolUtilizationEWMA, double executionTimeEWMAAlpha, - double queueLatencyEWMAAlpha, - double poolUtilizationEWMAAlpha + double queueLatencyEWMAAlpha ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; this.trackQueueLatencyEWMA = trackQueueLatencyEWMA; - this.trackPoolUtilizationEWMA = trackPoolUtilizationEWMA; this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; this.queueLatencyEWMAAlpha = queueLatencyEWMAAlpha; - this.poolUtilizationEWMAAlpha = poolUtilizationEWMAAlpha; } public boolean trackExecutionTime() { @@ -679,10 +650,6 @@ public boolean trackQueueLatencyEWMA() { return trackQueueLatencyEWMA; } - public boolean trackPoolUtilizationEWMA() { - return trackPoolUtilizationEWMA; - } - public double getExecutionTimeEwmaAlpha() { return executionTimeEwmaAlpha; } @@ -690,10 +657,6 @@ public double getExecutionTimeEwmaAlpha() { public double getQueueLatencyEwmaAlpha() { return queueLatencyEWMAAlpha; } - - public double getPoolUtilizationEwmaAlpha() { - return poolUtilizationEWMAAlpha; - } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 8acf4259dd2a6..564e0e52f646d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,7 +27,6 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -48,14 +47,13 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); - private volatile long lastPollTime = System.nanoTime(); - private volatile long lastTotalExecutionTime = 0; + private volatile long lastPollTimeAPM = System.nanoTime(); + private volatile long lastTotalExecutionTimeAPM = 0; + private volatile long lastPollTimeNanosAllocation = System.nanoTime(); + private volatile long lastTotalExecutionTimeAllocation = 0; private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); private final boolean trackQueueLatencyEWMA; - private final boolean trackUtilizationEWMA; private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA; - private final ExponentiallyWeightedMovingAverage percentPoolUtilizationEWMA; - private final AtomicReference lastUtilizationValue = new AtomicReference<>(0.0); TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -77,8 +75,6 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); this.trackQueueLatencyEWMA = trackingConfig.trackQueueLatencyEWMA(); this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0); - this.trackUtilizationEWMA = trackingConfig.trackPoolUtilizationEWMA(); - this.percentPoolUtilizationEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getPoolUtilizationEwmaAlpha(), 0); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -106,7 +102,7 @@ public List setupMetrics(MeterRegistry meterRegistry, String threadP ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, "fraction of maximum thread time utilized for " + threadPoolName, "fraction", - () -> new DoubleWithAttributes(pollUtilization(), Map.of()) + () -> new DoubleWithAttributes(pollUtilization(true, false), Map.of()) ) ); } @@ -147,13 +143,6 @@ public int getCurrentQueueSize() { return getQueue().size(); } - public double getPercentPoolUtilizationEWMA() { - if (trackUtilizationEWMA == false) { - return 0; - } - return this.percentPoolUtilizationEWMA.getAverage(); - } - public double getQueuedTaskLatencyMillisEWMA() { if (trackQueueLatencyEWMA == false) { return 0; @@ -162,38 +151,38 @@ public double getQueuedTaskLatencyMillisEWMA() { } /** - * Returns the fraction of the maximum possible thread time that was actually used since the last time - * this method was called. + * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. + * One of the two boolean parameters must be true, while the other false. There are two periodic pulling mechanisms that access + * utilization reporting. * - * @return the utilization as a fraction, in the range [0, 1] + * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started + * earlier, contributing a larger execution time. */ - public double pollUtilization() { + public double pollUtilization(boolean forAPM, boolean forAllocation) { + assert forAPM ^ forAllocation : "Can only collect one or the other, APM: " + forAPM + ", Allocation: " + forAllocation; + final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); final long currentPollTimeNanos = System.nanoTime(); - final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; - final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; + final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - (forAPM + ? lastTotalExecutionTimeAPM + : lastTotalExecutionTimeAllocation); + final long timeSinceLastPoll = currentPollTimeNanos - (forAPM ? lastPollTimeAPM : lastPollTimeNanosAllocation); final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; - lastTotalExecutionTime = currentTotalExecutionTimeNanos; - lastPollTime = currentPollTimeNanos; - - if (trackUtilizationEWMA) { - percentPoolUtilizationEWMA.addValue(utilizationSinceLastPoll); - // Test only tracking. - assert setUtilizationSinceLastPoll(utilizationSinceLastPoll); + if (forAPM) { + lastTotalExecutionTimeAPM = currentTotalExecutionTimeNanos; + lastPollTimeAPM = currentPollTimeNanos; + } else { + assert forAllocation; + lastTotalExecutionTimeAllocation = currentTotalExecutionTimeNanos; + lastPollTimeNanosAllocation = currentPollTimeNanos; } return utilizationSinceLastPoll; } - // Test only - private boolean setUtilizationSinceLastPoll(double utilizationSinceLastPoll) { - lastUtilizationValue.set(utilizationSinceLastPoll); - return true; - } - @Override protected void beforeExecute(Thread t, Runnable r) { if (trackOngoingTasks) { @@ -209,9 +198,7 @@ protected void beforeExecute(Thread t, Runnable r) { queueLatencyMillisHistogram.addObservation(queueLatencyMillis); if (trackQueueLatencyEWMA) { - if (queueLatencyMillis > 0) { - queueLatencyMillisEWMA.addValue(queueLatencyMillis); - } + queueLatencyMillisEWMA.addValue(queueLatencyMillis); } } @@ -257,9 +244,6 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { if (trackQueueLatencyEWMA) { sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillisEWMA())).append(", "); } - if (trackUtilizationEWMA) { - sb.append("thread pool utilization percentage EWMA = ").append(getPercentPoolUtilizationEWMA()).append(", "); - } } /** @@ -283,18 +267,8 @@ public double getQueueLatencyEwmaAlpha() { return queueLatencyMillisEWMA.getAlpha(); } - // Used for testing - public double getPoolUtilizationEwmaAlpha() { - return percentPoolUtilizationEWMA.getAlpha(); - } - // Used for testing public boolean trackingQueueLatencyEwma() { return trackQueueLatencyEWMA; } - - // Used for testing - public boolean trackUtilizationEwma() { - return trackUtilizationEWMA; - } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 7be67af612f6b..9e049e39f0141 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -22,7 +22,6 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; -import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA; public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { @@ -35,7 +34,6 @@ public Map getBuilders(Settings settings, int allocated final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512); final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings); final double queueLatencyEWMAAlpha = WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.get(settings); - final double threadUtilizationEWMAAlpha = WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA.get(settings); Map result = new HashMap<>(); result.put( @@ -59,14 +57,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.WRITE, allocatedProcessors, 10000, - new EsExecutors.TaskTrackingConfig( - true, - true, - true, - indexAutoscalingEWMA, - queueLatencyEWMAAlpha, - threadUtilizationEWMAAlpha - ) + new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA, queueLatencyEWMAAlpha) ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index c702f32869d01..520330ebc294a 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -227,32 +227,6 @@ public static ThreadPoolType fromType(String type) { */ public static final double DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = 0.1; - /** - * The utilization percentage is tracked as a value between 0 and 1. A utilization sample is collected every 60 seconds, and represents - * the average thread pool utilization over that time. The EWMA will not be updated frequently, therefore a new sample will be made to - * have a larger effect. - * - * Suppose a new utilization sample is 90%, and the EWMA is 50%. The new sample will have the following effect with different alphas: - * .2 x .100 + .8 x .50 = .2 + .40 = .60 - * .2 x .100 + .8 x .60 = .2 + .48 = .68 - * .2 x .100 + .8 x .68 = .2 + .544 = .744 - * .2 x .100 + .8 x .744 = .2 + .595 = .795 - * .2 x .100 + .8 x .795 = .2 + .636 = .836 - * - * .3 x .100 + .7 x .50 = .3 + .35 = .65 - * .3 x .100 + .7 x .65 = .3 + .455 = .755 - * .3 x .100 + .7 x .755 = .3 + .5285 = .8285 - * .3 x .100 + .7 x .8285 = .3 + .5799 = .8799 - * .3 x .100 + .7 x .8799 = .3 + .616 = .916 - * It would take 5 minutes at 100% utilization to push the EWMA from 50% to >90% (currently a write load threshold for allocation). - */ - // NOMERGE: this has a dependency on the APM polling interval. Need to figure that out, and make assurances. - public static final double DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA = 0.3; - - /** - * - */ - private final Map executors; private final ThreadPoolInfo threadPoolInfo; @@ -315,19 +289,6 @@ public Collection builders() { DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA, 0, 1, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * The {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} alpha for tracking thread pool thread utilization percentage. - */ - public static final Setting WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA = Setting.doubleSetting( - "thread_pool.threads.percent_utilization.ewma_alpha", - DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA, - 0, - 1, - Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 655ac5a0bdc6a..212608f31680e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -26,7 +26,7 @@ import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; -import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST; +import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -108,10 +108,8 @@ public void testQueueLatencyEWMACalculation() throws Exception { new TaskTrackingConfig( randomBoolean(), true, - false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, - 0.6, // DEFAULT_QUEUE_LATENCY_EWMA_ALPHA is the default, but no need to break the test if it changes. - DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST + DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST ) ); executor.setupMetrics(meterRegistry, threadPoolName); @@ -202,7 +200,7 @@ public void testGetOngoingTasks() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } - public void testQueueLatencyMetrics() { + public void testQueueLatencyHistogramMetrics() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final var threadPoolName = randomIdentifier(); var executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 02246529765f9..dd36eb7302cce 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -42,7 +41,6 @@ import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT; -import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DO_NOT_TRACK; import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING; import static org.elasticsearch.threadpool.ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING; @@ -509,91 +507,66 @@ public void testDetailedUtilizationMetric() throws Exception { threadPool.executor(threadPoolName) ); - ExponentiallyWeightedMovingAverage expectedUtilizationEWMA = new ExponentiallyWeightedMovingAverage( - DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST, - 0 + final long beforePreviousCollectNanos = System.nanoTime(); + meterRegistry.getRecorder().collect(); + double allocationUtilization = executor.pollUtilization(false, true); + final long afterPreviousCollectNanos = System.nanoTime(); + + var metricValue = metricAsserter.assertLatestMetricValueMatches( + InstrumentType.DOUBLE_GAUGE, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + Measurement::getDouble, + equalTo(0.0d) ); - for (int i = 0; i < 5; ++i) { - runThreadUtilizationMetricCollection( - meterRegistry, - threadPoolName, - metricAsserter, - executor, - threadPoolInfo, - expectedUtilizationEWMA - ); - } + logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); + assertThat(allocationUtilization, equalTo(0.0d)); + + final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); + final long beforeStartNanos = System.nanoTime(); + final CyclicBarrier barrier = new CyclicBarrier(2); + Future future = executor.submit(() -> { + long innerStartTimeNanos = System.nanoTime(); + safeSleep(100); + safeAwait(barrier); + minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); + }); + safeAwait(barrier); + safeGet(future); + final long maxDurationNanos = System.nanoTime() - beforeStartNanos; + + // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run + assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); + + final long beforeMetricsCollectedNanos = System.nanoTime(); + meterRegistry.getRecorder().collect(); + allocationUtilization = executor.pollUtilization(false, true); + final long afterMetricsCollectedNanos = System.nanoTime(); + + // Calculate upper bound on utilisation metric + final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; + final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); + final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; + + // Calculate lower bound on utilisation metric + final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; + final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); + final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; + + logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); + Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); + metricValue = metricAsserter.assertLatestMetricValueMatches( + InstrumentType.DOUBLE_GAUGE, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + Measurement::getDouble, + matcher + ); + logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); + assertThat(allocationUtilization, matcher); } finally { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } } - private void runThreadUtilizationMetricCollection( - RecordingMeterRegistry meterRegistry, - String threadPoolName, - MetricAsserter metricAsserter, - TaskExecutionTimeTrackingEsThreadPoolExecutor executor, - ThreadPool.Info threadPoolInfo, - ExponentiallyWeightedMovingAverage expectedUtilizationEWMA - ) throws Exception { - - final long beforePreviousCollectNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - final long afterPreviousCollectNanos = System.nanoTime(); - - var metricValue = metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - equalTo(0.0d) - ); - logger.info("---> Utilization metric data point for EWMA: " + metricValue); - expectedUtilizationEWMA.addValue(metricValue); - assertThat(executor.getPercentPoolUtilizationEWMA(), equalTo(expectedUtilizationEWMA.getAverage())); - - final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); - final long beforeStartNanos = System.nanoTime(); - final CyclicBarrier barrier = new CyclicBarrier(2); - Future future = executor.submit(() -> { - long innerStartTimeNanos = System.nanoTime(); - safeSleep(100); - safeAwait(barrier); - minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); - }); - safeAwait(barrier); - safeGet(future); - final long maxDurationNanos = System.nanoTime() - beforeStartNanos; - - // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run - assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); - - final long beforeMetricsCollectedNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - final long afterMetricsCollectedNanos = System.nanoTime(); - - // Calculate upper bound on utilisation metric - final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; - final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); - final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; - - // Calculate lower bound on utilisation metric - final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; - final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); - final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; - - logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); - Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); - metricValue = metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - matcher - ); - logger.info("---> Utilization metric data point: " + metricValue); - expectedUtilizationEWMA.addValue(metricValue); - assertThat(executor.getPercentPoolUtilizationEWMA(), equalTo(expectedUtilizationEWMA.getAverage())); - } - public void testThreadCountMetrics() throws Exception { final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); From 37b20e83b56d27f89b72220d7bf90d2dec251c78 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 3 Jul 2025 15:50:58 -0700 Subject: [PATCH 03/10] sub class pollUtilization into UtilizationTracker; rename from EWMA to average --- .../common/util/concurrent/EsExecutors.java | 30 +++--- ...utionTimeTrackingEsThreadPoolExecutor.java | 93 +++++++++++-------- ...TimeTrackingEsThreadPoolExecutorTests.java | 12 +-- .../threadpool/ThreadPoolTests.java | 5 +- 4 files changed, 80 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 94bc5779ed612..23b3bcb386b46 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -582,9 +582,9 @@ public static class TaskTrackingConfig { private final boolean trackExecutionTime; private final boolean trackOngoingTasks; - private final boolean trackQueueLatencyEWMA; + private final boolean trackQueueLatencyAverage; private final double executionTimeEwmaAlpha; - private final double queueLatencyEWMAAlpha; + private final double queueLatencyEwmaAlpha; public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( false, @@ -610,32 +610,32 @@ public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlp */ public TaskTrackingConfig( boolean trackOngoingTasks, - boolean trackQueueLatencyEWMA, - double executionTimeEWMAAlpha, - double queueLatencyEWMAAlpha + boolean trackQueueLatencyAverage, + double executionTimeEwmaAlpha, + double queueLatencyEwmaAlpha ) { - this(true, trackOngoingTasks, trackQueueLatencyEWMA, executionTimeEWMAAlpha, queueLatencyEWMAAlpha); + this(true, trackOngoingTasks, trackQueueLatencyAverage, executionTimeEwmaAlpha, queueLatencyEwmaAlpha); } /** * @param trackExecutionTime Whether to track execution stats * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks - * @param trackQueueLatencyEWMA Whether to track queue latency {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} + * @param trackQueueLatencyAverage Whether to track the average queue latency. * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). - * @param queueLatencyEWMAAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage). + * @param queueLatencyEwmaAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage). */ private TaskTrackingConfig( boolean trackExecutionTime, boolean trackOngoingTasks, - boolean trackQueueLatencyEWMA, + boolean trackQueueLatencyAverage, double executionTimeEWMAAlpha, - double queueLatencyEWMAAlpha + double queueLatencyEwmaAlpha ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; - this.trackQueueLatencyEWMA = trackQueueLatencyEWMA; + this.trackQueueLatencyAverage = trackQueueLatencyAverage; this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; - this.queueLatencyEWMAAlpha = queueLatencyEWMAAlpha; + this.queueLatencyEwmaAlpha = queueLatencyEwmaAlpha; } public boolean trackExecutionTime() { @@ -646,8 +646,8 @@ public boolean trackOngoingTasks() { return trackOngoingTasks; } - public boolean trackQueueLatencyEWMA() { - return trackQueueLatencyEWMA; + public boolean trackQueueLatencyAverage() { + return trackQueueLatencyAverage; } public double getExecutionTimeEwmaAlpha() { @@ -655,7 +655,7 @@ public double getExecutionTimeEwmaAlpha() { } public double getQueueLatencyEwmaAlpha() { - return queueLatencyEWMAAlpha; + return queueLatencyEwmaAlpha; } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 564e0e52f646d..45f3af5d7c3bc 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -47,14 +47,18 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); - private volatile long lastPollTimeAPM = System.nanoTime(); - private volatile long lastTotalExecutionTimeAPM = 0; - private volatile long lastPollTimeNanosAllocation = System.nanoTime(); - private volatile long lastTotalExecutionTimeAllocation = 0; private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); - private final boolean trackQueueLatencyEWMA; + private final boolean trackQueueLatencyAverage; private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA; + public enum UtilizationTrackingPurpose { + APM, + ALLOCATION, + } + + private volatile UtilizationTracker apmUtilizationTracker; + private volatile UtilizationTracker allocationUtilizationTracker; + TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, int corePoolSize, @@ -73,8 +77,10 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); - this.trackQueueLatencyEWMA = trackingConfig.trackQueueLatencyEWMA(); + this.trackQueueLatencyAverage = trackingConfig.trackQueueLatencyAverage(); this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0); + this.apmUtilizationTracker = new UtilizationTracker(); + this.allocationUtilizationTracker = new UtilizationTracker(); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -102,7 +108,7 @@ public List setupMetrics(MeterRegistry meterRegistry, String threadP ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, "fraction of maximum thread time utilized for " + threadPoolName, "fraction", - () -> new DoubleWithAttributes(pollUtilization(true, false), Map.of()) + () -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of()) ) ); } @@ -143,8 +149,8 @@ public int getCurrentQueueSize() { return getQueue().size(); } - public double getQueuedTaskLatencyMillisEWMA() { - if (trackQueueLatencyEWMA == false) { + public double getQueuedTaskLatencyMillis() { + if (trackQueueLatencyAverage == false) { return 0; } return queueLatencyMillisEWMA.getAverage(); @@ -152,35 +158,21 @@ public double getQueuedTaskLatencyMillisEWMA() { /** * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. - * One of the two boolean parameters must be true, while the other false. There are two periodic pulling mechanisms that access - * utilization reporting. + * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the + * caller. * * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started * earlier, contributing a larger execution time. */ - public double pollUtilization(boolean forAPM, boolean forAllocation) { - assert forAPM ^ forAllocation : "Can only collect one or the other, APM: " + forAPM + ", Allocation: " + forAllocation; - - final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); - final long currentPollTimeNanos = System.nanoTime(); - - final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - (forAPM - ? lastTotalExecutionTimeAPM - : lastTotalExecutionTimeAllocation); - final long timeSinceLastPoll = currentPollTimeNanos - (forAPM ? lastPollTimeAPM : lastPollTimeNanosAllocation); - final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); - final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; - - if (forAPM) { - lastTotalExecutionTimeAPM = currentTotalExecutionTimeNanos; - lastPollTimeAPM = currentPollTimeNanos; - } else { - assert forAllocation; - lastTotalExecutionTimeAllocation = currentTotalExecutionTimeNanos; - lastPollTimeNanosAllocation = currentPollTimeNanos; + public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) { + switch (utilizationTrackingPurpose) { + case APM: + return apmUtilizationTracker.pollUtilization(); + case ALLOCATION: + return allocationUtilizationTracker.pollUtilization(); + default: + throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]"); } - - return utilizationSinceLastPoll; } @Override @@ -197,7 +189,7 @@ protected void beforeExecute(Thread t, Runnable r) { var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); queueLatencyMillisHistogram.addObservation(queueLatencyMillis); - if (trackQueueLatencyEWMA) { + if (trackQueueLatencyAverage) { queueLatencyMillisEWMA.addValue(queueLatencyMillis); } } @@ -241,8 +233,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { .append("total task execution time = ") .append(TimeValue.timeValueNanos(getTotalTaskExecutionTime())) .append(", "); - if (trackQueueLatencyEWMA) { - sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillisEWMA())).append(", "); + if (trackQueueLatencyAverage) { + sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillis())).append(", "); } } @@ -269,6 +261,33 @@ public double getQueueLatencyEwmaAlpha() { // Used for testing public boolean trackingQueueLatencyEwma() { - return trackQueueLatencyEWMA; + return trackQueueLatencyAverage; + } + + /** + * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization + * since the last poll can be calculated for the next polling request. + * + * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. + */ + private class UtilizationTracker { + volatile long lastPollTime = System.nanoTime();; + volatile long lastTotalExecutionTime = 0; + + public double pollUtilization() { + final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); + final long currentPollTimeNanos = System.nanoTime(); + + final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; + final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; + + final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); + final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; + + lastTotalExecutionTime = currentTotalExecutionTimeNanos; + lastPollTime = currentPollTimeNanos; + + return utilizationSinceLastPoll; + } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 212608f31680e..cdf62a7ee4fdb 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -116,18 +116,18 @@ public void testQueueLatencyEWMACalculation() throws Exception { executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); - assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.0); + assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.0); // Using the settableQueuingWrapper each task will report being queued for 1ms executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.6); }); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.6); }); executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.84); }); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.84); }); executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.936); }); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.936); }); executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.9744); }); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.9744); }); executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillisEWMA(), 0.98976); }); + assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.98976); }); assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index dd36eb7302cce..04c23c9873f24 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.InstrumentType; @@ -509,7 +510,7 @@ public void testDetailedUtilizationMetric() throws Exception { final long beforePreviousCollectNanos = System.nanoTime(); meterRegistry.getRecorder().collect(); - double allocationUtilization = executor.pollUtilization(false, true); + double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); final long afterPreviousCollectNanos = System.nanoTime(); var metricValue = metricAsserter.assertLatestMetricValueMatches( @@ -539,7 +540,7 @@ public void testDetailedUtilizationMetric() throws Exception { final long beforeMetricsCollectedNanos = System.nanoTime(); meterRegistry.getRecorder().collect(); - allocationUtilization = executor.pollUtilization(false, true); + allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); final long afterMetricsCollectedNanos = System.nanoTime(); // Calculate upper bound on utilisation metric From bd2e0f15c110abc4dc37962ac8c9a6cd0a2283a0 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 8 Jul 2025 17:34:13 -0700 Subject: [PATCH 04/10] rename --- .../common/util/concurrent/EsExecutors.java | 14 +++++++------- ...kExecutionTimeTrackingEsThreadPoolExecutor.java | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 23b3bcb386b46..90c86cd6e9d7f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -582,7 +582,7 @@ public static class TaskTrackingConfig { private final boolean trackExecutionTime; private final boolean trackOngoingTasks; - private final boolean trackQueueLatencyAverage; + private final boolean trackAverageQueueLatency; private final double executionTimeEwmaAlpha; private final double queueLatencyEwmaAlpha; @@ -610,30 +610,30 @@ public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlp */ public TaskTrackingConfig( boolean trackOngoingTasks, - boolean trackQueueLatencyAverage, + boolean trackAverageQueueLatency, double executionTimeEwmaAlpha, double queueLatencyEwmaAlpha ) { - this(true, trackOngoingTasks, trackQueueLatencyAverage, executionTimeEwmaAlpha, queueLatencyEwmaAlpha); + this(true, trackOngoingTasks, trackAverageQueueLatency, executionTimeEwmaAlpha, queueLatencyEwmaAlpha); } /** * @param trackExecutionTime Whether to track execution stats * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks - * @param trackQueueLatencyAverage Whether to track the average queue latency. + * @param trackAverageQueueLatency Whether to track the average queue latency. * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). * @param queueLatencyEwmaAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage). */ private TaskTrackingConfig( boolean trackExecutionTime, boolean trackOngoingTasks, - boolean trackQueueLatencyAverage, + boolean trackAverageQueueLatency, double executionTimeEWMAAlpha, double queueLatencyEwmaAlpha ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; - this.trackQueueLatencyAverage = trackQueueLatencyAverage; + this.trackAverageQueueLatency = trackAverageQueueLatency; this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; this.queueLatencyEwmaAlpha = queueLatencyEwmaAlpha; } @@ -647,7 +647,7 @@ public boolean trackOngoingTasks() { } public boolean trackQueueLatencyAverage() { - return trackQueueLatencyAverage; + return trackAverageQueueLatency; } public double getExecutionTimeEwmaAlpha() { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 45f3af5d7c3bc..7de6ad4b3f8da 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -48,7 +48,7 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); - private final boolean trackQueueLatencyAverage; + private final boolean trackAverageQueueLatency; private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA; public enum UtilizationTrackingPurpose { @@ -77,7 +77,7 @@ public enum UtilizationTrackingPurpose { this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); - this.trackQueueLatencyAverage = trackingConfig.trackQueueLatencyAverage(); + this.trackAverageQueueLatency = trackingConfig.trackQueueLatencyAverage(); this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0); this.apmUtilizationTracker = new UtilizationTracker(); this.allocationUtilizationTracker = new UtilizationTracker(); @@ -150,7 +150,7 @@ public int getCurrentQueueSize() { } public double getQueuedTaskLatencyMillis() { - if (trackQueueLatencyAverage == false) { + if (trackAverageQueueLatency == false) { return 0; } return queueLatencyMillisEWMA.getAverage(); @@ -189,7 +189,7 @@ protected void beforeExecute(Thread t, Runnable r) { var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); queueLatencyMillisHistogram.addObservation(queueLatencyMillis); - if (trackQueueLatencyAverage) { + if (trackAverageQueueLatency) { queueLatencyMillisEWMA.addValue(queueLatencyMillis); } } @@ -233,7 +233,7 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { .append("total task execution time = ") .append(TimeValue.timeValueNanos(getTotalTaskExecutionTime())) .append(", "); - if (trackQueueLatencyAverage) { + if (trackAverageQueueLatency) { sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillis())).append(", "); } } @@ -261,7 +261,7 @@ public double getQueueLatencyEwmaAlpha() { // Used for testing public boolean trackingQueueLatencyEwma() { - return trackQueueLatencyAverage; + return trackAverageQueueLatency; } /** From b0fdaf27d9ae0cac21f171b82d323fe312765d14 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 9 Jul 2025 14:10:42 -0700 Subject: [PATCH 05/10] Remove EWMA; add Max --- .../threadpool/SimpleThreadPoolIT.java | 14 +-- .../common/settings/ClusterSettings.java | 1 - .../common/util/concurrent/EsExecutors.java | 40 +++----- ...utionTimeTrackingEsThreadPoolExecutor.java | 37 ++++---- .../DefaultBuiltInExecutorBuilders.java | 4 +- .../elasticsearch/threadpool/ThreadPool.java | 21 ----- ...TimeTrackingEsThreadPoolExecutorTests.java | 91 +++++++++++++------ 7 files changed, 99 insertions(+), 109 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index 93df39474ead2..fa81ee40cb76d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -40,9 +40,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; -import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; -import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -239,14 +237,9 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu public void testWriteThreadpoolsEwmaAlphaSetting() { Settings settings = Settings.EMPTY; var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; - var queueLatencyEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; if (randomBoolean()) { executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); - queueLatencyEwmaAlpha = randomDoubleBetween(0.0, 1.0, true); - settings = Settings.builder() - .put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha) - .put(WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.getKey(), queueLatencyEwmaAlpha) - .build(); + settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha).build(); } var nodeName = internalCluster().startNode(settings); var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); @@ -259,11 +252,10 @@ public void testWriteThreadpoolsEwmaAlphaSetting() { // Only the WRITE thread pool should enable further tracking. if (name.equals(ThreadPool.Names.WRITE) == false) { - assertFalse(executor.trackingQueueLatencyEwma()); + assertFalse(executor.trackingMaxQueueLatency()); } else { // Verify that the WRITE thread pool has extra tracking enabled. - assertTrue(executor.trackingQueueLatencyEwma()); - assertThat(Double.compare(executor.getQueueLatencyEwmaAlpha(), queueLatencyEwmaAlpha), CoreMatchers.equalTo(0)); + assertTrue(executor.trackingMaxQueueLatency()); } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 14b80d8b2fa25..1fbc8993cc5aa 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -547,7 +547,6 @@ public void apply(Settings value, Settings current, Settings previous) { ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING, ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING, - ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 90c86cd6e9d7f..ca8447bfc47e1 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -578,64 +578,52 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { public static class TaskTrackingConfig { public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; - public static final double DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST = 0.6; private final boolean trackExecutionTime; private final boolean trackOngoingTasks; - private final boolean trackAverageQueueLatency; + private final boolean trackMaxQueueLatency; private final double executionTimeEwmaAlpha; - private final double queueLatencyEwmaAlpha; public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( false, false, false, - DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, - DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( true, false, false, - DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, - DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST + DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) { - this(true, trackOngoingTasks, false, executionTimeEWMAAlpha, DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST); + this(true, trackOngoingTasks, false, executionTimeEWMAAlpha); } /** * Execution tracking enabled constructor, with extra options to enable further specialized tracking. */ - public TaskTrackingConfig( - boolean trackOngoingTasks, - boolean trackAverageQueueLatency, - double executionTimeEwmaAlpha, - double queueLatencyEwmaAlpha - ) { - this(true, trackOngoingTasks, trackAverageQueueLatency, executionTimeEwmaAlpha, queueLatencyEwmaAlpha); + public TaskTrackingConfig(boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha) { + this(true, trackOngoingTasks, trackMaxQueueLatency, executionTimeEwmaAlpha); } /** * @param trackExecutionTime Whether to track execution stats * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks - * @param trackAverageQueueLatency Whether to track the average queue latency. + * @param trackMaxQueueLatency Whether to track max queue latency. * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). - * @param queueLatencyEwmaAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage). */ private TaskTrackingConfig( boolean trackExecutionTime, boolean trackOngoingTasks, - boolean trackAverageQueueLatency, - double executionTimeEWMAAlpha, - double queueLatencyEwmaAlpha + boolean trackMaxQueueLatency, + double executionTimeEWMAAlpha ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; - this.trackAverageQueueLatency = trackAverageQueueLatency; + this.trackMaxQueueLatency = trackMaxQueueLatency; this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; - this.queueLatencyEwmaAlpha = queueLatencyEwmaAlpha; } public boolean trackExecutionTime() { @@ -646,17 +634,13 @@ public boolean trackOngoingTasks() { return trackOngoingTasks; } - public boolean trackQueueLatencyAverage() { - return trackAverageQueueLatency; + public boolean trackMaxQueueLatency() { + return trackMaxQueueLatency; } public double getExecutionTimeEwmaAlpha() { return executionTimeEwmaAlpha; } - - public double getQueueLatencyEwmaAlpha() { - return queueLatencyEwmaAlpha; - } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 7de6ad4b3f8da..611e05a439b61 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -13,6 +13,8 @@ import org.elasticsearch.common.metrics.ExponentialBucketHistogram; import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.Instrument; import org.elasticsearch.telemetry.metric.LongWithAttributes; @@ -27,6 +29,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -37,6 +40,7 @@ * An extension to thread pool executor, which tracks statistics for the task execution time. */ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { + private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingEsThreadPoolExecutor.class); public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18; private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 }; @@ -48,8 +52,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); - private final boolean trackAverageQueueLatency; - private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA; + private final boolean trackMaxQueueLatency; + private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); public enum UtilizationTrackingPurpose { APM, @@ -77,8 +81,7 @@ public enum UtilizationTrackingPurpose { this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); - this.trackAverageQueueLatency = trackingConfig.trackQueueLatencyAverage(); - this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0); + this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); this.apmUtilizationTracker = new UtilizationTracker(); this.allocationUtilizationTracker = new UtilizationTracker(); } @@ -149,11 +152,13 @@ public int getCurrentQueueSize() { return getQueue().size(); } - public double getQueuedTaskLatencyMillis() { - if (trackAverageQueueLatency == false) { + public long getMaxQueueLatencyMillisSinceLastPollAndReset() { + if (trackMaxQueueLatency == false) { + logger.info("~~~trackMaxQueueLatency is false"); return 0; } - return queueLatencyMillisEWMA.getAverage(); + logger.info("~~~getting max"); + return maxQueueLatencyMillisSinceLastPoll.getThenReset(); } /** @@ -189,8 +194,10 @@ protected void beforeExecute(Thread t, Runnable r) { var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); queueLatencyMillisHistogram.addObservation(queueLatencyMillis); - if (trackAverageQueueLatency) { - queueLatencyMillisEWMA.addValue(queueLatencyMillis); + logger.info("~~~queueLatencyMillis: " + queueLatencyMillis); + if (trackMaxQueueLatency) { + logger.info("~~~adding: " + queueLatencyMillis); + maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis); } } @@ -233,9 +240,6 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { .append("total task execution time = ") .append(TimeValue.timeValueNanos(getTotalTaskExecutionTime())) .append(", "); - if (trackAverageQueueLatency) { - sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillis())).append(", "); - } } /** @@ -255,13 +259,8 @@ public double getExecutionEwmaAlpha() { } // Used for testing - public double getQueueLatencyEwmaAlpha() { - return queueLatencyMillisEWMA.getAlpha(); - } - - // Used for testing - public boolean trackingQueueLatencyEwma() { - return trackAverageQueueLatency; + public boolean trackingMaxQueueLatency() { + return trackMaxQueueLatency; } /** diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index d513cc8d4fa7b..63f0257cb8aba 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -21,7 +21,6 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; -import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA; public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { @@ -33,7 +32,6 @@ public Map getBuilders(Settings settings, int allocated final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512); final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings); - final double queueLatencyEWMAAlpha = WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.get(settings); Map result = new HashMap<>(); result.put( @@ -58,7 +56,7 @@ public Map getBuilders(Settings settings, int allocated allocatedProcessors, // 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores. Math.max(allocatedProcessors * 750, 10000), - new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA, queueLatencyEWMAAlpha) + new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA) ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 5d97dfa425a08..58ac4635b2a4e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -219,16 +219,6 @@ public static ThreadPoolType fromType(String type) { // moving average is 100ms, and we get one task which takes 20s the new EWMA will be ~500ms. public static final double DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA = 0.02; - /** - * If the queue latency reaches a high value (e.g. 10-30 seconds), then this thread pool is overwhelmed. It may be temporary, but that - * spike warrants the allocation balancer adjusting some number of shards, if possible. Therefore, it is alright to react quickly. - * - * As an example, suppose the EWMA is 10_000ms, i.e. 10 seconds. - * A single task in the queue that takes 30_000ms, i.e. 30 seconds, would result in a new EWMA of ~12_000ms - * 0.1 x 30_000ms + 0.9 x 10_000 = 3_000ms + 9_000ms = 12_000ms - */ - public static final double DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = 0.1; - private final Map executors; private final ThreadPoolInfo threadPoolInfo; @@ -283,17 +273,6 @@ public Collection builders() { Setting.Property.NodeScope ); - /** - * The {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} alpha for tracking task queue latency. - */ - public static final Setting WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = Setting.doubleSetting( - "thread_pool.task_tracking.queue_latency.ewma_alpha", - DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA, - 0, - 1, - Setting.Property.NodeScope - ); - /** * Defines and builds the many thread pools delineated in {@link Names}. * diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index cdf62a7ee4fdb..6fe1e7ad7ed0b 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -26,7 +26,6 @@ import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; -import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -90,10 +89,15 @@ public void testExecutionEWMACalculation() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } - public void testQueueLatencyEWMACalculation() throws Exception { + public void testMaxQueueLatency() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final var threadPoolName = randomIdentifier(); + final var barrier = new CyclicBarrier(2); + var adjustableTimedRunnable = new AdjustableQueueTimeWithExecutionBarrierTimedRunnable( + barrier, + TimeUnit.NANOSECONDS.toNanos(1000000) + ); TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( "test-threadpool", 1, @@ -101,33 +105,34 @@ public void testQueueLatencyEWMACalculation() throws Exception { 1000, TimeUnit.MILLISECONDS, ConcurrentCollections.newBlockingQueue(), - settableQueuingWrapper(TimeUnit.NANOSECONDS.toNanos(1000000)), - EsExecutors.daemonThreadFactory("queuetest"), + (runnable) -> adjustableTimedRunnable, + EsExecutors.daemonThreadFactory("queue-latency-test"), new EsAbortPolicy(), context, - new TaskTrackingConfig( - randomBoolean(), - true, - DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST, - DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST - ) + new TaskTrackingConfig(randomBoolean(), true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); executor.setupMetrics(meterRegistry, threadPoolName); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); - assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.0); - // Using the settableQueuingWrapper each task will report being queued for 1ms - executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.6); }); - executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.84); }); - executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.936); }); + // Check that the max is zero initially and after a reset. + assertEquals("The queue latency should be initialized zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + executor.execute(() -> {}); + safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. + assertEquals("Ran one task of 1ms, should be the max", 1, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + + // Check that the max is kept across multiple calls, where the last is not the max. + adjustableTimedRunnable.setQueuedTimeTakenNanos(5000000); executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.9744); }); + safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. + adjustableTimedRunnable.setQueuedTimeTakenNanos(1000000); executeTask(executor, 1); - assertBusy(() -> { assertDoublesEqual(executor.getQueuedTaskLatencyMillis(), 0.98976); }); + safeAwait(barrier); + assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + + // Clean up. assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); @@ -275,10 +280,6 @@ public void testQueueLatencyHistogramMetrics() { } } - private void assertDoublesEqual(double expected, double actual) { - assertEquals(expected, actual, 0.00001); - } - private long getPercentile(List measurements, String percentile) { return measurements.stream().filter(m -> m.attributes().get("percentile").equals(percentile)).findFirst().orElseThrow().getLong(); } @@ -292,7 +293,7 @@ private Function settableWrapper(long timeTakenNanos) } /** - * The returned function outputs a WrappedRunnabled that simulates the case + * The returned function outputs a WrappedRunnable that simulates the case * where {@link TimedRunnable#getQueueTimeNanos()} always returns {@code queueTimeTakenNanos}. */ private Function settableQueuingWrapper(long queueTimeTakenNanos) { @@ -317,7 +318,7 @@ private void executeTask(TaskExecutionTimeTrackingEsThreadPoolExecutor executor, } public class SettableTimedRunnable extends TimedRunnable { - private final long queuedTimeTakenNanos; + private long queuedTimeTakenNanos; private final long executionTimeTakenNanos; private final boolean testFailedOrRejected; @@ -328,6 +329,10 @@ public SettableTimedRunnable(long queuedTimeTakenNanos, long executionTimeTakenN this.testFailedOrRejected = failedOrRejected; } + public void setQueuedTimeTakenNanos(long timeTakenNanos) { + this.queuedTimeTakenNanos = timeTakenNanos; + } + @Override public long getTotalExecutionNanos() { return executionTimeTakenNanos; @@ -343,4 +348,38 @@ long getQueueTimeNanos() { return queuedTimeTakenNanos; } } + + /** + * This TimedRunnable override provides the following: + *
    + *
  • Overrides {@link TimedRunnable#getQueueTimeNanos()} so that arbitrary queue latencies can be set for the thread pool.
  • + *
  • Replaces any submitted Runnable task to the thread pool with a Runnable that only waits on a {@link CyclicBarrier}.
  • + *
+ * This allows dynamically manipulating the queue time with {@link #setQueuedTimeTakenNanos}, and provides a means of waiting for a task + * to start by calling {@code safeAwait(barrier)} after submitting a task. + *

+ * Look at {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#wrapRunnable} for how the ThreadPool uses this as a wrapper around all + * submitted tasks. + */ + public class AdjustableQueueTimeWithExecutionBarrierTimedRunnable extends TimedRunnable { + private long queuedTimeTakenNanos; + + /** + * @param barrier A barrier that the caller can wait upon to ensure a task starts. + * @param queuedTimeTakenNanos The default queue time reported for all tasks. + */ + public AdjustableQueueTimeWithExecutionBarrierTimedRunnable(CyclicBarrier barrier, long queuedTimeTakenNanos) { + super(() -> { safeAwait(barrier); }); + this.queuedTimeTakenNanos = queuedTimeTakenNanos; + } + + public void setQueuedTimeTakenNanos(long timeTakenNanos) { + this.queuedTimeTakenNanos = timeTakenNanos; + } + + @Override + long getQueueTimeNanos() { + return queuedTimeTakenNanos; + } + } } From 3ecac4880bc8ba6212ca2a09b9f4760c5d799403 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 9 Jul 2025 14:15:55 -0700 Subject: [PATCH 06/10] typo fix --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 611e05a439b61..f4e25dc27565d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -270,7 +270,7 @@ public boolean trackingMaxQueueLatency() { * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. */ private class UtilizationTracker { - volatile long lastPollTime = System.nanoTime();; + volatile long lastPollTime = System.nanoTime(); volatile long lastTotalExecutionTime = 0; public double pollUtilization() { From 4986fa5a586a44ab3d84d112b451a57f7f043a2b Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 9 Jul 2025 14:34:04 -0700 Subject: [PATCH 07/10] remove debug logging; tidying up --- ...utionTimeTrackingEsThreadPoolExecutor.java | 10 ++------ ...TimeTrackingEsThreadPoolExecutorTests.java | 25 +++---------------- 2 files changed, 5 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index f4e25dc27565d..2bed9f1f22da7 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -60,8 +60,8 @@ public enum UtilizationTrackingPurpose { ALLOCATION, } - private volatile UtilizationTracker apmUtilizationTracker; - private volatile UtilizationTracker allocationUtilizationTracker; + private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); + private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -82,8 +82,6 @@ public enum UtilizationTrackingPurpose { this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); - this.apmUtilizationTracker = new UtilizationTracker(); - this.allocationUtilizationTracker = new UtilizationTracker(); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -154,10 +152,8 @@ public int getCurrentQueueSize() { public long getMaxQueueLatencyMillisSinceLastPollAndReset() { if (trackMaxQueueLatency == false) { - logger.info("~~~trackMaxQueueLatency is false"); return 0; } - logger.info("~~~getting max"); return maxQueueLatencyMillisSinceLastPoll.getThenReset(); } @@ -194,9 +190,7 @@ protected void beforeExecute(Thread t, Runnable r) { var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); queueLatencyMillisHistogram.addObservation(queueLatencyMillis); - logger.info("~~~queueLatencyMillis: " + queueLatencyMillis); if (trackMaxQueueLatency) { - logger.info("~~~adding: " + queueLatencyMillis); maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis); } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 6fe1e7ad7ed0b..47de722c197a5 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -289,15 +289,7 @@ private long getPercentile(List measurements, String percentile) { * where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}. */ private Function settableWrapper(long timeTakenNanos) { - return (runnable) -> new SettableTimedRunnable(0, timeTakenNanos, false); - } - - /** - * The returned function outputs a WrappedRunnable that simulates the case - * where {@link TimedRunnable#getQueueTimeNanos()} always returns {@code queueTimeTakenNanos}. - */ - private Function settableQueuingWrapper(long queueTimeTakenNanos) { - return (runnable) -> new SettableTimedRunnable(queueTimeTakenNanos, 0, false); + return (runnable) -> new SettableTimedRunnable(timeTakenNanos, false); } /** @@ -306,7 +298,7 @@ private Function settableQueuingWrapper(long queueTim * the job failed or was rejected before it finished. */ private Function exceptionalWrapper() { - return (runnable) -> new SettableTimedRunnable(0, TimeUnit.NANOSECONDS.toNanos(-1), true); + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true); } /** Execute a blank task {@code times} times for the executor */ @@ -318,21 +310,15 @@ private void executeTask(TaskExecutionTimeTrackingEsThreadPoolExecutor executor, } public class SettableTimedRunnable extends TimedRunnable { - private long queuedTimeTakenNanos; private final long executionTimeTakenNanos; private final boolean testFailedOrRejected; - public SettableTimedRunnable(long queuedTimeTakenNanos, long executionTimeTakenNanos, boolean failedOrRejected) { + public SettableTimedRunnable(long executionTimeTakenNanos, boolean failedOrRejected) { super(() -> {}); - this.queuedTimeTakenNanos = queuedTimeTakenNanos; this.executionTimeTakenNanos = executionTimeTakenNanos; this.testFailedOrRejected = failedOrRejected; } - public void setQueuedTimeTakenNanos(long timeTakenNanos) { - this.queuedTimeTakenNanos = timeTakenNanos; - } - @Override public long getTotalExecutionNanos() { return executionTimeTakenNanos; @@ -342,11 +328,6 @@ public long getTotalExecutionNanos() { public boolean getFailedOrRejected() { return testFailedOrRejected; } - - @Override - long getQueueTimeNanos() { - return queuedTimeTakenNanos; - } } /** From 7ded14f493c917bd9c07fa0d72f3555a3b5a928b Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 9 Jul 2025 14:38:03 -0700 Subject: [PATCH 08/10] add try-finally in test --- ...TimeTrackingEsThreadPoolExecutorTests.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 47de722c197a5..b19a118cb87bd 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -111,31 +111,33 @@ public void testMaxQueueLatency() throws Exception { context, new TaskTrackingConfig(randomBoolean(), true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) ); - executor.setupMetrics(meterRegistry, threadPoolName); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - // Check that the max is zero initially and after a reset. - assertEquals("The queue latency should be initialized zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); - executor.execute(() -> {}); - safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. - assertEquals("Ran one task of 1ms, should be the max", 1, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); - assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); - - // Check that the max is kept across multiple calls, where the last is not the max. - adjustableTimedRunnable.setQueuedTimeTakenNanos(5000000); - executeTask(executor, 1); - safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. - adjustableTimedRunnable.setQueuedTimeTakenNanos(1000000); - executeTask(executor, 1); - safeAwait(barrier); - assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); - assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + try { + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); - // Clean up. - assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + // Check that the max is zero initially and after a reset. + assertEquals("The queue latency should be initialized zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + executor.execute(() -> { + }); + safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. + assertEquals("Ran one task of 1ms, should be the max", 1, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + + // Check that the max is kept across multiple calls, where the last is not the max. + adjustableTimedRunnable.setQueuedTimeTakenNanos(5000000); + executeTask(executor, 1); + safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. + adjustableTimedRunnable.setQueuedTimeTakenNanos(1000000); + executeTask(executor, 1); + safeAwait(barrier); + assertEquals("Max should not be the last task", 5, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); + } finally { + // Clean up. + assertThat(executor.getOngoingTasks().toString(), executor.getOngoingTasks().size(), equalTo(0)); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } } /** Use a runnable wrapper that simulates a task with unknown failures. */ From f94233228b42c22fd718beeb67b79714444f4cbc Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 9 Jul 2025 21:47:59 +0000 Subject: [PATCH 09/10] [CI] Auto commit changes from spotless --- .../TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index b19a118cb87bd..d3ae1165a3aee 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -117,8 +117,7 @@ public void testMaxQueueLatency() throws Exception { // Check that the max is zero initially and after a reset. assertEquals("The queue latency should be initialized zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); - executor.execute(() -> { - }); + executor.execute(() -> {}); safeAwait(barrier); // Wait for the task to start, which means implies has finished the queuing stage. assertEquals("Ran one task of 1ms, should be the max", 1, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); assertEquals("The max was just reset, should be zero", 0, executor.getMaxQueueLatencyMillisSinceLastPollAndReset()); From 505732478bbbb884f30467b3f0ec2ce61937b35d Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 10 Jul 2025 08:39:47 -0700 Subject: [PATCH 10/10] some improvements from Nick's review. --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 6 +++--- .../java/org/elasticsearch/threadpool/ThreadPoolTests.java | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 2bed9f1f22da7..3e2ad9ce5a091 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -264,10 +264,10 @@ public boolean trackingMaxQueueLatency() { * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. */ private class UtilizationTracker { - volatile long lastPollTime = System.nanoTime(); - volatile long lastTotalExecutionTime = 0; + long lastPollTime = System.nanoTime(); + long lastTotalExecutionTime = 0; - public double pollUtilization() { + public synchronized double pollUtilization() { final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); final long currentPollTimeNanos = System.nanoTime(); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 04c23c9873f24..2cd166e002637 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -682,8 +682,9 @@ T assertLatestMetricValueMatches( List measurements = meterRegistry.getRecorder() .getMeasurements(instrumentType, ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + name); assertFalse(name + " has no measurements", measurements.isEmpty()); - assertThat(valueExtractor.apply(measurements.getLast()), matcher); - return valueExtractor.apply(measurements.getLast()); + var latestMetric = valueExtractor.apply(measurements.getLast()); + assertThat(latestMetric, matcher); + return latestMetric; } }