Skip to content

Commit 4d28410

Browse files
committed
WIP
1 parent 27adf20 commit 4d28410

File tree

3 files changed

+127
-17
lines changed

3 files changed

+127
-17
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* An extension to thread pool executor, which tracks statistics for the task execution time.
2727
*/
28-
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
28+
public class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {
2929

3030
private final Function<Runnable, WrappedRunnable> runnableWrapper;
3131
private final ExponentiallyWeightedMovingAverage executionEWMA;
@@ -115,15 +115,23 @@ protected void afterExecute(Runnable r, Throwable t) {
115115
+ failedOrRejected;
116116
if (taskExecutionNanos != -1) {
117117
// taskExecutionNanos may be -1 if the task threw an exception
118-
executionEWMA.addValue(taskExecutionNanos);
119-
totalExecutionTime.add(taskExecutionNanos);
118+
trackExecutionTime(r, taskExecutionNanos);
120119
}
121120
} finally {
122-
// if trackOngoingTasks is false -> ongoingTasks must be empty
123-
assert trackOngoingTasks || ongoingTasks.isEmpty();
124-
if (trackOngoingTasks) {
125-
ongoingTasks.remove(r);
126-
}
121+
removeTrackedTask(r);
122+
}
123+
}
124+
125+
protected void trackExecutionTime(Runnable r, long taskTime) {
126+
executionEWMA.addValue(taskTime);
127+
totalExecutionTime.add(taskTime);
128+
}
129+
130+
protected void removeTrackedTask(Runnable r) {
131+
// if trackOngoingTasks is false -> ongoingTasks must be empty
132+
assert trackOngoingTasks || ongoingTasks.isEmpty();
133+
if (trackOngoingTasks) {
134+
ongoingTasks.remove(r);
127135
}
128136
}
129137

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.util.concurrent;
11+
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.RejectedExecutionHandler;
17+
import java.util.concurrent.ThreadFactory;
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.LongAdder;
20+
import java.util.function.Function;
21+
22+
// TODO MP add java doc
23+
public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor {
24+
private final ConcurrentHashMap<String, LongAdder> indexExecutionTime;
25+
private final Map<Runnable, String> runnableToIndexName;
26+
// TODO MP do we need also EWMA per-index or per-project?
27+
28+
TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor(
29+
String name,
30+
int corePoolSize,
31+
int maximumPoolSize,
32+
long keepAliveTime,
33+
TimeUnit unit,
34+
BlockingQueue<Runnable> workQueue,
35+
Function<Runnable, WrappedRunnable> runnableWrapper,
36+
ThreadFactory threadFactory,
37+
RejectedExecutionHandler handler,
38+
ThreadContext contextHolder,
39+
EsExecutors.TaskTrackingConfig trackingConfig
40+
) {
41+
super(
42+
name,
43+
corePoolSize,
44+
maximumPoolSize,
45+
keepAliveTime,
46+
unit,
47+
workQueue,
48+
runnableWrapper,
49+
threadFactory,
50+
handler,
51+
contextHolder,
52+
trackingConfig
53+
);
54+
indexExecutionTime = new ConcurrentHashMap<>();
55+
runnableToIndexName = new HashMap<>();
56+
}
57+
58+
public long getSearchLoadPerIndex(String indexName) {
59+
// TODO MP check for null maybe we don't need getOrDefault
60+
return indexExecutionTime.getOrDefault(indexName, new LongAdder()).sum();
61+
}
62+
63+
public long getLoadEMWAPerIndex(String indexName) {
64+
// TODO MP do we need to report load EMWA per index?
65+
throw new UnsupportedOperationException("Not supported yet");
66+
}
67+
68+
public long getSearchLoadPerProject() {
69+
// TODO MP we probably need to report sl per project?
70+
throw new UnsupportedOperationException("Not supported yet");
71+
}
72+
73+
public long getLoadEMWAPerProject() {
74+
// TODO MP we probably need to report load EMWA per project?
75+
throw new UnsupportedOperationException("Not supported yet");
76+
}
77+
78+
public void registerIndexNameForRunnable(String indexName, Runnable r) {
79+
runnableToIndexName.put(r, indexName);
80+
}
81+
82+
private void trackExecutionTimePerIndex(Runnable r, long timeSpentExecuting) {
83+
// TODO MP do we need a LongAdder here?
84+
String indexName = runnableToIndexName.get(r);
85+
if (indexName != null) {
86+
indexExecutionTime.putIfAbsent(indexName, new LongAdder());
87+
indexExecutionTime.get(indexName).add(timeSpentExecuting);
88+
}
89+
}
90+
91+
@Override
92+
protected void trackExecutionTime(Runnable r, long taskTime) {
93+
trackExecutionTimePerIndex(r, taskTime);
94+
super.trackExecutionTime(r, taskTime);
95+
}
96+
}

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4949
import org.elasticsearch.common.util.concurrent.EsExecutors;
5050
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
51+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor;
5152
import org.elasticsearch.core.IOUtils;
5253
import org.elasticsearch.core.RefCounted;
5354
import org.elasticsearch.core.Releasable;
@@ -620,7 +621,7 @@ private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
620621
final Executor executor = getExecutor(shard);
621622
try {
622623
if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) {
623-
runAsync(executor, executable, listener);
624+
runAsync(executor, executable, listener, shard);
624625
return;
625626
}
626627
if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) {
@@ -696,7 +697,7 @@ private void searchReady() {
696697
if (timeoutTask != null) {
697698
timeoutTask.cancel();
698699
}
699-
runAsync(executor, executable, listener);
700+
runAsync(executor, executable, listener, shard);
700701
}
701702
}
702703
});
@@ -719,9 +720,14 @@ private IndexShard getShard(ShardSearchRequest request) {
719720
private static <T extends RefCounted> void runAsync(
720721
Executor executor,
721722
CheckedSupplier<T, Exception> executable,
722-
ActionListener<T> listener
723+
ActionListener<T> listener,
724+
IndexShard shard
723725
) {
724-
executor.execute(ActionRunnable.supplyAndDecRef(listener, executable));
726+
Runnable r = ActionRunnable.supplyAndDecRef(listener, executable);
727+
if (executor instanceof TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexExecutor) {
728+
perIndexExecutor.registerIndexNameForRunnable(shard.shardId().getIndexName(), r);
729+
}
730+
executor.execute(r);
725731
}
726732

727733
/**
@@ -798,7 +804,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
798804
// we handle the failure in the failure listener below
799805
throw e;
800806
}
801-
}, wrapFailureListener(listener, readerContext, markAsUsed));
807+
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
802808
}
803809

804810
private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
@@ -850,7 +856,7 @@ public void executeQueryPhase(
850856
// we handle the failure in the failure listener below
851857
throw e;
852858
}
853-
}, wrapFailureListener(listener, readerContext, markAsUsed));
859+
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
854860
}
855861

856862
/**
@@ -899,7 +905,7 @@ public void executeQueryPhase(
899905
// we handle the failure in the failure listener below
900906
throw e;
901907
}
902-
}, wrapFailureListener(l, readerContext, markAsUsed));
908+
}, wrapFailureListener(l, readerContext, markAsUsed), readerContext.indexShard());
903909
}));
904910
}
905911

@@ -950,7 +956,7 @@ public void executeFetchPhase(
950956
// we handle the failure in the failure listener below
951957
throw e;
952958
}
953-
}, wrapFailureListener(listener, readerContext, markAsUsed));
959+
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
954960
}
955961

956962
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
@@ -987,7 +993,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
987993
// we handle the failure in the failure listener below
988994
throw e;
989995
}
990-
}, wrapFailureListener(l, readerContext, markAsUsed));
996+
}, wrapFailureListener(l, readerContext, markAsUsed), readerContext.indexShard());
991997
}));
992998
}
993999

0 commit comments

Comments
 (0)