Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4037ccf
Add allocation write load stats to write thread pool
DiannaHohensee Jul 1, 2025
b8d8e56
reduce utilization EWMA to parallel polling: allow utilization to be …
DiannaHohensee Jul 2, 2025
fd8f602
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 2, 2025
47debcd
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 3, 2025
37b20e8
sub class pollUtilization into UtilizationTracker; rename from EWMA t…
DiannaHohensee Jul 3, 2025
be50439
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 3, 2025
45f6836
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
bd2e0f1
rename
DiannaHohensee Jul 9, 2025
629ca35
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
b0fdaf2
Remove EWMA; add Max
DiannaHohensee Jul 9, 2025
46a659b
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
3ecac48
typo fix
DiannaHohensee Jul 9, 2025
4986fa5
remove debug logging; tidying up
DiannaHohensee Jul 9, 2025
7ded14f
add try-finally in test
DiannaHohensee Jul 9, 2025
f942332
[CI] Auto commit changes from spotless
Jul 9, 2025
f9488fe
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 9, 2025
5057324
some improvements from Nick's review.
DiannaHohensee Jul 10, 2025
fb05d84
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 10, 2025
326f9e1
Merge branch 'main' into 2025/06/30/add-node-level-write-load-stats
DiannaHohensee Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,29 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu
}
}

public void testWriteThreadpoolEwmaAlphaSetting() {
public void testWriteThreadpoolsEwmaAlphaSetting() {
Settings settings = Settings.EMPTY;
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
if (randomBoolean()) {
ewmaAlpha = randomDoubleBetween(0.0, 1.0, true);
settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), ewmaAlpha).build();
executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha).build();
}
var nodeName = internalCluster().startNode(settings);
var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);

// Verify that the write thread pools all use the tracking executor.
for (var name : List.of(ThreadPool.Names.WRITE, ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
assertThat(threadPool.executor(name), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
final var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(name);
assertThat(Double.compare(executor.getEwmaAlpha(), ewmaAlpha), CoreMatchers.equalTo(0));
assertThat(Double.compare(executor.getExecutionEwmaAlpha(), executionEwmaAlpha), CoreMatchers.equalTo(0));

// Only the WRITE thread pool should enable further tracking.
if (name.equals(ThreadPool.Names.WRITE) == false) {
assertFalse(executor.trackingMaxQueueLatency());
} else {
// Verify that the WRITE thread pool has extra tracking enabled.
assertTrue(executor.trackingMaxQueueLatency());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -577,24 +577,53 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
}

public static class TaskTrackingConfig {
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static final double DEFAULT_EWMA_ALPHA = 0.3;
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;

private final boolean trackExecutionTime;
private final boolean trackOngoingTasks;
private final double ewmaAlpha;
private final boolean trackMaxQueueLatency;
private final double executionTimeEwmaAlpha;

public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
false,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
true,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
);
Copy link
Contributor

@nicktindall nicktindall Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think when you get to three consecutive booleans, it's worth making a builder for readability, e.g.

TaskTrackingConfig config = TaskTrackingConfig.builder()
    .trackingOngoingTasks()
    .trackingQueueLatencyAverage()
    // ... etc
    .build();

A side benefit would be you could default all to false and just specify the ones that were true.

Happy to be challenged on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide this is a good idea, also happy for it to be a separate PR because it'll add a fair bit of noise I imagine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll put up a separate PR for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I wonder if the max latency tracking is so cheap that we don't need to have a config for it. The ongoing task tracking is quite expensive, so it makes sense to be able to opt out of it. It would also reduce the surface area of this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a waste to track information unnecessarily, and potentially confusing wondering why it's not used. I'd prefer not to. Also never know how code will grow over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To close this out, I've merged a TaskTrackingConfig Builder subclass and updated the callers with it.


public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA);
public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) {
this(true, trackOngoingTasks, false, executionTimeEWMAAlpha);
}

public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) {
this(true, trackOngoingTasks, ewmaAlpha);
/**
* Execution tracking enabled constructor, with extra options to enable further specialized tracking.
*/
public TaskTrackingConfig(boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha) {
this(true, trackOngoingTasks, trackMaxQueueLatency, executionTimeEwmaAlpha);
}

private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) {
/**
* @param trackExecutionTime Whether to track execution stats
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
* @param trackMaxQueueLatency Whether to track max queue latency.
* @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
*/
private TaskTrackingConfig(
boolean trackExecutionTime,
boolean trackOngoingTasks,
boolean trackMaxQueueLatency,
double executionTimeEWMAAlpha
) {
this.trackExecutionTime = trackExecutionTime;
this.trackOngoingTasks = trackOngoingTasks;
this.ewmaAlpha = EWMAAlpha;
this.trackMaxQueueLatency = trackMaxQueueLatency;
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
}

public boolean trackExecutionTime() {
Expand All @@ -605,8 +634,12 @@ public boolean trackOngoingTasks() {
return trackOngoingTasks;
}

public double getEwmaAlpha() {
return ewmaAlpha;
public boolean trackMaxQueueLatency() {
return trackMaxQueueLatency;
}

public double getExecutionTimeEwmaAlpha() {
return executionTimeEwmaAlpha;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.Instrument;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
Expand All @@ -27,6 +29,7 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

Expand All @@ -37,6 +40,7 @@
* An extension to thread pool executor, which tracks statistics for the task execution time.
*/
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingEsThreadPoolExecutor.class);
Copy link
Contributor

@mashhurs mashhurs Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logger is initialized but it seems never used.
@DiannaHohensee do you have a plan to remove or log details?

The way I figured out is, https://github.com/elastic/logstash-filter-elastic_integration Logstash plugin minimally (localDistro) builds ES and uses only required (mostly processors and some modules such as GeoIP) modules. And with this PR, the plugin is broken (CI failure example).
At the glance I can compile lib/elasticsearch-logging and add it to the plugin as a dependency but

  • since the LogManager isn't used, if you remove it, it would be much appreciated;
  • otherwise, I need to dig more to figure out another NPE error after adding lib/elasticsearch-logging that comes from LogManager.java:36 (I suspect LoggerFactory#INSTANCE is null)
logstash-1       |       # ------------------
logstash-1       |       # --- Caused by: ---
logstash-1       |       # Java::JavaLang::ExceptionInInitializerError:
logstash-1       |       #   Exception java.lang.NullPointerException [in thread "main"]
logstash-1       |       #   org.elasticsearch.logging.LogManager.getLogger(LogManager.java:36)

Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To close the loop here, I've put up a PR to remove the logger. Work is under way to address this on the Logstash module side, but it's a lot easier to change ES right now.


public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18;
private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };
Expand All @@ -47,9 +51,17 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final boolean trackOngoingTasks;
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private volatile long lastPollTime = System.nanoTime();
private volatile long lastTotalExecutionTime = 0;
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
private final boolean trackMaxQueueLatency;
private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);

public enum UtilizationTrackingPurpose {
APM,
ALLOCATION,
}

private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker();
private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker();

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand All @@ -65,9 +77,11 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
TaskTrackingConfig trackingConfig
) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);

