Skip to content

Commit 420739e

Browse files
committed
Setup metrics separately to constructor
1 parent 2c36b97 commit 420739e

File tree

21 files changed

+65
-158
lines changed

21 files changed

+65
-158
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import org.elasticsearch.rest.action.RestToXContentListener;
7272
import org.elasticsearch.tasks.Task;
7373
import org.elasticsearch.tasks.TaskCancelledException;
74-
import org.elasticsearch.telemetry.metric.MeterRegistry;
7574
import org.elasticsearch.test.ESTestCase;
7675
import org.elasticsearch.test.MockLog;
7776
import org.elasticsearch.test.junit.annotations.TestLogging;
@@ -163,8 +162,7 @@ public void testTraceLogging() {
163162
-1,
164163
EsExecutors.daemonThreadFactory(Settings.EMPTY, "test"),
165164
new ThreadContext(Settings.EMPTY),
166-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
167-
MeterRegistry.NOOP
165+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
168166
);
169167
resources.add(() -> assertTrue(ThreadPool.terminate(executor, 10, TimeUnit.SECONDS)));
170168
var loggingFinishedLatch = new CountDownLatch(1);

qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
1717
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
1818
import org.elasticsearch.core.TimeValue;
19-
import org.elasticsearch.telemetry.metric.MeterRegistry;
2019
import org.elasticsearch.test.ESTestCase;
2120
import org.junit.After;
2221
import org.junit.Before;
@@ -68,8 +67,7 @@ public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedEx
6867
1,
6968
EsExecutors.daemonThreadFactory("test"),
7069
threadPool.getThreadContext(),
71-
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK),
72-
MeterRegistry.NOOP
70+
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
7371
);
7472
try {
7573
checkExecutionError(getExecuteRunner(fixedExecutor));
@@ -178,8 +176,7 @@ public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws Interrupt
178176
1,
179177
EsExecutors.daemonThreadFactory("test"),
180178
threadPool.getThreadContext(),
181-
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK),
182-
MeterRegistry.NOOP
179+
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
183180
);
184181
try {
185182
checkExecutionException(getExecuteRunner(fixedExecutor), true);

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
import org.elasticsearch.common.settings.Setting.Property;
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.common.unit.Processors;
17-
import org.elasticsearch.core.Nullable;
1817
import org.elasticsearch.core.SuppressForbidden;
1918
import org.elasticsearch.node.Node;
20-
import org.elasticsearch.telemetry.metric.MeterRegistry;
2119

2220
import java.security.AccessController;
2321
import java.security.PrivilegedAction;
@@ -114,16 +112,15 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
114112
* rejection handler.
115113
*/
116114
public static EsThreadPoolExecutor newScaling(
117-
QualifiedName name,
115+
String name,
118116
int min,
119117
int max,
120118
long keepAliveTime,
121119
TimeUnit unit,
122120
boolean rejectAfterShutdown,
123121
ThreadFactory threadFactory,
124122
ThreadContext contextHolder,
125-
TaskTrackingConfig config,
126-
MeterRegistry meterRegistry
123+
TaskTrackingConfig config
127124
) {
128125
LinkedTransferQueue<Runnable> queue = newUnboundedScalingLTQueue(min, max);
129126
// Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
@@ -141,12 +138,11 @@ public static EsThreadPoolExecutor newScaling(
141138
threadFactory,
142139
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
143140
contextHolder,
144-
config,
145-
meterRegistry
141+
config
146142
);
147143
} else {
148144
return new EsThreadPoolExecutor(
149-
name.toCompositeString(),
145+
name,
150146
min,
151147
max,
152148
keepAliveTime,
@@ -185,16 +181,15 @@ public static EsThreadPoolExecutor newScaling(
185181
ThreadContext contextHolder
186182
) {
187183
return newScaling(
188-
new QualifiedName(name),
184+
name,
189185
min,
190186
max,
191187
keepAliveTime,
192188
unit,
193189
rejectAfterShutdown,
194190
threadFactory,
195191
contextHolder,
196-
TaskTrackingConfig.DO_NOT_TRACK,
197-
MeterRegistry.NOOP
192+
TaskTrackingConfig.DO_NOT_TRACK
198193
);
199194
}
200195

@@ -204,20 +199,7 @@ public static EsThreadPoolExecutor newFixed(
204199
int queueCapacity,
205200
ThreadFactory threadFactory,
206201
ThreadContext contextHolder,
207-
TaskTrackingConfig config,
208-
MeterRegistry meterRegistry
209-
) {
210-
return newFixed(new QualifiedName(name), size, queueCapacity, threadFactory, contextHolder, config, meterRegistry);
211-
}
212-
213-
public static EsThreadPoolExecutor newFixed(
214-
QualifiedName name,
215-
int size,
216-
int queueCapacity,
217-
ThreadFactory threadFactory,
218-
ThreadContext contextHolder,
219-
TaskTrackingConfig config,
220-
MeterRegistry meterRegistry
202+
TaskTrackingConfig config
221203
) {
222204
final BlockingQueue<Runnable> queue;
223205
final EsRejectedExecutionHandler rejectedExecutionHandler;
@@ -240,12 +222,11 @@ public static EsThreadPoolExecutor newFixed(
240222
threadFactory,
241223
rejectedExecutionHandler,
242224
contextHolder,
243-
config,
244-
meterRegistry
225+
config
245226
);
246227
} else {
247228
return new EsThreadPoolExecutor(
248-
name.toCompositeString(),
229+
name,
249230
size,
250231
size,
251232
0,
@@ -632,15 +613,4 @@ public double getEwmaAlpha() {
632613
return ewmaAlpha;
633614
}
634615
}
635-
636-
public record QualifiedName(@Nullable String nodeName, String threadPoolName) {
637-
638-
public QualifiedName(String threadPoolName) {
639-
this(null, threadPoolName);
640-
}
641-
642-
public String toCompositeString() {
643-
return nodeName == null ? threadPoolName : nodeName + "/" + threadPoolName;
644-
}
645-
}
646616
}

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
1414
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
1515
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.telemetry.metric.Instrument;
1617
import org.elasticsearch.telemetry.metric.LongGauge;
1718
import org.elasticsearch.telemetry.metric.LongWithAttributes;
1819
import org.elasticsearch.telemetry.metric.MeterRegistry;
@@ -44,10 +45,9 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
4445
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
4546
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
4647
private final ExponentialBucketHistogram queueLatencyHistogram = new ExponentialBucketHistogram();
47-
private final LongGauge queueLatencyGauge;
4848

4949
TaskExecutionTimeTrackingEsThreadPoolExecutor(
50-
EsExecutors.QualifiedName name,
50+
String name,
5151
int corePoolSize,
5252
int maximumPoolSize,
5353
long keepAliveTime,
@@ -57,26 +57,18 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
5757
ThreadFactory threadFactory,
5858
RejectedExecutionHandler handler,
5959
ThreadContext contextHolder,
60-
TaskTrackingConfig trackingConfig,
61-
MeterRegistry meterRegistry
60+
TaskTrackingConfig trackingConfig
6261
) {
63-
super(
64-
name.toCompositeString(),
65-
corePoolSize,
66-
maximumPoolSize,
67-
keepAliveTime,
68-
unit,
69-
workQueue,
70-
threadFactory,
71-
handler,
72-
contextHolder
73-
);
62+
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);
7463
this.runnableWrapper = runnableWrapper;
7564
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0);
7665
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
77-
this.queueLatencyGauge = meterRegistry.registerLongsGauge(
78-
ThreadPool.THREAD_POOL_METRIC_PREFIX + name.threadPoolName() + THREAD_POOL_METRIC_NAME_QUEUE_TIME,
79-
"Time tasks spent in the queue for the " + name.threadPoolName() + " thread pool",
66+
}
67+
68+
public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
69+
final LongGauge queueLatencyGauge = meterRegistry.registerLongsGauge(
70+
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_QUEUE_TIME,
71+
"Time tasks spent in the queue for the " + threadPoolName + " thread pool",
8072
"milliseconds",
8173
() -> {
8274
List<LongWithAttributes> metricValues = Arrays.stream(LATENCY_PERCENTILES_TO_REPORT)
@@ -91,6 +83,7 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
9183
return metricValues;
9284
}
9385
);
86+
return List.of(queueLatencyGauge);
9487
}
9588

9689
@Override

server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.node.Node;
4040
import org.elasticsearch.plugins.ClusterCoordinationPlugin;
4141
import org.elasticsearch.plugins.MetadataUpgrader;
42-
import org.elasticsearch.telemetry.metric.MeterRegistry;
4342
import org.elasticsearch.threadpool.ThreadPool;
4443
import org.elasticsearch.transport.TransportService;
4544

@@ -411,8 +410,7 @@ static class AsyncPersistedState extends InMemoryPersistedState {
411410
1,
412411
daemonThreadFactory(nodeName, THREAD_NAME),
413412
threadPool.getThreadContext(),
414-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
415-
MeterRegistry.NOOP
413+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
416414
);
417415
this.persistedState = persistedState;
418416
}

