Skip to content

Commit 270ca0a

Browse files
authored
Add thread pool utilisation metric (#120363)
There are existing metrics for the active number of threads, but it seems tricky to go from those to a "utilisation" number because all the pools have different sizes. This commit adds `es.thread_pool.{name}.threads.utilization.current` which will be published by all `TaskExecutionTimeTrackingEsThreadPoolExecutor` thread pools (where `EsExecutors.TaskTrackingConfig#trackExecutionTime` is true). The metric is a double gauge indicating what fraction (in [0.0, 1.0]) of the maximum possible execution time was utilised over the polling interval. It's calculated as actualTaskExecutionTime / maximumTaskExecutionTime, so effectively a "mean" value. The metric interval is 60s so brief spikes won't be apparent in the measure, but the initial goal is to use it to detect hot-spotting so the 60s average will probably suffice. Relates ES-10530
1 parent e53d3ff commit 270ca0a

File tree

5 files changed

+314
-31
lines changed

5 files changed

+314
-31
lines changed

docs/changelog/120363.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120363
2+
summary: Add thread pool utilization metric
3+
area: "Infra/Metrics"
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java

Lines changed: 69 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.index.IndexRequestBuilder;
1313
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.Maps;
1415
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
1516
import org.elasticsearch.index.query.QueryBuilders;
1617
import org.elasticsearch.plugins.Plugin;
@@ -31,22 +32,18 @@
3132
import java.util.HashSet;
3233
import java.util.List;
3334
import java.util.Map;
34-
import java.util.Map.Entry;
3535
import java.util.Set;
36+
import java.util.function.BiFunction;
3637
import java.util.function.Function;
3738
import java.util.regex.Pattern;
39+
import java.util.stream.Collectors;
3840

39-
import static java.util.function.Function.identity;
40-
import static org.elasticsearch.common.util.Maps.toUnmodifiableSortedMap;
4141
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
4242
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
4343
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
4444
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
4545
import static org.hamcrest.Matchers.contains;
46-
import static org.hamcrest.Matchers.equalTo;
4746
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
48-
import static org.hamcrest.Matchers.hasEntry;
49-
import static org.hamcrest.Matchers.in;
5047
import static org.hamcrest.Matchers.instanceOf;
5148
import static org.hamcrest.Matchers.matchesRegex;
5249

@@ -165,37 +162,78 @@ public void testThreadPoolMetrics() throws Exception {
165162
registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER));
166163

167164
tps[0].forEach(stats -> {
168-
Map<String, Long> threadPoolStats = List.of(
169-
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, stats.completed()),
170-
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, 0L),
171-
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, 0L),
172-
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, (long) stats.largest()),
173-
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE, 0L)
174-
).stream().collect(toUnmodifiableSortedMap(e -> stats.name() + e.getKey(), Entry::getValue));
175-
176-
Function<String, List<Long>> measurementExtractor = name -> {
177-
String metricName = ThreadPool.THREAD_POOL_METRIC_PREFIX + name;
178-
assertThat(metricName, in(registeredMetrics));
179-
180-
List<Measurement> measurements = name.endsWith(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED)
181-
? plugin.getLongAsyncCounterMeasurement(metricName)
182-
: plugin.getLongGaugeMeasurement(metricName);
183-
return measurements.stream().map(Measurement::getLong).toList();
184-
};
185-
186-
Map<String, List<Long>> measurements = threadPoolStats.keySet()
187-
.stream()
188-
.collect(toUnmodifiableSortedMap(identity(), measurementExtractor));
165+
Map<String, MetricDefinition<?>> metricDefinitions = Map.of(
166+
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
167+
new MetricDefinition<>(stats.completed(), TestTelemetryPlugin::getLongAsyncCounterMeasurement, Measurement::getLong),
168+
ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE,
169+
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
170+
ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT,
171+
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
172+
ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST,
173+
new MetricDefinition<>((long) stats.largest(), TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
174+
ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE,
175+
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong)
176+
);
189177

190-
logger.info("Stats of `{}`: {}", stats.name(), threadPoolStats);
191-
logger.info("Measurements of `{}`: {}", stats.name(), measurements);
178+
// TaskExecutionTimeTrackingEsThreadPoolExecutor also publishes a utilization metric
179+
if (tp.executor(stats.name()) instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) {
180+
metricDefinitions = Maps.copyMapWithAddedEntry(
181+
metricDefinitions,
182+
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
183+
new MetricDefinition<>(0.0d, TestTelemetryPlugin::getDoubleGaugeMeasurement, Measurement::getDouble)
184+
);
185+
}
192186

