Skip to content

Commit 4ae62d6

Browse files
Can match phase coordinator duration APM metric (#136828)
Add a new APM metric to measure the duration of the can-match phase at the coordinator: es.search_response.took_durations.can_match.histogram
1 parent 2c64f47 commit 4ae62d6

File tree

8 files changed

+151
-21
lines changed

8 files changed

+151
-21
lines changed

docs/changelog/136828.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136828
2+
summary: Can match phase coordinator duration APM metric
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.core.Nullable;
2020
import org.elasticsearch.index.query.CoordinatorRewriteContext;
2121
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
22+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
2223
import org.elasticsearch.search.CanMatchShardResponse;
2324
import org.elasticsearch.search.SearchService;
2425
import org.elasticsearch.search.SearchShardTarget;
@@ -58,6 +59,8 @@
5859
*/
5960
final class CanMatchPreFilterSearchPhase {
6061

62+
private static final String PHASE_NAME = "can_match";
63+
6164
private final Logger logger;
6265
private final SearchRequest request;
6366
private final List<SearchShardIterator> shardsIts;
@@ -136,20 +139,35 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
136139
TransportSearchAction.SearchTimeProvider timeProvider,
137140
SearchTask task,
138141
boolean requireAtLeastOneMatch,
139-
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
142+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
143+
SearchResponseMetrics searchResponseMetrics
140144
) {
141145
if (shardsIts.isEmpty()) {
142146
return SubscribableListener.newSucceeded(List.of());
143147
}
144148
final SubscribableListener<List<SearchShardIterator>> listener = new SubscribableListener<>();
149+
long phaseStartTimeInNanos = System.nanoTime();
150+
151+
listener.addListener(new ActionListener<>() {
152+
@Override
153+
public void onResponse(List<SearchShardIterator> shardsIts) {
154+
searchResponseMetrics.recordSearchPhaseDuration(PHASE_NAME, System.nanoTime() - phaseStartTimeInNanos);
155+
}
156+
157+
@Override
158+
public void onFailure(Exception e) {
159+
// do not record duration of failed phases
160+
}
161+
});
162+
145163
// Note that the search is failed when this task is rejected by the executor
146164
executor.execute(new AbstractRunnable() {
147165
@Override
148166
public void onFailure(Exception e) {
149167
if (logger.isDebugEnabled()) {
150168
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
151169
}
152-
listener.onFailure(new SearchPhaseExecutionException("can_match", "start", e, ShardSearchFailure.EMPTY_ARRAY));
170+
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "start", e, ShardSearchFailure.EMPTY_ARRAY));
153171
}
154172

155173
@Override
@@ -249,7 +267,7 @@ private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAnd
249267

250268
private void checkNoMissingShards(List<SearchShardIterator> shards) {
251269
assert assertSearchCoordinationThread();
252-
SearchPhase.doCheckNoMissingShards("can_match", request, shards);
270+
SearchPhase.doCheckNoMissingShards(PHASE_NAME, request, shards);
253271
}
254272

255273
private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchShardIterator> shards) {
@@ -386,7 +404,7 @@ public void onFailure(Exception e) {
386404
if (logger.isDebugEnabled()) {
387405
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
388406
}
389-
listener.onFailure(new SearchPhaseExecutionException("can_match", "round", e, ShardSearchFailure.EMPTY_ARRAY));
407+
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "round", e, ShardSearchFailure.EMPTY_ARRAY));
390408
}
391409
}
392410

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ public void runNewSearchPhase(
184184
timeProvider,
185185
task,
186186
false,
187-
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
187+
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
188+
searchResponseMetrics
188189
)
189190
.addListener(
190191
listener.delegateFailureAndWrap(

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1788,7 +1788,8 @@ public void runNewSearchPhase(
17881788
timeProvider,
17891789
task,
17901790
requireAtLeastOneMatch,
1791-
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
1791+
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
1792+
searchResponseMetrics
17921793
)
17931794
.addListener(
17941795
listener.delegateFailureAndWrap(

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.index.Index;
2626
import org.elasticsearch.index.query.Rewriteable;
2727
import org.elasticsearch.injection.guice.Inject;
28+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
2829
import org.elasticsearch.search.SearchService;
2930
import org.elasticsearch.search.builder.SearchSourceBuilder;
3031
import org.elasticsearch.search.internal.AliasFilter;
@@ -60,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
6061
private final ProjectResolver projectResolver;
6162
private final IndexNameExpressionResolver indexNameExpressionResolver;
6263
private final ThreadPool threadPool;
64+
private final SearchResponseMetrics searchResponseMetrics;
6365

6466
@Inject
6567
public TransportSearchShardsAction(
@@ -70,7 +72,8 @@ public TransportSearchShardsAction(
7072
TransportSearchAction transportSearchAction,
7173
SearchTransportService searchTransportService,
7274
ProjectResolver projectResolver,
73-
IndexNameExpressionResolver indexNameExpressionResolver
75+
IndexNameExpressionResolver indexNameExpressionResolver,
76+
SearchResponseMetrics searchResponseMetrics
7477
) {
7578
super(
7679
TYPE.name(),
@@ -88,6 +91,7 @@ public TransportSearchShardsAction(
8891
this.projectResolver = projectResolver;
8992
this.indexNameExpressionResolver = indexNameExpressionResolver;
9093
this.threadPool = transportService.getThreadPool();
94+
this.searchResponseMetrics = searchResponseMetrics;
9195
}
9296

9397
@Override
@@ -180,7 +184,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
180184
timeProvider,
181185
(SearchTask) task,
182186
false,
183-
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
187+
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
188+
searchResponseMetrics
184189
)
185190
.addListener(
186191
delegate.map(

server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) {
6666
);
6767

6868
phaseNameToDurationHistogram = Map.of(
69+
"can_match",
70+
meterRegistry.registerLongHistogram(
71+
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "can_match"),
72+
"The search phase can_match duration in milliseconds at the coordinator, expressed as a histogram",
73+
"millis"
74+
),
6975
"dfs",
7076
meterRegistry.registerLongHistogram(
7177
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "dfs"),

server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.shard.ShardId;
4444
import org.elasticsearch.index.shard.ShardLongFieldRange;
4545
import org.elasticsearch.indices.DateFieldRangeInfo;
46+
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
4647
import org.elasticsearch.search.CanMatchShardResponse;
4748
import org.elasticsearch.search.aggregations.AggregationBuilder;
4849
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregationBuilder;
@@ -53,6 +54,7 @@
5354
import org.elasticsearch.search.sort.SortBuilders;
5455
import org.elasticsearch.search.sort.SortOrder;
5556
import org.elasticsearch.search.suggest.SuggestBuilder;
57+
import org.elasticsearch.telemetry.TelemetryProvider;
5658
import org.elasticsearch.test.ESTestCase;
5759
import org.elasticsearch.threadpool.TestThreadPool;
5860
import org.elasticsearch.threadpool.ThreadPool;
@@ -161,7 +163,8 @@ public void sendCanMatch(
161163
timeProvider,
162164
null,
163165
true,
164-
EMPTY_CONTEXT_PROVIDER
166+
EMPTY_CONTEXT_PROVIDER,
167+
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
165168
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
166169
result.set(iter);
167170
latch.countDown();
@@ -256,7 +259,8 @@ public void sendCanMatch(
256259
timeProvider,
257260
null,
258261
true,
259-
EMPTY_CONTEXT_PROVIDER
262+
EMPTY_CONTEXT_PROVIDER,
263+
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
260264
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
261265
result.set(iter);
262266
latch.countDown();
@@ -347,7 +351,8 @@ public void sendCanMatch(
347351
timeProvider,
348352
null,
349353
true,
350-
EMPTY_CONTEXT_PROVIDER
354+
EMPTY_CONTEXT_PROVIDER,
355+
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
351356
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
352357
result.set(iter);
353358
latch.countDown();
@@ -446,7 +451,8 @@ public void sendCanMatch(
446451
timeProvider,
447452
null,
448453
shardsIter.size() > shardToSkip.size(),
449-
EMPTY_CONTEXT_PROVIDER
454+
EMPTY_CONTEXT_PROVIDER,
455+
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
450456
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
451457
result.set(iter);
452458
latch.countDown();
@@ -1418,7 +1424,8 @@ public void sendCanMatch(
14181424
timeProvider,
14191425
null,
14201426
true,
1421-
contextProvider
1427+
contextProvider,
1428+
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
14221429
),
14231430
requests
14241431
);

server/src/test/java/org/elasticsearch/rest/action/search/SearchPhaseCoordinatorAPMMetricsTests.java

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@
1212
import org.elasticsearch.action.search.ClosePointInTimeRequest;
1313
import org.elasticsearch.action.search.OpenPointInTimeRequest;
1414
import org.elasticsearch.action.search.OpenPointInTimeResponse;
15+
import org.elasticsearch.action.search.SearchRequest;
16+
import org.elasticsearch.action.search.SearchShardsRequest;
1517
import org.elasticsearch.action.search.SearchType;
1618
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
1719
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
20+
import org.elasticsearch.action.search.TransportSearchShardsAction;
1821
import org.elasticsearch.cluster.metadata.IndexMetadata;
1922
import org.elasticsearch.common.bytes.BytesReference;
2023
import org.elasticsearch.common.settings.Settings;
2124
import org.elasticsearch.core.TimeValue;
25+
import org.elasticsearch.index.query.RangeQueryBuilder;
2226
import org.elasticsearch.plugins.Plugin;
2327
import org.elasticsearch.plugins.PluginsService;
2428
import org.elasticsearch.search.builder.PointInTimeBuilder;
@@ -39,8 +43,10 @@
3943

4044
public class SearchPhaseCoordinatorAPMMetricsTests extends ESSingleNodeTestCase {
4145
private static final String indexName = "test_coordinator_search_phase_metrics";
46+
private static final String secondIndexName = "test_coordinator_search_phase_metrics_2";
4247
private final int num_primaries = randomIntBetween(2, 7);
4348

49+
private static final String CAN_MATCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.can_match.histogram";
4450
private static final String DFS_QUERY_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs_query.histogram";
4551
private static final String DFS_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs.histogram";
4652
private static final String FETCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.fetch.histogram";
@@ -63,8 +69,22 @@ private void setUpIndex() throws Exception {
6369
);
6470
ensureGreen(indexName);
6571

66-
prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get();
67-
prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get();
72+
prepareIndex(indexName).setId("1").setSource("body", "doc1", "@timestamp", "2024-11-01").setRefreshPolicy(IMMEDIATE).get();
73+
prepareIndex(indexName).setId("2").setSource("body", "doc2", "@timestamp", "2024-12-01").setRefreshPolicy(IMMEDIATE).get();
74+
prepareIndex(indexName).setId("3").setSource("body", "doc3", "@timestamp", "2025-01-01").setRefreshPolicy(IMMEDIATE).get();
75+
76+
createIndex(
77+
secondIndexName,
78+
Settings.builder()
79+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries)
80+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
81+
.build()
82+
);
83+
ensureGreen(secondIndexName);
84+
85+
prepareIndex(secondIndexName).setId("4").setSource("body", "doc1", "@timestamp", "2025-11-01").setRefreshPolicy(IMMEDIATE).get();
86+
prepareIndex(secondIndexName).setId("5").setSource("body", "doc2", "@timestamp", "2025-12-01").setRefreshPolicy(IMMEDIATE).get();
87+
prepareIndex(secondIndexName).setId("6").setSource("body", "doc3", "@timestamp", "2026-01-01").setRefreshPolicy(IMMEDIATE).get();
6888
}
6989

7090
@After
@@ -82,19 +102,24 @@ public void testSearchQueryThenFetch() {
82102
client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
83103
"1"
84104
);
85-
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
105+
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
106+
assertNotMeasured(
107+
List.of(CAN_MATCH_SEARCH_PHASE_METRIC, DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC)
108+
);
86109
}
87110

88111
public void testDfsSearch() {
89112
assertSearchHitsWithoutFailures(
90113
client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
91114
"1"
92115
);
93-
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
116+
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
117+
assertNotMeasured(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
94118
}
95119

96120
public void testPointInTime() {
97121
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10));
122+
request.indexFilter(simpleQueryStringQuery("doc1"));
98123
OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
99124
BytesReference pointInTimeId = response.getPointInTimeId();
100125

@@ -106,12 +131,67 @@ public void testPointInTime() {
106131
.setQuery(simpleQueryStringQuery("doc1")),
107132
"1"
108133
);
109-
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
134+
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
135+
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
136+
} finally {
137+
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
138+
}
139+
}
140+
141+
public void testPointInTimeWithPreFiltering() {
142+
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName, secondIndexName).keepAlive(TimeValue.timeValueMinutes(10));
143+
request.indexFilter(new RangeQueryBuilder("@timestamp").gte("2025-07-01"));
144+
OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
145+
BytesReference pointInTimeId = response.getPointInTimeId();
146+
147+
try {
148+
assertSearchHitsWithoutFailures(
149+
client().prepareSearch()
150+
.setPointInTime(new PointInTimeBuilder(pointInTimeId))
151+
.setSize(1)
152+
.setPreFilterShardSize(1)
153+
.setQuery(simpleQueryStringQuery("doc3")),
154+
"6"
155+
);
156+
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
157+
assertMeasurements(
158+
List.of(CAN_MATCH_SEARCH_PHASE_METRIC),
159+
2 // one during open PIT, one during can-match phase of search
160+
);
161+
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
110162
} finally {
111163
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
112164
}
113165
}
114166

167+
public void testCanMatchSearch() {
168+
assertSearchHitsWithoutFailures(
169+
client().prepareSearch(indexName)
170+
.setSearchType(SearchType.QUERY_THEN_FETCH)
171+
.setPreFilterShardSize(1)
172+
.setQuery(simpleQueryStringQuery("doc1")),
173+
"1"
174+
);
175+
176+
assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC), 1);
177+
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
178+
}
179+
180+
public void testSearchShards() {
181+
var request = new SearchShardsRequest(
182+
new String[] { indexName },
183+
SearchRequest.DEFAULT_INDICES_OPTIONS,
184+
simpleQueryStringQuery("doc1"),
185+
null,
186+
null,
187+
randomBoolean(),
188+
randomBoolean() ? null : randomAlphaOfLength(10)
189+
);
190+
var resp = client().execute(TransportSearchShardsAction.TYPE, request).actionGet();
191+
assertThat(resp.getGroups(), hasSize(num_primaries));
192+
assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC), 1);
193+
}
194+
115195
private void resetMeter() {
116196
getTestTelemetryPlugin().resetMeter();
117197
}
@@ -120,11 +200,18 @@ private TestTelemetryPlugin getTestTelemetryPlugin() {
120200
return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0);
121201
}
122202

123-
private void assertMeasurements(Collection<String> metricNames) {
203+
private void assertNotMeasured(Collection<String> metricNames) {
204+
for (var metricName : metricNames) {
205+
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
206+
assertThat(metricName, measurements, hasSize(0));
207+
}
208+
}
209+
210+
private void assertMeasurements(Collection<String> metricNames, int numberOfMeasurements) {
124211
for (var metricName : metricNames) {
125212
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
126-
assertThat(measurements, hasSize(1));
127-
assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L));
213+
assertThat(metricName, measurements, hasSize(numberOfMeasurements));
214+
assertThat(metricName, measurements.getFirst().getLong(), greaterThanOrEqualTo(0L));
128215
}
129216
}
130217
}

0 commit comments

Comments
 (0)