Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
}

public static class TaskTrackingConfig {
// This is a random starting point alpha.
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;

private final boolean trackExecutionTime;
Expand All @@ -597,17 +598,6 @@ public static class TaskTrackingConfig {
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
);

public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) {
this(true, trackOngoingTasks, false, executionTimeEWMAAlpha);
}

/**
* Execution tracking enabled constructor, with extra options to enable further specialized tracking.
*/
public TaskTrackingConfig(boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha) {
this(true, trackOngoingTasks, trackMaxQueueLatency, executionTimeEwmaAlpha);
}

/**
* @param trackExecutionTime Whether to track execution stats
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
Expand Down Expand Up @@ -641,6 +631,39 @@ public boolean trackMaxQueueLatency() {
public double getExecutionTimeEwmaAlpha() {
return executionTimeEwmaAlpha;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private boolean trackExecutionTime = false;
private boolean trackOngoingTasks = false;
private boolean trackMaxQueueLatency = false;
private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST;

public Builder() {}

public Builder trackExecutionTime(double alpha) {
trackExecutionTime = true;
ewmaAlpha = alpha;
return this;
}

public Builder trackOngoingTasks() {
trackOngoingTasks = true;
return this;
}

public Builder trackMaxQueueLatency() {
trackMaxQueueLatency = true;
return this;
}

public TaskTrackingConfig build() {
return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
allocatedProcessors,
// 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores.
Math.max(allocatedProcessors * 750, 10000),
new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA)
EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackMaxQueueLatency()
.trackExecutionTime(indexAutoscalingEWMA)
.build()
)
);
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);
Expand All @@ -81,7 +85,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SEARCH,
searchOrGetThreadPoolSize,
searchOrGetThreadPoolSize * 1000,
new EsExecutors.TaskTrackingConfig(true, searchAutoscalingEWMA)
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(searchAutoscalingEWMA).build()
)
);
result.put(
Expand All @@ -91,7 +95,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SEARCH_COORDINATION,
halfProc,
1000,
new EsExecutors.TaskTrackingConfig(true, searchAutoscalingEWMA)
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(searchAutoscalingEWMA).build()
)
);
result.put(
Expand Down Expand Up @@ -195,7 +199,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_WRITE,
halfProcMaxAt5,
1000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build(),
true
)
);
Expand Down Expand Up @@ -228,7 +232,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_CRITICAL_WRITE,
halfProcMaxAt5,
1500,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(indexAutoscalingEWMA).build(),
true
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ public void testScalingWithTaskTimeTracking() {
final int max = between(min + 1, 6);

{
var executionTimeEwma = randomDoubleBetween(0.01, 0.1, true);
ThreadPoolExecutor pool = EsExecutors.newScaling(
getClass().getName() + "/" + getTestName(),
min,
Expand All @@ -684,7 +685,9 @@ public void testScalingWithTaskTimeTracking() {
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
threadContext,
new EsExecutors.TaskTrackingConfig(randomBoolean(), randomDoubleBetween(0.01, 0.1, true))
randomBoolean()
? EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build()
: EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build()
);
assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
Expand Down Expand Up @@ -51,7 +50,12 @@ public void testExecutionEWMACalculation() throws Exception {
EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(),
context,
new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
randomBoolean()
? EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
: EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build()
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
Expand Down Expand Up @@ -109,7 +113,16 @@ public void testMaxQueueLatency() throws Exception {
EsExecutors.daemonThreadFactory("queue-latency-test"),
new EsAbortPolicy(),
context,
new TaskTrackingConfig(randomBoolean(), true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
randomBoolean()
? EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackMaxQueueLatency()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
: EsExecutors.TaskTrackingConfig.builder()
.trackMaxQueueLatency()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
);
try {
executor.prestartAllCoreThreads();
Expand Down Expand Up @@ -153,7 +166,12 @@ public void testExceptionThrowingTask() throws Exception {
EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(),
context,
new TaskTrackingConfig(randomBoolean(), DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
randomBoolean()
? EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
: EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build()
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
Expand Down Expand Up @@ -185,7 +203,10 @@ public void testGetOngoingTasks() throws Exception {
EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(),
context,
new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
);
var taskRunningLatch = new CountDownLatch(1);
var exitTaskLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -220,7 +241,10 @@ public void testQueueLatencyHistogramMetrics() {
EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(),
new ThreadContext(Settings.EMPTY),
new TaskTrackingConfig(true, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build()
);
executor.setupMetrics(meterRegistry, threadPoolName);

Expand Down