diff --git a/docs/changelog/120363.yaml b/docs/changelog/120363.yaml new file mode 100644 index 0000000000000..65e74024bbbbb --- /dev/null +++ b/docs/changelog/120363.yaml @@ -0,0 +1,5 @@ +pr: 120363 +summary: Add thread pool utilization metric +area: "Infra/Metrics" +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index d2e021a8d7436..b9f2a5eb79f22 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; @@ -31,22 +32,18 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Collectors; -import static java.util.function.Function.identity; -import static org.elasticsearch.common.util.Maps.toUnmodifiableSortedMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.matchesRegex; @@ -165,37 +162,78 @@ public void testThreadPoolMetrics() throws Exception { registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER)); tps[0].forEach(stats -> { - Map threadPoolStats = List.of( - Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, stats.completed()), - Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, 0L), - Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, 0L), - Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, (long) stats.largest()), - Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE, 0L) - ).stream().collect(toUnmodifiableSortedMap(e -> stats.name() + e.getKey(), Entry::getValue)); - - Function> measurementExtractor = name -> { - String metricName = ThreadPool.THREAD_POOL_METRIC_PREFIX + name; - assertThat(metricName, in(registeredMetrics)); - - List measurements = name.endsWith(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED) - ? plugin.getLongAsyncCounterMeasurement(metricName) - : plugin.getLongGaugeMeasurement(metricName); - return measurements.stream().map(Measurement::getLong).toList(); - }; - - Map> measurements = threadPoolStats.keySet() - .stream() - .collect(toUnmodifiableSortedMap(identity(), measurementExtractor)); + Map> metricDefinitions = Map.of( + ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, + new MetricDefinition<>(stats.completed(), TestTelemetryPlugin::getLongAsyncCounterMeasurement, Measurement::getLong), + ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, + new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong), + ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, + new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong), + ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, + new MetricDefinition<>((long) stats.largest(), TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong), + ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE, + new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong) + ); - logger.info("Stats of `{}`: {}", stats.name(), threadPoolStats); - logger.info("Measurements of `{}`: {}", stats.name(), measurements); + // TaskExecutionTimeTrackingEsThreadPoolExecutor also publishes a utilization metric + if (tp.executor(stats.name()) instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) { + metricDefinitions = Maps.copyMapWithAddedEntry( + metricDefinitions, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + new MetricDefinition<>(0.0d, TestTelemetryPlugin::getDoubleGaugeMeasurement, Measurement::getDouble) + ); + } - threadPoolStats.forEach( - (metric, value) -> assertThat(measurements, hasEntry(equalTo(metric), contains(greaterThanOrEqualTo(value)))) + metricDefinitions = metricDefinitions.entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(e -> stats.name() + e.getKey(), Map.Entry::getValue)); + + logger.info( + "Measurements of `{}`: {}", + stats.name(), + metricDefinitions.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMeasurements(plugin, e.getKey()))) ); + + // Validate all metrics + metricDefinitions.forEach((name, md) -> md.assertValid(plugin, name)); }); } + private static class MetricDefinition> { + + private final T minimumValue; + private final BiFunction> metricExtractor; + private final Function valueExtractor; + + MetricDefinition( + T minimumValue, + BiFunction> metricExtractor, + Function valueExtractor + ) { + this.minimumValue = minimumValue; + this.metricExtractor = metricExtractor; + this.valueExtractor = valueExtractor; + } + + public List getMeasurements(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) { + return metricExtractor.apply(testTelemetryPlugin, ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix) + .stream() + .map(valueExtractor) + .toList(); + } + + public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) { + List metrics = getMeasurements(testTelemetryPlugin, metricSuffix); + assertThat( + ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix + " is populated", + metrics, + contains(greaterThanOrEqualTo(minimumValue)) + ); + } + } + public void testWriteThreadpoolEwmaAlphaSetting() { Settings settings = Settings.EMPTY; var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index bcef86f00b2a4..71f57bcc16754 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -33,6 +33,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); + private volatile long lastPollTime = System.nanoTime(); + private volatile long lastTotalExecutionTime = 0; TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -89,6 +91,26 @@ public int getCurrentQueueSize() { return getQueue().size(); } + /** + * Returns the fraction of the maximum possible thread time that was actually used since the last time + * this method was called. + * + * @return the utilization as a fraction, in the range [0, 1] + */ + public double pollUtilization() { + final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); + final long currentPollTimeNanos = System.nanoTime(); + + final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; + final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; + final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); + final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; + + lastTotalExecutionTime = currentTotalExecutionTimeNanos; + lastPollTime = currentPollTimeNanos; + return utilizationSinceLastPoll; + } + @Override protected void beforeExecute(Thread t, Runnable r) { if (trackOngoingTasks) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 85ee02b6db856..1533e616b8f28 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -24,12 +24,14 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionHandler; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.node.ReportingService; +import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.Instrument; import org.elasticsearch.telemetry.metric.LongAsyncCounter; import org.elasticsearch.telemetry.metric.LongGauge; @@ -149,6 +151,7 @@ public static class Names { public static final String THREAD_POOL_METRIC_NAME_CURRENT = ".threads.count.current"; public static final String THREAD_POOL_METRIC_NAME_QUEUE = ".threads.queue.size"; public static final String THREAD_POOL_METRIC_NAME_ACTIVE = ".threads.active.current"; + public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current"; public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current"; public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total"; @@ -374,6 +377,17 @@ private static ArrayList setupMetrics(MeterRegistry meterRegistry, S if (rejectedExecutionHandler instanceof EsRejectedExecutionHandler handler) { handler.registerCounter(meterRegistry, prefix + THREAD_POOL_METRIC_NAME_REJECTED, name); } + + if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) { + instruments.add( + meterRegistry.registerDoubleGauge( + prefix + THREAD_POOL_METRIC_NAME_UTILIZATION, + "fraction of maximum thread time utilized for " + name, + "fraction", + () -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at) + ) + ); + } } return instruments; } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 808c0a5b88b7e..3762ea0feaee3 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -17,15 +17,28 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; +import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; +import org.hamcrest.Matcher; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DO_NOT_TRACK; @@ -35,8 +48,10 @@ import static org.elasticsearch.threadpool.ThreadPool.getMaxSnapshotThreadPoolSize; import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessorsMaxFive; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; public class ThreadPoolTests extends ESTestCase { @@ -473,6 +488,195 @@ public void testScheduledFixedDelayForceExecution() { } } + public void testDetailedUtilizationMetric() throws Exception { + final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); + final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); + + final ThreadPool threadPool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), + meterRegistry, + builtInExecutorBuilders + ); + try { + // write thread pool is tracked + final String threadPoolName = ThreadPool.Names.WRITE; + final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName); + final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); + final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf( + TaskExecutionTimeTrackingEsThreadPoolExecutor.class, + threadPool.executor(threadPoolName) + ); + + final long beforePreviousCollectNanos = System.nanoTime(); + meterRegistry.getRecorder().collect(); + final long afterPreviousCollectNanos = System.nanoTime(); + metricAsserter.assertLatestMetricValueMatches( + InstrumentType.DOUBLE_GAUGE, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + Measurement::getDouble, + equalTo(0.0d) + ); + + final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); + final long beforeStartNanos = System.nanoTime(); + final CyclicBarrier barrier = new CyclicBarrier(2); + Future future = executor.submit(() -> { + long innerStartTimeNanos = System.nanoTime(); + safeSleep(100); + safeAwait(barrier); + minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); + }); + safeAwait(barrier); + safeGet(future); + final long maxDurationNanos = System.nanoTime() - beforeStartNanos; + + // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run + assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); + + final long beforeMetricsCollectedNanos = System.nanoTime(); + meterRegistry.getRecorder().collect(); + final long afterMetricsCollectedNanos = System.nanoTime(); + + // Calculate upper bound on utilisation metric + final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; + final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); + final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; + + // Calculate lower bound on utilisation metric + final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; + final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); + final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; + + logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); + Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); + metricAsserter.assertLatestMetricValueMatches( + InstrumentType.DOUBLE_GAUGE, + ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, + Measurement::getDouble, + matcher + ); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + public void testThreadCountMetrics() throws Exception { + final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); + final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); + final ThreadPool threadPool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), + meterRegistry, + builtInExecutorBuilders + ); + try { + final String threadPoolName = randomFrom( + ThreadPool.Names.GENERIC, + ThreadPool.Names.ANALYZE, + ThreadPool.Names.WRITE, + ThreadPool.Names.SEARCH + ); + final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); + final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName); + + meterRegistry.getRecorder().collect(); + metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, InstrumentType.LONG_GAUGE, equalTo(0L)); + metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, InstrumentType.LONG_GAUGE, equalTo(0L)); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, + InstrumentType.LONG_ASYNC_COUNTER, + equalTo(0L) + ); + metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, InstrumentType.LONG_GAUGE, equalTo(0L)); + + final int numThreads = randomIntBetween(1, Math.min(10, threadPoolInfo.getMax())); + final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + final List> futures = new ArrayList<>(); + final EsThreadPoolExecutor executor = asInstanceOf(EsThreadPoolExecutor.class, threadPool.executor(threadPoolName)); + for (int i = 0; i < numThreads; i++) { + futures.add(executor.submit(() -> { + safeAwait(barrier); + safeAwait(barrier); + })); + } + // Wait for all threads to start + safeAwait(barrier); + + meterRegistry.getRecorder().collect(); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, + InstrumentType.LONG_GAUGE, + equalTo((long) numThreads) + ); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, + InstrumentType.LONG_GAUGE, + equalTo((long) numThreads) + ); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, + InstrumentType.LONG_ASYNC_COUNTER, + equalTo(0L) + ); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, + InstrumentType.LONG_GAUGE, + equalTo((long) numThreads) + ); + + // Let all threads complete + safeAwait(barrier); + futures.forEach(ESTestCase::safeGet); + // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to complete + assertBusy(() -> assertThat(executor.getActiveCount(), equalTo(0))); + + meterRegistry.getRecorder().collect(); + metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, InstrumentType.LONG_GAUGE, equalTo(0L)); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, + InstrumentType.LONG_GAUGE, + equalTo((long) numThreads) + ); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, + InstrumentType.LONG_ASYNC_COUNTER, + equalTo((long) numThreads) + ); + metricAsserter.assertLatestLongValueMatches( + ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, + InstrumentType.LONG_GAUGE, + equalTo((long) numThreads) + ); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + private static class MetricAsserter { + private final RecordingMeterRegistry meterRegistry; + private final String threadPoolName; + + MetricAsserter(RecordingMeterRegistry meterRegistry, String threadPoolName) { + this.meterRegistry = meterRegistry; + this.threadPoolName = threadPoolName; + } + + void assertLatestLongValueMatches(String metricName, InstrumentType instrumentType, Matcher matcher) { + assertLatestMetricValueMatches(instrumentType, metricName, Measurement::getLong, matcher); + } + + void assertLatestMetricValueMatches( + InstrumentType instrumentType, + String name, + Function valueExtractor, + Matcher matcher + ) { + List measurements = meterRegistry.getRecorder() + .getMeasurements(instrumentType, ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + name); + assertFalse(name + " has no measurements", measurements.isEmpty()); + assertThat(valueExtractor.apply(measurements.getLast()), matcher); + } + } + private static AbstractRunnable forceExecution(AbstractRunnable delegate) { return new AbstractRunnable() { @Override