Skip to content

Commit e8c7e42

Browse files
Search query phase coordinator duration APM metrics. (#136059)
Added the following metrics for duration of the search phases at the coordinator: * es.search_response.took_durations.query.histogram * es.search_response.took_durations.dfs.histogram * es.search_response.took_durations.open_pit.histogram
1 parent 64fcdcc commit e8c7e42

11 files changed

+231
-34
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.util.concurrent.AtomicArray;
2828
import org.elasticsearch.core.Releasable;
2929
import org.elasticsearch.index.shard.ShardId;
30+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
3031
import org.elasticsearch.search.SearchContextMissingException;
3132
import org.elasticsearch.search.SearchPhaseResult;
3233
import org.elasticsearch.search.SearchShardTarget;
@@ -93,6 +94,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9394
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
9495
private final AtomicBoolean requestCancelled = new AtomicBoolean();
9596
private final int skippedCount;
97+
protected final SearchResponseMetrics searchResponseMetrics;
98+
protected long phaseStartTimeInNanos;
9699

97100
// protected for tests
98101
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
@@ -114,7 +117,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
114117
SearchTask task,
115118
SearchPhaseResults<Result> resultConsumer,
116119
int maxConcurrentRequestsPerNode,
117-
SearchResponse.Clusters clusters
120+
SearchResponse.Clusters clusters,
121+
SearchResponseMetrics searchResponseMetrics
118122
) {
119123
super(name);
120124
this.namedWriteableRegistry = namedWriteableRegistry;
@@ -155,6 +159,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
155159
// at the end of the search
156160
addReleasable(resultConsumer);
157161
this.clusters = clusters;
162+
this.searchResponseMetrics = searchResponseMetrics;
158163
}
159164

