Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;
protected final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;

// protected for tests
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
Expand All @@ -114,7 +116,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchTask task,
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(name);
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -155,6 +158,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
// at the end of the search
addReleasable(resultConsumer);
this.clusters = clusters;
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
}

protected void notifyListShards(
Expand Down Expand Up @@ -666,9 +670,12 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
* @see #onShardResult(SearchPhaseResult)
*/
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
recordPhaseLatency();
executeNextPhase(getName(), this::getNextPhase);
}

protected void recordPhaseLatency() {}

/**
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
* thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
Client client
Client client,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(
"dfs",
Expand All @@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters
clusters,
coordinatorSearchPhaseAPMMetrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit surprised that we don't record latency for this. I don't want to confuse you , I don't mean reporting dfs phase latency at the coordinator. What I mean is that DFS query then fetch has an additional DFS roundtrip in the beginning, but after DFS it executes the query phase, yet the codepath is all in SearchDfsQueryThenFetchAction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah its a separate code path if you do a DFS query. It joins the "normal" code path when you get back to the Fetch and subsequent phases. In my original PR, it was reporting a "dfs" and a "dfs_query" phase duration. Not for this PR but for the future one where we record the DFS phase metric, do we want to record a separate DFS roundtrip metric? Also, do we want to differentiate the two code query code paths with two different metrics (DFS and non-DFS query phases) or record both paths with the same query phase metric?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, I'd keep the two variations of query phase separate.

);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
addReleasable(queryPhaseResultConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.SimpleRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
private volatile BottomSortValuesCollector bottomSortCollector;
private final Client client;
private final boolean batchQueryPhase;
private long phaseStartTimeNanos;

SearchQueryThenFetchAsyncAction(
Logger logger,
Expand All @@ -110,7 +112,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
SearchTask task,
SearchResponse.Clusters clusters,
Client client,
boolean batchQueryPhase
boolean batchQueryPhase,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(
"query",
Expand All @@ -129,7 +132,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
task,
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters
clusters,
coordinatorSearchPhaseAPMMetrics
);
this.topDocsSize = getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down Expand Up @@ -421,6 +425,7 @@ private static boolean isPartOfPIT(

@Override
protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
phaseStartTimeNanos = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be potentially streamlined into the run method in the parent class. We may be able to even report the latency at the coordinator in a generic manner, with all the code in AbstractSearchAsyncAction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My other PR attempted the "generic" path as well. There is some weirdness around the PIT creation and queries that caused a lot of issues in CI but I think I have an idea of where the issue was. I'll try and move this code into the AbstractSearchAsyncAction and that should cover at least DFS and query phases.

I'll have to check if that helps fetch other subsequent phases. The issue with them is that they don't subclass off of AbstractSearchAsyncAction but reference it via a passed in context so those phases might not hit run to set the start time of the phase. I'll have to do some debug tracing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I'd limit the change to tracking the two variations of query phase and perhaps open point in time. The AbstractSearchAsyncAction subclasses to be more concrete. I would not consider the other so called search phases that important to be honest. I care about can match, dfs, query, fetch. We can expand further later as needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, does "dfs" cover both the "DFS roundtrip" and the DFS specific query implementation (i.e. basically timing SearchDfsQueryThenFetchAction)? Or do we want "DFS roundtrip", "normal query", "DFS specific query" in addition to can match and fetch times?

if (this.batchQueryPhase == false) {
super.doRun(shardIndexMap);
return;
Expand Down Expand Up @@ -564,6 +569,12 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
}
}

@Override
protected void recordPhaseLatency() {
final long tookInNanos = System.nanoTime() - phaseStartTimeNanos;
coordinatorSearchPhaseAPMMetrics.onQueryPhaseDone(tookInNanos);
}

public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]";

static void registerNodeSearchAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
private final TransportService transportService;
private final SearchService searchService;
private final ClusterService clusterService;
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;

@Inject
public TransportOpenPointInTimeAction(
Expand All @@ -79,7 +81,8 @@ public TransportOpenPointInTimeAction(
TransportSearchAction transportSearchAction,
SearchTransportService searchTransportService,
NamedWriteableRegistry namedWriteableRegistry,
ClusterService clusterService
ClusterService clusterService,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(TYPE.name(), transportService, actionFilters, OpenPointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.transportService = transportService;
Expand All @@ -88,6 +91,7 @@ public TransportOpenPointInTimeAction(
this.searchTransportService = searchTransportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterService = clusterService;
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
transportService.registerRequestHandler(
OPEN_SHARD_READER_CONTEXT_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Expand Down Expand Up @@ -246,7 +250,8 @@ void runOpenPointInTimePhase(
task,
new ArraySearchPhaseResults<>(shardIterators.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters
clusters,
coordinatorSearchPhaseAPMMetrics
) {
@Override
protected void executePhaseOnShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.ExecutorSelector;
Expand Down Expand Up @@ -170,6 +171,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final UsageService usageService;
private final boolean collectCCSTelemetry;
private final TimeValue forceConnectTimeoutSecs;
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;

@Inject
public TransportSearchAction(
Expand All @@ -188,7 +190,8 @@ public TransportSearchAction(
ExecutorSelector executorSelector,
SearchResponseMetrics searchResponseMetrics,
Client client,
UsageService usageService
UsageService usageService,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(TYPE.name(), transportService, actionFilters, SearchRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
Expand Down Expand Up @@ -218,6 +221,7 @@ public TransportSearchAction(
this.searchResponseMetrics = searchResponseMetrics;
this.client = client;
this.usageService = usageService;
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
}

Expand Down Expand Up @@ -1688,7 +1692,8 @@ public void runNewSearchPhase(
clusterState,
task,
clusters,
client
client,
coordinatorSearchPhaseAPMMetrics
);
} else {
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
Expand All @@ -1709,7 +1714,8 @@ public void runNewSearchPhase(
task,
clusters,
client,
searchService.batchQueryPhase()
searchService.batchQueryPhase(),
coordinatorSearchPhaseAPMMetrics
);
}
success = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.search.stats;

import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.concurrent.TimeUnit;

/**
* Coordinator level APM metrics for search phases. Records phase execution times as histograms.
*/
public class CoordinatorSearchPhaseAPMMetrics {

public static final CoordinatorSearchPhaseAPMMetrics NOOP = new CoordinatorSearchPhaseAPMMetrics(MeterRegistry.NOOP);

public static final String QUERY_SEARCH_PHASE_METRIC = "es.search.coordinator.phases.query.duration.histogram";
private final LongHistogram queryPhaseMetric;

public CoordinatorSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
this.queryPhaseMetric = meterRegistry.registerLongHistogram(
QUERY_SEARCH_PHASE_METRIC,
"Query search phase execution times at the coordinator level, expressed as a histogram",
"ms"
);
}

