Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4037ccf
Add allocation write load stats to write thread pool
DiannaHohensee Jul 1, 2025
b8d8e56
reduce utilization EWMA to parallel polling: allow utilization to be …
DiannaHohensee Jul 2, 2025
fd8f602
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 2, 2025
47debcd
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 3, 2025
37b20e8
sub class pollUtilization into UtilizationTracker; rename from EWMA t…
DiannaHohensee Jul 3, 2025
be50439
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 3, 2025
45f6836
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
bd2e0f1
rename
DiannaHohensee Jul 9, 2025
629ca35
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
b0fdaf2
Remove EWMA; add Max
DiannaHohensee Jul 9, 2025
46a659b
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
3ecac48
typo fix
DiannaHohensee Jul 9, 2025
4986fa5
remove debug logging; tidying up
DiannaHohensee Jul 9, 2025
7ded14f
add try-finally in test
DiannaHohensee Jul 9, 2025
f942332
[CI] Auto commit changes from spotless
Jul 9, 2025
f9488fe
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
5057324
some improvements from Nick's review.
DiannaHohensee Jul 10, 2025
fb05d84
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 10, 2025
326f9e1
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -234,19 +238,41 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu
}
}

public void testWriteThreadpoolEwmaAlphaSetting() {
public void testWriteThreadpoolsEwmaAlphaSetting() {
Settings settings = Settings.EMPTY;
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
var queueLatencyEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
var threadUtilizationEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA;
if (randomBoolean()) {
ewmaAlpha = randomDoubleBetween(0.0, 1.0, true);
settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), ewmaAlpha).build();
executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
queueLatencyEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
threadUtilizationEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
settings = Settings.builder()
.put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha)
.put(WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.getKey(), queueLatencyEwmaAlpha)
.put(WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA.getKey(), threadUtilizationEwmaAlpha)
.build();
}
var nodeName = internalCluster().startNode(settings);
var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);

// Verify that the write thread pools all use the tracking executor.
for (var name : List.of(ThreadPool.Names.WRITE, ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
assertThat(threadPool.executor(name), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
final var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(name);
assertThat(Double.compare(executor.getEwmaAlpha(), ewmaAlpha), CoreMatchers.equalTo(0));
assertThat(Double.compare(executor.getExecutionEwmaAlpha(), executionEwmaAlpha), CoreMatchers.equalTo(0));

// Only the WRITE thread pool should enable further tracking.
if (name.equals(ThreadPool.Names.WRITE) == false) {
assertFalse(executor.trackingQueueLatencyEwma());
assertFalse(executor.trackUtilizationEwma());
} else {
// Verify that the WRITE thread pool has extra tracking enabled.
assertTrue(executor.trackingQueueLatencyEwma());
assertTrue(executor.trackUtilizationEwma());
assertThat(Double.compare(executor.getQueueLatencyEwmaAlpha(), queueLatencyEwmaAlpha), CoreMatchers.equalTo(0));
assertThat(Double.compare(executor.getPoolUtilizationEwmaAlpha(), threadUtilizationEwmaAlpha), CoreMatchers.equalTo(0));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,
ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING,
ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA,
ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA,
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,24 +577,94 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
}

public static class TaskTrackingConfig {
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static final double DEFAULT_EWMA_ALPHA = 0.3;
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;
public static final double DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST = 0.6;
public static final double DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST = 0.6;

private final boolean trackExecutionTime;
private final boolean trackOngoingTasks;
private final double ewmaAlpha;
private final boolean trackQueueLatencyEWMA;
private final boolean trackPoolUtilizationEWMA;
private final double executionTimeEwmaAlpha;
private final double queueLatencyEWMAAlpha;
private final double poolUtilizationEWMAAlpha;

public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
false,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST,
DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST,
DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST
);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
true,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST,
DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST,
DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST
);
Copy link
Contributor

@nicktindall nicktindall Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think when you get to three consecutive booleans, it's worth making a builder for readability, e.g.

TaskTrackingConfig config = TaskTrackingConfig.builder()
    .trackingOngoingTasks()
    .trackingQueueLatencyAverage()
    // ... etc
    .build();

A side benefit would be you could default all to false and just specify the ones that were true.

Happy to be challenged on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide this is a good idea, also happy for it to be a separate PR because it'll add a fair bit of noise I imagine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll put up a separate PR for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I wonder if the max latency tracking is so cheap that we don't need to have a config for it. The ongoing task tracking is quite expensive, so it makes sense to be able to opt out of it. It would also reduce the surface area of this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a waste to track information unnecessarily, and potentially confusing wondering why it's not used. I'd prefer not to. Also never know how code will grow over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To close this out, I've merged a TaskTrackingConfig Builder subclass and updated the callers with it.


public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA);
public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) {
this(
true,
trackOngoingTasks,
false,
false,
executionTimeEWMAAlpha,
DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST,
DEFAULT_POOL_UTILIZATION_EWMA_ALPHA_FOR_TEST
);
}

