Skip to content

Commit 9239f46

Browse files
Added es.search.coordinator.phases.query.duration.histogram APM metric
to track the duration of the search query phase.
1 parent 391e78d commit 9239f46

15 files changed

+276
-23
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.Maps;
2727
import org.elasticsearch.common.util.concurrent.AtomicArray;
2828
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
2930
import org.elasticsearch.index.shard.ShardId;
3031
import org.elasticsearch.search.SearchContextMissingException;
3132
import org.elasticsearch.search.SearchPhaseResult;
@@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9394
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
9495
private final AtomicBoolean requestCancelled = new AtomicBoolean();
9596
private final int skippedCount;
97+
protected final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;
9698

9799
// protected for tests
98100
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
@@ -114,7 +116,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
114116
SearchTask task,
115117
SearchPhaseResults<Result> resultConsumer,
116118
int maxConcurrentRequestsPerNode,
117-
SearchResponse.Clusters clusters
119+
SearchResponse.Clusters clusters,
120+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
118121
) {
119122
super(name);
120123
this.namedWriteableRegistry = namedWriteableRegistry;
@@ -155,6 +158,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
155158
// at the end of the search
156159
addReleasable(resultConsumer);
157160
this.clusters = clusters;
161+
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
158162
}
159163

160164
protected void notifyListShards(
@@ -666,9 +670,12 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
666670
* @see #onShardResult(SearchPhaseResult)
667671
*/
668672
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
673+
recordPhaseLatency();
669674
executeNextPhase(getName(), this::getNextPhase);
670675
}
671676

