From 18806fb00e843d515b8aee4c059856b73a99ed86 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Wed, 1 Oct 2025 09:54:41 -0500 Subject: [PATCH 1/3] checkpoint --- .../search/AbstractSearchAsyncAction.java | 18 +- .../search/CanMatchPreFilterSearchPhase.java | 1 + .../action/search/DfsQueryPhase.java | 1 + .../action/search/ExpandSearchPhase.java | 1 + .../action/search/FetchLookupFieldsPhase.java | 1 + .../action/search/FetchSearchPhase.java | 1 + .../SearchDfsQueryThenFetchAsyncAction.java | 7 +- .../SearchQueryThenFetchAsyncAction.java | 7 +- .../TransportOpenPointInTimeAction.java | 13 +- .../action/search/TransportSearchAction.java | 12 +- .../CoordinatorSearchPhaseAPMMetrics.java | 60 ++++++ .../elasticsearch/node/NodeConstruction.java | 6 + .../AbstractSearchAsyncActionTests.java | 4 +- .../action/search/MockSearchPhaseContext.java | 4 +- .../action/search/SearchAsyncActionTests.java | 19 +- .../SearchQueryThenFetchAsyncActionTests.java | 7 +- .../search/TransportSearchActionTests.java | 4 +- ...CoordinatorSearchPhaseAPMMetricsTests.java | 190 ++++++++++++++++++ .../snapshots/SnapshotResiliencyTests.java | 4 +- 19 files changed, 338 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/search/stats/CoordinatorSearchPhaseAPMMetrics.java create mode 100644 server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..5be31f8922da1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Releasable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchPhaseResult; @@ -93,6 +94,8 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseMetrics; + private long phaseStartTimeNanos; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -114,7 +117,8 @@ abstract class AbstractSearchAsyncAction exten SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics ) { super(name); this.namedWriteableRegistry = namedWriteableRegistry; @@ -155,6 +159,7 @@ abstract class AbstractSearchAsyncAction exten // at the end of the search addReleasable(resultConsumer); this.clusters = clusters; + this.coordinatorSearchPhaseMetrics = coordinatorSearchPhaseAPMMetrics; } protected void notifyListShards( @@ -374,7 +379,9 @@ protected void executeNextPhase(String currentPhase, Supplier nextP private void executePhase(SearchPhase phase) { try { + phaseStartTimeNanos = timeProvider.relativeCurrentNanosProvider().getAsLong(); phase.run(); + } catch (RuntimeException e) { if (logger.isDebugEnabled()) { logger.debug(() -> format("Failed to execute [%s] while moving to [%s] phase", request, phase.getName()), e); @@ -621,6 +628,7 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { * @param cause the cause of the phase failure */ public void onPhaseFailure(String phase, String msg, Throwable cause) { + recordPhaseTookTime(phase); raisePhaseFailure(new SearchPhaseExecutionException(phase, msg, cause, buildShardFailures())); } @@ -666,9 +674,17 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti * @see #onShardResult(SearchPhaseResult) */ private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() + recordPhaseTookTime(getName()); executeNextPhase(getName(), this::getNextPhase); } + protected void recordPhaseTookTime(String phaseName) { + coordinatorSearchPhaseMetrics.onCoordinatorPhaseDone( + phaseName, + timeProvider.relativeCurrentNanosProvider().getAsLong() - phaseStartTimeNanos + ); + } + /** * Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be * thrown. diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index e42f8127c5e97..753033dc1c989 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -184,6 +184,7 @@ private static boolean assertSearchCoordinationThread() { private void runCoordinatorRewritePhase() { // TODO: the index filter (i.e, `_index:patten`) should be prefiltered on the coordinator assert assertSearchCoordinationThread(); + final long coordinatorStartTimeNanos = timeProvider.relativeCurrentNanosProvider().getAsLong(); final List matchedShardLevelRequests = new ArrayList<>(); for (SearchShardIterator searchShardIterator : shardsIts) { final CanMatchNodeRequest canMatchNodeRequest = new CanMatchNodeRequest( diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 71eb94459548c..3e7d382ea20de 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -66,6 +66,7 @@ class DfsQueryPhase extends SearchPhase { // protected for testing protected SearchPhase nextPhase(AggregatedDfs dfs) { + context.recordPhaseTookTime(getName()); return SearchQueryThenFetchAsyncAction.nextPhase(client, context, queryResult, dfs); } diff --git a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java index 8055ebb1a7358..1d18f812c6675 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java @@ -182,6 +182,7 @@ private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilde } private void onPhaseDone() { + context.recordPhaseTookTime(getName()); context.executeNextPhase(NAME, this::nextPhase); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java index 9aba4efa03bf4..568d81a245156 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java @@ -144,6 +144,7 @@ public void onFailure(Exception e) { } private void sendResponse() { + context.recordPhaseTookTime(getName()); context.sendSearchResponse(searchResponse, queryResults); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index e63a3ef5b979f..2225b75fa67a7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -260,6 +260,7 @@ private void moveToNextPhase( AtomicArray fetchResultsArr, SearchPhaseController.ReducedQueryPhase reducedQueryPhase ) { + context.recordPhaseTookTime(getName()); context.executeNextPhase(NAME, () -> { var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr); context.addReleasable(resp); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index d49542313a712..898bb83aa6be1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters, - Client client + Client client, + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics ) { super( "dfs", @@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), - clusters + clusters, + coordinatorSearchPhaseAPMMetrics ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; addReleasable(queryPhaseResultConsumer); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a8f22eb1cc572..5ca06b2158d88 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; @@ -108,7 +109,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction(shardIterators.size()), searchRequest.getMaxConcurrentShardRequests(), - clusters + clusters, + coordinatorSearchPhaseAPMMetrics ) { @Override protected void executePhaseOnShard( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index cd8c561a4ad75..3374c4dc41a4c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -67,6 +67,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.ExecutorSelector; @@ -170,6 +171,7 @@ public class TransportSearchAction extends HandledTransportAction histogramsCache = ConcurrentCollections.newConcurrentMap(); + private final MeterRegistry meterRegistry; + + public CoordinatorSearchPhaseAPMMetrics(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + + public void onCoordinatorPhaseDone(String phaseName, long tookInNanos) { + LongHistogram histogram = histogramsCache.computeIfAbsent(phaseName, this::createHistogram); + if (histogram != null) { + recordPhaseLatency(histogram, tookInNanos); + } else { + throw new IllegalStateException("phase [" + phaseName + "] not found"); + } + } + + private LongHistogram createHistogram(String phaseName) { + return meterRegistry.registerLongHistogram( + String.format(Locale.ROOT, metricNameFormat, phaseName), + String.format( + Locale.ROOT, + "%s phase execution times at the coordinator level, expressed as a histogram", + phaseName.toUpperCase(Locale.ROOT) + ), + "ms" + ); + } + + protected void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) { + histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos)); + } +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index f1d5b6cc804bf..9a72c5593b50a 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -122,6 +122,7 @@ import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.RootObjectMapperNamespaceValidator; import org.elasticsearch.index.mapper.SourceFieldMetrics; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.indices.ExecutorSelector; @@ -863,6 +864,10 @@ private void construct( new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry()) ); + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics = new CoordinatorSearchPhaseAPMMetrics( + telemetryProvider.getMeterRegistry() + ); + List slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class); // NOTE: the response of index/search slow log fields below must be calculated dynamically on every call // because the responses may change dynamically at runtime @@ -1360,6 +1365,7 @@ public Map queryFields() { b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService); b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService); b.bind(MergeMetrics.class).toInstance(mergeMetrics); + b.bind(CoordinatorSearchPhaseAPMMetrics.class).toInstance(coordinatorSearchPhaseAPMMetrics); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..9d213f279b3a7 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -88,7 +89,8 @@ private AbstractSearchAsyncAction createAction( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 16e90064ca929..2a66decbc7211 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -66,7 +67,8 @@ public MockSearchPhaseContext(int numShards) { new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()), new ArraySearchPhaseResults<>(numShards), 5, - null + null, + CoordinatorSearchPhaseAPMMetrics.NOOP ); this.numShards = numShards; numSuccess = new AtomicInteger(numShards); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index afd3bee4c4ab8..63848a0123f89 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.Index; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; @@ -111,7 +112,8 @@ public void testSkipSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -218,7 +220,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -334,7 +337,8 @@ public void sendFreeContext( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -464,7 +468,8 @@ public void sendFreeContext( null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected void executePhaseOnShard( @@ -572,7 +577,8 @@ public void testAllowPartialResults() throws InterruptedException { null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override @@ -670,7 +676,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(searchShardIterators.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 2a9d12b27507d..24d3399a882f9 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.lucene.grouping.TopFieldGroups; import org.elasticsearch.search.DocValueFormat; @@ -211,7 +212,8 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, null, - false + false, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected SearchPhase getNextPhase() { @@ -407,7 +409,8 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, null, - false + false, + CoordinatorSearchPhaseAPMMetrics.NOOP ) { @Override protected SearchPhase getNextPhase() { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 61aa05f703018..28cb888bd3984 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -1810,7 +1811,8 @@ protected void doWriteTo(StreamOutput out) throws IOException { null, new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), client, - new UsageService() + new UsageService(), + CoordinatorSearchPhaseAPMMetrics.NOOP ); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java new file mode 100644 index 0000000000000..e237c05f54b7e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java @@ -0,0 +1,190 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.TelemetryMetrics; + +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; + +public class CoordinatorSearchPhaseAPMMetricsTests extends ESSingleNodeTestCase { + private static final String indexName = "test_coordinator_search_phase_metrics"; + private final int num_primaries = randomIntBetween(128, 133); + + private final Set expectedMetrics = Set.of( + "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram", + "es.search.coordinator.phases.fetch.duration.histogram", + "es.search.coordinator.phases.expand.duration.histogram", + "es.search.coordinator.phases.query.duration.histogram" + ); + + private final Set expectedMetricsWithDfs = Set.of( + "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram", + "es.search.coordinator.phases.fetch.duration.histogram", + "es.search.coordinator.phases.expand.duration.histogram", + "es.search.coordinator.phases.dfs_query.duration.histogram", + "es.search.coordinator.phases.dfs.duration.histogram" + ); + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @Before + private void setUpIndex() throws Exception { + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(indexName); + + prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get(); + prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get(); + + prepareIndex(CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME).setId("1") + .setSource("body", "doc1") + .setRefreshPolicy(IMMEDIATE) + .get(); + prepareIndex(CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME).setId("2") + .setSource("body", "doc2") + .setRefreshPolicy(IMMEDIATE) + .get(); + } + + @After + private void afterTest() { + resetMeter(); + } + + @Override + protected Collection> getPlugins() { + return pluginList(TestTelemetryPlugin.class, CoordinatorSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.class); + } + + public void testMetricsDfsQueryThenFetch() throws InterruptedException { + checkMetricsDfsQueryThenFetch(indexName); + } + + private void checkMetricsDfsQueryThenFetch(String indexName) throws InterruptedException { + assertSearchHitsWithoutFailures( + client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), + "1" + ); + + var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); + assertThat(coordinatorMetrics, hasSize(5)); + assertThat(coordinatorMetrics, equalTo(expectedMetricsWithDfs)); + for (var metricName : coordinatorMetrics) { + List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); + assertThat(measurements, hasSize(1)); + assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); + } + } + + public void testSearchTransportMetricsQueryThenFetch() throws InterruptedException { + checkSearchTransportMetricsQueryThenFetch(indexName); + } + + private void checkSearchTransportMetricsQueryThenFetch(String indexName) throws InterruptedException { + assertSearchHitsWithoutFailures( + client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), + "1" + ); + + var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); + assertThat(coordinatorMetrics, hasSize(4)); + assertThat(coordinatorMetrics, equalTo(expectedMetrics)); + } + + private void resetMeter() { + getTestTelemetryPlugin().resetMeter(); + } + + private TestTelemetryPlugin getTestTelemetryPlugin() { + return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0); + } + + private Set filterForCoordinatorMetrics(List registeredMetrics) { + return registeredMetrics.stream().filter(m -> m.startsWith("es.search.coordinator")).collect(Collectors.toSet()); + } + + public static class TestSystemIndexPlugin extends Plugin implements SystemIndexPlugin { + + static final String INDEX_NAME = ".test-system-index"; + + public TestSystemIndexPlugin() {} + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of( + SystemIndexDescriptor.builder() + .setIndexPattern(INDEX_NAME + "*") + .setPrimaryIndex(INDEX_NAME) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .setMappings(""" + { + "_meta": { + "version": "8.0.0", + "managed_index_mappings_version": 3 + }, + "properties": { + "body": { "type": "keyword" } + } + } + """) + .setThreadPools(ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS) + .setOrigin(ShardSearchPhaseAPMMetricsTests.class.getSimpleName()) + .build() + ); + } + + @Override + public String getFeatureName() { + return ShardSearchPhaseAPMMetricsTests.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "test plugin"; + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d6a57ba4587c5..37ac5ce022766 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -148,6 +148,7 @@ import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; @@ -2771,7 +2772,8 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { EmptySystemIndices.INSTANCE.getExecutorSelector(), new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), client, - usageService + usageService, + CoordinatorSearchPhaseAPMMetrics.NOOP ) ); actions.put( From 8b48c0783056c04df50bf5d8db021c0775061e63 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Thu, 2 Oct 2025 13:30:59 -0500 Subject: [PATCH 2/3] rank phase --- .../search/CanMatchPreFilterSearchPhase.java | 16 +- .../action/search/RankFeaturePhase.java | 1 + .../TransportOpenPointInTimeAction.java | 3 +- .../action/search/TransportSearchAction.java | 3 +- .../search/TransportSearchShardsAction.java | 9 +- .../CoordinatorSearchPhaseAPMMetrics.java | 9 +- .../CanMatchPreFilterSearchPhaseTests.java | 16 +- ...CoordinatorSearchPhaseAPMMetricsTests.java | 242 ++++++++++++++++-- 8 files changed, 261 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 753033dc1c989..a24d92d31696b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -19,6 +19,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.CoordinatorRewriteContext; import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.search.CanMatchShardResponse; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -71,6 +72,7 @@ final class CanMatchPreFilterSearchPhase { private final SearchTask task; private final Executor executor; private final boolean requireAtLeastOneMatch; + private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics; private final FixedBitSet possibleMatches; private final MinAndMax[] minAndMaxes; @@ -90,7 +92,8 @@ private CanMatchPreFilterSearchPhase( SearchTask task, boolean requireAtLeastOneMatch, CoordinatorRewriteContextProvider coordinatorRewriteContextProvider, - ActionListener> listener + ActionListener> listener, + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics ) { this.logger = logger; this.searchTransportService = searchTransportService; @@ -105,6 +108,7 @@ private CanMatchPreFilterSearchPhase( this.requireAtLeastOneMatch = requireAtLeastOneMatch; this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider; this.executor = executor; + this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics; final int size = shardsIts.size(); possibleMatches = new FixedBitSet(size); minAndMaxes = new MinAndMax[size]; @@ -136,7 +140,8 @@ public static SubscribableListener> execute( TransportSearchAction.SearchTimeProvider timeProvider, SearchTask task, boolean requireAtLeastOneMatch, - CoordinatorRewriteContextProvider coordinatorRewriteContextProvider + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider, + CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics ) { if (shardsIts.isEmpty()) { return SubscribableListener.newSucceeded(List.of()); @@ -168,7 +173,8 @@ protected void doRun() { task, requireAtLeastOneMatch, coordinatorRewriteContextProvider, - listener + listener, + coordinatorSearchPhaseAPMMetrics ).runCoordinatorRewritePhase(); } }); @@ -224,6 +230,10 @@ private void runCoordinatorRewritePhase() { checkNoMissingShards(matchedShardLevelRequests); new Round(matchedShardLevelRequests).run(); } + coordinatorSearchPhaseAPMMetrics.onCoordinatorPhaseDone( + "can_match", + timeProvider.relativeCurrentNanosProvider().getAsLong() - coordinatorStartTimeNanos + ); } private void consumeResult(boolean canMatch, ShardSearchRequest request) { diff --git a/server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java b/server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java index 25238711c5c1c..16648a44e5e0f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java @@ -193,6 +193,7 @@ public void onResponse(RankFeatureDoc[] docsWithUpdatedScores) { reducedQueryPhase, topResults ); + context.recordPhaseTookTime(getName()); moveToNextPhase(rankPhaseResults, reducedRankFeaturePhase); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index e7482bba7ae95..e8437bb832b66 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -182,7 +182,8 @@ public void runNewSearchPhase( timeProvider, task, false, - searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis) + searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis), + coordinatorSearchPhaseAPMMetrics ) .addListener( listener.delegateFailureAndWrap( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 3374c4dc41a4c..716617c49bbe6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1627,7 +1627,8 @@ public void runNewSearchPhase( timeProvider, task, requireAtLeastOneMatch, - searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis) + searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis), + coordinatorSearchPhaseAPMMetrics ) .addListener( listener.delegateFailureAndWrap( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java index 507893df0c0c1..3cb2dde87dd9b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -60,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction { result.set(iter); latch.countDown(); @@ -256,7 +258,8 @@ public void sendCanMatch( timeProvider, null, true, - EMPTY_CONTEXT_PROVIDER + EMPTY_CONTEXT_PROVIDER, + CoordinatorSearchPhaseAPMMetrics.NOOP ).addListener(ActionTestUtils.assertNoFailureListener(iter -> { result.set(iter); latch.countDown(); @@ -347,7 +350,8 @@ public void sendCanMatch( timeProvider, null, true, - EMPTY_CONTEXT_PROVIDER + EMPTY_CONTEXT_PROVIDER, + CoordinatorSearchPhaseAPMMetrics.NOOP ).addListener(ActionTestUtils.assertNoFailureListener(iter -> { result.set(iter); latch.countDown(); @@ -446,7 +450,8 @@ public void sendCanMatch( timeProvider, null, shardsIter.size() > shardToSkip.size(), - EMPTY_CONTEXT_PROVIDER + EMPTY_CONTEXT_PROVIDER, + CoordinatorSearchPhaseAPMMetrics.NOOP ).addListener(ActionTestUtils.assertNoFailureListener(iter -> { result.set(iter); latch.countDown(); @@ -1418,7 +1423,8 @@ public void sendCanMatch( timeProvider, null, true, - contextProvider + contextProvider, + CoordinatorSearchPhaseAPMMetrics.NOOP ), requests ); diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java index e237c05f54b7e..9509cf00dd0e3 100644 --- a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/CoordinatorSearchPhaseAPMMetricsTests.java @@ -9,14 +9,37 @@ package org.elasticsearch.search.TelemetryMetrics; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.indices.ExecutorNames; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.rank.RankBuilder; +import org.elasticsearch.search.rank.RankDoc; +import org.elasticsearch.search.rank.RankShardResult; +import org.elasticsearch.search.rank.TestRankBuilder; +import org.elasticsearch.search.rank.TestRankShardResult; +import org.elasticsearch.search.rank.context.QueryPhaseRankCoordinatorContext; +import org.elasticsearch.search.rank.context.QueryPhaseRankShardContext; +import org.elasticsearch.search.rank.context.RankFeaturePhaseRankCoordinatorContext; +import org.elasticsearch.search.rank.context.RankFeaturePhaseRankShardContext; +import org.elasticsearch.search.rank.feature.RankFeatureDoc; +import org.elasticsearch.search.rank.feature.RankFeatureShardResult; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; @@ -24,13 +47,18 @@ import org.junit.After; import org.junit.Before; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -38,21 +66,30 @@ public class CoordinatorSearchPhaseAPMMetricsTests extends ESSingleNodeTestCase { private static final String indexName = "test_coordinator_search_phase_metrics"; - private final int num_primaries = randomIntBetween(128, 133); + private final int num_primaries = randomIntBetween(2, 7); private final Set expectedMetrics = Set.of( - "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram", - "es.search.coordinator.phases.fetch.duration.histogram", + "es.search.coordinator.phases.can_match.duration.histogram", "es.search.coordinator.phases.expand.duration.histogram", + "es.search.coordinator.phases.fetch.duration.histogram", + "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram", "es.search.coordinator.phases.query.duration.histogram" ); private final Set expectedMetricsWithDfs = Set.of( - "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram", + "es.search.coordinator.phases.dfs.duration.histogram", + "es.search.coordinator.phases.dfs_query.duration.histogram", + "es.search.coordinator.phases.expand.duration.histogram", "es.search.coordinator.phases.fetch.duration.histogram", + "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram" + ); + + private final Set expectedMetricsWithRanking = Set.of( "es.search.coordinator.phases.expand.duration.histogram", - "es.search.coordinator.phases.dfs_query.duration.histogram", - "es.search.coordinator.phases.dfs.duration.histogram" + "es.search.coordinator.phases.fetch.duration.histogram", + "es.search.coordinator.phases.fetch_lookup_fields.duration.histogram", + "es.search.coordinator.phases.query.duration.histogram", + "es.search.coordinator.phases.rank_feature.duration.histogram" ); @Override @@ -95,38 +132,195 @@ protected Collection> getPlugins() { } public void testMetricsDfsQueryThenFetch() throws InterruptedException { - checkMetricsDfsQueryThenFetch(indexName); - } - - private void checkMetricsDfsQueryThenFetch(String indexName) throws InterruptedException { assertSearchHitsWithoutFailures( client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), "1" ); var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); - assertThat(coordinatorMetrics, hasSize(5)); assertThat(coordinatorMetrics, equalTo(expectedMetricsWithDfs)); - for (var metricName : coordinatorMetrics) { - List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); - assertThat(measurements, hasSize(1)); - assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); - } + assertMeasurements(expectedMetricsWithDfs); } public void testSearchTransportMetricsQueryThenFetch() throws InterruptedException { - checkSearchTransportMetricsQueryThenFetch(indexName); + assertSearchHitsWithoutFailures( + client().prepareSearch(indexName) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreFilterShardSize(1) + .setQuery(simpleQueryStringQuery("doc1")), + "1" + ); + + var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); + assertThat(coordinatorMetrics, equalTo(expectedMetrics)); + assertMeasurements(expectedMetrics); } - private void checkSearchTransportMetricsQueryThenFetch(String indexName) throws InterruptedException { + public void testSearchMultipleIndices() { assertSearchHitsWithoutFailures( - client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")), + client().prepareSearch(indexName, ShardSearchPhaseAPMMetricsTests.TestSystemIndexPlugin.INDEX_NAME) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreFilterShardSize(1) + .setQuery(simpleQueryStringQuery("doc1")), + "1", "1" ); + var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); + assertThat(coordinatorMetrics, equalTo(expectedMetrics)); + assertMeasurements(expectedMetrics); + } + public void testSearchTransportMetricsScroll() { + assertScrollResponsesAndHitCount( + client(), + TimeValue.timeValueSeconds(60), + client().prepareSearch(indexName).setSize(1).setPreFilterShardSize(1).setQuery(simpleQueryStringQuery("doc1 doc2")), + 2, + (respNum, response) -> {} + ); var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); - assertThat(coordinatorMetrics, hasSize(4)); assertThat(coordinatorMetrics, equalTo(expectedMetrics)); + assertMeasurements(expectedMetrics); + + } + + public void testSearchRanking() { + final String indexName = "index"; + final String rankFeatureFieldName = "field"; + final String searchFieldName = "search_field"; + final String searchFieldValue = "some_value"; + final String fetchFieldName = "fetch_field"; + final String fetchFieldValue = "fetch_value"; + + final int minDocs = 3; + final int maxDocs = 10; + int numDocs = between(minDocs, maxDocs); + createIndex(indexName); + // index some documents + for (int i = 0; i < numDocs; i++) { + prepareIndex(indexName).setId(String.valueOf(i)) + .setSource( + rankFeatureFieldName, + "aardvark_" + i, + searchFieldName, + searchFieldValue, + fetchFieldName, + fetchFieldValue + "_" + i + ) + .get(); + } + indicesAdmin().prepareRefresh(indexName).get(); + + var response = client().prepareSearch(indexName) + .setSource( + new SearchSourceBuilder().query(new TermQueryBuilder(searchFieldName, searchFieldValue)) + .size(2) + .from(2) + .fetchField(fetchFieldName) + .rankBuilder(new TestRankBuilder(RankBuilder.DEFAULT_RANK_WINDOW_SIZE) { + + // no need for more than one queries + @Override + public boolean isCompoundBuilder() { + return false; + } + + @Override + public RankFeaturePhaseRankCoordinatorContext buildRankFeaturePhaseCoordinatorContext( + int size, + int from, + Client client + ) { + return new RankFeaturePhaseRankCoordinatorContext(size, from, DEFAULT_RANK_WINDOW_SIZE, false) { + @Override + protected void computeScores(RankFeatureDoc[] featureDocs, ActionListener scoreListener) { + float[] scores = new float[featureDocs.length]; + for (int i = 0; i < featureDocs.length; i++) { + scores[i] = featureDocs[i].score; + } + scoreListener.onResponse(scores); + } + }; + } + + @Override + public QueryPhaseRankCoordinatorContext buildQueryPhaseCoordinatorContext(int size, int from) { + return new QueryPhaseRankCoordinatorContext(RankBuilder.DEFAULT_RANK_WINDOW_SIZE) { + @Override + public ScoreDoc[] rankQueryPhaseResults( + List querySearchResults, + SearchPhaseController.TopDocsStats topDocStats + ) { + List rankDocs = new ArrayList<>(); + for (int i = 0; i < querySearchResults.size(); i++) { + QuerySearchResult querySearchResult = querySearchResults.get(i); + TestRankShardResult shardResult = (TestRankShardResult) querySearchResult.getRankShardResult(); + for (RankDoc trd : shardResult.testRankDocs) { + trd.shardIndex = i; + rankDocs.add(trd); + } + } + rankDocs.sort(Comparator.comparing((RankDoc doc) -> doc.score).reversed()); + RankDoc[] topResults = rankDocs.stream().limit(rankWindowSize).toArray(RankDoc[]::new); + topDocStats.fetchHits = topResults.length; + return topResults; + } + }; + } + + @Override + public QueryPhaseRankShardContext buildQueryPhaseShardContext(List queries, int from) { + return new QueryPhaseRankShardContext(queries, from) { + + @Override + public int rankWindowSize() { + return DEFAULT_RANK_WINDOW_SIZE; + } + + @Override + public RankShardResult combineQueryPhaseResults(List rankResults) { + // we know we have just 1 query, so return all the docs from it + return new TestRankShardResult( + Arrays.stream(rankResults.getFirst().scoreDocs) + .map(x -> new RankDoc(x.doc, x.score, x.shardIndex)) + .limit(rankWindowSize()) + .toArray(RankDoc[]::new) + ); + } + }; + } + + @Override + public RankFeaturePhaseRankShardContext buildRankFeaturePhaseShardContext() { + return new RankFeaturePhaseRankShardContext(rankFeatureFieldName) { + @Override + public RankShardResult buildRankFeatureShardResult(SearchHits hits, int shardId) { + RankFeatureDoc[] rankFeatureDocs = new RankFeatureDoc[hits.getHits().length]; + for (int i = 0; i < hits.getHits().length; i++) { + SearchHit hit = hits.getHits()[i]; + rankFeatureDocs[i] = new RankFeatureDoc(hit.docId(), hit.getScore(), shardId); + rankFeatureDocs[i].featureData(parseFeatureData(hit, rankFeatureFieldName)); + rankFeatureDocs[i].score = randomFloat(); + rankFeatureDocs[i].rank = i + 1; + } + return new RankFeatureShardResult(rankFeatureDocs); + } + }; + } + }) + ) + .get(); + assertNoFailures(response); + var coordinatorMetrics = filterForCoordinatorMetrics(getTestTelemetryPlugin().getRegisteredMetrics(InstrumentType.LONG_HISTOGRAM)); + assertThat(coordinatorMetrics, equalTo(expectedMetricsWithRanking)); + assertMeasurements(expectedMetricsWithRanking); + } + + private List parseFeatureData(SearchHit hit, String fieldName) { + Object fieldValue = hit.getFields().get(fieldName).getValue(); + @SuppressWarnings("unchecked") + List fieldValues = fieldValue instanceof List ? (List) fieldValue : List.of(String.valueOf(fieldValue)); + return fieldValues; } private void resetMeter() { @@ -141,6 +335,14 @@ private Set filterForCoordinatorMetrics(List registeredMetrics) return registeredMetrics.stream().filter(m -> m.startsWith("es.search.coordinator")).collect(Collectors.toSet()); } + private void assertMeasurements(Collection metricNames) { + for (var metricName : metricNames) { + List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName); + assertThat(measurements, hasSize(1)); + assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L)); + } + } + public static class TestSystemIndexPlugin extends Plugin implements SystemIndexPlugin { static final String INDEX_NAME = ".test-system-index"; From cecc64172be1469ca332da1f6cc02ae5c5da5173 Mon Sep 17 00:00:00 2001 From: Chris Parrinello Date: Thu, 2 Oct 2025 13:43:20 -0500 Subject: [PATCH 3/3] Update docs/changelog/135868.yaml --- docs/changelog/135868.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/135868.yaml diff --git a/docs/changelog/135868.yaml b/docs/changelog/135868.yaml new file mode 100644 index 0000000000000..d440fad2f12d8 --- /dev/null +++ b/docs/changelog/135868.yaml @@ -0,0 +1,5 @@ +pr: 135868 +summary: Add APM metrics for duration of search phases at the coordinating node +area: Search +type: enhancement +issues: []