public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) {
this(true, trackOngoingTasks, ewmaAlpha);
/**
* Execution tracking enabled constructor, with extra options to enable further specialized tracking.
*/
public TaskTrackingConfig(
boolean trackOngoingTasks,
boolean trackQueueLatencyEWMA,
boolean trackPoolUtilizationEWMA,
double executionTimeEWMAAlpha,
double queueLatencyEWMAAlpha,
double poolUtilizationEWMAAlpha
) {
this(
true,
trackOngoingTasks,
trackQueueLatencyEWMA,
trackPoolUtilizationEWMA,
executionTimeEWMAAlpha,
queueLatencyEWMAAlpha,
poolUtilizationEWMAAlpha
);
}

private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) {
/**
* @param trackExecutionTime Whether to track execution stats
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
* @param trackQueueLatencyEWMA Whether to track queue latency {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage}
* @param trackPoolUtilizationEWMA Whether to track the EWMA for thread pool thread utilization (percent use).
* @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
* @param queueLatencyEWMAAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage).
* @param poolUtilizationEWMAAlpha The alpha seed for pool utilization EWMA (ExponentiallyWeightedMovingAverage).
*/
private TaskTrackingConfig(
boolean trackExecutionTime,
boolean trackOngoingTasks,
boolean trackQueueLatencyEWMA,
boolean trackPoolUtilizationEWMA,
double executionTimeEWMAAlpha,
double queueLatencyEWMAAlpha,
double poolUtilizationEWMAAlpha
) {
this.trackExecutionTime = trackExecutionTime;
this.trackOngoingTasks = trackOngoingTasks;
this.ewmaAlpha = EWMAAlpha;
this.trackQueueLatencyEWMA = trackQueueLatencyEWMA;
this.trackPoolUtilizationEWMA = trackPoolUtilizationEWMA;
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
this.queueLatencyEWMAAlpha = queueLatencyEWMAAlpha;
this.poolUtilizationEWMAAlpha = poolUtilizationEWMAAlpha;
}

public boolean trackExecutionTime() {
Expand All @@ -605,8 +675,24 @@ public boolean trackOngoingTasks() {
return trackOngoingTasks;
}

public double getEwmaAlpha() {
return ewmaAlpha;
public boolean trackQueueLatencyEWMA() {
return trackQueueLatencyEWMA;
}

public boolean trackPoolUtilizationEWMA() {
return trackPoolUtilizationEWMA;
}

public double getExecutionTimeEwmaAlpha() {
return executionTimeEwmaAlpha;
}

public double getQueueLatencyEwmaAlpha() {
return queueLatencyEWMAAlpha;
}

public double getPoolUtilizationEwmaAlpha() {
return poolUtilizationEWMAAlpha;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

Expand All @@ -50,6 +51,11 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private volatile long lastPollTime = System.nanoTime();
private volatile long lastTotalExecutionTime = 0;
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
private final boolean trackQueueLatencyEWMA;
private final boolean trackUtilizationEWMA;
private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA;
private final ExponentiallyWeightedMovingAverage percentPoolUtilizationEWMA;
private final AtomicReference<Double> lastUtilizationValue = new AtomicReference<>(0.0);

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand All @@ -65,9 +71,14 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
TaskTrackingConfig trackingConfig
) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);

this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0);
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
this.trackQueueLatencyEWMA = trackingConfig.trackQueueLatencyEWMA();
this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0);
this.trackUtilizationEWMA = trackingConfig.trackPoolUtilizationEWMA();
this.percentPoolUtilizationEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getPoolUtilizationEwmaAlpha(), 0);
}

public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
Expand Down Expand Up @@ -136,6 +147,20 @@ public int getCurrentQueueSize() {
return getQueue().size();
}

public double getPercentPoolUtilizationEWMA() {
if (trackUtilizationEWMA == false) {
return 0;
}
return this.percentPoolUtilizationEWMA.getAverage();
}

