Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4037ccf
Add allocation write load stats to write thread pool
DiannaHohensee Jul 1, 2025
b8d8e56
reduce utilization EWMA to parallel polling: allow utilization to be …
DiannaHohensee Jul 2, 2025
fd8f602
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 2, 2025
47debcd
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 3, 2025
37b20e8
sub class pollUtilization into UtilizationTracker; rename from EWMA t…
DiannaHohensee Jul 3, 2025
be50439
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 3, 2025
45f6836
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
bd2e0f1
rename
DiannaHohensee Jul 9, 2025
629ca35
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
b0fdaf2
Remove EWMA; add Max
DiannaHohensee Jul 9, 2025
46a659b
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
3ecac48
typo fix
DiannaHohensee Jul 9, 2025
4986fa5
remove debug logging; tidying up
DiannaHohensee Jul 9, 2025
7ded14f
add try-finally in test
DiannaHohensee Jul 9, 2025
f942332
[CI] Auto commit changes from spotless
Jul 9, 2025
f9488fe
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
5057324
some improvements from Nick's review.
DiannaHohensee Jul 10, 2025
fb05d84
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 10, 2025
326f9e1
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -234,19 +236,35 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu
}
}

public void testWriteThreadpoolEwmaAlphaSetting() {
public void testWriteThreadpoolsEwmaAlphaSetting() {
Settings settings = Settings.EMPTY;
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
var queueLatencyEwmaAlpha = DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
if (randomBoolean()) {
ewmaAlpha = randomDoubleBetween(0.0, 1.0, true);
settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), ewmaAlpha).build();
executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
queueLatencyEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
settings = Settings.builder()
.put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha)
.put(WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.getKey(), queueLatencyEwmaAlpha)
.build();
}
var nodeName = internalCluster().startNode(settings);
var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);

// Verify that the write thread pools all use the tracking executor.
for (var name : List.of(ThreadPool.Names.WRITE, ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
assertThat(threadPool.executor(name), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
final var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(name);
assertThat(Double.compare(executor.getEwmaAlpha(), ewmaAlpha), CoreMatchers.equalTo(0));
assertThat(Double.compare(executor.getExecutionEwmaAlpha(), executionEwmaAlpha), CoreMatchers.equalTo(0));

// Only the WRITE thread pool should enable further tracking.
if (name.equals(ThreadPool.Names.WRITE) == false) {
assertFalse(executor.trackingQueueLatencyEwma());
} else {
// Verify that the WRITE thread pool has extra tracking enabled.
assertTrue(executor.trackingQueueLatencyEwma());
assertThat(Double.compare(executor.getQueueLatencyEwmaAlpha(), queueLatencyEwmaAlpha), CoreMatchers.equalTo(0));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,
ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING,
ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA,
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,24 +577,65 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
}

public static class TaskTrackingConfig {
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static final double DEFAULT_EWMA_ALPHA = 0.3;
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;
public static final double DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST = 0.6;

private final boolean trackExecutionTime;
private final boolean trackOngoingTasks;
private final double ewmaAlpha;
private final boolean trackQueueLatencyEWMA;
private final double executionTimeEwmaAlpha;
private final double queueLatencyEWMAAlpha;

public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
false,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST,
DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST
);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
true,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST,
DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST
);
Copy link
Contributor

@nicktindall nicktindall Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think when you get to three consecutive booleans, it's worth making a builder for readability, e.g.

TaskTrackingConfig config = TaskTrackingConfig.builder()
    .trackingOngoingTasks()
    .trackingQueueLatencyAverage()
    // ... etc
    .build();

A side benefit would be you could default all to false and just specify the ones that were true.

Happy to be challenged on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide this is a good idea, also happy for it to be a separate PR because it'll add a fair bit of noise I imagine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll put up a separate PR for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I wonder if the max latency tracking is so cheap that we don't need to have a config for it. The ongoing task tracking is quite expensive, so it makes sense to be able to opt out of it. It would also reduce the surface area of this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a waste to track information unnecessarily, and potentially confusing wondering why it's not used. I'd prefer not to. Also never know how code will grow over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To close this out, I've merged a TaskTrackingConfig Builder subclass and updated the callers with it.


public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA);
public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) {
this(true, trackOngoingTasks, false, executionTimeEWMAAlpha, DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST);
}

