diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..e9cedf54d188e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + protected final SearchResponseMetrics searchResponseMetrics; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -114,7 +116,8 @@ abstract class AbstractSearchAsyncAction exten SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchResponseMetrics searchResponseMetrics ) { super(name); this.namedWriteableRegistry = namedWriteableRegistry; @@ -155,6 +158,7 @@ abstract class AbstractSearchAsyncAction exten // at the end of the search addReleasable(resultConsumer); this.clusters = clusters; + this.searchResponseMetrics = searchResponseMetrics; } protected void notifyListShards( @@ -665,7 +669,7 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti * @see #onShardFailure(int, SearchShardTarget, Exception) * @see #onShardResult(SearchPhaseResult) */ - private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() + protected void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() executeNextPhase(getName(), this::getNextPhase); } @@ -684,6 +688,10 @@ public SearchTransportService getSearchTransport() { return searchTransportService; } + public SearchResponseMetrics getSearchResponseMetrics() { + return searchResponseMetrics; + } + public final void execute(Runnable command) { executor.execute(command); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index d49542313a712..a47947c81c18f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters, - Client client + Client client, + SearchResponseMetrics searchResponseMetrics ) { super( "dfs", @@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), - clusters + clusters, + searchResponseMetrics ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; addReleasable(queryPhaseResultConsumer); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 1a7c0f26f4b21..486e805ca3969 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -92,6 +93,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction shardIndexMap) { + phaseStartTimeNanos = System.nanoTime(); if (this.batchQueryPhase == false) { super.doRun(shardIndexMap); return; @@ -564,6 +569,13 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP } } + @Override + protected void onPhaseDone() { + final long tookInNanos = System.nanoTime() - phaseStartTimeNanos; + searchResponseMetrics.recordQueryPhaseDuration(tookInNanos); + super.onPhaseDone(); + } + public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; static void registerNodeSearchAction( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index f9ae44c473014..868a7094c61e9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -70,6 +71,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction(shardIterators.size()), searchRequest.getMaxConcurrentShardRequests(), - clusters + clusters, + searchResponseMetrics ) { @Override protected void executePhaseOnShard( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 16389f6137f81..b66fa8d1f509b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1688,7 +1688,8 @@ public void runNewSearchPhase( clusterState, task, clusters, - client + client, + searchResponseMetrics ); } else { assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType(); @@ -1709,7 +1710,8 @@ public void runNewSearchPhase( task, clusters, client, - searchService.batchQueryPhase() + searchService.batchQueryPhase(), + searchResponseMetrics ); } success = true; diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java b/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java index ae9f14de4c24d..7645dc4b32fbd 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Container class for aggregated metrics about search responses. @@ -42,9 +43,11 @@ public String getDisplayName() { public static final String TOOK_DURATION_TOTAL_HISTOGRAM_NAME = "es.search_response.took_durations.histogram"; public static final String RESPONSE_COUNT_TOTAL_COUNTER_NAME = "es.search_response.response_count.total"; + public static final String QUERY_SEARCH_PHASE_METRIC = "es.search_response.coordinator_phases.query.duration.histogram"; private final LongHistogram tookDurationTotalMillisHistogram; private final LongCounter responseCountTotalCounter; + private final LongHistogram queryPhaseDurationHistogram; public SearchResponseMetrics(MeterRegistry meterRegistry) { this( @@ -59,13 +62,23 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) { + "success, partial failure, or failure, expressed as a single total counter and individual " + "attribute counters", "count" + ), + meterRegistry.registerLongHistogram( + QUERY_SEARCH_PHASE_METRIC, + "Query search phase execution times at the coordinator level, expressed as a histogram", + "millis" ) ); } - private SearchResponseMetrics(LongHistogram tookDurationTotalMillisHistogram, LongCounter responseCountTotalCounter) { + private SearchResponseMetrics( + LongHistogram tookDurationTotalMillisHistogram, + LongCounter responseCountTotalCounter, + LongHistogram queryPhaseDurationHistogram + ) { this.tookDurationTotalMillisHistogram = tookDurationTotalMillisHistogram; this.responseCountTotalCounter = responseCountTotalCounter; + this.queryPhaseDurationHistogram = queryPhaseDurationHistogram; } public long recordTookTimeForSearchScroll(long tookTime) { @@ -91,4 +104,8 @@ public void incrementResponseCount(ResponseCountTotalStatus responseCountTotalSt attributesWithStatus.put(RESPONSE_COUNT_TOTAL_STATUS_ATTRIBUTE_NAME, responseCountTotalStatus.getDisplayName()); responseCountTotalCounter.incrementBy(1L, attributesWithStatus); } + + public void recordQueryPhaseDuration(long tookInNanos) { + queryPhaseDurationHistogram.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos)); + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..388e4472093af 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -18,11 +18,13 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; @@ -88,7 +90,8 @@ private AbstractSearchAsyncAction createAction( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 16e90064ca929..4ed49e4368857 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -18,9 +18,11 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ShardSearchContextId; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.transport.Transport; import org.junit.Assert; @@ -66,7 +68,8 @@ public MockSearchPhaseContext(int numShards) { new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()), new ArraySearchPhaseResults<>(numShards), 5, - null + null, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ); this.numShards = numShards; numSuccess = new AtomicInteger(numShards); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index afd3bee4c4ab8..713c41f3ee35e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -21,10 +21,12 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; @@ -111,7 +113,8 @@ public void testSkipSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -218,7 +221,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -334,7 +338,8 @@ public void sendFreeContext( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -464,7 +469,8 @@ public void sendFreeContext( null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected void executePhaseOnShard( @@ -572,7 +578,8 @@ public void testAllowPartialResults() throws InterruptedException { null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -670,7 +677,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(searchShardIterators.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 2a9d12b27507d..a8d69cec03fc0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.lucene.grouping.TopFieldGroups; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -38,6 +39,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.transport.Transport; @@ -211,7 +213,8 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, null, - false + false, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected SearchPhase getNextPhase() { @@ -407,7 +410,8 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, null, - false + false, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/rest/action/search/SearchResponseMetricsTests.java b/server/src/test/java/org/elasticsearch/rest/action/search/SearchResponseMetricsTests.java new file mode 100644 index 0000000000000..74df14137de1d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/rest/action/search/SearchResponseMetricsTests.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.action.search; + +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; +import static org.elasticsearch.rest.action.search.SearchResponseMetrics.QUERY_SEARCH_PHASE_METRIC; +import static org.elasticsearch.rest.action.search.SearchResponseMetrics.TOOK_DURATION_TOTAL_HISTOGRAM_NAME; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; + +public class SearchResponseMetricsTests extends ESSingleNodeTestCase { + private static final String indexName = "test_coordinator_search_phase_metrics"; + private final int num_primaries = randomIntBetween(2, 7); + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @Before + private void setUpIndex() throws Exception { + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(indexName); + + prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get(); + } + + @After + private void afterTest() { + resetMeter(); + } + + @Override + protected Collection> getPlugins() { + return pluginList(TestTelemetryPlugin.class); + } + + public void testSearchQueryThenFetch() throws InterruptedException { + assertSearchHitsWithoutFailures( + client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), + "1" + ); + assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, TOOK_DURATION_TOTAL_HISTOGRAM_NAME)); + } + + private void resetMeter() { + getTestTelemetryPlugin().resetMeter(); + } + + private TestTelemetryPlugin getTestTelemetryPlugin() { + return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0); + } + + private void assertMeasurements(Collection metricNames) { + for (var metricName : metricNames) { + List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); + assertThat(measurements, hasSize(1)); + assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); + } + } +}