From 9239f461779ba8f88422320cb86c87a50a01e78f Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Mon, 6 Oct 2025 15:44:48 -0500 Subject: [PATCH 1/6] Added es.search.coordinator.phases.query.duration.histogram APM metric to track the duration of the search query phase. --- .../search/AbstractSearchAsyncAction.java | 9 +- .../SearchDfsQueryThenFetchAsyncAction.java | 7 +- .../SearchQueryThenFetchAsyncAction.java | 15 +- .../TransportOpenPointInTimeAction.java | 9 +- .../action/search/TransportSearchAction.java | 12 +- .../search/TransportSearchShardsAction.java | 6 +- .../CoordinatorSearchPhaseAPMMetrics.java | 42 +++++ .../elasticsearch/node/NodeConstruction.java | 6 + .../AbstractSearchAsyncActionTests.java | 4 +- .../action/search/MockSearchPhaseContext.java | 4 +- .../action/search/SearchAsyncActionTests.java | 19 ++- .../SearchQueryThenFetchAsyncActionTests.java | 7 +- .../search/TransportSearchActionTests.java | 4 +- ...CoordinatorSearchPhaseAPMMetricsTests.java | 151 ++++++++++++++++++ .../snapshots/SnapshotResiliencyTests.java | 4 +- 15 files changed, 276 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java create mode 100644 server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..adc30977cd445 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Releasable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchPhaseResult; @@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + protected final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -114,7 +116,8 @@ abstract class AbstractSearchAsyncAction exten SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics ) { super(name); this.namedWriteableRegistry = namedWriteableRegistry; @@ -155,6 +158,7 @@ abstract class AbstractSearchAsyncAction exten // at the end of the search addReleasable(resultConsumer); this.clusters = clusters; + this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics; } protected void notifyListShards( @@ -666,9 +670,12 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti * @see #onShardResult(SearchPhaseResult) */ private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() + recordPhaseLatency(); executeNextPhase(getName(), this::getNextPhase); } + protected void recordPhaseLatency() {} + /** * Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be * thrown. diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index d49542313a712..898bb83aa6be1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters, - Client client + Client client, + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics ) { super( "dfs", @@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), - clusters + clusters, + coordinatorSearchPhaseAPMMetrics ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; addReleasable(queryPhaseResultConsumer); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a8f22eb1cc572..4147907f532d2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; @@ -90,6 +91,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction shardIndexMap) { + phaseStartTimeNanos = System.nanoTime(); if (this.batchQueryPhase == false) { super.doRun(shardIndexMap); return; @@ -562,6 +567,12 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP } } + @Override + protected void recordPhaseLatency() { + final long tookInNanos = System.nanoTime() - phaseStartTimeNanos; + coordinatorSearchPhaseAPMMetrics.onQueryPhaseDone(tookInNanos); + } + public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; static void registerNodeSearchAction( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 1038308bc6bf3..5cf9a68bb1668 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.rest.RestStatus; @@ -70,6 +71,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction(shardIterators.size()), searchRequest.getMaxConcurrentShardRequests(), - clusters + clusters, + coordinatorSearchPhaseAPMMetrics ) { @Override protected void executePhaseOnShard( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index cd8c561a4ad75..3374c4dc41a4c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -67,6 +67,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.ExecutorSelector; @@ -170,6 +171,7 @@ public class TransportSearchAction extends HandledTransportAction slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class); // NOTE: the response of index/search slow log fields below must be calculated dynamically on every call // because the responses may change dynamically at runtime @@ -1360,6 +1365,7 @@ public Map queryFields() { b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService); b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService); b.bind(MergeMetrics.class).toInstance(mergeMetrics); + b.bind(CoordinatorSearchPhaseAPMMetrics.class).toInstance(coordinatorSearchPhaseAPMMetrics); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..9d213f279b3a7 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -88,7 +89,8 @@ private AbstractSearchAsyncAction createAction( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 16e90064ca929..2a66decbc7211 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -66,7 +67,8 @@ public MockSearchPhaseContext(int numShards) { new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()), new ArraySearchPhaseResults<>(numShards), 5, - null + null, + CoordinatorSearchPhaseAPMMetrics.NOOP ); this.numShards = numShards; numSuccess = new AtomicInteger(numShards); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index afd3bee4c4ab8..63848a0123f89 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.Index; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; @@ -111,7 +112,8 @@ public void testSkipSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -218,7 +220,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -334,7 +337,8 @@ public void sendFreeContext( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -464,7 +468,8 @@ public void sendFreeContext( null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected void executePhaseOnShard( @@ -572,7 +577,8 @@ public void testAllowPartialResults() throws InterruptedException { null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -670,7 +676,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(searchShardIterators.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 2a9d12b27507d..24d3399a882f9 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.lucene.grouping.TopFieldGroups; import org.elasticsearch.search.DocValueFormat; @@ -211,7 +212,8 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, null, - false + false, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected SearchPhase getNextPhase() { @@ -407,7 +409,8 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, null, - false + false, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 61aa05f703018..28cb888bd3984 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -1810,7 +1811,8 @@ protected void doWriteTo(StreamOutput out) throws IOException { null, new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), client, - new UsageService() + new UsageService(), + CoordinatorSearchPhaseAPMMetrics.NOOP ); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java new file mode 100644 index 0000000000000..200756cd2f490 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.TelemetryMetrics; + +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; +import static org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics.QUERY_SEARCH_PHASE_METRIC; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; + +public class CoordinatorSearchPhaseAPMMetricsTests extends ESSingleNodeTestCase { + private static final String indexName = "test_coordinator_search_phase_metrics"; + private final int num_primaries = randomIntBetween(2, 7); + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @Before + private void setUpIndex() throws Exception { + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(indexName); + + prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get(); + + prepareIndex(CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME).setId("1") + .setSource("body", "doc1") + .setRefreshPolicy(IMMEDIATE) + .get(); + prepareIndex(CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME).setId("2") + .setSource("body", "doc2") + .setRefreshPolicy(IMMEDIATE) + .get(); + } + + @After + private void afterTest() { + resetMeter(); + } + + @Override + protected Collection> getPlugins() { + return pluginList(TestTelemetryPlugin.class, CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.class); + } + + public void testSearchQueryThenFetch() throws InterruptedException { + assertSearchHitsWithoutFailures( + client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), + "1" + ); + final List queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC); + assertEquals(1, queryMeasurements.size()); + assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC)); + } + + private void resetMeter() { + getTestTelemetryPlugin().resetMeter(); + } + + private TestTelemetryPlugin getTestTelemetryPlugin() { + return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0); + } + + private void assertMeasurements(Collection metricNames) { + for (var metricName : metricNames) { + List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); + assertThat(measurements, hasSize(1)); + assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); + } + } + + public static class TestSystemIndexPlugin extends Plugin implements SystemIndexPlugin { + + static final String INDEX_NAME = ".test-system-index"; + + public TestSystemIndexPlugin() {} + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of( + SystemIndexDescriptor.builder() + .setIndexPattern(INDEX_NAME + "*") + .setPrimaryIndex(INDEX_NAME) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .setMappings(""" + { + "_meta": { + "version": "8.0.0", + "managed_index_mappings_version": 3 + }, + "properties": { + "body": { "type": "keyword" } + } + } + """) + .setThreadPools(ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS) + .setOrigin(ShardSearchPhaseAPMMetricsTests.class.getSimpleName()) + .build() + ); + } + + @Override + public String getFeatureName() { + return ShardSearchPhaseAPMMetricsTests.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "test plugin"; + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d6a57ba4587c5..37ac5ce022766 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -148,6 +148,7 @@ import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; @@ -2771,7 +2772,8 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { EmptySystemIndices.INSTANCE.getExecutorSelector(), new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), client, - usageService + usageService, + CoordinatorSearchPhaseAPMMetrics.NOOP ) ); actions.put( From 5973b2ccb3284cf1d41b03c411e469c30e207715 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Tue, 7 Oct 2025 09:07:49 -0500 Subject: [PATCH 2/6] clean up unused code --- .../action/search/TransportSearchShardsAction.java | 6 +----- .../CoordinatorSearchPhaseAPMMetricsTests.java | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java index 47f0be9105fb2..507893df0c0c1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.Rewriteable; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -61,7 +60,6 @@ public class TransportSearchShardsAction extends HandledTransportAction queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC); - assertEquals(1, queryMeasurements.size()); + assertThat(queryMeasurements, hasSize(1)); assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC)); } From 58d237e9d10d40469b301253c9e6282dca312c65 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Tue, 7 Oct 2025 11:24:53 -0500 Subject: [PATCH 3/6] remove redundant asserts --- .../TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java index e3185e099e905..44c708eb20ce7 100644 --- a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java @@ -81,8 +81,6 @@ public void testSearchQueryThenFetch() throws InterruptedException { client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), "1" ); - final List queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC); - assertThat(queryMeasurements, hasSize(1)); assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC)); } From 182b619fe87894d86f2bbd457d6b935a7a158bc1 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Tue, 7 Oct 2025 13:21:28 -0500 Subject: [PATCH 4/6] PR fixes --- .../CoordinatorSearchPhaseAPMMetrics.java | 5 +- ...CoordinatorSearchPhaseAPMMetricsTests.java | 60 +------------------ 2 files changed, 2 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java b/server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java index 029d2fc309337..ca717571d9ec5 100644 --- a/server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java +++ b/server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java @@ -33,10 +33,7 @@ public CoordinatorSearchPhaseAPMMetrics(MeterRegistry meterRegistry) { } public void onQueryPhaseDone(long tookInNanos) { - recordPhaseLatency(queryPhaseMetric, tookInNanos); + queryPhaseMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos)); } - protected void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) { - histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos)); - } } diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java index 44c708eb20ce7..5426fcf4fcc28 100644 --- a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java @@ -12,11 +12,8 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indices.ExecutorNames; -import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -55,15 +52,6 @@ private void setUpIndex() throws Exception { prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get(); prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get(); - - prepareIndex(CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME).setId("1") - .setSource("body", "doc1") - .setRefreshPolicy(IMMEDIATE) - .get(); - prepareIndex(CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME).setId("2") - .setSource("body", "doc2") - .setRefreshPolicy(IMMEDIATE) - .get(); } @After @@ -73,7 +61,7 @@ private void afterTest() { @Override protected Collection> getPlugins() { - return pluginList(TestTelemetryPlugin.class, CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.class); + return pluginList(TestTelemetryPlugin.class); } public void testSearchQueryThenFetch() throws InterruptedException { @@ -100,50 +88,4 @@ private void assertMeasurements(Collection metricNames) { } } - public static class TestSystemIndexPlugin extends Plugin implements SystemIndexPlugin { - - static final String INDEX_NAME = ".test-system-index"; - - public TestSystemIndexPlugin() {} - - @Override - public Collection getSystemIndexDescriptors(Settings settings) { - return List.of( - SystemIndexDescriptor.builder() - .setIndexPattern(INDEX_NAME + "*") - .setPrimaryIndex(INDEX_NAME) - .setSettings( - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build() - ) - .setMappings(""" - { - "_meta": { - "version": "8.0.0", - "managed_index_mappings_version": 3 - }, - "properties": { - "body": { "type": "keyword" } - } - } - """) - .setThreadPools(ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS) - .setOrigin(ShardSearchPhaseAPMMetricsTests.class.getSimpleName()) - .build() - ); - } - - @Override - public String getFeatureName() { - return ShardSearchPhaseAPMMetricsTests.class.getSimpleName(); - } - - @Override - public String getFeatureDescription() { - return "test plugin"; - } - } - } From f52cedf2600a99909c329e30e08d97ff00f8bafb Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Tue, 7 Oct 2025 15:01:37 -0500 Subject: [PATCH 5/6] Use SearchResponseMetrics instead, directly call for the query phase --- .../search/AbstractSearchAsyncAction.java | 17 ++++---- .../SearchDfsQueryThenFetchAsyncAction.java | 6 +-- .../SearchQueryThenFetchAsyncAction.java | 11 +++--- .../TransportOpenPointInTimeAction.java | 10 ++--- .../action/search/TransportSearchAction.java | 10 ++--- .../CoordinatorSearchPhaseAPMMetrics.java | 39 ------------------- .../elasticsearch/node/NodeConstruction.java | 6 --- .../action/search/SearchResponseMetrics.java | 19 ++++++++- .../AbstractSearchAsyncActionTests.java | 5 ++- .../action/search/MockSearchPhaseContext.java | 5 ++- .../action/search/SearchAsyncActionTests.java | 15 +++---- .../SearchQueryThenFetchAsyncActionTests.java | 7 ++-- .../search/TransportSearchActionTests.java | 4 +- .../search/SearchResponseMetricsTests.java} | 10 ++--- .../snapshots/SnapshotResiliencyTests.java | 4 +- 15 files changed, 69 insertions(+), 99 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java rename server/src/test/java/org/elasticsearch/{search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java => rest/action/search/SearchResponseMetricsTests.java} (88%) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index adc30977cd445..e9cedf54d188e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -26,8 +26,8 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Releasable; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -94,7 +94,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; - protected final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics; + protected final SearchResponseMetrics searchResponseMetrics; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -117,7 +117,7 @@ abstract class AbstractSearchAsyncAction exten SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters, - CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics + SearchResponseMetrics searchResponseMetrics ) { super(name); this.namedWriteableRegistry = namedWriteableRegistry; @@ -158,7 +158,7 @@ abstract class AbstractSearchAsyncAction exten // at the end of the search addReleasable(resultConsumer); this.clusters = clusters; - this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics; + this.searchResponseMetrics = searchResponseMetrics; } protected void notifyListShards( @@ -669,13 +669,10 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti * @see #onShardFailure(int, SearchShardTarget, Exception) * @see #onShardResult(SearchPhaseResult) */ - private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() - recordPhaseLatency(); + protected void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() executeNextPhase(getName(), this::getNextPhase); } - protected void recordPhaseLatency() {} - /** * Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be * thrown. @@ -691,6 +688,10 @@ public SearchTransportService getSearchTransport() { return searchTransportService; } + public SearchResponseMetrics getSearchResponseMetrics() { + return searchResponseMetrics; + } + public final void execute(Runnable command) { executor.execute(command); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 898bb83aa6be1..a47947c81c18f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -14,7 +14,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -49,7 +49,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction SearchTask task, SearchResponse.Clusters clusters, Client client, - CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics + SearchResponseMetrics searchResponseMetrics ) { super( "dfs", @@ -69,7 +69,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - coordinatorSearchPhaseAPMMetrics + searchResponseMetrics ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; addReleasable(queryPhaseResultConsumer); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 6c33fff3a0b89..486e805ca3969 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -34,8 +34,8 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -113,7 +113,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction(shardIterators.size()), searchRequest.getMaxConcurrentShardRequests(), clusters, - coordinatorSearchPhaseAPMMetrics + searchResponseMetrics ) { @Override protected void executePhaseOnShard( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index cd5fa86c1d5a9..b66fa8d1f509b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -67,7 +67,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.Rewriteable; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.ExecutorSelector; @@ -171,7 +170,6 @@ public class TransportSearchAction extends HandledTransportAction slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class); // NOTE: the response of index/search slow log fields below must be calculated dynamically on every call // because the responses may change dynamically at runtime @@ -1365,7 +1360,6 @@ public Map queryFields() { b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService); b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService); b.bind(MergeMetrics.class).toInstance(mergeMetrics); - b.bind(CoordinatorSearchPhaseAPMMetrics.class).toInstance(coordinatorSearchPhaseAPMMetrics); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java b/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java index ae9f14de4c24d..89193b258f14a 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Container class for aggregated metrics about search responses. @@ -42,9 +43,11 @@ public String getDisplayName() { public static final String TOOK_DURATION_TOTAL_HISTOGRAM_NAME = "es.search_response.took_durations.histogram"; public static final String RESPONSE_COUNT_TOTAL_COUNTER_NAME = "es.search_response.response_count.total"; + public static final String QUERY_SEARCH_PHASE_METRIC = "es.search_response.coordinator_phases_took_durations.query.histogram"; private final LongHistogram tookDurationTotalMillisHistogram; private final LongCounter responseCountTotalCounter; + private final LongHistogram queryPhaseDurationHistogram; public SearchResponseMetrics(MeterRegistry meterRegistry) { this( @@ -59,13 +62,23 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) { + "success, partial failure, or failure, expressed as a single total counter and individual " + "attribute counters", "count" + ), + meterRegistry.registerLongHistogram( + QUERY_SEARCH_PHASE_METRIC, + "Query search phase execution times at the coordinator level, expressed as a histogram", + "millis" ) ); } - private SearchResponseMetrics(LongHistogram tookDurationTotalMillisHistogram, LongCounter responseCountTotalCounter) { + private SearchResponseMetrics( + LongHistogram tookDurationTotalMillisHistogram, + LongCounter responseCountTotalCounter, + LongHistogram queryPhaseDurationHistogram + ) { this.tookDurationTotalMillisHistogram = tookDurationTotalMillisHistogram; this.responseCountTotalCounter = responseCountTotalCounter; + this.queryPhaseDurationHistogram = queryPhaseDurationHistogram; } public long recordTookTimeForSearchScroll(long tookTime) { @@ -91,4 +104,8 @@ public void incrementResponseCount(ResponseCountTotalStatus responseCountTotalSt attributesWithStatus.put(RESPONSE_COUNT_TOTAL_STATUS_ATTRIBUTE_NAME, responseCountTotalStatus.getDisplayName()); responseCountTotalCounter.incrementBy(1L, attributesWithStatus); } + + public void recordQueryPhaseDuration(long tookInNanos) { + queryPhaseDurationHistogram.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos)); + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 9d213f279b3a7..388e4472093af 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -17,13 +17,14 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; @@ -90,7 +91,7 @@ private AbstractSearchAsyncAction createAction( results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 2a66decbc7211..4ed49e4368857 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -18,10 +18,11 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ShardSearchContextId; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.transport.Transport; import org.junit.Assert; @@ -68,7 +69,7 @@ public MockSearchPhaseContext(int numShards) { new ArraySearchPhaseResults<>(numShards), 5, null, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ); this.numShards = numShards; numSuccess = new AtomicInteger(numShards); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 63848a0123f89..713c41f3ee35e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -20,12 +20,13 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.Index; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; @@ -113,7 +114,7 @@ public void testSkipSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -221,7 +222,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -338,7 +339,7 @@ public void sendFreeContext( results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -469,7 +470,7 @@ public void sendFreeContext( new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected void executePhaseOnShard( @@ -578,7 +579,7 @@ public void testAllowPartialResults() throws InterruptedException { results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override @@ -677,7 +678,7 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(searchShardIterators.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 24d3399a882f9..a8d69cec03fc0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -26,9 +26,9 @@ import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.lucene.grouping.TopFieldGroups; +import org.elasticsearch.rest.action.search.SearchResponseMetrics; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -39,6 +39,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.transport.Transport; @@ -213,7 +214,7 @@ public void sendExecuteQuery( SearchResponse.Clusters.EMPTY, null, false, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected SearchPhase getNextPhase() { @@ -410,7 +411,7 @@ public void sendExecuteQuery( SearchResponse.Clusters.EMPTY, null, false, - CoordinatorSearchPhaseAPMMetrics.NOOP + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()) ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 28cb888bd3984..61aa05f703018 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -61,7 +61,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.TermsQueryBuilder; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -1811,8 +1810,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { null, new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), client, - new UsageService(), - CoordinatorSearchPhaseAPMMetrics.NOOP + new UsageService() ); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/rest/action/search/SearchResponseMetricsTests.java similarity index 88% rename from server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java rename to server/src/test/java/org/elasticsearch/rest/action/search/SearchResponseMetricsTests.java index 5426fcf4fcc28..74df14137de1d 100644 --- a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/search/SearchResponseMetricsTests.java @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.search.TelemetryMetrics; +package org.elasticsearch.rest.action.search; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -25,12 +25,13 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; -import static org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics.QUERY_SEARCH_PHASE_METRIC; +import static org.elasticsearch.rest.action.search.SearchResponseMetrics.QUERY_SEARCH_PHASE_METRIC; +import static org.elasticsearch.rest.action.search.SearchResponseMetrics.TOOK_DURATION_TOTAL_HISTOGRAM_NAME; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; -public class CoordinatorSearchPhaseAPMMetricsTests extends ESSingleNodeTestCase { +public class SearchResponseMetricsTests extends ESSingleNodeTestCase { private static final String indexName = "test_coordinator_search_phase_metrics"; private final int num_primaries = randomIntBetween(2, 7); @@ -69,7 +70,7 @@ public void testSearchQueryThenFetch() throws InterruptedException { client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), "1" ); - assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC)); + assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, TOOK_DURATION_TOTAL_HISTOGRAM_NAME)); } private void resetMeter() { @@ -87,5 +88,4 @@ private void assertMeasurements(Collection metricNames) { assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); } } - } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 37ac5ce022766..d6a57ba4587c5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -148,7 +148,6 @@ import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; -import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; @@ -2772,8 +2771,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { EmptySystemIndices.INSTANCE.getExecutorSelector(), new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), client, - usageService, - CoordinatorSearchPhaseAPMMetrics.NOOP + usageService ) ); actions.put( From 60640d9d3e36f010ab5bcf42eee60d927ba9b684 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Tue, 7 Oct 2025 15:16:31 -0500 Subject: [PATCH 6/6] part of metric name has to be less than 30 characters --- .../elasticsearch/rest/action/search/SearchResponseMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java b/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java index 89193b258f14a..7645dc4b32fbd 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/SearchResponseMetrics.java @@ -43,7 +43,7 @@ public String getDisplayName() { public static final String TOOK_DURATION_TOTAL_HISTOGRAM_NAME = "es.search_response.took_durations.histogram"; public static final String RESPONSE_COUNT_TOTAL_COUNTER_NAME = "es.search_response.response_count.total"; - public static final String QUERY_SEARCH_PHASE_METRIC = "es.search_response.coordinator_phases_took_durations.query.histogram"; + public static final String QUERY_SEARCH_PHASE_METRIC = "es.search_response.coordinator_phases.query.duration.histogram"; private final LongHistogram tookDurationTotalMillisHistogram; private final LongCounter responseCountTotalCounter;