public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) {
this(true, trackOngoingTasks, ewmaAlpha);
/**
* Execution tracking enabled constructor, with extra options to enable further specialized tracking.
*/
public TaskTrackingConfig(
boolean trackOngoingTasks,
boolean trackQueueLatencyEWMA,
double executionTimeEWMAAlpha,
double queueLatencyEWMAAlpha
) {
this(true, trackOngoingTasks, trackQueueLatencyEWMA, executionTimeEWMAAlpha, queueLatencyEWMAAlpha);
}

private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) {
/**
* @param trackExecutionTime Whether to track execution stats
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
* @param trackQueueLatencyEWMA Whether to track queue latency {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage}
* @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
* @param queueLatencyEWMAAlpha The alpha seed for task queue latency EWMA (ExponentiallyWeightedMovingAverage).
*/
private TaskTrackingConfig(
boolean trackExecutionTime,
boolean trackOngoingTasks,
boolean trackQueueLatencyEWMA,
double executionTimeEWMAAlpha,
double queueLatencyEWMAAlpha
) {
this.trackExecutionTime = trackExecutionTime;
this.trackOngoingTasks = trackOngoingTasks;
this.ewmaAlpha = EWMAAlpha;
this.trackQueueLatencyEWMA = trackQueueLatencyEWMA;
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
this.queueLatencyEWMAAlpha = queueLatencyEWMAAlpha;
}

public boolean trackExecutionTime() {
Expand All @@ -605,8 +646,16 @@ public boolean trackOngoingTasks() {
return trackOngoingTasks;
}

public double getEwmaAlpha() {
return ewmaAlpha;
public boolean trackQueueLatencyEWMA() {
return trackQueueLatencyEWMA;
}

public double getExecutionTimeEwmaAlpha() {
return executionTimeEwmaAlpha;
}

public double getQueueLatencyEwmaAlpha() {
return queueLatencyEWMAAlpha;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final boolean trackOngoingTasks;
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private volatile long lastPollTime = System.nanoTime();
private volatile long lastTotalExecutionTime = 0;
private volatile long lastPollTimeAPM = System.nanoTime();
private volatile long lastTotalExecutionTimeAPM = 0;
private volatile long lastPollTimeNanosAllocation = System.nanoTime();
private volatile long lastTotalExecutionTimeAllocation = 0;
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
private final boolean trackQueueLatencyEWMA;
private final ExponentiallyWeightedMovingAverage queueLatencyMillisEWMA;

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand All @@ -65,9 +69,12 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
TaskTrackingConfig trackingConfig
) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);

this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0);
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
this.trackQueueLatencyEWMA = trackingConfig.trackQueueLatencyEWMA();
this.queueLatencyMillisEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getQueueLatencyEwmaAlpha(), 0);
}

public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
Expand Down Expand Up @@ -95,7 +102,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + threadPoolName,
"fraction",
() -> new DoubleWithAttributes(pollUtilization(), Map.of())
() -> new DoubleWithAttributes(pollUtilization(true, false), Map.of())
)
);
}
Expand Down Expand Up @@ -136,23 +143,43 @@ public int getCurrentQueueSize() {
return getQueue().size();
}

public double getQueuedTaskLatencyMillisEWMA() {
if (trackQueueLatencyEWMA == false) {
return 0;
}
return queueLatencyMillisEWMA.getAverage();
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time
* this method was called.
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
* One of the two boolean parameters must be true, while the other false. There are two periodic pulling mechanisms that access
* utilization reporting.
*
* @return the utilization as a fraction, in the range [0, 1]
* @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
* earlier, contributing a larger execution time.
*/
public double pollUtilization() {
public double pollUtilization(boolean forAPM, boolean forAllocation) {
assert forAPM ^ forAllocation : "Can only collect one or the other, APM: " + forAPM + ", Allocation: " + forAllocation;

final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - (forAPM
? lastTotalExecutionTimeAPM
: lastTotalExecutionTimeAllocation);
final long timeSinceLastPoll = currentPollTimeNanos - (forAPM ? lastPollTimeAPM : lastPollTimeNanosAllocation);
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
if (forAPM) {
lastTotalExecutionTimeAPM = currentTotalExecutionTimeNanos;
lastPollTimeAPM = currentPollTimeNanos;
} else {
assert forAllocation;
lastTotalExecutionTimeAllocation = currentTotalExecutionTimeNanos;
lastPollTimeNanosAllocation = currentPollTimeNanos;
}

return utilizationSinceLastPoll;
}

Expand All @@ -161,12 +188,18 @@ protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
ongoingTasks.put(r, System.nanoTime());
}

assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
timedRunnable.beforeExecute();
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
assert taskQueueLatency >= 0;
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency);
queueLatencyMillisHistogram.addObservation(queueLatencyMillis);

