diff --git a/docs/changelog/136828.yaml b/docs/changelog/136828.yaml new file mode 100644 index 0000000000000..7d4ffe3d892c2 --- /dev/null +++ b/docs/changelog/136828.yaml @@ -0,0 +1,5 @@ +pr: 136828 +summary: Can match phase coordinator duration APM metric +area: Search +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index b5faf88dff125..c9285c23ef53e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -19,6 +19,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.CoordinatorRewriteContext; import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.CanMatchShardResponse; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -58,6 +59,8 @@ */ final class CanMatchPreFilterSearchPhase { + private static final String PHASE_NAME = "can_match"; + private final Logger logger; private final SearchRequest request; private final List shardsIts; @@ -136,12 +139,27 @@ public static SubscribableListener> execute( TransportSearchAction.SearchTimeProvider timeProvider, SearchTask task, boolean requireAtLeastOneMatch, - CoordinatorRewriteContextProvider coordinatorRewriteContextProvider + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider, + SearchResponseMetrics searchResponseMetrics ) { if (shardsIts.isEmpty()) { return SubscribableListener.newSucceeded(List.of()); } final SubscribableListener> listener = new SubscribableListener<>(); + long phaseStartTimeInNanos = System.nanoTime(); + + listener.addListener(new ActionListener<>() { + @Override + public void onResponse(List shardsIts) { + searchResponseMetrics.recordSearchPhaseDuration(PHASE_NAME, System.nanoTime() - phaseStartTimeInNanos); + } + + @Override + public void onFailure(Exception e) { + // do not record duration of failed phases + } + }); + // Note that the search is failed when this task is rejected by the executor executor.execute(new AbstractRunnable() { @Override @@ -149,7 +167,7 @@ public void onFailure(Exception e) { if (logger.isDebugEnabled()) { logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e); } - listener.onFailure(new SearchPhaseExecutionException("can_match", "start", e, ShardSearchFailure.EMPTY_ARRAY)); + listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "start", e, ShardSearchFailure.EMPTY_ARRAY)); } @Override @@ -249,7 +267,7 @@ private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAnd private void checkNoMissingShards(List shards) { assert assertSearchCoordinationThread(); - SearchPhase.doCheckNoMissingShards("can_match", request, shards); + SearchPhase.doCheckNoMissingShards(PHASE_NAME, request, shards); } private Map> groupByNode(List shards) { @@ -386,7 +404,7 @@ public void onFailure(Exception e) { if (logger.isDebugEnabled()) { logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e); } - listener.onFailure(new SearchPhaseExecutionException("can_match", "round", e, ShardSearchFailure.EMPTY_ARRAY)); + listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "round", e, ShardSearchFailure.EMPTY_ARRAY)); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 2563b0f5f26f0..f9e779a610df4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -184,7 +184,8 @@ public void runNewSearchPhase( timeProvider, task, false, - searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis) + searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis), + searchResponseMetrics ) .addListener( listener.delegateFailureAndWrap( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 51dd4cc09a415..077c45eeafc70 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1788,7 +1788,8 @@ public void runNewSearchPhase( timeProvider, task, requireAtLeastOneMatch, - searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis) + searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis), + searchResponseMetrics ) .addListener( listener.delegateFailureAndWrap( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java index 035a5005b4a73..afd806c2b8a3c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; @@ -60,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction { result.set(iter); latch.countDown(); @@ -256,7 +259,8 @@ public void sendCanMatch( timeProvider, null, true, - EMPTY_CONTEXT_PROVIDER + EMPTY_CONTEXT_PROVIDER, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ).addListener(ActionTestUtils.assertNoFailureListener(iter -> { result.set(iter); latch.countDown(); @@ -347,7 +351,8 @@ public void sendCanMatch( timeProvider, null, true, - EMPTY_CONTEXT_PROVIDER + EMPTY_CONTEXT_PROVIDER, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ).addListener(ActionTestUtils.assertNoFailureListener(iter -> { result.set(iter); latch.countDown(); @@ -446,7 +451,8 @@ public void sendCanMatch( timeProvider, null, shardsIter.size() > shardToSkip.size(), - EMPTY_CONTEXT_PROVIDER + EMPTY_CONTEXT_PROVIDER, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ).addListener(ActionTestUtils.assertNoFailureListener(iter -> { result.set(iter); latch.countDown(); @@ -1418,7 +1424,8 @@ public void sendCanMatch( timeProvider, null, true, - contextProvider + contextProvider, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ), requests ); diff --git a/server/src/test/java/org/elasticsearch/rest/action/search/SearchPhaseCoordinatorAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/rest/action/search/SearchPhaseCoordinatorAPMMetricsTests.java index 9cd3a8d9dc68c..70a03419c3a5e 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/search/SearchPhaseCoordinatorAPMMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/search/SearchPhaseCoordinatorAPMMetricsTests.java @@ -12,13 +12,17 @@ import org.elasticsearch.action.search.ClosePointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchShardsRequest; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; +import org.elasticsearch.action.search.TransportSearchShardsAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.builder.PointInTimeBuilder; @@ -39,8 +43,10 @@ public class SearchPhaseCoordinatorAPMMetricsTests extends ESSingleNodeTestCase { private static final String indexName = "test_coordinator_search_phase_metrics"; + private static final String secondIndexName = "test_coordinator_search_phase_metrics_2"; private final int num_primaries = randomIntBetween(2, 7); + private static final String CAN_MATCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.can_match.histogram"; private static final String DFS_QUERY_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs_query.histogram"; private static final String DFS_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs.histogram"; private static final String FETCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.fetch.histogram"; @@ -63,8 +69,22 @@ private void setUpIndex() throws Exception { ); ensureGreen(indexName); - prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get(); - prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(indexName).setId("1").setSource("body", "doc1", "@timestamp", "2024-11-01").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(indexName).setId("2").setSource("body", "doc2", "@timestamp", "2024-12-01").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(indexName).setId("3").setSource("body", "doc3", "@timestamp", "2025-01-01").setRefreshPolicy(IMMEDIATE).get(); + + createIndex( + secondIndexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(secondIndexName); + + prepareIndex(secondIndexName).setId("4").setSource("body", "doc1", "@timestamp", "2025-11-01").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(secondIndexName).setId("5").setSource("body", "doc2", "@timestamp", "2025-12-01").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(secondIndexName).setId("6").setSource("body", "doc3", "@timestamp", "2026-01-01").setRefreshPolicy(IMMEDIATE).get(); } @After @@ -82,7 +102,10 @@ public void testSearchQueryThenFetch() { client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), "1" ); - assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC)); + assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1); + assertNotMeasured( + List.of(CAN_MATCH_SEARCH_PHASE_METRIC, DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC) + ); } public void testDfsSearch() { @@ -90,11 +113,13 @@ public void testDfsSearch() { client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), "1" ); - assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC)); + assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1); + assertNotMeasured(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC)); } public void testPointInTime() { OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10)); + request.indexFilter(simpleQueryStringQuery("doc1")); OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); BytesReference pointInTimeId = response.getPointInTimeId(); @@ -106,12 +131,67 @@ public void testPointInTime() { .setQuery(simpleQueryStringQuery("doc1")), "1" ); - assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC)); + assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1); + assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC)); + } finally { + client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet(); + } + } + + public void testPointInTimeWithPreFiltering() { + OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName, secondIndexName).keepAlive(TimeValue.timeValueMinutes(10)); + request.indexFilter(new RangeQueryBuilder("@timestamp").gte("2025-07-01")); + OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); + BytesReference pointInTimeId = response.getPointInTimeId(); + + try { + assertSearchHitsWithoutFailures( + client().prepareSearch() + .setPointInTime(new PointInTimeBuilder(pointInTimeId)) + .setSize(1) + .setPreFilterShardSize(1) + .setQuery(simpleQueryStringQuery("doc3")), + "6" + ); + assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1); + assertMeasurements( + List.of(CAN_MATCH_SEARCH_PHASE_METRIC), + 2 // one during open PIT, one during can-match phase of search + ); + assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC)); } finally { client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet(); } } + public void testCanMatchSearch() { + assertSearchHitsWithoutFailures( + client().prepareSearch(indexName) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreFilterShardSize(1) + .setQuery(simpleQueryStringQuery("doc1")), + "1" + ); + + assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC), 1); + assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC)); + } + + public void testSearchShards() { + var request = new SearchShardsRequest( + new String[] { indexName }, + SearchRequest.DEFAULT_INDICES_OPTIONS, + simpleQueryStringQuery("doc1"), + null, + null, + randomBoolean(), + randomBoolean() ? null : randomAlphaOfLength(10) + ); + var resp = client().execute(TransportSearchShardsAction.TYPE, request).actionGet(); + assertThat(resp.getGroups(), hasSize(num_primaries)); + assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC), 1); + } + private void resetMeter() { getTestTelemetryPlugin().resetMeter(); } @@ -120,11 +200,18 @@ private TestTelemetryPlugin getTestTelemetryPlugin() { return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0); } - private void assertMeasurements(Collection metricNames) { + private void assertNotMeasured(Collection metricNames) { + for (var metricName : metricNames) { + List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); + assertThat(metricName, measurements, hasSize(0)); + } + } + + private void assertMeasurements(Collection metricNames, int numberOfMeasurements) { for (var metricName : metricNames) { List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); - assertThat(measurements, hasSize(1)); - assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); + assertThat(metricName, measurements, hasSize(numberOfMeasurements)); + assertThat(metricName, measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); } } }