Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136828.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136828
summary: Can match phase coordinator duration APM metric
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -58,6 +59,8 @@
*/
final class CanMatchPreFilterSearchPhase {

private static final String PHASE_NAME = "can_match";

private final Logger logger;
private final SearchRequest request;
private final List<SearchShardIterator> shardsIts;
Expand Down Expand Up @@ -136,20 +139,35 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
SearchResponseMetrics searchResponseMetrics
) {
if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
}
final SubscribableListener<List<SearchShardIterator>> listener = new SubscribableListener<>();
long phaseStartTimeInNanos = System.nanoTime();

listener.addListener(new ActionListener<>() {
@Override
public void onResponse(List<SearchShardIterator> shardsIts) {
searchResponseMetrics.recordSearchPhaseDuration(PHASE_NAME, System.nanoTime() - phaseStartTimeInNanos);
}

@Override
public void onFailure(Exception e) {
// do not record duration of failed phases
}
});

// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "start", e, ShardSearchFailure.EMPTY_ARRAY));
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "start", e, ShardSearchFailure.EMPTY_ARRAY));
}

@Override
Expand Down Expand Up @@ -249,7 +267,7 @@ private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAnd

private void checkNoMissingShards(List<SearchShardIterator> shards) {
assert assertSearchCoordinationThread();
SearchPhase.doCheckNoMissingShards("can_match", request, shards);
SearchPhase.doCheckNoMissingShards(PHASE_NAME, request, shards);
}

private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchShardIterator> shards) {
Expand Down Expand Up @@ -386,7 +404,7 @@ public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "round", e, ShardSearchFailure.EMPTY_ARRAY));
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "round", e, ShardSearchFailure.EMPTY_ARRAY));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void runNewSearchPhase(
timeProvider,
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,8 @@ public void runNewSearchPhase(
timeProvider,
task,
requireAtLeastOneMatch,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
private final ProjectResolver projectResolver;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final ThreadPool threadPool;
private final SearchResponseMetrics searchResponseMetrics;

@Inject
public TransportSearchShardsAction(
Expand All @@ -70,7 +72,8 @@ public TransportSearchShardsAction(
TransportSearchAction transportSearchAction,
SearchTransportService searchTransportService,
ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
SearchResponseMetrics searchResponseMetrics
) {
super(
TYPE.name(),
Expand All @@ -88,6 +91,7 @@ public TransportSearchShardsAction(
this.projectResolver = projectResolver;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.threadPool = transportService.getThreadPool();
this.searchResponseMetrics = searchResponseMetrics;
}

@Override
Expand Down Expand Up @@ -175,7 +179,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
timeProvider,
(SearchTask) task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
delegate.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) {
);

phaseNameToDurationHistogram = Map.of(
"can_match",
meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "can_match"),
"The search phase can_match duration in milliseconds at the coordinator, expressed as a histogram",
"millis"
),
"dfs",
meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "dfs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.indices.DateFieldRangeInfo;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregationBuilder;
Expand All @@ -53,6 +54,7 @@
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -161,7 +163,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -256,7 +259,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -347,7 +351,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -446,7 +451,8 @@ public void sendCanMatch(
timeProvider,
null,
shardsIter.size() > shardToSkip.size(),
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -1418,7 +1424,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
contextProvider
contextProvider,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
),
requests
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class SearchPhaseCoordinatorAPMMetricsTests extends ESSingleNodeTestCase
private static final String indexName = "test_coordinator_search_phase_metrics";
private final int num_primaries = randomIntBetween(2, 7);

private static final String CAN_MATCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.can_match.histogram";
private static final String DFS_QUERY_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs_query.histogram";
private static final String DFS_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs.histogram";
private static final String FETCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.fetch.histogram";
Expand Down Expand Up @@ -82,19 +83,45 @@ public void testSearchQueryThenFetch() {
client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(
List.of(CAN_MATCH_SEARCH_PHASE_METRIC, DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC)
);
}

public void testDfsSearch() {
assertSearchHitsWithoutFailures(
client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
}

public void testPointInTime() {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10));
request.indexFilter(simpleQueryStringQuery("doc1"));
OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
BytesReference pointInTimeId = response.getPointInTimeId();

try {
assertSearchHitsWithoutFailures(
client().prepareSearch()
.setPointInTime(new PointInTimeBuilder(pointInTimeId))
.setSize(1)
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

public void testPointInTimeWithPreFiltering() {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10));
request.indexFilter(simpleQueryStringQuery("doc1"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use a more realistic index filter? For instance a date range query instead? I was also wondering if setting the index filter is required for this test to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to a more complicated test. For this test to have two runs of can-match, we need to make sure we have at least two shards remaining after the open PIT can match. We have to have and index filter on the open PIT for can-match to run during that operation.

OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
BytesReference pointInTimeId = response.getPointInTimeId();

Expand All @@ -103,15 +130,34 @@ public void testPointInTime() {
client().prepareSearch()
.setPointInTime(new PointInTimeBuilder(pointInTimeId))
.setSize(1)
.setPreFilterShardSize(1)
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC), 1);
assertMeasurements(
List.of(CAN_MATCH_SEARCH_PHASE_METRIC),
2 // one during open PIT, one during can-match phase of search
);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

public void testCanMatchSearch() {
assertSearchHitsWithoutFailures(
client().prepareSearch(indexName)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreFilterShardSize(1)
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);

assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC), 1);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
}

private void resetMeter() {
getTestTelemetryPlugin().resetMeter();
}
Expand All @@ -120,11 +166,18 @@ private TestTelemetryPlugin getTestTelemetryPlugin() {
return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0);
}

private void assertMeasurements(Collection<String> metricNames) {
private void assertNotMeasured(Collection<String> metricNames) {
for (var metricName : metricNames) {
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
assertThat(metricName, measurements, hasSize(0));
}
}

private void assertMeasurements(Collection<String> metricNames, int numberOfMeasurements) {
for (var metricName : metricNames) {
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
assertThat(measurements, hasSize(1));
assertThat(measurements.getFirst().getLong(), greaterThanOrEqualTo(0L));
assertThat(metricName, measurements, hasSize(numberOfMeasurements));
assertThat(metricName, measurements.getFirst().getLong(), greaterThanOrEqualTo(0L));
}
}
}