677+
protected void recordPhaseLatency() {}
678+
672679
/**
673680
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
674681
* thrown.

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.client.internal.Client;
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
17+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
1718
import org.elasticsearch.search.SearchPhaseResult;
1819
import org.elasticsearch.search.SearchShardTarget;
1920
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
4748
ClusterState clusterState,
4849
SearchTask task,
4950
SearchResponse.Clusters clusters,
50-
Client client
51+
Client client,
52+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
5153
) {
5254
super(
5355
"dfs",
@@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
6668
task,
6769
new ArraySearchPhaseResults<>(shardsIts.size()),
6870
request.getMaxConcurrentShardRequests(),
69-
clusters
71+
clusters,
72+
coordinatorSearchPhaseAPMMetrics
7073
);
7174
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
7275
addReleasable(queryPhaseResultConsumer);

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.core.RefCounted;
3535
import org.elasticsearch.core.SimpleRefCounted;
3636
import org.elasticsearch.core.TimeValue;
37+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
3738
import org.elasticsearch.index.shard.ShardId;
3839
import org.elasticsearch.search.SearchPhaseResult;
3940
import org.elasticsearch.search.SearchService;
@@ -90,6 +91,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
9091
private volatile BottomSortValuesCollector bottomSortCollector;
9192
private final Client client;
9293
private final boolean batchQueryPhase;
94+
private long phaseStartTimeNanos;
9395

9496
SearchQueryThenFetchAsyncAction(
9597
Logger logger,
@@ -108,7 +110,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
108110
SearchTask task,
109111
SearchResponse.Clusters clusters,
110112
Client client,
111-
boolean batchQueryPhase
113+
boolean batchQueryPhase,
114+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
112115
) {
113116
super(
114117
"query",
@@ -127,7 +130,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
127130
task,
128131
resultConsumer,
129132
request.getMaxConcurrentShardRequests(),
130-
clusters
133+
clusters,
134+
coordinatorSearchPhaseAPMMetrics
131135
);
132136
this.topDocsSize = getTopDocsSize(request);
133137
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
@@ -419,6 +423,7 @@ private static boolean isPartOfPIT(
419423

420424
@Override
421425
protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
426+
phaseStartTimeNanos = System.nanoTime();
422427
if (this.batchQueryPhase == false) {
423428
super.doRun(shardIndexMap);
424429
return;
@@ -562,6 +567,12 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
562567
}
563568
}
564569

570+
@Override
571+
protected void recordPhaseLatency() {
572+
final long tookInNanos = System.nanoTime() - phaseStartTimeNanos;
573+
coordinatorSearchPhaseAPMMetrics.onQueryPhaseDone(tookInNanos);
574+
}
575+
565576
public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]";
566577

567578
static void registerNodeSearchAction(

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.io.stream.StreamOutput;
3232
import org.elasticsearch.common.util.concurrent.EsExecutors;
3333
import org.elasticsearch.core.TimeValue;
34+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
3435
import org.elasticsearch.index.shard.ShardId;
3536
import org.elasticsearch.injection.guice.Inject;
3637
import org.elasticsearch.rest.RestStatus;
@@ -70,6 +71,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
7071
private final TransportService transportService;
7172
private final SearchService searchService;
7273
private final ClusterService clusterService;
74+
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;
7375

7476
@Inject
7577
public TransportOpenPointInTimeAction(
@@ -79,7 +81,8 @@ public TransportOpenPointInTimeAction(
7981
TransportSearchAction transportSearchAction,
8082
SearchTransportService searchTransportService,
8183
NamedWriteableRegistry namedWriteableRegistry,
82-
ClusterService clusterService
84+
ClusterService clusterService,
85+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
8386
) {
8487
super(TYPE.name(), transportService, actionFilters, OpenPointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
8588
this.transportService = transportService;
@@ -88,6 +91,7 @@ public TransportOpenPointInTimeAction(
8891
this.searchTransportService = searchTransportService;
8992
this.namedWriteableRegistry = namedWriteableRegistry;
9093
this.clusterService = clusterService;
94+
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
9195
transportService.registerRequestHandler(
9296
OPEN_SHARD_READER_CONTEXT_NAME,
9397
EsExecutors.DIRECT_EXECUTOR_SERVICE,
@@ -240,7 +244,8 @@ void runOpenPointInTimePhase(
240244
task,
241245
new ArraySearchPhaseResults<>(shardIterators.size()),
242246
searchRequest.getMaxConcurrentShardRequests(),
243-
clusters
247+
clusters,
248+
coordinatorSearchPhaseAPMMetrics
244249
) {
245250
@Override
246251
protected void executePhaseOnShard(

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.IndexNotFoundException;
6868
import org.elasticsearch.index.query.QueryBuilder;
6969
import org.elasticsearch.index.query.Rewriteable;
70+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
7071
import org.elasticsearch.index.shard.ShardId;
7172
import org.elasticsearch.index.shard.ShardNotFoundException;
7273
import org.elasticsearch.indices.ExecutorSelector;
@@ -170,6 +171,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
170171
private final UsageService usageService;
171172
private final boolean collectCCSTelemetry;
172173
private final TimeValue forceConnectTimeoutSecs;
174+
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;
173175

174176
@Inject
175177
public TransportSearchAction(
@@ -188,7 +190,8 @@ public TransportSearchAction(
188190
ExecutorSelector executorSelector,
189191
SearchResponseMetrics searchResponseMetrics,
190192
Client client,
191-
UsageService usageService
193+
UsageService usageService,
194+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
192195
) {
193196
super(TYPE.name(), transportService, actionFilters, SearchRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
194197
this.threadPool = threadPool;
@@ -218,6 +221,7 @@ public TransportSearchAction(
218221
this.searchResponseMetrics = searchResponseMetrics;
219222
this.client = client;
220223
this.usageService = usageService;
224+
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
221225
forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
222226
}
223227

@@ -1682,7 +1686,8 @@ public void runNewSearchPhase(
16821686
clusterState,
16831687
task,
16841688
clusters,
1685-
client
1689+
client,
1690+
coordinatorSearchPhaseAPMMetrics
16861691
);
16871692
} else {
16881693
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
@@ -1703,7 +1708,8 @@ public void runNewSearchPhase(
17031708
task,
17041709
clusters,
17051710
client,
1706-
searchService.batchQueryPhase()
1711+
searchService.batchQueryPhase(),
1712+
coordinatorSearchPhaseAPMMetrics
17071713
);
17081714
}
17091715
success = true;

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.service.ClusterService;
2525
import org.elasticsearch.index.Index;
2626
import org.elasticsearch.index.query.Rewriteable;
27+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
2728
import org.elasticsearch.injection.guice.Inject;
2829
import org.elasticsearch.search.SearchService;
2930
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -60,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
6061
private final ProjectResolver projectResolver;
6162
private final IndexNameExpressionResolver indexNameExpressionResolver;
6263
private final ThreadPool threadPool;
64+
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;
6365

6466
@Inject
6567
public TransportSearchShardsAction(
@@ -70,7 +72,8 @@ public TransportSearchShardsAction(
7072
TransportSearchAction transportSearchAction,
7173
SearchTransportService searchTransportService,
7274
ProjectResolver projectResolver,
73-
IndexNameExpressionResolver indexNameExpressionResolver
75+
IndexNameExpressionResolver indexNameExpressionResolver,
76+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
7477
) {
7578
super(
7679
TYPE.name(),
@@ -88,6 +91,7 @@ public TransportSearchShardsAction(
8891
this.projectResolver = projectResolver;
8992
this.indexNameExpressionResolver = indexNameExpressionResolver;
9093
this.threadPool = transportService.getThreadPool();
94+
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
9195
}
9296

9397
@Override
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.search.stats;
11+
12+
import org.elasticsearch.telemetry.metric.LongHistogram;
13+
import org.elasticsearch.telemetry.metric.MeterRegistry;
14+
15+
import java.util.concurrent.TimeUnit;
16+
17+
/**
18+
* Coordinator level APM metrics for search phases. Records phase execution times as histograms.
19+
*/
20+
public class CoordinatorSearchPhaseAPMMetrics {
21+
22+
public static final CoordinatorSearchPhaseAPMMetrics NOOP = new CoordinatorSearchPhaseAPMMetrics(MeterRegistry.NOOP);
23+
24+
public static final String QUERY_SEARCH_PHASE_METRIC = "es.search.coordinator.phases.query.duration.histogram";
25+
private final LongHistogram queryPhaseMetric;
26+
27+
public CoordinatorSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
28+
this.queryPhaseMetric = meterRegistry.registerLongHistogram(
29+
QUERY_SEARCH_PHASE_METRIC,
30+
"Query search phase execution times at the coordinator level, expressed as a histogram",
31+
"ms"
32+
);
33+
}
34+
35+
public void onQueryPhaseDone(long tookInNanos) {
36+
recordPhaseLatency(queryPhaseMetric, tookInNanos);
37+
}
38+
39+
protected void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) {
40+
histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
41+
}
42+
}

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
import org.elasticsearch.index.mapper.MapperMetrics;
123123
import org.elasticsearch.index.mapper.RootObjectMapperNamespaceValidator;
124124
import org.elasticsearch.index.mapper.SourceFieldMetrics;
125+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
125126
import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics;
126127
import org.elasticsearch.index.shard.SearchOperationListener;
127128
import org.elasticsearch.indices.ExecutorSelector;
@@ -863,6 +864,10 @@ private void construct(
863864
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
864865
);
865866

