Skip to content

Commit b50ea91

Browse files
Add allocation write load stats to write thread pool (#130373)
Instrument the WRITE thread pool to collect: 1. Thread pool utilization for periodic collection 2. Thread pool max queue latency for periodic collection This supports collection of node-level write load stats, which will be collected by the balancer in a future patch. Relates ES-12233
1 parent f6a02a9 commit b50ea91

File tree

6 files changed

+247
-52
lines changed

6 files changed

+247
-52
lines changed

server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,19 +234,29 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu
234234
}
235235
}
236236

237-
public void testWriteThreadpoolEwmaAlphaSetting() {
237+
public void testWriteThreadpoolsEwmaAlphaSetting() {
238238
Settings settings = Settings.EMPTY;
239-
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
239+
var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
240240
if (randomBoolean()) {
241-
ewmaAlpha = randomDoubleBetween(0.0, 1.0, true);
242-
settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), ewmaAlpha).build();
241+
executionEwmaAlpha = randomDoubleBetween(0.0, 1.0, true);
242+
settings = Settings.builder().put(WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.getKey(), executionEwmaAlpha).build();
243243
}
244244
var nodeName = internalCluster().startNode(settings);
245245
var threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
246+
247+
// Verify that the write thread pools all use the tracking executor.
246248
for (var name : List.of(ThreadPool.Names.WRITE, ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
247249
assertThat(threadPool.executor(name), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
248250
final var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(name);
249-
assertThat(Double.compare(executor.getEwmaAlpha(), ewmaAlpha), CoreMatchers.equalTo(0));
251+
assertThat(Double.compare(executor.getExecutionEwmaAlpha(), executionEwmaAlpha), CoreMatchers.equalTo(0));
252+
253+
// Only the WRITE thread pool should enable further tracking.
254+
if (name.equals(ThreadPool.Names.WRITE) == false) {
255+
assertFalse(executor.trackingMaxQueueLatency());
256+
} else {
257+
// Verify that the WRITE thread pool has extra tracking enabled.
258+
assertTrue(executor.trackingMaxQueueLatency());
259+
}
250260
}
251261
}
252262
}

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -577,24 +577,53 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
577577
}
578578

579579
public static class TaskTrackingConfig {
580-
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
581-
public static final double DEFAULT_EWMA_ALPHA = 0.3;
580+
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;
582581

583582
private final boolean trackExecutionTime;
584583
private final boolean trackOngoingTasks;
585-
private final double ewmaAlpha;
584+
private final boolean trackMaxQueueLatency;
585+
private final double executionTimeEwmaAlpha;
586+
587+
public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
588+
false,
589+
false,
590+
false,
591+
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
592+
);
593+
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
594+
true,
595+
false,
596+
false,
597+
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
598+
);
586599

587-
public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(false, false, DEFAULT_EWMA_ALPHA);
588-
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(true, false, DEFAULT_EWMA_ALPHA);
600+
public TaskTrackingConfig(boolean trackOngoingTasks, double executionTimeEWMAAlpha) {
601+
this(true, trackOngoingTasks, false, executionTimeEWMAAlpha);
602+
}
589603

590-
public TaskTrackingConfig(boolean trackOngoingTasks, double ewmaAlpha) {
591-
this(true, trackOngoingTasks, ewmaAlpha);
604+
/**
605+
* Execution tracking enabled constructor, with extra options to enable further specialized tracking.
606+
*/
607+
public TaskTrackingConfig(boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha) {
608+
this(true, trackOngoingTasks, trackMaxQueueLatency, executionTimeEwmaAlpha);
592609
}
593610

594-
private TaskTrackingConfig(boolean trackExecutionTime, boolean trackOngoingTasks, double EWMAAlpha) {
611+
/**
612+
* @param trackExecutionTime Whether to track execution stats
613+
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
614+
* @param trackMaxQueueLatency Whether to track max queue latency.
615+
* @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
616+
*/
617+
private TaskTrackingConfig(
618+
boolean trackExecutionTime,
619+
boolean trackOngoingTasks,
620+
boolean trackMaxQueueLatency,
621+
double executionTimeEWMAAlpha
622+
) {
595623
this.trackExecutionTime = trackExecutionTime;
596624
this.trackOngoingTasks = trackOngoingTasks;
597-
this.ewmaAlpha = EWMAAlpha;
625+
this.trackMaxQueueLatency = trackMaxQueueLatency;
626+
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
598627
}
599628

600629
public boolean trackExecutionTime() {
@@ -605,8 +634,12 @@ public boolean trackOngoingTasks() {
605634
return trackOngoingTasks;
606635
}
607636

608-
public double getEwmaAlpha() {
609-
return ewmaAlpha;
637+
public boolean trackMaxQueueLatency() {
638+
return trackMaxQueueLatency;
639+
}
640+
641+
public double getExecutionTimeEwmaAlpha() {
642+
return executionTimeEwmaAlpha;
610643
}
611644
}
612645

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
1414
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
1515
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.logging.LogManager;
17+
import org.elasticsearch.logging.Logger;
1618
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
1719
import org.elasticsearch.telemetry.metric.Instrument;
1820
import org.elasticsearch.telemetry.metric.LongWithAttributes;
@@ -27,6 +29,7 @@
2729
import java.util.concurrent.RejectedExecutionHandler;
2830
import java.util.concurrent.ThreadFactory;
2931
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.LongAccumulator;
3033
import java.util.concurrent.atomic.LongAdder;
3134
import java.util.function.Function;
3235

@@ -37,6 +40,7 @@
3740
* An extension to thread pool executor, which tracks statistics for the task execution time.
3841
*/
3942
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
43+
private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingEsThreadPoolExecutor.class);
4044

4145
public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18;
4246
private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };
@@ -47,9 +51,17 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
4751
private final boolean trackOngoingTasks;
4852
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
4953
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
50-
private volatile long lastPollTime = System.nanoTime();
51-
private volatile long lastTotalExecutionTime = 0;
5254
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
55+
private final boolean trackMaxQueueLatency;
56+
private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);
57+
58+
public enum UtilizationTrackingPurpose {
59+
APM,
60+
ALLOCATION,
61+
}
62+
63+
private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker();
64+
private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker();
5365