193-
threadPoolStats.forEach(
194-
(metric, value) -> assertThat(measurements, hasEntry(equalTo(metric), contains(greaterThanOrEqualTo(value))))
187+
metricDefinitions = metricDefinitions.entrySet()
188+
.stream()
189+
.collect(Collectors.toUnmodifiableMap(e -> stats.name() + e.getKey(), Map.Entry::getValue));
190+
191+
logger.info(
192+
"Measurements of `{}`: {}",
193+
stats.name(),
194+
metricDefinitions.entrySet()
195+
.stream()
196+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMeasurements(plugin, e.getKey())))
195197
);
198+
199+
// Validate all metrics
200+
metricDefinitions.forEach((name, md) -> md.assertValid(plugin, name));
196201
});
197202
}
198203

204+
private static class MetricDefinition<T extends Comparable<T>> {
205+
206+
private final T minimumValue;
207+
private final BiFunction<TestTelemetryPlugin, String, List<Measurement>> metricExtractor;
208+
private final Function<Measurement, T> valueExtractor;
209+
210+
MetricDefinition(
211+
T minimumValue,
212+
BiFunction<TestTelemetryPlugin, String, List<Measurement>> metricExtractor,
213+
Function<Measurement, T> valueExtractor
214+
) {
215+
this.minimumValue = minimumValue;
216+
this.metricExtractor = metricExtractor;
217+
this.valueExtractor = valueExtractor;
218+
}
219+
220+
public List<T> getMeasurements(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) {
221+
return metricExtractor.apply(testTelemetryPlugin, ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix)
222+
.stream()
223+
.map(valueExtractor)
224+
.toList();
225+
}
226+
227+
public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) {
228+
List<T> metrics = getMeasurements(testTelemetryPlugin, metricSuffix);
229+
assertThat(
230+
ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix + " is populated",
231+
metrics,
232+
contains(greaterThanOrEqualTo(minimumValue))
233+
);
234+
}
235+
}
236+
199237
public void testWriteThreadpoolEwmaAlphaSetting() {
200238
Settings settings = Settings.EMPTY;
201239
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
3333
private final boolean trackOngoingTasks;
3434
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
3535
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
36+
private volatile long lastPollTime = System.nanoTime();
37+
private volatile long lastTotalExecutionTime = 0;
3638

3739
TaskExecutionTimeTrackingEsThreadPoolExecutor(
3840
String name,
@@ -89,6 +91,26 @@ public int getCurrentQueueSize() {
8991
return getQueue().size();
9092
}
9193

94+
/**
95+
* Returns the fraction of the maximum possible thread time that was actually used since the last time
96+
* this method was called.
97+
*
98+
* @return the utilization as a fraction, in the range [0, 1]
99+
*/
100+
public double pollUtilization() {
101+
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
102+
final long currentPollTimeNanos = System.nanoTime();
103+
104+
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
105+
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
106+
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
107+
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
108+
109+
lastTotalExecutionTime = currentTotalExecutionTimeNanos;
110+
lastPollTime = currentPollTimeNanos;
111+
return utilizationSinceLastPoll;
112+
}
113+
92114
@Override
93115
protected void beforeExecute(Thread t, Runnable r) {
94116
if (trackOngoingTasks) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2525
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionHandler;
2626
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
27+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
2728
import org.elasticsearch.common.util.concurrent.ThreadContext;
2829
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
2930
import org.elasticsearch.core.Nullable;
3031
import org.elasticsearch.core.TimeValue;
3132
import org.elasticsearch.node.Node;
3233
import org.elasticsearch.node.ReportingService;
34+
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
3335
import org.elasticsearch.telemetry.metric.Instrument;
3436
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
3537
import org.elasticsearch.telemetry.metric.LongGauge;
@@ -149,6 +151,7 @@ public static class Names {
149151
public static final String THREAD_POOL_METRIC_NAME_CURRENT = ".threads.count.current";
150152
public static final String THREAD_POOL_METRIC_NAME_QUEUE = ".threads.queue.size";
151153
public static final String THREAD_POOL_METRIC_NAME_ACTIVE = ".threads.active.current";
154+
public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current";
152155
public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current";
153156
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";
154157

@@ -374,6 +377,17 @@ private static ArrayList<Instrument> setupMetrics(MeterRegistry meterRegistry, S
374377
if (rejectedExecutionHandler instanceof EsRejectedExecutionHandler handler) {
375378
handler.registerCounter(meterRegistry, prefix + THREAD_POOL_METRIC_NAME_REJECTED, name);
376379
}
380+
381+
if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) {
382+
instruments.add(
383+
meterRegistry.registerDoubleGauge(
384+
prefix + THREAD_POOL_METRIC_NAME_UTILIZATION,
385+
"fraction of maximum thread time utilized for " + name,
386+
"fraction",
387+
() -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at)
388+
)
389+
);
390+
}
377391
}
378392
return instruments;
379393
}

0 commit comments

Comments
 (0)