diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c39ce209bf875..e20477a47d950 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import java.util.List; @@ -584,19 +585,12 @@ public static class TaskTrackingConfig { private final boolean trackOngoingTasks; private final boolean trackMaxQueueLatency; private final double executionTimeEwmaAlpha; + private final TimeValue utilizationRefreshInterval; - public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( - false, - false, - false, - DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST - ); - public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( - true, - false, - false, - DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST - ); + public static final TaskTrackingConfig DO_NOT_TRACK = TaskTrackingConfig.builder().build(); + public static final TaskTrackingConfig DEFAULT = TaskTrackingConfig.builder() + .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + .build(); /** * @param trackExecutionTime Whether to track execution stats @@ -608,12 +602,14 @@ private TaskTrackingConfig( boolean trackExecutionTime, boolean trackOngoingTasks, boolean trackMaxQueueLatency, - double executionTimeEWMAAlpha + double executionTimeEWMAAlpha, + TimeValue utilizationRefreshInterval ) { this.trackExecutionTime = trackExecutionTime; this.trackOngoingTasks = trackOngoingTasks; this.trackMaxQueueLatency = trackMaxQueueLatency; this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; + this.utilizationRefreshInterval = utilizationRefreshInterval; } public boolean trackExecutionTime() { @@ -632,6 +628,10 @@ public double getExecutionTimeEwmaAlpha() { return executionTimeEwmaAlpha; } + public TimeValue getUtilizationRefreshInterval() { + return utilizationRefreshInterval; + } + public static Builder builder() { return new Builder(); } @@ -641,6 +641,7 @@ public static class Builder { private boolean trackOngoingTasks = false; private boolean trackMaxQueueLatency = false; private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; + private TimeValue utilizationRefreshInterval = TimeValue.timeValueSeconds(30); public Builder() {} @@ -660,8 +661,19 @@ public Builder trackMaxQueueLatency() { return this; } + public Builder utilizationRefreshInterval(TimeValue utilizationRefreshInterval) { + this.utilizationRefreshInterval = utilizationRefreshInterval; + return this; + } + public TaskTrackingConfig build() { - return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha); + return new TaskTrackingConfig( + trackExecutionTime, + trackOngoingTasks, + trackMaxQueueLatency, + ewmaAlpha, + utilizationRefreshInterval + ); } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 762a8c280b7f3..fb71876183fdd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,6 +27,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -49,15 +50,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final Map ongoingTasks = new ConcurrentHashMap<>(); private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); private final boolean trackMaxQueueLatency; - private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); - - public enum UtilizationTrackingPurpose { - APM, - ALLOCATION, - } - - private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); - private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); + private final LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); + private final UtilizationTracker utilizationTracker; TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -78,6 +72,7 @@ public enum UtilizationTrackingPurpose { this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); + this.utilizationTracker = new UtilizationTracker(trackingConfig.getUtilizationRefreshInterval()); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -105,7 +100,7 @@ public List setupMetrics(MeterRegistry meterRegistry, String threadP ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, "fraction of maximum thread time utilized for " + threadPoolName, "fraction", - () -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of()) + () -> new DoubleWithAttributes(getUtilization(), Map.of()) ) ); } @@ -154,22 +149,15 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { } /** - * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. - * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the - * caller. + * Returns the fraction of the maximum possible thread time that was actually used recently. + * + * This value is updated approximately every {@link TaskTrackingConfig#getUtilizationRefreshInterval()} * * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started * earlier, contributing a larger execution time. */ - public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) { - switch (utilizationTrackingPurpose) { - case APM: - return apmUtilizationTracker.pollUtilization(); - case ALLOCATION: - return allocationUtilizationTracker.pollUtilization(); - default: - throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]"); - } + public double getUtilization() { + return utilizationTracker.getUtilization(); } @Override @@ -213,6 +201,7 @@ protected void afterExecute(Runnable r, Throwable t) { executionEWMA.addValue(taskExecutionNanos); totalExecutionTime.add(taskExecutionNanos); } + utilizationTracker.recalculateUtilizationIfDue(); } finally { // if trackOngoingTasks is false -> ongoingTasks must be empty assert trackOngoingTasks || ongoingTasks.isEmpty(); @@ -254,29 +243,54 @@ public boolean trackingMaxQueueLatency() { } /** - * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization - * since the last poll can be calculated for the next polling request. + * Tracks the utilization of a thread pool by periodically calculating the average since the last time it was calculated. Requires + * that {@link #recalculateUtilizationIfDue()} is called regularly to stay up to date. * - * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. + * Uses the difference of {@link #totalExecutionTime} since the last calculation to determine how much activity has occurred. */ private class UtilizationTracker { - long lastPollTime = System.nanoTime(); - long lastTotalExecutionTime = 0; + private final long refreshIntervalNanos; + private final AtomicLong lastCalculatedTime; + volatile long lastTotalExecutionTime = 0; + volatile double lastUtilization = 0; + + UtilizationTracker(TimeValue refreshInterval) { + this.refreshIntervalNanos = refreshInterval.nanos(); + this.lastCalculatedTime = new AtomicLong(System.nanoTime() - refreshIntervalNanos); + } + + /** + * If our utilization value is stale, recalculate it + */ + public void recalculateUtilizationIfDue() { + final long now = System.nanoTime(); + final long lastCalcTimeCopy = lastCalculatedTime.get(); + if (now - lastCalcTimeCopy > refreshIntervalNanos) { - public synchronized double pollUtilization() { - final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); - final long currentPollTimeNanos = System.nanoTime(); + // `refreshIntervalNanos` should be large enough that this + // compare-and-swap is enough to avoid concurrency issues here + if (lastCalculatedTime.compareAndSet(lastCalcTimeCopy, now)) { + final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); - final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; - final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; + final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; + final long timeSinceLastPoll = now - lastCalcTimeCopy; - final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); - final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; + final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); + final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos + / maximumExecutionTimeSinceLastPollNanos; - lastTotalExecutionTime = currentTotalExecutionTimeNanos; - lastPollTime = currentPollTimeNanos; + lastTotalExecutionTime = currentTotalExecutionTimeNanos; + lastUtilization = utilizationSinceLastPoll; + } + } + } - return utilizationSinceLastPoll; + /** + * Get the most recent utilization value calculated + */ + public double getUtilization() { + recalculateUtilizationIfDue(); + return lastUtilization; } } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 2cd166e002637..97e322e1d5e9f 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.InstrumentType; @@ -491,38 +491,50 @@ public void testScheduledFixedDelayForceExecution() { public void testDetailedUtilizationMetric() throws Exception { final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); - final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); - final ThreadPool threadPool = new ThreadPool( - Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), - meterRegistry, - builtInExecutorBuilders + final String threadPoolName = randomIdentifier(); + final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName); + final int maxThreadPoolSize = randomIntBetween(5, 10); + final int minThreadPoolSize = randomIntBetween(0, maxThreadPoolSize); + final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf( + TaskExecutionTimeTrackingEsThreadPoolExecutor.class, + EsExecutors.newScaling( + threadPoolName, + minThreadPoolSize, + maxThreadPoolSize, + randomLongBetween(5, 10), + TimeUnit.SECONDS, + true, + EsExecutors.daemonThreadFactory(randomIdentifier()), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.builder() + .trackExecutionTime(EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) + // For this test we should always recalculate + .utilizationRefreshInterval(TimeValue.ZERO) + .build() + ) ); - try { - // write thread pool is tracked - final String threadPoolName = ThreadPool.Names.WRITE; - final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName); - final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); - final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf( - TaskExecutionTimeTrackingEsThreadPoolExecutor.class, - threadPool.executor(threadPoolName) - ); + executor.setupMetrics(meterRegistry, threadPoolName); - final long beforePreviousCollectNanos = System.nanoTime(); + try { + // Utilization should be zero to begin with meterRegistry.getRecorder().collect(); - double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); - final long afterPreviousCollectNanos = System.nanoTime(); - - var metricValue = metricAsserter.assertLatestMetricValueMatches( + metricAsserter.assertLatestMetricValueMatches( InstrumentType.DOUBLE_GAUGE, ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, Measurement::getDouble, equalTo(0.0d) ); - logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); - assertThat(allocationUtilization, equalTo(0.0d)); + + // Run a task through to establish a lower/upper bound on the poll interval + final long beforePreviousPollNanos = System.nanoTime(); + safeGet(executor.submit(() -> {})); + // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run + assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); + final long afterPreviousPollNanos = System.nanoTime(); final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); + final AtomicLong beforeEndNanos = new AtomicLong(); final long beforeStartNanos = System.nanoTime(); final CyclicBarrier barrier = new CyclicBarrier(2); Future future = executor.submit(() -> { @@ -530,6 +542,7 @@ public void testDetailedUtilizationMetric() throws Exception { safeSleep(100); safeAwait(barrier); minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); + beforeEndNanos.set(System.nanoTime()); }); safeAwait(barrier); safeGet(future); @@ -537,34 +550,31 @@ public void testDetailedUtilizationMetric() throws Exception { // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); + final long afterEndNanos = System.nanoTime(); - final long beforeMetricsCollectedNanos = System.nanoTime(); meterRegistry.getRecorder().collect(); - allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); - final long afterMetricsCollectedNanos = System.nanoTime(); - // Calculate upper bound on utilisation metric - final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; - final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); + // Calculate upper bound on utilization metric + final long minimumPollIntervalNanos = beforeEndNanos.get() - afterPreviousPollNanos; + final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * maxThreadPoolSize; final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; - // Calculate lower bound on utilisation metric - final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; - final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); + // Calculate lower bound on utilization metric + final long maximumPollIntervalNanos = afterEndNanos - beforePreviousPollNanos; + final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * maxThreadPoolSize; final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); - metricValue = metricAsserter.assertLatestMetricValueMatches( + var utilizationValue = metricAsserter.assertLatestMetricValueMatches( InstrumentType.DOUBLE_GAUGE, ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, Measurement::getDouble, matcher ); - logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); - assertThat(allocationUtilization, matcher); + logger.info("---> Utilization metric: " + utilizationValue); } finally { - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } }