- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
Move to a single shared utilization tracker #131796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
df8d980
              c701001
              a573b56
              c8b3de5
              30dd5d5
              0a26722
              6dc97f3
              b5c87b6
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -27,6 +27,7 @@ | |
| import java.util.concurrent.RejectedExecutionHandler; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.LongAccumulator; | ||
| import java.util.concurrent.atomic.LongAdder; | ||
| import java.util.function.Function; | ||
|  | @@ -49,15 +50,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea | |
| private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>(); | ||
| private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); | ||
| private final boolean trackMaxQueueLatency; | ||
| private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); | ||
|  | ||
| public enum UtilizationTrackingPurpose { | ||
| APM, | ||
| ALLOCATION, | ||
| } | ||
|  | ||
| private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); | ||
| private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); | ||
| private final LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); | ||
| private final UtilizationTracker utilizationTracker; | ||
|  | ||
| TaskExecutionTimeTrackingEsThreadPoolExecutor( | ||
| String name, | ||
|  | @@ -78,6 +72,7 @@ public enum UtilizationTrackingPurpose { | |
| this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); | ||
| this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); | ||
| this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); | ||
| this.utilizationTracker = new UtilizationTracker(trackingConfig.getUtilizationRefreshInterval()); | ||
| } | ||
|  | ||
| public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { | ||
|  | @@ -105,7 +100,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP | |
| ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, | ||
| "fraction of maximum thread time utilized for " + threadPoolName, | ||
| "fraction", | ||
| () -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of()) | ||
| () -> new DoubleWithAttributes(getUtilization(), Map.of()) | ||
| ) | ||
| ); | ||
| } | ||
|  | @@ -154,22 +149,15 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { | |
| } | ||
|  | ||
| /** | ||
| * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. | ||
| * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the | ||
| * caller. | ||
| * Returns the fraction of the maximum possible thread time that was actually used recently. | ||
| * | ||
| * This value is updated approximately every {@link TaskTrackingConfig#getUtilizationRefreshInterval()} | ||
| * | ||
| * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started | ||
| * earlier, contributing a larger execution time. | ||
| */ | ||
| public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) { | ||
| switch (utilizationTrackingPurpose) { | ||
| case APM: | ||
| return apmUtilizationTracker.pollUtilization(); | ||
| case ALLOCATION: | ||
| return allocationUtilizationTracker.pollUtilization(); | ||
| default: | ||
| throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]"); | ||
| } | ||
| public double getUtilization() { | ||
| return utilizationTracker.getUtilization(); | ||
| } | ||
|  | ||
| @Override | ||
|  | @@ -213,6 +201,7 @@ protected void afterExecute(Runnable r, Throwable t) { | |
| executionEWMA.addValue(taskExecutionNanos); | ||
| totalExecutionTime.add(taskExecutionNanos); | ||
| } | ||
| utilizationTracker.recalculateUtilizationIfDue(); | ||
| } finally { | ||
| // if trackOngoingTasks is false -> ongoingTasks must be empty | ||
| assert trackOngoingTasks || ongoingTasks.isEmpty(); | ||
|  | @@ -254,29 +243,53 @@ public boolean trackingMaxQueueLatency() { | |
| } | ||
|  | ||
| /** | ||
| * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization | ||
| * since the last poll can be calculated for the next polling request. | ||
| * Tracks the utilization of a thread pool by periodically calculating the average since the last time it was calculated. Requires | ||
| * that {@link #recalculateUtilizationIfDue()} is called regularly to stay up to date. | ||
| * | ||
| * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. | ||
| * Uses the difference of {@link #totalExecutionTime} since the last calculation to determine how much activity has occurred. | ||
| */ | ||
| private class UtilizationTracker { | ||
| long lastPollTime = System.nanoTime(); | ||
| long lastTotalExecutionTime = 0; | ||
| private final long refreshIntervalNanos; | ||
| private final AtomicLong lastCalculatedTime; | ||
| volatile long lastTotalExecutionTime = 0; | ||
| volatile double lastUtilization = 0; | ||
|  | ||
| UtilizationTracker(TimeValue refreshInterval) { | ||
| this.refreshIntervalNanos = refreshInterval.nanos(); | ||
| this.lastCalculatedTime = new AtomicLong(System.nanoTime() - refreshIntervalNanos); | ||
| } | ||
|  | ||
| /** | ||
| * If our utilization value is stale, recalculate it | ||
| */ | ||
| public void recalculateUtilizationIfDue() { | ||
| final long now = System.nanoTime(); | ||
| final long lastCalcTimeCopy = lastCalculatedTime.get(); | ||
| if (now - lastCalcTimeCopy > refreshIntervalNanos) { | ||
|  | ||
| public synchronized double pollUtilization() { | ||
| final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); | ||
| final long currentPollTimeNanos = System.nanoTime(); | ||
| // `refreshIntervalNanos` should be large enough that this | ||
| // compare-and-swap is enough to avoid concurrency issues here | ||
| if (lastCalculatedTime.compareAndSet(lastCalcTimeCopy, now)) { | ||
| final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); | ||
|  | ||
| final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; | ||
| final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; | ||
| final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; | ||
| final long timeSinceLastPoll = now - lastCalcTimeCopy; | ||
|  | ||
| final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); | ||
| final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; | ||
| final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); | ||
| final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos | ||
| / maximumExecutionTimeSinceLastPollNanos; | ||
|  | ||
| lastTotalExecutionTime = currentTotalExecutionTimeNanos; | ||
| lastPollTime = currentPollTimeNanos; | ||
| lastTotalExecutionTime = currentTotalExecutionTimeNanos; | ||
| lastUtilization = utilizationSinceLastPoll; | ||
| } | ||
| } | ||
| } | ||
|  | ||
| return utilizationSinceLastPoll; | ||
| /** | ||
| * Get the most recent utilization value calculated | ||
| */ | ||
| public double getUtilization() { | ||
| return lastUtilization; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps we should also call recalculateUtilizationIfDue in here in case there is zero activity in the pool? probably not an issue for the write pool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want that to avoid it being infinitely stale, but then you get into a task tracking issue? Also, it seems like the value can now be 30s old instead of current? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a recalculate here, in case we end up using this. You're right, because we are recalculating independently of the polling it's possible the utilisation is up to  | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that 30s might be too short given we saw
> 1.0utilization in the APM metrics which were calculated every 60s. Perhaps we should use 60 or split the difference and do 45?