public void onQueryPhaseDone(long tookInNanos) {
recordPhaseLatency(queryPhaseMetric, tookInNanos);
}

protected void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) {
histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.RootObjectMapperNamespaceValidator;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.ExecutorSelector;
Expand Down Expand Up @@ -863,6 +864,10 @@ private void construct(
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
);

CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics = new CoordinatorSearchPhaseAPMMetrics(
telemetryProvider.getMeterRegistry()
);

List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
// NOTE: the response of index/search slow log fields below must be calculated dynamically on every call
// because the responses may change dynamically at runtime
Expand Down Expand Up @@ -1360,6 +1365,7 @@ public Map<String, String> queryFields() {
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService);
b.bind(MergeMetrics.class).toInstance(mergeMetrics);
b.bind(CoordinatorSearchPhaseAPMMetrics.class).toInstance(coordinatorSearchPhaseAPMMetrics);
});

if (ReadinessService.enabled(environment)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -88,7 +89,8 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
null,
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
CoordinatorSearchPhaseAPMMetrics.NOOP
) {
@Override
protected SearchPhase getNextPhase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand Down Expand Up @@ -66,7 +67,8 @@ public MockSearchPhaseContext(int numShards) {
new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()),
new ArraySearchPhaseResults<>(numShards),
5,
null
null,
CoordinatorSearchPhaseAPMMetrics.NOOP
);
this.numShards = numShards;
numSuccess = new AtomicInteger(numShards);
Expand Down
Loading