Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;

import java.util.List;
Expand Down Expand Up @@ -584,19 +585,12 @@ public static class TaskTrackingConfig {
private final boolean trackOngoingTasks;
private final boolean trackMaxQueueLatency;
private final double executionTimeEwmaAlpha;
private final TimeValue utilizationRefreshInterval;

public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
false,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
);
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
true,
false,
false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
);
public static final TaskTrackingConfig DO_NOT_TRACK = TaskTrackingConfig.builder().build();
public static final TaskTrackingConfig DEFAULT = TaskTrackingConfig.builder()
.trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
.build();

/**
* @param trackExecutionTime Whether to track execution stats
Expand All @@ -608,12 +602,14 @@ private TaskTrackingConfig(
boolean trackExecutionTime,
boolean trackOngoingTasks,
boolean trackMaxQueueLatency,
double executionTimeEWMAAlpha
double executionTimeEWMAAlpha,
TimeValue utilizationRefreshInterval
) {
this.trackExecutionTime = trackExecutionTime;
this.trackOngoingTasks = trackOngoingTasks;
this.trackMaxQueueLatency = trackMaxQueueLatency;
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
this.utilizationRefreshInterval = utilizationRefreshInterval;
}

public boolean trackExecutionTime() {
Expand All @@ -632,6 +628,10 @@ public double getExecutionTimeEwmaAlpha() {
return executionTimeEwmaAlpha;
}

public TimeValue getUtilizationRefreshInterval() {
return utilizationRefreshInterval;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -641,6 +641,7 @@ public static class Builder {
private boolean trackOngoingTasks = false;
private boolean trackMaxQueueLatency = false;
private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST;
private TimeValue utilizationRefreshInterval = TimeValue.timeValueSeconds(30);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that 30s might be too short given we saw > 1.0 utilization in the APM metrics which were calculated every 60s. Perhaps we should use 60 or split the difference and do 45?


public Builder() {}

Expand All @@ -660,8 +661,19 @@ public Builder trackMaxQueueLatency() {
return this;
}

public Builder utilizationRefreshInterval(TimeValue utilizationRefreshInterval) {
this.utilizationRefreshInterval = utilizationRefreshInterval;
return this;
}

public TaskTrackingConfig build() {
return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha);
return new TaskTrackingConfig(
trackExecutionTime,
trackOngoingTasks,
trackMaxQueueLatency,
ewmaAlpha,
utilizationRefreshInterval
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
Expand All @@ -49,15 +50,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
private final boolean trackMaxQueueLatency;
private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);

public enum UtilizationTrackingPurpose {
APM,
ALLOCATION,
}

private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker();
private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker();
private final LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);
private final UtilizationTracker utilizationTracker;

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand All @@ -78,6 +72,7 @@ public enum UtilizationTrackingPurpose {
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency();
this.utilizationTracker = new UtilizationTracker(trackingConfig.getUtilizationRefreshInterval());
}

public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
Expand Down Expand Up @@ -105,7 +100,7 @@ public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadP
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + threadPoolName,
"fraction",
() -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of())
() -> new DoubleWithAttributes(getUtilization(), Map.of())
)
);
}
Expand Down Expand Up @@ -154,22 +149,15 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
}

/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called.
* There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the
* caller.
* Returns the fraction of the maximum possible thread time that was actually used recently.
*
* This value is updated approximately every {@link TaskTrackingConfig#getUtilizationRefreshInterval()}
*
* @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started
* earlier, contributing a larger execution time.
*/
public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) {
switch (utilizationTrackingPurpose) {
case APM:
return apmUtilizationTracker.pollUtilization();
case ALLOCATION:
return allocationUtilizationTracker.pollUtilization();
default:
throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]");
}
public double getUtilization() {
return utilizationTracker.getUtilization();
}