160165
protected void notifyListShards(
@@ -221,6 +226,7 @@ public final void start() {
221226

222227
@Override
223228
protected final void run() {
229+
phaseStartTimeInNanos = System.nanoTime();
224230
if (outstandingShards.get() == 0) {
225231
onPhaseDone();
226232
return;
@@ -666,6 +672,7 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
666672
* @see #onShardResult(SearchPhaseResult)
667673
*/
668674
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
675+
searchResponseMetrics.recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos);
669676
executeNextPhase(getName(), this::getNextPhase);
670677
}
671678

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.client.internal.Client;
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
17+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
1718
import org.elasticsearch.search.SearchPhaseResult;
1819
import org.elasticsearch.search.SearchShardTarget;
1920
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
4748
ClusterState clusterState,
4849
SearchTask task,
4950
SearchResponse.Clusters clusters,
50-
Client client
51+
Client client,
52+
SearchResponseMetrics searchResponseMetrics
5153
) {
5254
super(
5355
"dfs",
@@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
6668
task,
6769
new ArraySearchPhaseResults<>(shardsIts.size()),
6870
request.getMaxConcurrentShardRequests(),
69-
clusters
71+
clusters,
72+
searchResponseMetrics
7073
);
7174
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
7275
addReleasable(queryPhaseResultConsumer);

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.core.SimpleRefCounted;
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.index.shard.ShardId;
38+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
3839
import org.elasticsearch.search.SearchPhaseResult;
3940
import org.elasticsearch.search.SearchService;
4041
import org.elasticsearch.search.SearchShardTarget;
@@ -92,6 +93,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
9293
private volatile BottomSortValuesCollector bottomSortCollector;
9394
private final Client client;
9495
private final boolean batchQueryPhase;
96+
private long phaseStartTimeNanos;
9597

9698
SearchQueryThenFetchAsyncAction(
9799
Logger logger,
@@ -110,7 +112,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
110112
SearchTask task,
111113
SearchResponse.Clusters clusters,
112114
Client client,
113-
boolean batchQueryPhase
115+
boolean batchQueryPhase,
116+
SearchResponseMetrics searchResponseMetrics
114117
) {
115118
super(
116119
"query",
@@ -129,7 +132,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
129132
task,
130133
resultConsumer,
131134
request.getMaxConcurrentShardRequests(),
132-
clusters
135+
clusters,
136+
searchResponseMetrics
133137
);
134138
this.topDocsSize = getTopDocsSize(request);
135139
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.index.shard.ShardId;
3535
import org.elasticsearch.injection.guice.Inject;
3636
import org.elasticsearch.rest.RestStatus;
37+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
3738
import org.elasticsearch.search.SearchPhaseResult;
3839
import org.elasticsearch.search.SearchService;
3940
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -70,6 +71,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
7071
private final TransportService transportService;
7172
private final SearchService searchService;
7273
private final ClusterService clusterService;
74+
private final SearchResponseMetrics searchResponseMetrics;
7375

7476
@Inject
7577
public TransportOpenPointInTimeAction(
@@ -79,7 +81,8 @@ public TransportOpenPointInTimeAction(
7981
TransportSearchAction transportSearchAction,
8082
SearchTransportService searchTransportService,
8183
NamedWriteableRegistry namedWriteableRegistry,
82-
ClusterService clusterService
84+
ClusterService clusterService,
85+
SearchResponseMetrics searchResponseMetrics
8386
) {
8487
super(TYPE.name(), transportService, actionFilters, OpenPointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
8588
this.transportService = transportService;
@@ -88,6 +91,7 @@ public TransportOpenPointInTimeAction(
8891
this.searchTransportService = searchTransportService;
8992
this.namedWriteableRegistry = namedWriteableRegistry;
9093
this.clusterService = clusterService;
94+
this.searchResponseMetrics = searchResponseMetrics;
9195
transportService.registerRequestHandler(
9296
OPEN_SHARD_READER_CONTEXT_NAME,
9397
EsExecutors.DIRECT_EXECUTOR_SERVICE,
@@ -230,7 +234,7 @@ void runOpenPointInTimePhase(
230234
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
231235
TransportVersion minTransportVersion = clusterState.getMinTransportVersion();
232236
new AbstractSearchAsyncAction<>(
233-
actionName,
237+
"open_pit",
234238
logger,
235239
namedWriteableRegistry,
236240
searchTransportService,
@@ -246,7 +250,8 @@ void runOpenPointInTimePhase(
246250
task,
247251
new ArraySearchPhaseResults<>(shardIterators.size()),
248252
searchRequest.getMaxConcurrentShardRequests(),
249-
clusters
253+
clusters,
254+
searchResponseMetrics
250255
) {
251256
@Override
252257
protected void executePhaseOnShard(

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1688,7 +1688,8 @@ public void runNewSearchPhase(
16881688
clusterState,
16891689
task,
16901690
clusters,
1691-
client
1691+
client,
1692+
searchResponseMetrics
16921693
);
16931694
} else {
16941695
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
@@ -1709,7 +1710,8 @@ public void runNewSearchPhase(
17091710
task,
17101711
clusters,
17111712
client,
1712-
searchService.batchQueryPhase()
1713+
searchService.batchQueryPhase(),
1714+
searchResponseMetrics
17131715
);
17141716
}
17151717
success = true;

server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
import org.elasticsearch.telemetry.metric.MeterRegistry;
1616

1717
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Locale;
1820
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
1922

2023
/**
2124
* Container class for aggregated metrics about search responses.
@@ -43,31 +46,50 @@ public String getDisplayName() {
4346
public static final String TOOK_DURATION_TOTAL_HISTOGRAM_NAME = "es.search_response.took_durations.histogram";
4447
public static final String RESPONSE_COUNT_TOTAL_COUNTER_NAME = "es.search_response.response_count.total";
4548

49+
private static final String SEARCH_PHASE_METRIC_FORMAT = "es.search_response.took_durations.%s.histogram";
50+
private static final List<String> SEARCH_PHASE_NAMES = List.of("dfs", "open_pit", "query");
51+
4652
private final LongHistogram tookDurationTotalMillisHistogram;
4753
private final LongCounter responseCountTotalCounter;
4854

55+
private final Map<String, LongHistogram> phaseNameToDurationHistogram;
56+
4957
public SearchResponseMetrics(MeterRegistry meterRegistry) {
50-
this(
58+
this.tookDurationTotalMillisHistogram = meterRegistry.registerLongHistogram(
59+
TOOK_DURATION_TOTAL_HISTOGRAM_NAME,
60+
"The SearchResponse.took durations in milliseconds, expressed as a histogram",
61+
"millis"
62+
);
63+
this.responseCountTotalCounter = meterRegistry.registerLongCounter(
64+
RESPONSE_COUNT_TOTAL_COUNTER_NAME,
65+
"The cumulative total of search responses with an attribute to describe "
66+
+ "success, partial failure, or failure, expressed as a single total counter and individual "
67+
+ "attribute counters",
68+
"count"
69+
);
70+
71+
phaseNameToDurationHistogram = Map.of(
72+
"dfs",
73+
meterRegistry.registerLongHistogram(
74+
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "dfs"),
75+
"The search phase dfs duration in milliseconds at the coordinator, expressed as a histogram",
76+
"millis"
77+
),
78+
"open_pit",
5179
meterRegistry.registerLongHistogram(
52-
TOOK_DURATION_TOTAL_HISTOGRAM_NAME,
53-
"The SearchResponse.took durations in milliseconds, expressed as a histogram",
80+
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "open_pit"),
81+
"The search phase open_pit duration in milliseconds at the coordinator, expressed as a histogram",
5482
"millis"
5583
),
56-
meterRegistry.registerLongCounter(
57-
RESPONSE_COUNT_TOTAL_COUNTER_NAME,
58-
"The cumulative total of search responses with an attribute to describe "
59-
+ "success, partial failure, or failure, expressed as a single total counter and individual "
60-
+ "attribute counters",
61-
"count"
84+
"query",
85+
meterRegistry.registerLongHistogram(
86+
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "query"),
87+
"The search phase query duration in milliseconds at the coordinator, expressed as a histogram",
88+
"millis"
6289
)
6390
);
6491
}
6592

66-
private SearchResponseMetrics(LongHistogram tookDurationTotalMillisHistogram, LongCounter responseCountTotalCounter) {
67-
this.tookDurationTotalMillisHistogram = tookDurationTotalMillisHistogram;
68-
this.responseCountTotalCounter = responseCountTotalCounter;
69-
}
70-
7193
public long recordTookTimeForSearchScroll(long tookTime) {
7294
tookDurationTotalMillisHistogram.record(tookTime, SearchRequestAttributesExtractor.SEARCH_SCROLL_ATTRIBUTES);
7395
return tookTime;
@@ -91,4 +113,10 @@ public void incrementResponseCount(ResponseCountTotalStatus responseCountTotalSt
91113
attributesWithStatus.put(RESPONSE_COUNT_TOTAL_STATUS_ATTRIBUTE_NAME, responseCountTotalStatus.getDisplayName());
92114
responseCountTotalCounter.incrementBy(1L, attributesWithStatus);
93115
}
116+
117+
public void recordSearchPhaseDuration(String phaseName, long tookInNanos) {
118+
LongHistogram queryPhaseDurationHistogram = phaseNameToDurationHistogram.get(phaseName);
119+
assert queryPhaseDurationHistogram != null;
120+
queryPhaseDurationHistogram.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
121+
}
94122
}

server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import org.elasticsearch.index.Index;
1919
import org.elasticsearch.index.query.MatchAllQueryBuilder;
2020
import org.elasticsearch.index.shard.ShardId;
21+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
2122
import org.elasticsearch.search.SearchPhaseResult;
2223
import org.elasticsearch.search.SearchShardTarget;
2324
import org.elasticsearch.search.internal.AliasFilter;
2425
import org.elasticsearch.search.internal.ShardSearchContextId;
2526
import org.elasticsearch.search.internal.ShardSearchRequest;
2627
import org.elasticsearch.test.ESTestCase;
2728
import org.elasticsearch.transport.Transport;
29+
import org.mockito.Mockito;
2830

2931
import java.util.ArrayList;
3032
import java.util.Collections;
@@ -88,7 +90,8 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
8890
null,
8991
results,
9092
request.getMaxConcurrentShardRequests(),
91-
SearchResponse.Clusters.EMPTY
93+
SearchResponse.Clusters.EMPTY,
94+
Mockito.mock(SearchResponseMetrics.class)
9295
) {
9396
@Override
9497
protected SearchPhase getNextPhase() {

server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1919
import org.elasticsearch.common.util.concurrent.AtomicArray;
2020
import org.elasticsearch.core.Nullable;
21+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
2122
import org.elasticsearch.search.SearchPhaseResult;
2223
import org.elasticsearch.search.SearchShardTarget;
2324
import org.elasticsearch.search.internal.ShardSearchContextId;
25+
import org.elasticsearch.telemetry.TelemetryProvider;
2426
import org.elasticsearch.transport.Transport;
2527
import org.junit.Assert;
2628

@@ -66,7 +68,8 @@ public MockSearchPhaseContext(int numShards) {
6668
new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()),
6769
new ArraySearchPhaseResults<>(numShards),
6870
5,
69-
null
71+
null,
72+
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
7073
);
7174
this.numShards = numShards;
7275
numSuccess = new AtomicInteger(numShards);

server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.UUIDs;
2222
import org.elasticsearch.index.Index;
2323
import org.elasticsearch.index.shard.ShardId;
24+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
2425
import org.elasticsearch.search.SearchHits;
2526
import org.elasticsearch.search.SearchPhaseResult;
2627
import org.elasticsearch.search.internal.AliasFilter;
@@ -52,6 +53,7 @@
5253
import static org.hamcrest.Matchers.containsString;
5354
import static org.hamcrest.Matchers.equalTo;
5455
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
56+
import static org.mockito.Mockito.mock;
5557

5658
public class SearchAsyncActionTests extends ESTestCase {
5759

@@ -111,7 +113,8 @@ public void testSkipSearchShards() throws InterruptedException {
111113
null,
112114
new ArraySearchPhaseResults<>(shardsIter.size()),
113115
request.getMaxConcurrentShardRequests(),
114-
SearchResponse.Clusters.EMPTY
116+
SearchResponse.Clusters.EMPTY,
117+
mock(SearchResponseMetrics.class)
115118
) {
116119

117120
@Override
@@ -218,7 +221,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException {
218221
null,
219222
results,
220223
request.getMaxConcurrentShardRequests(),
221-
SearchResponse.Clusters.EMPTY
224+
SearchResponse.Clusters.EMPTY,
225+
mock(SearchResponseMetrics.class)
222226
) {
223227

224228
@Override
@@ -334,7 +338,8 @@ public void sendFreeContext(
334338
null,
335339
results,
336340
request.getMaxConcurrentShardRequests(),
337-
SearchResponse.Clusters.EMPTY
341+
SearchResponse.Clusters.EMPTY,
342+
mock(SearchResponseMetrics.class)
338343
) {
339344

340345
@Override
@@ -464,7 +469,8 @@ public void sendFreeContext(
464469
null,
465470
new ArraySearchPhaseResults<>(shardsIter.size()),
466471
request.getMaxConcurrentShardRequests(),
467-
SearchResponse.Clusters.EMPTY
472+
SearchResponse.Clusters.EMPTY,
473+
mock(SearchResponseMetrics.class)
468474
) {
469475
@Override
470476
protected void executePhaseOnShard(
@@ -572,7 +578,8 @@ public void testAllowPartialResults() throws InterruptedException {
572578
null,
573579
results,
574580
request.getMaxConcurrentShardRequests(),
575-
SearchResponse.Clusters.EMPTY
581+
SearchResponse.Clusters.EMPTY,
582+
mock(SearchResponseMetrics.class)
576583
) {
577584

578585
@Override
@@ -670,7 +677,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException {
670677
null,
671678
new ArraySearchPhaseResults<>(searchShardIterators.size()),
672679
request.getMaxConcurrentShardRequests(),
673-
SearchResponse.Clusters.EMPTY
680+
SearchResponse.Clusters.EMPTY,
681+
mock(SearchResponseMetrics.class)
674682
) {
675683

676684
@Override

0 commit comments

Comments
 (0)