Skip to content

Commit 6a83b07

Browse files
committed
Add some metric logs
1 parent 23ad8c6 commit 6a83b07

File tree

3 files changed

+66
-2
lines changed

3 files changed

+66
-2
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.coordination;
11+
12+
import org.elasticsearch.cluster.ClusterChangedEvent;
13+
import org.elasticsearch.cluster.ClusterStateListener;
14+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor;
15+
import org.elasticsearch.index.Index;
16+
17+
public class SearchIndexTimeTrackingCleanupService implements ClusterStateListener {
18+
private TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor;
19+
20+
public SearchIndexTimeTrackingCleanupService(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor) {
21+
this.executor = executor;
22+
}
23+
24+
@Override
25+
public void clusterChanged(ClusterChangedEvent event) {
26+
for (Index index : event.indicesDeleted()) {
27+
executor.stopTrackingIndex(index.getName());
28+
}
29+
}
30+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.cluster.ClusterChangedEvent;
15+
import org.elasticsearch.cluster.ClusterStateListener;
1416
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
1517
import org.elasticsearch.core.Tuple;
18+
import org.elasticsearch.index.Index;
1619

1720
import java.util.concurrent.BlockingQueue;
1821
import java.util.concurrent.ConcurrentHashMap;
@@ -28,8 +31,8 @@
2831
*/
2932
public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor {
3033
private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class);
31-
private final ConcurrentHashMap<String, Tuple<LongAdder, ExponentiallyWeightedMovingAverage>> indexExecutionTime;
32-
private final ConcurrentHashMap<Runnable, String> runnableToIndexName;
34+
public final ConcurrentHashMap<String, Tuple<LongAdder, ExponentiallyWeightedMovingAverage>> indexExecutionTime;
35+
public final ConcurrentHashMap<Runnable, String> runnableToIndexName;
3336

3437
TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor(
3538
String name,

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService;
4242
import org.elasticsearch.cluster.coordination.Coordinator;
4343
import org.elasticsearch.cluster.coordination.MasterHistoryService;
44+
import org.elasticsearch.cluster.coordination.SearchIndexTimeTrackingCleanupService;
4445
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
4546
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
4647
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
@@ -79,6 +80,8 @@
7980
import org.elasticsearch.common.settings.SettingsModule;
8081
import org.elasticsearch.common.util.BigArrays;
8182
import org.elasticsearch.common.util.PageCacheRecycler;
83+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
84+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor;
8285
import org.elasticsearch.common.util.set.Sets;
8386
import org.elasticsearch.core.IOUtils;
8487
import org.elasticsearch.core.SuppressForbidden;
@@ -235,6 +238,8 @@
235238
import java.util.Objects;
236239
import java.util.Optional;
237240
import java.util.Set;
241+
import java.util.Timer;
242+
import java.util.TimerTask;
238243
import java.util.concurrent.TimeUnit;
239244
import java.util.function.Function;
240245
import java.util.function.UnaryOperator;
@@ -700,6 +705,12 @@ private void construct(
700705
ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
701706
clusterService.addStateApplier(scriptService);
702707

708+
// TODO DR - this is a bit of a hack to get the cluster service into the plugins
709+
var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH);
710+
TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexEsThreadPoolExecutor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) executor;
711+
searchLoadMetricsReporter(perIndexEsThreadPoolExecutor);
712+
clusterService.addListener(new SearchIndexTimeTrackingCleanupService(perIndexEsThreadPoolExecutor));
713+
703714
modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
704715

705716
FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry());
@@ -1253,6 +1264,26 @@ public Map<String, String> searchFields() {
12531264
postInjection(clusterModule, actionModule, clusterService, transportService, featureService);
12541265
}
12551266

1267+
private void searchLoadMetricsReporter(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor) {
1268+
Timer timer = new Timer();
1269+
TimerTask task = new TimerTask() {
1270+
@Override
1271+
public void run() {
1272+
if(executor.indexExecutionTime.size() > 0) {
1273+
logger.info("Number of reported indices: {}", executor.indexExecutionTime.size());
1274+
logger.info("Number of runnables: {}", executor.runnableToIndexName.size());
1275+
executor.indexExecutionTime.forEach((index, tuple) -> {
1276+
logger.info("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage());
1277+
});
1278+
logger.info("Total task execution time: {}", executor.getTotalTaskExecutionTime());
1279+
logger.info("----------------------------------------------------------------------------------");
1280+
}
1281+
}
1282+
};
1283+
1284+
timer.scheduleAtFixedRate(task, 0, 4000);
1285+
}
1286+
12561287
/**
12571288
* For each "component" (getter) <em>c</em> of a {@link Record},
12581289
* calls {@link org.elasticsearch.injection.Injector#addInstance(Object) Injector.addInstance}

0 commit comments

Comments
 (0)