server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,12 @@ ThreadPool.ExecutorHolder build(
152152
int queueSize = settings.queueSize;
153153
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name(), isSystemThread());
154154
final ExecutorService executor = EsExecutors.newFixed(
155-
new EsExecutors.QualifiedName(settings.nodeName, name()),
155+
settings.nodeName + "/" + name(),
156156
size,
157157
queueSize,
158158
threadFactory,
159159
threadContext,
160-
taskTrackingConfig,
161-
meterRegistry
160+
taskTrackingConfig
162161
);
163162
final ThreadPool.Info info = new ThreadPool.Info(
164163
name(),

server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,15 @@ ThreadPool.ExecutorHolder build(
144144
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name());
145145
ExecutorService executor;
146146
executor = EsExecutors.newScaling(
147-
new EsExecutors.QualifiedName(settings.nodeName, name()),
147+
settings.nodeName + "/" + name(),
148148
core,
149149
max,
150150
keepAlive.millis(),
151151
TimeUnit.MILLISECONDS,
152152
rejectAfterShutdown,
153153
threadFactory,
154154
threadContext,
155-
trackingConfig,
156-
meterRegistry
155+
trackingConfig
157156
);
158157
return new ThreadPool.ExecutorHolder(executor, info);
159158
}

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2525
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionHandler;
2626
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
27+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
2728
import org.elasticsearch.common.util.concurrent.ThreadContext;
2829
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
2930
import org.elasticsearch.core.Nullable;
@@ -374,6 +375,10 @@ private static ArrayList<Instrument> setupMetrics(MeterRegistry meterRegistry, S
374375
if (rejectedExecutionHandler instanceof EsRejectedExecutionHandler handler) {
375376
handler.registerCounter(meterRegistry, prefix + THREAD_POOL_METRIC_NAME_REJECTED, name);
376377
}
378+
379+
if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor trackingThreadPoolExecutor) {
380+
instruments.addAll(trackingThreadPoolExecutor.setupMetrics(meterRegistry, name));
381+
}
377382
}
378383
return instruments;
379384
}

