Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e6ea943
Add thread pool utilisation metric
nicktindall Jan 17, 2025
9cc7af1
Merge remote-tracking branch 'origin/main' into ES-10530_add_thread_p…
nicktindall Jan 17, 2025
4342f95
Update docs/changelog/120363.yaml
nicktindall Jan 17, 2025
b00ee42
Merge branch 'main' into ES-10530_add_thread_pool_utilisation_metric
nicktindall Jan 19, 2025
0cfd844
Merge remote-tracking branch 'origin/ES-10530_add_thread_pool_utilisa…
nicktindall Jan 19, 2025
9dc9e18
Fix area
nicktindall Jan 19, 2025
080bc98
Implement mean utilisation for task execution time tracking thread pools
nicktindall Feb 4, 2025
bfdf9df
Merge remote-tracking branch 'origin/main' into ES-10530_add_thread_p…
nicktindall Feb 4, 2025
aa43210
Make lastXXX fields volatile
nicktindall Feb 4, 2025
d3ccf2f
Fix typo
nicktindall Feb 4, 2025
8ffcffa
finals
nicktindall Feb 4, 2025
54db077
Only publish utilisation for time-tracking executors
nicktindall Mar 2, 2025
755b567
Merge remote-tracking branch 'origin/main' into ES-10530_add_thread_p…
nicktindall Mar 2, 2025
b2f4ef5
Fix tests
nicktindall Mar 2, 2025
b9b8f89
Utilisation -> Utilization, make poll clearer it has side-effects
nicktindall Mar 2, 2025
25e6f5a
Break up steps in pollUtilization
nicktindall Mar 2, 2025
c248419
Improve variable naming
nicktindall Mar 2, 2025
ee90df7
Merge branch 'main' into ES-10530_add_thread_pool_utilisation_metric
nicktindall Mar 28, 2025
d2a4de4
Merge branch 'main' into ES-10530_add_thread_pool_utilisation_metric
nicktindall Apr 2, 2025
c32e8e5
Update server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
nicktindall Apr 16, 2025
287786f
Fix race condition in ThreadPoolTests#testDetailedUtilizationMetric
nicktindall Apr 16, 2025
d6d82f2
Reduce verbosity in metric assertions
nicktindall Apr 16, 2025
a5c4073
Reduce verbosity in metric assertions
nicktindall Apr 16, 2025
436cc0b
Fix other race condition
nicktindall Apr 16, 2025
71b7fb7
Merge remote-tracking branch 'origin/main' into ES-10530_add_thread_p…
nicktindall Apr 16, 2025
dc9fa3f
Merge remote-tracking branch 'origin/main' into ES-10530_add_thread_p…
nicktindall Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120363.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120363
summary: Add thread pool utilization metric
area: "Infra/Metrics"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -31,22 +32,18 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.function.Function.identity;
import static org.elasticsearch.common.util.Maps.toUnmodifiableSortedMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.matchesRegex;

Expand Down Expand Up @@ -165,37 +162,78 @@ public void testThreadPoolMetrics() throws Exception {
registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER));

tps[0].forEach(stats -> {
Map<String, Long> threadPoolStats = List.of(
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, stats.completed()),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, 0L),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, 0L),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, (long) stats.largest()),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE, 0L)
).stream().collect(toUnmodifiableSortedMap(e -> stats.name() + e.getKey(), Entry::getValue));

Function<String, List<Long>> measurementExtractor = name -> {
String metricName = ThreadPool.THREAD_POOL_METRIC_PREFIX + name;
assertThat(metricName, in(registeredMetrics));

List<Measurement> measurements = name.endsWith(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED)
? plugin.getLongAsyncCounterMeasurement(metricName)
: plugin.getLongGaugeMeasurement(metricName);
return measurements.stream().map(Measurement::getLong).toList();
};