@Override
Expand Down Expand Up @@ -213,6 +201,7 @@ protected void afterExecute(Runnable r, Throwable t) {
executionEWMA.addValue(taskExecutionNanos);
totalExecutionTime.add(taskExecutionNanos);
}
utilizationTracker.recalculateUtilizationIfDue();
} finally {
// if trackOngoingTasks is false -> ongoingTasks must be empty
assert trackOngoingTasks || ongoingTasks.isEmpty();
Expand Down Expand Up @@ -254,29 +243,54 @@ public boolean trackingMaxQueueLatency() {
}

/**
* Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization
* since the last poll can be calculated for the next polling request.
* Tracks the utilization of a thread pool by periodically calculating the average since the last time it was calculated. Requires
* that {@link #recalculateUtilizationIfDue()} is called regularly to stay up to date.
*
* Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred.
* Uses the difference of {@link #totalExecutionTime} since the last calculation to determine how much activity has occurred.
*/
private class UtilizationTracker {
long lastPollTime = System.nanoTime();
long lastTotalExecutionTime = 0;
private final long refreshIntervalNanos;
private final AtomicLong lastCalculatedTime;
volatile long lastTotalExecutionTime = 0;
volatile double lastUtilization = 0;

UtilizationTracker(TimeValue refreshInterval) {
this.refreshIntervalNanos = refreshInterval.nanos();
this.lastCalculatedTime = new AtomicLong(System.nanoTime() - refreshIntervalNanos);
}

/**
* If our utilization value is stale, recalculate it
*/
public void recalculateUtilizationIfDue() {
final long now = System.nanoTime();
final long lastCalcTimeCopy = lastCalculatedTime.get();
if (now - lastCalcTimeCopy > refreshIntervalNanos) {

public synchronized double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();
// `refreshIntervalNanos` should be large enough that this
// compare-and-swap is enough to avoid concurrency issues here
if (lastCalculatedTime.compareAndSet(lastCalcTimeCopy, now)) {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();

final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = now - lastCalcTimeCopy;

final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos
/ maximumExecutionTimeSinceLastPollNanos;

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastUtilization = utilizationSinceLastPoll;
}
}
}

return utilizationSinceLastPoll;
/**
* Get the most recent utilization value calculated
*/
public double getUtilization() {
recalculateUtilizationIfDue();
return lastUtilization;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should also call recalculateUtilizationIfDue in here in case there is zero activity in the pool? probably not an issue for the write pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want that to avoid it being infinitely stale, but then you get into a task tracking issue?

Also, it seems like the value can now be 30s old instead of current?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a recalculate here, in case we end up using this. You're right, because we are recalculating independently of the polling it's possible the utilisation is up to TaskTrackingConfig#utilizationRefreshInterval old. Given that it's an average over the same interval, and our current goal of acting only on persistent hot-spots I think that's probably OK, but hopefully we can get a better utilisation measure.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.telemetry.InstrumentType;
Expand Down Expand Up @@ -491,80 +491,90 @@ public void testScheduledFixedDelayForceExecution() {

public void testDetailedUtilizationMetric() throws Exception {
final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders();

final ThreadPool threadPool = new ThreadPool(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(),
meterRegistry,
builtInExecutorBuilders
final String threadPoolName = randomIdentifier();
final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName);
final int maxThreadPoolSize = randomIntBetween(5, 10);
final int minThreadPoolSize = randomIntBetween(0, maxThreadPoolSize);
final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf(
TaskExecutionTimeTrackingEsThreadPoolExecutor.class,
EsExecutors.newScaling(
threadPoolName,
minThreadPoolSize,
maxThreadPoolSize,
randomLongBetween(5, 10),
TimeUnit.SECONDS,
true,
EsExecutors.daemonThreadFactory(randomIdentifier()),
new ThreadContext(Settings.EMPTY),
EsExecutors.TaskTrackingConfig.builder()
.trackExecutionTime(EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST)
// For this test we should always recalculate
.utilizationRefreshInterval(TimeValue.ZERO)
.build()
)
);
try {
// write thread pool is tracked
final String threadPoolName = ThreadPool.Names.WRITE;
final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName);
final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName);
final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf(
TaskExecutionTimeTrackingEsThreadPoolExecutor.class,
threadPool.executor(threadPoolName)
);
executor.setupMetrics(meterRegistry, threadPoolName);

final long beforePreviousCollectNanos = System.nanoTime();
try {
// Utilization should be zero to begin with
meterRegistry.getRecorder().collect();
double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION);
final long afterPreviousCollectNanos = System.nanoTime();

var metricValue = metricAsserter.assertLatestMetricValueMatches(
metricAsserter.assertLatestMetricValueMatches(
InstrumentType.DOUBLE_GAUGE,
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
Measurement::getDouble,
equalTo(0.0d)
);
logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization);
assertThat(allocationUtilization, equalTo(0.0d));

// Run a task through to establish a lower/upper bound on the poll interval
final long beforePreviousPollNanos = System.nanoTime();
safeGet(executor.submit(() -> {}));
// Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run
assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L)));
final long afterPreviousPollNanos = System.nanoTime();

final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE);
final AtomicLong beforeEndNanos = new AtomicLong();
final long beforeStartNanos = System.nanoTime();
final CyclicBarrier barrier = new CyclicBarrier(2);
Future<?> future = executor.submit(() -> {
long innerStartTimeNanos = System.nanoTime();
safeSleep(100);
safeAwait(barrier);
minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos);
beforeEndNanos.set(System.nanoTime());
});
safeAwait(barrier);
safeGet(future);
final long maxDurationNanos = System.nanoTime() - beforeStartNanos;

// Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run
assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L)));
final long afterEndNanos = System.nanoTime();

final long beforeMetricsCollectedNanos = System.nanoTime();
meterRegistry.getRecorder().collect();
allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION);
final long afterMetricsCollectedNanos = System.nanoTime();

// Calculate upper bound on utilisation metric
final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos;
final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax();
// Calculate upper bound on utilization metric
final long minimumPollIntervalNanos = beforeEndNanos.get() - afterPreviousPollNanos;
final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * maxThreadPoolSize;
final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos;

// Calculate lower bound on utilisation metric
final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos;
final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax();
// Calculate lower bound on utilization metric
final long maximumPollIntervalNanos = afterEndNanos - beforePreviousPollNanos;
final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * maxThreadPoolSize;
final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos;

logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization);
Matcher<Double> matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization));
metricValue = metricAsserter.assertLatestMetricValueMatches(
var utilizationValue = metricAsserter.assertLatestMetricValueMatches(
InstrumentType.DOUBLE_GAUGE,
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
Measurement::getDouble,
matcher
);
logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization);
assertThat(allocationUtilization, matcher);
logger.info("---> Utilization metric: " + utilizationValue);
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}

Expand Down