server/src/test/java/org/elasticsearch/action/ActionRunnableTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.util.concurrent.EsExecutors;
1616
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1717
import org.elasticsearch.common.util.concurrent.ThreadContext;
18-
import org.elasticsearch.telemetry.metric.MeterRegistry;
1918
import org.elasticsearch.test.ESTestCase;
2019
import org.elasticsearch.threadpool.ThreadPool;
2120
import org.hamcrest.Matchers;
@@ -80,8 +79,7 @@ public void testWrapReleasingRejected() throws Exception {
8079
0,
8180
Thread::new,
8281
new ThreadContext(Settings.EMPTY),
83-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
84-
MeterRegistry.NOOP
82+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
8583
);
8684
try {
8785
final var listener = new PlainActionFuture<Void>();

server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.search.aggregations.InternalAggregations;
2828
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2929
import org.elasticsearch.search.query.QuerySearchResult;
30-
import org.elasticsearch.telemetry.metric.MeterRegistry;
3130
import org.elasticsearch.test.ESTestCase;
3231
import org.elasticsearch.threadpool.TestThreadPool;
3332
import org.elasticsearch.threadpool.ThreadPool;
@@ -82,8 +81,7 @@ public AggregationReduceContext forFinalReduction() {
8281
10,
8382
EsExecutors.daemonThreadFactory("test"),
8483
threadPool.getThreadContext(),
85-
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK),
86-
MeterRegistry.NOOP
84+
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
8785
);
8886
}
8987

0 commit comments

Comments
 (0)