Map<String, List<Long>> measurements = threadPoolStats.keySet()
.stream()
.collect(toUnmodifiableSortedMap(identity(), measurementExtractor));
Map<String, MetricDefinition<?>> metricDefinitions = Map.of(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
new MetricDefinition<>(stats.completed(), TestTelemetryPlugin::getLongAsyncCounterMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE,
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT,
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST,
new MetricDefinition<>((long) stats.largest(), TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE,
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong)
);

logger.info("Stats of `{}`: {}", stats.name(), threadPoolStats);
logger.info("Measurements of `{}`: {}", stats.name(), measurements);
// TaskExecutionTimeTrackingEsThreadPoolExecutor also publishes a utilization metric
if (tp.executor(stats.name()) instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) {
metricDefinitions = Maps.copyMapWithAddedEntry(
metricDefinitions,
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
new MetricDefinition<>(0.0d, TestTelemetryPlugin::getDoubleGaugeMeasurement, Measurement::getDouble)
);
}

threadPoolStats.forEach(
(metric, value) -> assertThat(measurements, hasEntry(equalTo(metric), contains(greaterThanOrEqualTo(value))))
metricDefinitions = metricDefinitions.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(e -> stats.name() + e.getKey(), Map.Entry::getValue));

logger.info(
"Measurements of `{}`: {}",
stats.name(),
metricDefinitions.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMeasurements(plugin, e.getKey())))
);

// Validate all metrics
metricDefinitions.forEach((name, md) -> md.assertValid(plugin, name));
});
}

private static class MetricDefinition<T extends Comparable<T>> {

private final T minimumValue;
private final BiFunction<TestTelemetryPlugin, String, List<Measurement>> metricExtractor;
private final Function<Measurement, T> valueExtractor;

MetricDefinition(
T minimumValue,
BiFunction<TestTelemetryPlugin, String, List<Measurement>> metricExtractor,
Function<Measurement, T> valueExtractor
) {
this.minimumValue = minimumValue;
this.metricExtractor = metricExtractor;
this.valueExtractor = valueExtractor;
}

public List<T> getMeasurements(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) {
return metricExtractor.apply(testTelemetryPlugin, ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix)
.stream()
.map(valueExtractor)
.toList();
}

public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) {
List<T> metrics = getMeasurements(testTelemetryPlugin, metricSuffix);
assertThat(
ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix + " is populated",
metrics,
contains(greaterThanOrEqualTo(minimumValue))
);
}
}

public void testWriteThreadpoolEwmaAlphaSetting() {
Settings settings = Settings.EMPTY;
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final boolean trackOngoingTasks;
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private volatile long lastPollTime = System.nanoTime();
private volatile long lastTotalExecutionTime = 0;

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand Down Expand Up @@ -89,6 +91,26 @@ public int getCurrentQueueSize() {
return getQueue().size();
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time
* this method was called.
*
* @return the utilization as a fraction, in the range [0, 1]
*/
public double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
return utilizationSinceLastPoll;
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionHandler;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.Instrument;
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
import org.elasticsearch.telemetry.metric.LongGauge;
Expand Down Expand Up @@ -149,6 +151,7 @@ public static class Names {
public static final String THREAD_POOL_METRIC_NAME_CURRENT = ".threads.count.current";
public static final String THREAD_POOL_METRIC_NAME_QUEUE = ".threads.queue.size";
public static final String THREAD_POOL_METRIC_NAME_ACTIVE = ".threads.active.current";
public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current";
public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current";
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";

Expand Down Expand Up @@ -374,6 +377,17 @@ private static ArrayList<Instrument> setupMetrics(MeterRegistry meterRegistry, S
if (rejectedExecutionHandler instanceof EsRejectedExecutionHandler handler) {
handler.registerCounter(meterRegistry, prefix + THREAD_POOL_METRIC_NAME_REJECTED, name);
}

if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) {
instruments.add(
meterRegistry.registerDoubleGauge(
prefix + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + name,
"fraction",
() -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at)
)
);
}
}
return instruments;
}
Expand Down
Loading
Loading