if (trackQueueLatencyEWMA) {
queueLatencyMillisEWMA.addValue(queueLatencyMillis);
}
}

@Override
Expand Down Expand Up @@ -208,6 +241,9 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
.append("total task execution time = ")
.append(TimeValue.timeValueNanos(getTotalTaskExecutionTime()))
.append(", ");
if (trackQueueLatencyEWMA) {
sb.append("task queue EWMA = ").append(TimeValue.timeValueMillis((long) getQueuedTaskLatencyMillisEWMA())).append(", ");
}
}

/**
Expand All @@ -222,7 +258,17 @@ public Map<Runnable, Long> getOngoingTasks() {
}

// Used for testing
public double getEwmaAlpha() {
public double getExecutionEwmaAlpha() {
return executionEWMA.getAlpha();
}

// Used for testing
public double getQueueLatencyEwmaAlpha() {
return queueLatencyMillisEWMA.getAlpha();
}

// Used for testing
public boolean trackingQueueLatencyEwma() {
return trackQueueLatencyEWMA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA;

public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders {
Expand All @@ -32,6 +33,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512);
final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings);
final double queueLatencyEWMAAlpha = WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA.get(settings);

Map<String, ExecutorBuilder> result = new HashMap<>();
result.put(
Expand All @@ -55,7 +57,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.WRITE,
allocatedProcessors,
10000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA, queueLatencyEWMAAlpha)
)
);
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);
Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ public static ThreadPoolType fromType(String type) {
// moving average is 100ms, and we get one task which takes 20s the new EWMA will be ~500ms.
public static final double DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA = 0.02;

/**
* If the queue latency reaches a high value (e.g. 10-30 seconds), then this thread pool is overwhelmed. It may be temporary, but that
* spike warrants the allocation balancer adjusting some number of shards, if possible. Therefore, it is alright to react quickly.
*
* As an example, suppose the EWMA is 10_000ms, i.e. 10 seconds.
* A single task in the queue that takes 30_000ms, i.e. 30 seconds, would result in a new EWMA of ~12_000ms
* 0.1 x 30_000ms + 0.9 x 10_000 = 3_000ms + 9_000ms = 12_000ms
*/
public static final double DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = 0.1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intuitively, I feel this value could use some relationship with the one for executionEWMA since they are updated with the same fequency before and after the task execution, respectively. If the node has N write threads and we observe N large execution times in a row, it feels roughly a similar effect as seeing a single large queue latency, i.e. all threads were busy executing tasks that took long to complete. This makes me think whether this value should be N * executionEWMA_alpha. I could be be wrong on this. Just an idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling a bit with the ewma used in this way. We get a value weighted towards the last N values, those could have arrived in the last 0.5ms or the last 30seconds, so it seems difficult to reason about. I wonder if it'd be easier to reason about if the queue were polled at a fixed interval (by the same regular thread that we introduce to poll the ulitlization) and ask the task at the front of the queue how long it'd been waiting?

That way I think the utilisation and latency would reflect observations over the same fixed time period and we could tweak the alpha values to find a period that was meaningful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think it means the average doesn't move if there's no tasks moving through the queue? I know its unlikely, but it seems wrong that the average would just linger at whatever it was most recently until a task was executed. The latency should return to zero when there's nothing happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it'd be easier to reason about if the queue were polled at a fixed interval (by the same regular thread that we introduce to poll the ulitlization) and ask the task at the front of the queue how long it'd been waiting?

Our ThreadPools use a LinkedTransferQueue, which has peek(), and the task is TimedRunnable that has a creationTimeNanos private field. So I could grab a work queue reference in TaskExecutionTimeTrackingEsThreadPoolExecutor and add a TimedRunnable#getCreationTimeNanos method. So that is an option.

