Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136828.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136828
summary: Can match phase coordinator duration APM metric
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -58,6 +59,8 @@
*/
final class CanMatchPreFilterSearchPhase {

private static final String PHASE_NAME = "can_match";

private final Logger logger;
private final SearchRequest request;
private final List<SearchShardIterator> shardsIts;
Expand Down Expand Up @@ -136,20 +139,35 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
SearchResponseMetrics searchResponseMetrics
) {
if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
}
final SubscribableListener<List<SearchShardIterator>> listener = new SubscribableListener<>();
long phaseStartTimeInNanos = System.nanoTime();

listener.addListener(new ActionListener<>() {
@Override
public void onResponse(List<SearchShardIterator> shardsIts) {
searchResponseMetrics.recordSearchPhaseDuration(PHASE_NAME, System.nanoTime() - phaseStartTimeInNanos);
}

@Override
public void onFailure(Exception e) {
// do not record duration of failed phases
}
});

// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "start", e, ShardSearchFailure.EMPTY_ARRAY));
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "start", e, ShardSearchFailure.EMPTY_ARRAY));
}

@Override
Expand Down Expand Up @@ -249,7 +267,7 @@ private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAnd

private void checkNoMissingShards(List<SearchShardIterator> shards) {
assert assertSearchCoordinationThread();
SearchPhase.doCheckNoMissingShards("can_match", request, shards);
SearchPhase.doCheckNoMissingShards(PHASE_NAME, request, shards);
}

