Skip to content

Commit 97b7ec9

Browse files
authored
Track total task execution time in TaskExecutionTimeTrackingEsThreadPoolExecutor (#96944)
Track total task execution time in EWMATrackingEsThreadPoolExecutor and rename it. Relates ES-6249
1 parent ac7d791 commit 97b7ec9

File tree

7 files changed

+83
-36
lines changed

7 files changed

+83
-36
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,16 +122,16 @@ public static EsThreadPoolExecutor newFixed(
122122
int queueCapacity,
123123
ThreadFactory threadFactory,
124124
ThreadContext contextHolder,
125-
boolean trackEWMA
125+
boolean trackExecutionTime
126126
) {
127127
BlockingQueue<Runnable> queue;
128128
if (queueCapacity < 0) {
129129
queue = ConcurrentCollections.newBlockingQueue();
130130
} else {
131131
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
132132
}
133-
if (trackEWMA) {
134-
return new EWMATrackingEsThreadPoolExecutor(
133+
if (trackExecutionTime) {
134+
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
135135
name,
136136
size,
137137
size,

server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java renamed to server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@
1515
import java.util.concurrent.RejectedExecutionHandler;
1616
import java.util.concurrent.ThreadFactory;
1717
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.LongAdder;
1819
import java.util.function.Function;
1920

2021
/**
21-
* An extension to thread pool executor, which tracks the exponentially weighted moving average of the task execution time.
22+
* An extension to thread pool executor, which tracks statistics for the task execution time.
2223
*/
23-
public final class EWMATrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
24+
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
2425

2526
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
2627
public static double EWMA_ALPHA = 0.3;
2728

2829
private final Function<Runnable, WrappedRunnable> runnableWrapper;
2930
private final ExponentiallyWeightedMovingAverage executionEWMA;
31+
private final LongAdder totalExecutionTime = new LongAdder();
3032

31-
EWMATrackingEsThreadPoolExecutor(
33+
TaskExecutionTimeTrackingEsThreadPoolExecutor(
3234
String name,
3335
int corePoolSize,
3436
int maximumPoolSize,
@@ -67,6 +69,13 @@ public double getTaskExecutionEWMA() {
6769
return executionEWMA.getAverage();
6870
}
6971

72+
/**
73+
* Returns the total time (in nanoseconds) spend executing tasks in this executor.
74+
*/
75+
public long getTotalTaskExecutionTime() {
76+
return totalExecutionTime.sum();
77+
}
78+
7079
/**
7180
* Returns the current queue size (operations that are queued)
7281
*/
@@ -93,12 +102,18 @@ protected void afterExecute(Runnable r, Throwable t) {
93102
if (taskExecutionNanos != -1) {
94103
// taskExecutionNanos may be -1 if the task threw an exception
95104
executionEWMA.addValue(taskExecutionNanos);
105+
totalExecutionTime.add(taskExecutionNanos);
96106
}
97107
}
98108

99109
@Override
100110
protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
101-
sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", ");
111+
sb.append("task execution EWMA = ")
112+
.append(TimeValue.timeValueNanos((long) executionEWMA.getAverage()))
113+
.append(", ")
114+
.append("total task execution time = ")
115+
.append(TimeValue.timeValueNanos(getTotalTaskExecutionTime()))
116+
.append(", ");
102117
}
103118

104119
}

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import org.elasticsearch.common.lucene.MinimumScoreCollector;
3131
import org.elasticsearch.common.lucene.search.FilteredCollector;
3232
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
33-
import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor;
3433
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
34+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
3535
import org.elasticsearch.lucene.queries.SearchAfterSortedDocQuery;
3636
import org.elasticsearch.search.DocValueFormat;
3737
import org.elasticsearch.search.SearchContextSourcePrinter;
@@ -268,10 +268,10 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas
268268
searchWithCollectorManager(searchContext, searcher, query, collectorManager, timeoutRunnable != null);
269269
queryResult.topDocs(topDocsFactory.topDocsAndMaxScore(), topDocsFactory.sortValueFormats);
270270
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
271-
assert executor instanceof EWMATrackingEsThreadPoolExecutor
271+
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
272272
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
273273
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
274-
if (executor instanceof EWMATrackingEsThreadPoolExecutor rExecutor) {
274+
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
275275
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
276276
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
277277
}

server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
2828

2929
private final Setting<Integer> sizeSetting;
3030
private final Setting<Integer> queueSizeSetting;
31-
private final boolean trackEWMA;
31+
private final boolean trackExecutionTime;
3232

3333
/**
3434
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
@@ -37,10 +37,16 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
3737
* @param name the name of the executor
3838
* @param size the fixed number of threads
3939
* @param queueSize the size of the backing queue, -1 for unbounded
40-
* @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
40+
* @param trackExecutionTime whether to track statics about task execution time
4141
*/
42-
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean trackEWMA) {
43-
this(settings, name, size, queueSize, "thread_pool." + name, trackEWMA);
42+
FixedExecutorBuilder(
43+
final Settings settings,
44+
final String name,
45+
final int size,
46+
final int queueSize,
47+
final boolean trackExecutionTime
48+
) {
49+
this(settings, name, size, queueSize, "thread_pool." + name, trackExecutionTime);
4450
}
4551

4652
/**
@@ -51,15 +57,15 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
5157
* @param size the fixed number of threads
5258
* @param queueSize the size of the backing queue, -1 for unbounded
5359
* @param prefix the prefix for the settings keys
54-
* @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
60+
* @param trackExecutionTime whether to track statics about task execution time
5561
*/
5662
public FixedExecutorBuilder(
5763
final Settings settings,
5864
final String name,
5965
final int size,
6066
final int queueSize,
6167
final String prefix,
62-
final boolean trackEWMA
68+
final boolean trackExecutionTime
6369
) {
6470
super(name);
6571
final String sizeKey = settingsKey(prefix, "size");
@@ -71,7 +77,7 @@ public FixedExecutorBuilder(
7177
);
7278
final String queueSizeKey = settingsKey(prefix, "queue_size");
7379
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
74-
this.trackEWMA = trackEWMA;
80+
this.trackExecutionTime = trackExecutionTime;
7581
}
7682

7783
@Override
@@ -98,7 +104,7 @@ ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final Thre
98104
queueSize,
99105
threadFactory,
100106
threadContext,
101-
trackEWMA
107+
trackExecutionTime
102108
);
103109
final ThreadPool.Info info = new ThreadPool.Info(
104110
name(),

server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public String toString() {
308308
assertThat(
309309
message,
310310
either(containsString("on EsThreadPoolExecutor[name = " + getName())).or(
311-
containsString("on EWMATrackingEsThreadPoolExecutor[name = " + getName())
311+
containsString("on TaskExecutionTimeTrackingEsThreadPoolExecutor[name = " + getName())
312312
)
313313
);
314314
assertThat(message, containsString("queue capacity = " + queue));
@@ -353,7 +353,7 @@ public String toString() {
353353
assertThat(
354354
message,
355355
either(containsString("on EsThreadPoolExecutor[name = " + getName())).or(
356-
containsString("on EWMATrackingEsThreadPoolExecutor[name = " + getName())
356+
containsString("on TaskExecutionTimeTrackingEsThreadPoolExecutor[name = " + getName())
357357
)
358358
);
359359
assertThat(message, containsString("queue capacity = " + queue));
Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
* Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests}
2121
* based on the time taken for each event.
2222
*/
23-
public class EWMATrackingEsThreadPoolExecutorTests extends ESTestCase {
23+
public class TaskExecutionTimeTrackingEsThreadPoolExecutorTests extends ESTestCase {
2424

2525
public void testExecutionEWMACalculation() throws Exception {
2626
ThreadContext context = new ThreadContext(Settings.EMPTY);
2727

28-
EWMATrackingEsThreadPoolExecutor executor = new EWMATrackingEsThreadPoolExecutor(
28+
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
2929
"test-threadpool",
3030
1,
3131
1,
3232
1000,
3333
TimeUnit.MILLISECONDS,
3434
ConcurrentCollections.newBlockingQueue(),
35-
fastWrapper(),
35+
settableWrapper(TimeUnit.NANOSECONDS.toNanos(100)),
3636
EsExecutors.daemonThreadFactory("queuetest"),
3737
new EsAbortPolicy(),
3838
context
@@ -41,16 +41,33 @@ public void testExecutionEWMACalculation() throws Exception {
4141
logger.info("--> executor: {}", executor);
4242

4343
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
44+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L));
45+
// Using the settableWrapper each task would take 100ns
4446
executeTask(executor, 1);
45-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); });
47+
assertBusy(() -> {
48+
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L));
49+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(100L));
50+
});
4651
executeTask(executor, 1);
47-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); });
52+
assertBusy(() -> {
53+
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L));
54+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(200L));
55+
});
4856
executeTask(executor, 1);
49-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); });
57+
assertBusy(() -> {
58+
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L));
59+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(300L));
60+
});
5061
executeTask(executor, 1);
51-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); });
62+
assertBusy(() -> {
63+
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L));
64+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(400L));
65+
});
5266
executeTask(executor, 1);
53-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); });
67+
assertBusy(() -> {
68+
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L));
69+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L));
70+
});
5471

