Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135868.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135868
summary: Add APM metrics for duration of search phases at the coordinating node
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -93,6 +94,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseMetrics;
private long phaseStartTimeNanos;

// protected for tests
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
Expand All @@ -114,7 +117,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchTask task,
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(name);
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -155,6 +159,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
// at the end of the search
addReleasable(resultConsumer);
this.clusters = clusters;
this.coordinatorSearchPhaseMetrics = coordinatorSearchPhaseAPMMetrics;
}

protected void notifyListShards(
Expand Down Expand Up @@ -374,7 +379,9 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP

private void executePhase(SearchPhase phase) {
try {
phaseStartTimeNanos = timeProvider.relativeCurrentNanosProvider().getAsLong();
phase.run();

} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while moving to [%s] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -621,6 +628,7 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
* @param cause the cause of the phase failure
*/
public void onPhaseFailure(String phase, String msg, Throwable cause) {
recordPhaseTookTime(phase);
raisePhaseFailure(new SearchPhaseExecutionException(phase, msg, cause, buildShardFailures()));
}

Expand Down Expand Up @@ -666,9 +674,17 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
* @see #onShardResult(SearchPhaseResult)
*/
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
recordPhaseTookTime(getName());
executeNextPhase(getName(), this::getNextPhase);
}

protected void recordPhaseTookTime(String phaseName) {
coordinatorSearchPhaseMetrics.onCoordinatorPhaseDone(
phaseName,
timeProvider.relativeCurrentNanosProvider().getAsLong() - phaseStartTimeNanos
);
}

/**
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
* thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -71,6 +72,7 @@ final class CanMatchPreFilterSearchPhase {
private final SearchTask task;
private final Executor executor;
private final boolean requireAtLeastOneMatch;
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;

private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
Expand All @@ -90,7 +92,8 @@ private CanMatchPreFilterSearchPhase(
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
ActionListener<List<SearchShardIterator>> listener
ActionListener<List<SearchShardIterator>> listener,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
this.logger = logger;
this.searchTransportService = searchTransportService;
Expand All @@ -105,6 +108,7 @@ private CanMatchPreFilterSearchPhase(
this.requireAtLeastOneMatch = requireAtLeastOneMatch;
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
this.executor = executor;
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
final int size = shardsIts.size();
possibleMatches = new FixedBitSet(size);
minAndMaxes = new MinAndMax<?>[size];
Expand Down Expand Up @@ -136,7 +140,8 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
Expand Down Expand Up @@ -168,7 +173,8 @@ protected void doRun() {
task,
requireAtLeastOneMatch,
coordinatorRewriteContextProvider,
listener
listener,
coordinatorSearchPhaseAPMMetrics
).runCoordinatorRewritePhase();
}
});
Expand All @@ -184,6 +190,7 @@ private static boolean assertSearchCoordinationThread() {
private void runCoordinatorRewritePhase() {
// TODO: the index filter (i.e, `_index:patten`) should be prefiltered on the coordinator
assert assertSearchCoordinationThread();
final long coordinatorStartTimeNanos = timeProvider.relativeCurrentNanosProvider().getAsLong();
final List<SearchShardIterator> matchedShardLevelRequests = new ArrayList<>();
for (SearchShardIterator searchShardIterator : shardsIts) {
final CanMatchNodeRequest canMatchNodeRequest = new CanMatchNodeRequest(
Expand Down Expand Up @@ -223,6 +230,10 @@ private void runCoordinatorRewritePhase() {
checkNoMissingShards(matchedShardLevelRequests);
new Round(matchedShardLevelRequests).run();
}
coordinatorSearchPhaseAPMMetrics.onCoordinatorPhaseDone(
"can_match",
timeProvider.relativeCurrentNanosProvider().getAsLong() - coordinatorStartTimeNanos
);
}

private void consumeResult(boolean canMatch, ShardSearchRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class DfsQueryPhase extends SearchPhase {

// protected for testing
protected SearchPhase nextPhase(AggregatedDfs dfs) {
context.recordPhaseTookTime(getName());
return SearchQueryThenFetchAsyncAction.nextPhase(client, context, queryResult, dfs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilde
}

private void onPhaseDone() {
context.recordPhaseTookTime(getName());
context.executeNextPhase(NAME, this::nextPhase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void onFailure(Exception e) {
}

private void sendResponse() {
context.recordPhaseTookTime(getName());
context.sendSearchResponse(searchResponse, queryResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private void moveToNextPhase(
AtomicArray<? extends SearchPhaseResult> fetchResultsArr,
SearchPhaseController.ReducedQueryPhase reducedQueryPhase
) {
context.recordPhaseTookTime(getName());
context.executeNextPhase(NAME, () -> {
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
context.addReleasable(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void onResponse(RankFeatureDoc[] docsWithUpdatedScores) {
reducedQueryPhase,
topResults
);
context.recordPhaseTookTime(getName());
moveToNextPhase(rankPhaseResults, reducedRankFeaturePhase);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
Client client
Client client,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(
"dfs",
Expand All @@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters
clusters,
coordinatorSearchPhaseAPMMetrics
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
addReleasable(queryPhaseResultConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.SimpleRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
Expand Down Expand Up @@ -108,7 +109,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
SearchTask task,
SearchResponse.Clusters clusters,
Client client,
boolean batchQueryPhase
boolean batchQueryPhase,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(
"query",
Expand All @@ -127,7 +129,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
task,
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters
clusters,
coordinatorSearchPhaseAPMMetrics
);
this.topDocsSize = getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -70,6 +72,8 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
private final TransportService transportService;
private final SearchService searchService;
private final ClusterService clusterService;
private final TelemetryProvider telemetryProvider;
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;

@Inject
public TransportOpenPointInTimeAction(
Expand All @@ -79,7 +83,9 @@ public TransportOpenPointInTimeAction(
TransportSearchAction transportSearchAction,
SearchTransportService searchTransportService,
NamedWriteableRegistry namedWriteableRegistry,
ClusterService clusterService
ClusterService clusterService,
TelemetryProvider telemetryProvider,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(TYPE.name(), transportService, actionFilters, OpenPointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.transportService = transportService;
Expand All @@ -88,6 +94,8 @@ public TransportOpenPointInTimeAction(
this.searchTransportService = searchTransportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterService = clusterService;
this.telemetryProvider = telemetryProvider;
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
transportService.registerRequestHandler(
OPEN_SHARD_READER_CONTEXT_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Expand Down Expand Up @@ -174,7 +182,8 @@ public void runNewSearchPhase(
timeProvider,
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
coordinatorSearchPhaseAPMMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down Expand Up @@ -240,7 +249,8 @@ void runOpenPointInTimePhase(
task,
new ArraySearchPhaseResults<>(shardIterators.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters
clusters,
coordinatorSearchPhaseAPMMetrics
) {
@Override
protected void executePhaseOnShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.ExecutorSelector;
Expand Down Expand Up @@ -170,6 +171,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final UsageService usageService;
private final boolean collectCCSTelemetry;
private final TimeValue forceConnectTimeoutSecs;
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;

@Inject
public TransportSearchAction(
Expand All @@ -188,7 +190,8 @@ public TransportSearchAction(
ExecutorSelector executorSelector,
SearchResponseMetrics searchResponseMetrics,
Client client,
UsageService usageService
UsageService usageService,
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
) {
super(TYPE.name(), transportService, actionFilters, SearchRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
Expand Down Expand Up @@ -218,6 +221,7 @@ public TransportSearchAction(
this.searchResponseMetrics = searchResponseMetrics;
this.client = client;
this.usageService = usageService;
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
}

Expand Down Expand Up @@ -1623,7 +1627,8 @@ public void runNewSearchPhase(
timeProvider,
task,
requireAtLeastOneMatch,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
coordinatorSearchPhaseAPMMetrics
)
.addListener(
listener.delegateFailureAndWrap(
Expand Down Expand Up @@ -1682,7 +1687,8 @@ public void runNewSearchPhase(
clusterState,
task,
clusters,
client
client,
coordinatorSearchPhaseAPMMetrics
);
} else {
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
Expand All @@ -1703,7 +1709,8 @@ public void runNewSearchPhase(
task,
clusters,
client,
searchService.batchQueryPhase()
searchService.batchQueryPhase(),
coordinatorSearchPhaseAPMMetrics
);
}
success = true;
Expand Down
Loading