public double getQueuedTaskLatencyMillisEWMA() {
if (trackQueueLatencyEWMA == false) {
return 0;
}
return queueLatencyMillisEWMA.getAverage();
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time
* this method was called.
Expand All @@ -153,20 +178,41 @@ public double pollUtilization() {

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;

if (trackUtilizationEWMA) {
percentPoolUtilizationEWMA.addValue(utilizationSinceLastPoll);
// Test only tracking.
assert setUtilizationSinceLastPoll(utilizationSinceLastPoll);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels odd to tie utilization polling to APM. I think they can be separate. For example, we can have a separate set of lastTotalExecutionTime and lastPollTime which should allow us compute the utlization in a different frequency than the APM?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the decision originally was because it avoided the need for an extra thread etc. given that we're now extending it's use, I think it makes sense to rethink all of that.

it sounds like we want a more sophisticated average utilisation number.

If we're willing to wear the cost of a thread to do the polling, I think it makes sense for that thread to maintain THE average (which will become an EWMA) and publish that as the es.thread_pool.{name}.threads.utilization.current value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, I shall go ahead with that then, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note though, if we poll more often than once a minute the utilisation > 100% issues will get worse.

We could do what write load does and take into account running tasks, but it's all adding to the cost, and restricting this calculation to thread pools that have trackOngoingTasks=true. We should consider that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're willing to wear the cost of a thread to do the polling

We don't really need a "new" thread for it, right? It seems the options so far is either polling from AverageWriteLoadSampler orInternalClusterInfoService, both have existing thread schedule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AverageWriteLoadSampler is part of the auto-scaling reporting from data node to master, and runs every 1 second. The InternalClusterInfoService runs every 30 seconds (INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING).

An EWMA based on samples every 30 seconds would likely be unusable.

I started working on a Runnable that gets rescheduled on the GENERIC thread pool every 3 seconds, say.

One thing to note though, if we poll more often than once a minute the utilisation > 100% issues will get worse.

We could cap the utilization at 100%?

Copy link
Contributor

@nicktindall nicktindall Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could cap the utilization at 100%?

I think that would just leave us with wildly inaccurate utilisation numbers. I think 60s is a reasonable interval for polling this stuff. I don't think the > 100% utilisation is really that significant when you look at the thread utilisation dashboards. It seems to render utilisation fairly accurately overall. We could experiment with 30s and see how that looks.

Could we just not use the ewma and instead use the average for the last 60s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see a histogram of the run time for tasks. It's hard to know what a reasonable interval is without seeing that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An EWMA based on samples every 30 seconds would likely be unusable.

I think we should start out simple by letting InternalClusterInfoService to poll on 30s interval without adding extra thread scheduling. We can add that if it proves to be a problem. Personally I think every 30s sounds fine for balancing purpose.

Copy link
Contributor

@nicktindall nicktindall Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why not try just using the average we have? I think ewma is adding potentially unnecessary additional logic. It seems more useful when you are smoothing finer grained samples, if you're already looking at a 30s window I don't think it's necessary.


return utilizationSinceLastPoll;
}

// Test only
private boolean setUtilizationSinceLastPoll(double utilizationSinceLastPoll) {
lastUtilizationValue.set(utilizationSinceLastPoll);
return true;
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
ongoingTasks.put(r, System.nanoTime());
}

assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
timedRunnable.beforeExecute();
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
assert taskQueueLatency >= 0;
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency);
queueLatencyMillisHistogram.addObservation(queueLatencyMillis);

if (trackQueueLatencyEWMA) {
if (queueLatencyMillis > 0) {
queueLatencyMillisEWMA.addValue(queueLatencyMillis);
}
}
}

@Override
Expand Down Expand Up @@ -208,6 +254,12 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
.append("total task execution time = ")
.append(TimeValue.timeValueNanos(getTotalTaskExecutionTime()))
.append(", ");
if (trackQueueLatencyEWMA) {
sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillisEWMA())).append(", ");
}
if (trackUtilizationEWMA) {
sb.append("thread pool utilization percentage EWMA = ").append(getPercentPoolUtilizationEWMA()).append(", ");
}
}

/**
Expand All @@ -222,7 +274,27 @@ public Map<Runnable, Long> getOngoingTasks() {
}

// Used for testing
public double getEwmaAlpha() {
public double getExecutionEwmaAlpha() {
return executionEWMA.getAlpha();
}

// Used for testing
public double getQueueLatencyEwmaAlpha() {
return queueLatencyMillisEWMA.getAlpha();
}

// Used for testing
public double getPoolUtilizationEwmaAlpha() {
return percentPoolUtilizationEWMA.getAlpha();
}

// Used for testing
public boolean trackingQueueLatencyEwma() {
return trackQueueLatencyEWMA;
}

// Used for testing
public boolean trackUtilizationEwma() {
return trackUtilizationEWMA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA;

public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders {
Expand All @@ -32,6 +34,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512);
final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings);
final double queueLatencyEWMAAlpha = WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.get(settings);
final double threadUtilizationEWMAAlpha = WRITE_THREAD_POOL_THREAD_UTILIZATION_EWMA_ALPHA.get(settings);

Map<String, ExecutorBuilder> result = new HashMap<>();
result.put(
Expand All @@ -55,7 +59,14 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.WRITE,
allocatedProcessors,
10000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(
true,
true,
true,
indexAutoscalingEWMA,
queueLatencyEWMAAlpha,
threadUtilizationEWMAAlpha
)
)
);
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);
Expand Down
Loading