Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136828.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136828
summary: Can match phase coordinator duration APM metric
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -58,6 +59,8 @@
*/
final class CanMatchPreFilterSearchPhase {

private static final String PHASE_NAME = "can_match";

private final Logger logger;
private final SearchRequest request;
private final List<SearchShardIterator> shardsIts;
Expand Down Expand Up @@ -90,7 +93,8 @@ private CanMatchPreFilterSearchPhase(
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
ActionListener<List<SearchShardIterator>> listener
ActionListener<List<SearchShardIterator>> listener,
SearchResponseMetrics searchResponseMetrics
) {
this.logger = logger;
this.searchTransportService = searchTransportService;
Expand Down Expand Up @@ -136,20 +140,41 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
SearchResponseMetrics searchResponseMetrics
) {
if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
}
final SubscribableListener<List<SearchShardIterator>> listener = new SubscribableListener<>();
long phaseStartTimeInNanos = System.nanoTime();

listener.addListener(new ActionListener<>() {
@Override
public void onResponse(List<SearchShardIterator> shardsIts) {
// we only want to record the phase duration if this call to execute is running on the coordinator node
// as part of the can-match phase or a search request. It will be null if this is the data node round trip can-match
// execution
// or an open PIT request
if (searchResponseMetrics != null) {
searchResponseMetrics.recordSearchPhaseDuration(PHASE_NAME, System.nanoTime() - phaseStartTimeInNanos);
}
}

@Override
public void onFailure(Exception e) {
// do not record duration of failed phases
}
});

// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "start", e, ShardSearchFailure.EMPTY_ARRAY));
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "start", e, ShardSearchFailure.EMPTY_ARRAY));
}

@Override
Expand All @@ -168,7 +193,8 @@ protected void doRun() {
task,
requireAtLeastOneMatch,
coordinatorRewriteContextProvider,
listener
listener,
searchResponseMetrics
).runCoordinatorRewritePhase();
}
});
Expand Down Expand Up @@ -249,7 +275,7 @@ private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAnd

private void checkNoMissingShards(List<SearchShardIterator> shards) {
assert assertSearchCoordinationThread();
SearchPhase.doCheckNoMissingShards("can_match", request, shards);
SearchPhase.doCheckNoMissingShards(PHASE_NAME, request, shards);
}

private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchShardIterator> shards) {
Expand Down Expand Up @@ -386,7 +412,7 @@ public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "round", e, ShardSearchFailure.EMPTY_ARRAY));
listener.onFailure(new SearchPhaseExecutionException(PHASE_NAME, "round", e, ShardSearchFailure.EMPTY_ARRAY));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void runNewSearchPhase(
timeProvider,
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
null
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,8 @@ public void runNewSearchPhase(
timeProvider,
task,
requireAtLeastOneMatch,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
timeProvider,
(SearchTask) task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
null // do not record a phase duration for can_match in search_shards
)
.addListener(
delegate.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) {
);

phaseNameToDurationHistogram = Map.of(
"can_match",
meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "can_match"),
"The search phase can_match duration in milliseconds at the coordinator, expressed as a histogram",
"millis"
),
"dfs",
meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, SEARCH_PHASE_METRIC_FORMAT, "dfs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
null
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -256,7 +257,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
null
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -347,7 +349,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
null
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -446,7 +449,8 @@ public void sendCanMatch(
timeProvider,
null,
shardsIter.size() > shardToSkip.size(),
EMPTY_CONTEXT_PROVIDER
EMPTY_CONTEXT_PROVIDER,
null
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -1418,7 +1422,8 @@ public void sendCanMatch(
timeProvider,
null,
true,
contextProvider
contextProvider,
null
),
requests
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class SearchPhaseCoordinatorAPMMetricsTests extends ESSingleNodeTestCase
private static final String indexName = "test_coordinator_search_phase_metrics";
private final int num_primaries = randomIntBetween(2, 7);

private static final String CAN_MATCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.can_match.histogram";
private static final String DFS_QUERY_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs_query.histogram";
private static final String DFS_SEARCH_PHASE_METRIC = "es.search_response.took_durations.dfs.histogram";
private static final String FETCH_SEARCH_PHASE_METRIC = "es.search_response.took_durations.fetch.histogram";
Expand Down Expand Up @@ -83,6 +84,9 @@ public void testSearchQueryThenFetch() {
"1"
);
assertMeasurements(List.of(QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertNotMeasured(
List.of(CAN_MATCH_SEARCH_PHASE_METRIC, DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC)
);
}

public void testDfsSearch() {
Expand All @@ -91,10 +95,12 @@ public void testDfsSearch() {
"1"
);
assertMeasurements(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertNotMeasured(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
}

public void testPointInTime() {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10));
request.indexFilter(simpleQueryStringQuery("doc1"));
OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
BytesReference pointInTimeId = response.getPointInTimeId();

Expand All @@ -107,11 +113,49 @@ public void testPointInTime() {
"1"
);
assertMeasurements(List.of(OPEN_PIT_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC));
assertNotMeasured(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

public void testPointInTimeWithPreFiltering() {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(10));
request.indexFilter(simpleQueryStringQuery("doc1"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use a more realistic index filter? For instance a date range query instead? I was also wondering if setting the index filter is required for this test to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to a more complicated test. For this test to have two runs of can-match, we need to make sure we have at least two shards remaining after the open PIT can match. We have to have and index filter on the open PIT for can-match to run during that operation.

OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
BytesReference pointInTimeId = response.getPointInTimeId();

try {
assertSearchHitsWithoutFailures(
client().prepareSearch()
.setPointInTime(new PointInTimeBuilder(pointInTimeId))
.setSize(1)
.setPreFilterShardSize(1)
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);
assertMeasurements(
List.of(OPEN_PIT_SEARCH_PHASE_METRIC, CAN_MATCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC)
);
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC));
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

public void testCanMatchSearch() {
assertSearchHitsWithoutFailures(
client().prepareSearch(indexName)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreFilterShardSize(1)
.setQuery(simpleQueryStringQuery("doc1")),
"1"
);

assertMeasurements(List.of(CAN_MATCH_SEARCH_PHASE_METRIC, FETCH_SEARCH_PHASE_METRIC, QUERY_SEARCH_PHASE_METRIC));
assertNotMeasured(List.of(DFS_SEARCH_PHASE_METRIC, DFS_QUERY_SEARCH_PHASE_METRIC, OPEN_PIT_SEARCH_PHASE_METRIC));
}

private void resetMeter() {
getTestTelemetryPlugin().resetMeter();
}
Expand All @@ -120,6 +164,13 @@ private TestTelemetryPlugin getTestTelemetryPlugin() {
return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0);
}

private void assertNotMeasured(Collection<String> metricNames) {
for (var metricName : metricNames) {
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
assertThat(measurements, hasSize(0));
}
}

private void assertMeasurements(Collection<String> metricNames) {
for (var metricName : metricNames) {
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(metricName);
Expand Down