this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0);
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency();
}

public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
Expand Down Expand Up @@ -95,7 +109,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + threadPoolName,
"fraction",
() -> new DoubleWithAttributes(pollUtilization(), Map.of())
() -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of())
)
);
}
Expand Down Expand Up @@ -136,37 +150,49 @@ public int getCurrentQueueSize() {
return getQueue().size();
}

public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
if (trackMaxQueueLatency == false) {
return 0;
}
return maxQueueLatencyMillisSinceLastPoll.getThenReset();
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time
* this method was called.
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
* caller.
*
* @return the utilization as a fraction, in the range [0, 1]
* @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
* earlier, contributing a larger execution time.
*/
public double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
return utilizationSinceLastPoll;
public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) {
switch (utilizationTrackingPurpose) {
case APM:
return apmUtilizationTracker.pollUtilization();
case ALLOCATION:
return allocationUtilizationTracker.pollUtilization();
default:
throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]");
}
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
ongoingTasks.put(r, System.nanoTime());
}

assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
timedRunnable.beforeExecute();
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
assert taskQueueLatency >= 0;
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency);
queueLatencyMillisHistogram.addObservation(queueLatencyMillis);

if (trackMaxQueueLatency) {
maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis);
}
}

@Override
Expand Down Expand Up @@ -222,7 +248,39 @@ public Map<Runnable, Long> getOngoingTasks() {
}

// Used for testing
public double getEwmaAlpha() {
public double getExecutionEwmaAlpha() {
return executionEWMA.getAlpha();
}

// Used for testing
public boolean trackingMaxQueueLatency() {
return trackMaxQueueLatency;
}

/**
* Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization
* since the last poll can be calculated for the next polling request.
*
* Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred.
*/
private class UtilizationTracker {
volatile long lastPollTime = System.nanoTime();
volatile long lastTotalExecutionTime = 0;

public double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;

final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;

return utilizationSinceLastPoll;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
allocatedProcessors,
// 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores.
Math.max(allocatedProcessors * 750, 10000),
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA)
)
);
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);
Expand Down
Loading