diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index ca8447bfc47e1..c39ce209bf875 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -577,6 +577,7 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } public static class TaskTrackingConfig { + // This is a random starting point alpha. public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; private final boolean trackExecutionTime; @@ -597,17 +598,6 @@ public static class TaskTrackingConfig { DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); - public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) { - this(true, trackOngoingTasks, false, executionTimeEWMAAlpha); - } - - /** - * Execution tracking enabled constructor, with extra options to enable further specialized tracking. - */ - public TaskTrackingConfig(boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha) { - this(true, trackOngoingTasks, trackMaxQueueLatency, executionTimeEwmaAlpha); - } - /** * @param trackExecutionTime Whether to track execution stats * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks @@ -641,6 +631,39 @@ public boolean trackMaxQueueLatency() { public double getExecutionTimeEwmaAlpha() { return executionTimeEwmaAlpha; } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private boolean trackExecutionTime = false; + private boolean trackOngoingTasks = false; + private boolean trackMaxQueueLatency = false; + private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; + + public Builder() {} + + public Builder trackExecutionTime(double alpha) { + trackExecutionTime = true; + ewmaAlpha = alpha; + return this; + } + + public Builder trackOngoingTasks() { + trackOngoingTasks = true; + return this; + } + + public Builder trackMaxQueueLatency() { + trackMaxQueueLatency = true; + return this; + } + + public TaskTrackingConfig build() { + return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha); + } + } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 63f0257cb8aba..4ebcae1cc2ac0 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -56,7 +56,11 @@ public Map getBuilders(Settings settings, int allocated allocatedProcessors, // 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores. Math.max(allocatedProcessors * 750, 10000), - new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA) + EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackMaxQueueLatency() + .trackExecutionTime(indexAutoscalingEWMA) + .build() ) ); int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors); @@ -81,7 +85,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SEARCH, searchOrGetThreadPoolSize, searchOrGetThreadPoolSize * 1000, - new EsExecutors.TaskTrackingConfig(true, searchAutoscalingEWMA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(searchAutoscalingEWMA).build() ) ); result.put( @@ -91,7 +95,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SEARCH_COORDINATION, halfProc, 1000, - new EsExecutors.TaskTrackingConfig(true, searchAutoscalingEWMA) + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(searchAutoscalingEWMA).build() ) ); result.put( @@ -195,7 +199,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build(), true ) ); @@ -228,7 +232,7 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.SYSTEM_CRITICAL_WRITE, halfProcMaxAt5, 1500, - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build(), true ) ); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 38b14b09cd82e..62d4d6d9cbc15 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -675,6 +675,7 @@ public void testScalingWithTaskTimeTracking() { final int max = between(min + 1, 6); { + var executionTimeEwma = randomDoubleBetween(0.01, 0.1, true); ThreadPoolExecutor pool = EsExecutors.newScaling( getClass().getName() + "/" + getTestName(), min, @@ -684,7 +685,9 @@ public void testScalingWithTaskTimeTracking() { randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadContext, - new EsExecutors.TaskTrackingConfig(randomBoolean(), randomDoubleBetween(0.01, 0.1, true)) + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build() ); assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index d3ae1165a3aee..50156ff4d81c6 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.metrics.ExponentialBucketHistogram; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; @@ -51,7 +50,12 @@ public void testExecutionEWMACalculation() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -109,7 +113,16 @@ public void testMaxQueueLatency() throws Exception { EsExecutors.daemonThreadFactory("queue-latency-test"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackMaxQueueLatency() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder() + .trackMaxQueueLatency() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() ); try { executor.prestartAllCoreThreads(); @@ -153,7 +166,12 @@ public void testExceptionThrowingTask() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + randomBoolean() + ? EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -185,7 +203,10 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context, - new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -220,7 +241,10 @@ public void testQueueLatencyHistogramMetrics() { EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), new ThreadContext(Settings.EMPTY), - new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + EsExecutors.TaskTrackingConfig.builder() + .trackOngoingTasks() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build() ); executor.setupMetrics(meterRegistry, threadPoolName);