867+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics = new CoordinatorSearchPhaseAPMMetrics(
868+
telemetryProvider.getMeterRegistry()
869+
);
870+
866871
List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
867872
// NOTE: the response of index/search slow log fields below must be calculated dynamically on every call
868873
// because the responses may change dynamically at runtime
@@ -1360,6 +1365,7 @@ public Map<String, String> queryFields() {
13601365
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
13611366
b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService);
13621367
b.bind(MergeMetrics.class).toInstance(mergeMetrics);
1368+
b.bind(CoordinatorSearchPhaseAPMMetrics.class).toInstance(coordinatorSearchPhaseAPMMetrics);
13631369
});
13641370

13651371
if (ReadinessService.enabled(environment)) {

server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.core.Tuple;
1818
import org.elasticsearch.index.Index;
1919
import org.elasticsearch.index.query.MatchAllQueryBuilder;
20+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
2021
import org.elasticsearch.index.shard.ShardId;
2122
import org.elasticsearch.search.SearchPhaseResult;
2223
import org.elasticsearch.search.SearchShardTarget;
@@ -88,7 +89,8 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
8889
null,
8990
results,
9091
request.getMaxConcurrentShardRequests(),
91-
SearchResponse.Clusters.EMPTY
92+
SearchResponse.Clusters.EMPTY,
93+
CoordinatorSearchPhaseAPMMetrics.NOOP
9294
) {
9395
@Override
9496
protected SearchPhase getNextPhase() {

server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1919
import org.elasticsearch.common.util.concurrent.AtomicArray;
2020
import org.elasticsearch.core.Nullable;
21+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
2122
import org.elasticsearch.search.SearchPhaseResult;
2223
import org.elasticsearch.search.SearchShardTarget;
2324
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -66,7 +67,8 @@ public MockSearchPhaseContext(int numShards) {
6667
new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()),
6768
new ArraySearchPhaseResults<>(numShards),
6869
5,
69-
null
70+
null,
71+
CoordinatorSearchPhaseAPMMetrics.NOOP
7072
);
7173
this.numShards = numShards;
7274
numSuccess = new AtomicInteger(numShards);

0 commit comments

Comments
 (0)