private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchShardIterator> shards) {
Expand Down Expand Up @@ -386,7 +404,7 @@ public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "round", e, ShardSearchFailure.EMPTY_ARRAY));
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "round", e, ShardSearchFailure.EMPTY_ARRAY));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void runNewSearchPhase(
timeProvider,
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,8 @@ public void runNewSearchPhase(
timeProvider,
task,
requireAtLeastOneMatch,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
private final ProjectResolver projectResolver;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final ThreadPool threadPool;
private final SearchResponseMetrics searchResponseMetrics;

@Inject
public TransportSearchShardsAction(
Expand All @@ -70,7 +72,8 @@ public TransportSearchShardsAction(
TransportSearchAction transportSearchAction,
SearchTransportService searchTransportService,
ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
SearchResponseMetrics searchResponseMetrics
) {
super(
TYPE.name(),
Expand All @@ -88,6 +91,7 @@ public TransportSearchShardsAction(
this.projectResolver = projectResolver;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.threadPool = transportService.getThreadPool();
this.searchResponseMetrics = searchResponseMetrics;
}

@Override
Expand Down Expand Up @@ -180,7 +184,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
timeProvider,
(SearchTask) task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
delegate.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) {
);

phaseNameToDurationHistogram = Map.of(
"can_match",
meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "can_match"),
"The search phase can_match duration in milliseconds at the coordinator, expressed as a histogram",
"millis"
),
"dfs",
meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "dfs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.indices.DateFieldRangeInfo;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregationBuilder;
Expand All @@ -53,6 +54,7 @@
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -161,7 +163,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -256,7 +259,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -347,7 +351,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -446,7 +451,8 @@ public void sendCanMatch(
timeProvider,
null,
shardsIter.size() > shardToSkip.size(),
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -1418,7 +1424,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
contextProvider
contextProvider,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
),
requests
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.search.TransportSearchShardsAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.builder.PointInTimeBuilder;
Expand All @@ -39,8 +43,10 @@

public class SearchPhaseCoordinatorAPMMetricsTests extends ESSingleNodeTestCase {
private static final String indexName = "test_coordinator_search_phase_metrics";
private static final String secondIndexName = "test_coordinator_search_phase_metrics_2";
private final int num_primaries = randomIntBetween(2, 7);

private static final String CAN_MATCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.can_match.histogram";
private static final String DFS_QUERY_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs_query.histogram";
private static final String DFS_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs.histogram";
private static final String FETCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.fetch.histogram";
Expand All @@ -63,8 +69,22 @@ private void setUpIndex() throws Exception {
);
ensureGreen(indexName);

prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get();
prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get();
prepareIndex(indexName).setId("1").setSource("body", "doc1", "@timestamp", "2024-11-01").setRefreshPolicy(IMMEDIATE).get();
prepareIndex(indexName).setId("2").setSource("body", "doc2", "@timestamp", "2024-12-01").setRefreshPolicy(IMMEDIATE).get();
prepareIndex(indexName).setId("3").setSource("body", "doc3", "@timestamp", "2025-01-01").setRefreshPolicy(IMMEDIATE).get();

createIndex(
secondIndexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(secondIndexName);

prepareIndex(secondIndexName).setId("4").setSource("body", "doc1", "@timestamp", "2025-11-01").setRefreshPolicy(IMMEDIATE).get();
prepareIndex(secondIndexName).setId("5").setSource("body", "doc2", "@timestamp", "2025-12-01").setRefreshPolicy(IMMEDIATE).get();
prepareIndex(secondIndexName).setId("6").setSource("body", "doc3", "@timestamp", "2026-01-01").setRefreshPolicy(IMMEDIATE).get();
}

@After
Expand All @@ -82,19 +102,24 @@ public void testSearchQueryThenFetch() {
client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(
List.of(CAN_MATCH_SEARCH_PHASE_METRIC, DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC)
);
}

public void testDfsSearch() {
assertSearchHitsWithoutFailures(
client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
}

public void testPointInTime() {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10));
request.indexFilter(simpleQueryStringQuery("doc1"));
OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
BytesReference pointInTimeId = response.getPointInTimeId();

Expand All @@ -106,12 +131,67 @@ public void testPointInTime() {
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

public void testPointInTimeWithPreFiltering() {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName, secondIndexName).keepAlive(TimeValue.timeValueMinutes(10));
request.indexFilter(new RangeQueryBuilder("@timestamp").gte("2025-07-01"));
OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
BytesReference pointInTimeId = response.getPointInTimeId();

try {
assertSearchHitsWithoutFailures(
client().prepareSearch()
.setPointInTime(new PointInTimeBuilder(pointInTimeId))
.setSize(1)
.setPreFilterShardSize(1)
.setQuery(simpleQueryStringQuery("doc3")),
"6"
);
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertMeasurements(
List.of(CAN_MATCH_SEARCH_PHASE_METRIC),
2 // one during open PIT, one during can-match phase of search
);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

public void testCanMatchSearch() {
assertSearchHitsWithoutFailures(
client().prepareSearch(indexName)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreFilterShardSize(1)
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);

assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
}

public void testSearchShards() {
var request = new SearchShardsRequest(
new String[] { indexName },
SearchRequest.DEFAULT_INDICES_OPTIONS,
simpleQueryStringQuery("doc1"),
null,
null,
randomBoolean(),
randomBoolean() ? null : randomAlphaOfLength(10)
);
var resp = client().execute(TransportSearchShardsAction.TYPE, request).actionGet();
assertThat(resp.getGroups(), hasSize(num_primaries));
assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC), 1);
}

private void resetMeter() {
getTestTelemetryPlugin().resetMeter();
}
Expand All @@ -120,11 +200,18 @@ private TestTelemetryPlugin getTestTelemetryPlugin() {
return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0);
}

private void assertMeasurements(Collection<String> metricNames) {
private void assertNotMeasured(Collection<String> metricNames) {
for (var metricName : metricNames) {
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
assertThat(metricName, measurements, hasSize(0));
}
}

private void assertMeasurements(Collection<String> metricNames, int numberOfMeasurements) {
for (var metricName : metricNames) {
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
assertThat(measurements, hasSize(1));
assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L));
assertThat(metricName, measurements, hasSize(numberOfMeasurements));
assertThat(metricName, measurements.getFirst().getLong(), greaterThanOrEqualTo(0L));
}
}
}