From 54f39f9f2bb9353acbe98ca1feec04c2ac5f792b Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 9 Jul 2025 15:18:27 -0700 Subject: [PATCH 1/2] Create a Builder for TaskTrackingConfig --- .../common/util/concurrent/EsExecutors.java | 33 ++++++++++++++++--- .../DefaultBuiltInExecutorBuilders.java | 10 +++--- .../util/concurrent/EsExecutorsTests.java | 5 ++- ...TimeTrackingEsThreadPoolExecutorTests.java | 12 ++++--- 4 files changed, 45 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 4fd5225a29167..b438f7d17cee6 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -577,7 +577,8 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } public static class TaskTrackingConfig { - // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable + + // This is a random starting point alpha. public static final double DEFAULT_EWMA_ALPHA = 0.3; private final boolean trackExecutionTime; @@ -587,10 +588,6 @@ public static class TaskTrackingConfig { public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA); public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA); - public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) { - this(true, trackOngoingTasks, ewmaAlpha); - } - private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; @@ -608,6 +605,32 @@ public boolean trackOngoingTasks() { public double getEwmaAlpha() { return ewmaAlpha; } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private boolean trackExecutionTime = false; + private boolean trackOngoingTasks = false; + private double ewmaAlpha = DEFAULT_EWMA_ALPHA; + public Builder() {} + + public Builder trackExecutionTime(double alpha) { + trackExecutionTime = true; + ewmaAlpha = alpha; + return this; + } + + public Builder trackOngoingTasks() { + trackOngoingTasks = true; + return this; + } + + public TaskTrackingConfig build() { + return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, ewmaAlpha); + } + } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 7b69e3a164d5b..a68ed19636bf5 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -56,7 +56,7 @@ public Map getBuilders(Settings settings, int allocated allocatedProcessors, // 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores. Math.max(allocatedProcessors * 750, 10000), - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build() ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); @@ -81,7 +81,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SEARCH, searchOrGetThreadPoolSize, searchOrGetThreadPoolSize * 1000, - new EsExecutors.TaskTrackingConfig(true, searchAutoscalingEWMA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(searchAutoscalingEWMA).build() ) ); result.put( @@ -91,7 +91,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SEARCH_COORDINATION, halfProc, 1000, - new EsExecutors.TaskTrackingConfig(true, searchAutoscalingEWMA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(searchAutoscalingEWMA).build() ) ); result.put( @@ -195,7 +195,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build(), true ) ); @@ -228,7 +228,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SYSTEM_CRITICAL_WRITE, halfProcMaxAt5, 1500, - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build(), true ) ); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 38b14b09cd82e..3dbfacba22cb4 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -675,6 +675,7 @@ public void testScalingWithTaskTimeTracking() { final int max = between(min + 1, 6); { + var executionTimeEwma = randomDoubleBetween(0.01, 0.1, true); ThreadPoolExecutor pool = EsExecutors.newScaling( getClass().getName() + "/" + getTestName(), min, @@ -684,7 +685,9 @@ public void testScalingWithTaskTimeTracking() { randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadContext, - new EsExecutors.TaskTrackingConfig(randomBoolean(), randomDoubleBetween(0.01, 0.1, true)) + randomBoolean() ? + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build() : + EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build() ); assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 7f720721aebf2..4b9faf47a8600 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -51,7 +51,9 @@ public void testExecutionEWMACalculation() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + randomBoolean() ? + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EWMA_ALPHA).build() : + EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EWMA_ALPHA).build() ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -103,7 +105,9 @@ public void testExceptionThrowingTask() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + randomBoolean() ? + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EWMA_ALPHA).build() : + EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EWMA_ALPHA).build() ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -135,7 +139,7 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EWMA_ALPHA).build() ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -170,7 +174,7 @@ public void testQueueLatencyMetrics() { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), new ThreadContext(Settings.EMPTY), - new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EWMA_ALPHA).build() ); executor.setupMetrics(meterRegistry, threadPoolName); From 46ae7cfbfda3acbbd05c7e509c681c2e3fc3ba99 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 10 Jul 2025 13:46:34 -0700 Subject: [PATCH 2/2] styling --- .../common/util/concurrent/EsExecutors.java | 1 + .../DefaultBuiltInExecutorBuilders.java | 6 ++- .../util/concurrent/EsExecutorsTests.java | 6 +-- ...TimeTrackingEsThreadPoolExecutorTests.java | 42 +++++++++++++------ 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c81c72c84a9e5..c39ce209bf875 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -641,6 +641,7 @@ public static class Builder { private boolean trackOngoingTasks = false; private boolean trackMaxQueueLatency = false; private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; + public Builder() {} public Builder trackExecutionTime(double alpha) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 2d28fdf378ddc..4ebcae1cc2ac0 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -56,7 +56,11 @@ public Map getBuilders(Settings settings, int allocated allocatedProcessors, // 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores. Math.max(allocatedProcessors * 750, 10000), - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackMaxQueueLatency().trackExecutionTime(indexAutoscalingEWMA).build() + EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackMaxQueueLatency() + .trackExecutionTime(indexAutoscalingEWMA) + .build() ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 3dbfacba22cb4..62d4d6d9cbc15 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -685,9 +685,9 @@ public void testScalingWithTaskTimeTracking() { randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadContext, - randomBoolean() ? - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build() : - EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build() + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build() ); assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 25a01732747f5..50156ff4d81c6 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.metrics.ExponentialBucketHistogram; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; @@ -51,9 +50,12 @@ public void testExecutionEWMACalculation() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - randomBoolean() ? - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() : - EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -111,9 +113,16 @@ public void testMaxQueueLatency() throws Exception { EsExecutors.daemonThreadFactory("queue-latency-test"), new EsAbortPolicy(), context, - randomBoolean() ? - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackMaxQueueLatency().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() : - EsExecutors.TaskTrackingConfig.builder().trackMaxQueueLatency().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackMaxQueueLatency() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder() + .trackMaxQueueLatency() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() ); try { executor.prestartAllCoreThreads(); @@ -157,9 +166,12 @@ public void testExceptionThrowingTask() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - randomBoolean() ? - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() : - EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -191,7 +203,10 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -226,7 +241,10 @@ public void testQueueLatencyHistogramMetrics() { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), new ThreadContext(Settings.EMPTY), - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() ); executor.setupMetrics(meterRegistry, threadPoolName);