I had thought the EWMA would be better than periodically checking, as an average, but it does have some limitations. The argument against periodic poling is that we might see a momentary spike, and not know it, or vice versa miss frequent spikes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pooya was explaining that the executionTimeEWMA is okay to be disconnected from time by the way that auto-scaling uses the value. The idea being that when writes do occur, the executionTimeEWMA reflects the average time needed to serve the requests. Even if there are lulls where no write requests occur, the idea is that when the writes do occur again, the executionTimeEWMA accurately reflects the work requirements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought the EWMA would be better than periodically checking, as an average, but it does have some limitations. The argument against periodic poling is that we might see a momentary spike, and not know it, or vice versa miss frequent spikes.

Though, if we are going to create a polling system for thread pool utilization on the data node, per comment, then maybe we also have the opportunity to create polling for the queue latency with a time stable period EWMA. Then both EWMAs will be relatable to time, instead of the number of tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the node has N write threads and we observe N large execution times in a row, it feels roughly a similar effect as seeing a single large queue latency, i.e. all threads were busy executing tasks that took long to complete. This makes me think whether this value should be N * executionEWMA_alpha.

To make things a little more concrete, N * executionEWMA_alpha would be, say, 8 * .02 = 0.16.

I was hoping to make hot-spot indicators / node-level write load relatable to our shard-level write loads, so that the balancer could see "node A has X too much load, let's move X shard load off of node A". Shard load is relatable to execution time, and queue time, which is a lot more similar to auto-scaling calculations (I expect, haven't verified).

It's possible that we come up with a mathematical relationship between queue latency and needed execution resources do in future, but lacking it right now, the alpha seed simply controls how fast we want the average to respond to new data points. The number of write threads on a node seems relevant if relocating work to nodes with different thread pool sizes, or sizing up like auto-scaling does; not sure otherwise.

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think it means the average doesn't move if there's no tasks moving through the queue? I know its unlikely, but it seems wrong that the average would just linger at whatever it was most recently until a task was executed. The latency should return to zero when there's nothing happening.

A workaround would be for the balancer to only consider the queue duration when the thread pool utilization is greater than the 90% threshold. That would guarantee that both the thread pool is in use AND the queue latency is bad. An active thread pool with no queuing would quickly bring the EWMA back down again.

After the other discussions, I think we have three options for queue latency

  1. the master node polls every 30 seconds to see what the current front of the write queue latency is.
  2. continue taking the EWMA of all queue latencies (including zeros)
  3. create a periodic sampler for queue latency to add to an EWMA.

I'm inclined to do (1) as the simplest. I'd implement what I outline above,

Our ThreadPools use a LinkedTransferQueue, which has peek(), and the task is TimedRunnable that has a creationTimeNanos private field. So I could grab a work queue reference in TaskExecutionTimeTrackingEsThreadPoolExecutor and add a TimedRunnable#getCreationTimeNanos method.

Actually, the simplest would be to use what I already have, which is (2) 🤷‍♀️

Copy link
Contributor

@nicktindall nicktindall Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not a fan of the EWMA for this, because the rate that the value goes down is dependant on the amount of tasks moving through the queue, so if our average was 30s, then we have 5 minutes with no tasks at all (and no queue), it'll still be 30s 5 minutes later despite there being no queue, and until we run enough smaller values through it to bring the average down.

I don't feel strongly enough about it to block the change for it, because I think this scenario is probably unusual, but I have my reservations about it.

If we polled the queue regularly I would be happier with the EWMA as it'll decay when the queue is quiet.


private final Map<String, ExecutorHolder> executors;

private final ThreadPoolInfo threadPoolInfo;
Expand Down Expand Up @@ -271,6 +281,17 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

/**
* The {@link org.elasticsearch.common.ExponentiallyWeightedMovingAverage} alpha for tracking task queue latency.
*/
public static final Setting<Double> WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = Setting.doubleSetting(
"thread_pool.task_tracking.queue_latency.ewma_alpha",
DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA,
0,
1,
Setting.Property.NodeScope
);

/**
* Defines and builds the many thread pools delineated in {@link Names}.
*
Expand Down
Loading