5572
executor.shutdown();
5673
executor.awaitTermination(10, TimeUnit.SECONDS);
@@ -59,7 +76,7 @@ public void testExecutionEWMACalculation() throws Exception {
5976
/** Use a runnable wrapper that simulates a task with unknown failures. */
6077
public void testExceptionThrowingTask() throws Exception {
6178
ThreadContext context = new ThreadContext(Settings.EMPTY);
62-
EWMATrackingEsThreadPoolExecutor executor = new EWMATrackingEsThreadPoolExecutor(
79+
TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
6380
"test-threadpool",
6481
1,
6582
1,
@@ -74,14 +91,23 @@ public void testExceptionThrowingTask() throws Exception {
7491
executor.prestartAllCoreThreads();
7592
logger.info("--> executor: {}", executor);
7693

94+
// Using the exceptionalWrapper each task's execution time is -1 to simulate unknown failures/rejections.
7795
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
78-
executeTask(executor, 1);
96+
int taskCount = randomIntBetween(1, 100);
97+
executeTask(executor, taskCount);
98+
assertBusy(() -> assertThat(executor.getCompletedTaskCount(), equalTo((long) taskCount)));
99+
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
100+
assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L));
79101
executor.shutdown();
80102
executor.awaitTermination(10, TimeUnit.SECONDS);
81103
}
82104

83-
private Function<Runnable, WrappedRunnable> fastWrapper() {
84-
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false);
105+
/**
106+
* The returned function outputs a WrappedRunnabled that simulates the case
107+
* where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}.
108+
*/
109+
private Function<Runnable, WrappedRunnable> settableWrapper(long timeTakenNanos) {
110+
return (runnable) -> new SettableTimedRunnable(timeTakenNanos, false);
85111
}
86112

87113
/**
@@ -94,7 +120,7 @@ private Function<Runnable, WrappedRunnable> exceptionalWrapper() {
94120
}
95121

96122
/** Execute a blank task {@code times} times for the executor */
97-
private void executeTask(EWMATrackingEsThreadPoolExecutor executor, int times) {
123+
private void executeTask(TaskExecutionTimeTrackingEsThreadPoolExecutor executor, int times) {
98124
logger.info("--> executing a task [{}] times", times);
99125
for (int i = 0; i < times; i++) {
100126
executor.execute(() -> {});

server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
import org.elasticsearch.common.logging.Loggers;
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.common.unit.ByteSizeValue;
17-
import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor;
1817
import org.elasticsearch.common.util.concurrent.EsExecutors;
1918
import org.elasticsearch.common.util.concurrent.FutureUtils;
19+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
2020
import org.elasticsearch.core.TimeValue;
2121
import org.elasticsearch.test.ESTestCase;
2222
import org.elasticsearch.test.MockLogAppender;
@@ -351,10 +351,10 @@ public void testGetMaxSnapshotCores() {
351351
assertThat(getMaxSnapshotThreadPoolSize(allocatedProcessors, ByteSizeValue.ofGb(4)), equalTo(10));
352352
}
353353

354-
public void testWriteThreadPoolUsesEWMATrackingEsThreadPoolExecutor() {
354+
public void testWriteThreadPoolUsesTaskExecutionTimeTrackingEsThreadPoolExecutor() {
355355
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
356356
try {
357-
assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(EWMATrackingEsThreadPoolExecutor.class));
357+
assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
358358
} finally {
359359
assertTrue(terminate(threadPool));
360360
}

0 commit comments

Comments
 (0)