Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
Expand All @@ -40,6 +41,7 @@
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18;
private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };
private static final long UTILISATION_REFRESH_INTERVAL_NANOS = TimeValue.timeValueSeconds(45).nanos();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a setting perhaps so we can experiment?


private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
Expand All @@ -49,15 +51,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
private final boolean trackMaxQueueLatency;
private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);

public enum UtilizationTrackingPurpose {
APM,
ALLOCATION,
}

private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker();
private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker();
private final LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);
private final UtilizationTracker utilizationTracker = new UtilizationTracker();

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand Down Expand Up @@ -105,7 +100,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + threadPoolName,
"fraction",
() -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of())
() -> new DoubleWithAttributes(getUtilization(), Map.of())
)
);
}
Expand Down Expand Up @@ -154,22 +149,15 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
* caller.
* Returns the fraction of the maximum possible thread time that was actually used recently.
*
* This value is updated approximately every {@link #UTILISATION_REFRESH_INTERVAL_NANOS}
*
* @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
* earlier, contributing a larger execution time.
*/
public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) {
switch (utilizationTrackingPurpose) {
case APM:
return apmUtilizationTracker.pollUtilization();
case ALLOCATION:
return allocationUtilizationTracker.pollUtilization();
default:
throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]");
}
public double getUtilization() {
return utilizationTracker.getUtilization();
}

@Override
Expand Down Expand Up @@ -213,6 +201,7 @@ protected void afterExecute(Runnable r, Throwable t) {
executionEWMA.addValue(taskExecutionNanos);
totalExecutionTime.add(taskExecutionNanos);
}
utilizationTracker.recalculateUtilizationIfDue();
} finally {
// if trackOngoingTasks is false -> ongoingTasks must be empty
assert trackOngoingTasks || ongoingTasks.isEmpty();
Expand Down Expand Up @@ -254,29 +243,47 @@ public boolean trackingMaxQueueLatency() {
}

/**
* Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization
* since the last poll can be calculated for the next polling request.
* Tracks the utilization of a thread pool by periodically calculating the average since the last time it was calculated. Requires
* that {@link #recalculateUtilizationIfDue()} is called regularly to stay up to date.
*
* Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred.
* Uses the difference of {@link #totalExecutionTime} since the last calculation to determine how much activity has occurred.
*/
private class UtilizationTracker {
long lastPollTime = System.nanoTime();
long lastTotalExecutionTime = 0;

public synchronized double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;

final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
final AtomicLong lastCalculatedTime = new AtomicLong(System.nanoTime());
volatile long lastTotalExecutionTime = 0;
volatile double lastUtilization = 0;

/**
* If our utilization value is stale, recalculate it
*/
public void recalculateUtilizationIfDue() {
final long now = System.nanoTime();
final long lastCalcTimeCopy = lastCalculatedTime.get();
if (now - lastCalcTimeCopy > UTILISATION_REFRESH_INTERVAL_NANOS) {

// UTILISATION_REFRESH_INTERVAL should be large enough that this
// compare-and-swap is enough to avoid concurrency issues here
if (lastCalculatedTime.compareAndSet(lastCalcTimeCopy, now)) {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = now - lastCalcTimeCopy;

final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos
/ maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastUtilization = utilizationSinceLastPoll;
}
}
}

return utilizationSinceLastPoll;
/**
* Get the most recent utilization value calculated
*/
public double getUtilization() {
return lastUtilization;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should also call recalculateUtilizationIfDue in here in case there is zero activity in the pool? probably not an issue for the write pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want that to avoid it being infinitely stale, but then you get into a task tracking issue?

Also, it seems like the value can now be 30s old instead of current?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a recalculate here, in case we end up using this. You're right, because we are recalculating independently of the polling it's possible the utilisation is up to TaskTrackingConfig#utilizationRefreshInterval old. Given that it's an average over the same interval, and our current goal of acting only on persistent hot-spots I think that's probably OK, but hopefully we can get a better utilisation measure.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.telemetry.InstrumentType;
Expand Down Expand Up @@ -510,7 +509,7 @@ public void testDetailedUtilizationMetric() throws Exception {

final long beforePreviousCollectNanos = System.nanoTime();
meterRegistry.getRecorder().collect();
double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION);
double allocationUtilization = executor.getUtilization();
final long afterPreviousCollectNanos = System.nanoTime();

var metricValue = metricAsserter.assertLatestMetricValueMatches(
Expand Down Expand Up @@ -540,7 +539,7 @@ public void testDetailedUtilizationMetric() throws Exception {

final long beforeMetricsCollectedNanos = System.nanoTime();
meterRegistry.getRecorder().collect();
allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION);
allocationUtilization = executor.getUtilization();
final long afterMetricsCollectedNanos = System.nanoTime();

// Calculate upper bound on utilisation metric
Expand Down