5466
TaskExecutionTimeTrackingEsThreadPoolExecutor(
5567
String name,
@@ -65,9 +77,11 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
6577
TaskTrackingConfig trackingConfig
6678
) {
6779
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);
80+
6881
this.runnableWrapper = runnableWrapper;
69-
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0);
82+
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
7083
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
84+
this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency();
7185
}
7286

7387
public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
@@ -95,7 +109,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
95109
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
96110
"fraction of maximum thread time utilized for " + threadPoolName,
97111
"fraction",
98-
() -> new DoubleWithAttributes(pollUtilization(), Map.of())
112+
() -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of())
99113
)
100114
);
101115
}
@@ -136,37 +150,49 @@ public int getCurrentQueueSize() {
136150
return getQueue().size();
137151
}
138152

153+
public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
154+
if (trackMaxQueueLatency == false) {
155+
return 0;
156+
}
157+
return maxQueueLatencyMillisSinceLastPoll.getThenReset();
158+
}
159+
139160
/**
140-
* Returns the fraction of the maximum possible thread time that was actually used since the last time
141-
* this method was called.
161+
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
162+
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
163+
* caller.
142164
*
143-
* @return the utilization as a fraction, in the range [0, 1]
165+
* @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
166+
* earlier, contributing a larger execution time.
144167
*/
145-
public double pollUtilization() {
146-
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
147-
final long currentPollTimeNanos = System.nanoTime();
148-
149-
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
150-
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
151-
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
152-
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
153-
154-
lastTotalExecutionTime = currentTotalExecutionTimeNanos;
155-
lastPollTime = currentPollTimeNanos;
156-
return utilizationSinceLastPoll;
168+
public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) {
169+
switch (utilizationTrackingPurpose) {
170+
case APM:
171+
return apmUtilizationTracker.pollUtilization();
172+
case ALLOCATION:
173+
return allocationUtilizationTracker.pollUtilization();
174+
default:
175+
throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]");
176+
}
157177
}
158178

159179
@Override
160180
protected void beforeExecute(Thread t, Runnable r) {
161181
if (trackOngoingTasks) {
162182
ongoingTasks.put(r, System.nanoTime());
163183
}
184+
164185
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
165186
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
166187
timedRunnable.beforeExecute();
167188
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
168189
assert taskQueueLatency >= 0;
169-
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
190+
var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency);
191+
queueLatencyMillisHistogram.addObservation(queueLatencyMillis);
192+
193+
if (trackMaxQueueLatency) {
194+
maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis);
195+
}
170196
}
171197

172198
@Override
@@ -222,7 +248,39 @@ public Map<Runnable, Long> getOngoingTasks() {
222248
}
223249

224250
// Used for testing
225-
public double getEwmaAlpha() {
251+
public double getExecutionEwmaAlpha() {
226252
return executionEWMA.getAlpha();
227253
}
254+
255+
// Used for testing
256+
public boolean trackingMaxQueueLatency() {
257+
return trackMaxQueueLatency;
258+
}
259+
260+
/**
261+
* Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization
262+
* since the last poll can be calculated for the next polling request.
263+
*
264+
* Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred.
265+
*/
266+
private class UtilizationTracker {
267+
long lastPollTime = System.nanoTime();
268+
long lastTotalExecutionTime = 0;
269+
270+
public synchronized double pollUtilization() {
271+
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
272+
final long currentPollTimeNanos = System.nanoTime();
273+
274+
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
275+
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
276+
277+
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
278+
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
279+
280+
lastTotalExecutionTime = currentTotalExecutionTimeNanos;
281+
lastPollTime = currentPollTimeNanos;
282+
283+
return utilizationSinceLastPoll;
284+
}
285+
}
228286
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
5656
allocatedProcessors,
5757
// 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores.
5858
Math.max(allocatedProcessors * 750, 10000),
59-
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
59+
new EsExecutors.TaskTrackingConfig(true, true, indexAutoscalingEWMA)
6060
)
6161
);
6262
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);

0 commit comments

Comments
 (0)