Skip to content

Commit 18806fb

Browse files
checkpoint
1 parent 1521291 commit 18806fb

19 files changed

+338
-22
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.Maps;
2727
import org.elasticsearch.common.util.concurrent.AtomicArray;
2828
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
2930
import org.elasticsearch.index.shard.ShardId;
3031
import org.elasticsearch.search.SearchContextMissingException;
3132
import org.elasticsearch.search.SearchPhaseResult;
@@ -93,6 +94,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9394
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
9495
private final AtomicBoolean requestCancelled = new AtomicBoolean();
9596
private final int skippedCount;
97+
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseMetrics;
98+
private long phaseStartTimeNanos;
9699

97100
// protected for tests
98101
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
@@ -114,7 +117,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
114117
SearchTask task,
115118
SearchPhaseResults<Result> resultConsumer,
116119
int maxConcurrentRequestsPerNode,
117-
SearchResponse.Clusters clusters
120+
SearchResponse.Clusters clusters,
121+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
118122
) {
119123
super(name);
120124
this.namedWriteableRegistry = namedWriteableRegistry;
@@ -155,6 +159,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
155159
// at the end of the search
156160
addReleasable(resultConsumer);
157161
this.clusters = clusters;
162+
this.coordinatorSearchPhaseMetrics = coordinatorSearchPhaseAPMMetrics;
158163
}
159164

160165
protected void notifyListShards(
@@ -374,7 +379,9 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
374379

375380
private void executePhase(SearchPhase phase) {
376381
try {
382+
phaseStartTimeNanos = timeProvider.relativeCurrentNanosProvider().getAsLong();
377383
phase.run();
384+
378385
} catch (RuntimeException e) {
379386
if (logger.isDebugEnabled()) {
380387
logger.debug(() -> format("Failed to execute [%s] while moving to [%s] phase", request, phase.getName()), e);
@@ -621,6 +628,7 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
621628
* @param cause the cause of the phase failure
622629
*/
623630
public void onPhaseFailure(String phase, String msg, Throwable cause) {
631+
recordPhaseTookTime(phase);
624632
raisePhaseFailure(new SearchPhaseExecutionException(phase, msg, cause, buildShardFailures()));
625633
}
626634

@@ -666,9 +674,17 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
666674
* @see #onShardResult(SearchPhaseResult)
667675
*/
668676
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
677+
recordPhaseTookTime(getName());
669678
executeNextPhase(getName(), this::getNextPhase);
670679
}
671680

681+
protected void recordPhaseTookTime(String phaseName) {
682+
coordinatorSearchPhaseMetrics.onCoordinatorPhaseDone(
683+
phaseName,
684+
timeProvider.relativeCurrentNanosProvider().getAsLong() - phaseStartTimeNanos
685+
);
686+
}
687+
672688
/**
673689
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
674690
* thrown.

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ private static boolean assertSearchCoordinationThread() {
184184
private void runCoordinatorRewritePhase() {
185185
// TODO: the index filter (i.e, `_index:patten`) should be prefiltered on the coordinator
186186
assert assertSearchCoordinationThread();
187+
final long coordinatorStartTimeNanos = timeProvider.relativeCurrentNanosProvider().getAsLong();
187188
final List<SearchShardIterator> matchedShardLevelRequests = new ArrayList<>();
188189
for (SearchShardIterator searchShardIterator : shardsIts) {
189190
final CanMatchNodeRequest canMatchNodeRequest = new CanMatchNodeRequest(

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class DfsQueryPhase extends SearchPhase {
6666

6767
// protected for testing
6868
protected SearchPhase nextPhase(AggregatedDfs dfs) {
69+
context.recordPhaseTookTime(getName());
6970
return SearchQueryThenFetchAsyncAction.nextPhase(client, context, queryResult, dfs);
7071
}
7172

server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilde
182182
}
183183

184184
private void onPhaseDone() {
185+
context.recordPhaseTookTime(getName());
185186
context.executeNextPhase(NAME, this::nextPhase);
186187
}
187188
}

server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public void onFailure(Exception e) {
144144
}
145145

146146
private void sendResponse() {
147+
context.recordPhaseTookTime(getName());
147148
context.sendSearchResponse(searchResponse, queryResults);
148149
}
149150
}

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ private void moveToNextPhase(
260260
AtomicArray<? extends SearchPhaseResult> fetchResultsArr,
261261
SearchPhaseController.ReducedQueryPhase reducedQueryPhase
262262
) {
263+
context.recordPhaseTookTime(getName());
263264
context.executeNextPhase(NAME, () -> {
264265
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
265266
context.addReleasable(resp);

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.client.internal.Client;
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
17+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
1718
import org.elasticsearch.search.SearchPhaseResult;
1819
import org.elasticsearch.search.SearchShardTarget;
1920
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
4748
ClusterState clusterState,
4849
SearchTask task,
4950
SearchResponse.Clusters clusters,
50-
Client client
51+
Client client,
52+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
5153
) {
5254
super(
5355
"dfs",
@@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
6668
task,
6769
new ArraySearchPhaseResults<>(shardsIts.size()),
6870
request.getMaxConcurrentShardRequests(),
69-
clusters
71+
clusters,
72+
coordinatorSearchPhaseAPMMetrics
7073
);
7174
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
7275
addReleasable(queryPhaseResultConsumer);

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.core.RefCounted;
3535
import org.elasticsearch.core.SimpleRefCounted;
3636
import org.elasticsearch.core.TimeValue;
37+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
3738
import org.elasticsearch.index.shard.ShardId;
3839
import org.elasticsearch.search.SearchPhaseResult;
3940
import org.elasticsearch.search.SearchService;
@@ -108,7 +109,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
108109
SearchTask task,
109110
SearchResponse.Clusters clusters,
110111
Client client,
111-
boolean batchQueryPhase
112+
boolean batchQueryPhase,
113+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
112114
) {
113115
super(
114116
"query",
@@ -127,7 +129,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
127129
task,
128130
resultConsumer,
129131
request.getMaxConcurrentShardRequests(),
130-
clusters
132+
clusters,
133+
coordinatorSearchPhaseAPMMetrics
131134
);
132135
this.topDocsSize = getTopDocsSize(request);
133136
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.io.stream.StreamOutput;
3232
import org.elasticsearch.common.util.concurrent.EsExecutors;
3333
import org.elasticsearch.core.TimeValue;
34+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
3435
import org.elasticsearch.index.shard.ShardId;
3536
import org.elasticsearch.injection.guice.Inject;
3637
import org.elasticsearch.rest.RestStatus;
@@ -40,6 +41,7 @@
4041
import org.elasticsearch.search.internal.AliasFilter;
4142
import org.elasticsearch.search.internal.ShardSearchContextId;
4243
import org.elasticsearch.tasks.Task;
44+
import org.elasticsearch.telemetry.TelemetryProvider;
4345
import org.elasticsearch.threadpool.ThreadPool;
4446
import org.elasticsearch.transport.AbstractTransportRequest;
4547
import org.elasticsearch.transport.Transport;
@@ -70,6 +72,8 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
7072
private final TransportService transportService;
7173
private final SearchService searchService;
7274
private final ClusterService clusterService;
75+
private final TelemetryProvider telemetryProvider;
76+
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;
7377

7478
@Inject
7579
public TransportOpenPointInTimeAction(
@@ -79,7 +83,9 @@ public TransportOpenPointInTimeAction(
7983
TransportSearchAction transportSearchAction,
8084
SearchTransportService searchTransportService,
8185
NamedWriteableRegistry namedWriteableRegistry,
82-
ClusterService clusterService
86+
ClusterService clusterService,
87+
TelemetryProvider telemetryProvider,
88+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
8389
) {
8490
super(TYPE.name(), transportService, actionFilters, OpenPointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
8591
this.transportService = transportService;
@@ -88,6 +94,8 @@ public TransportOpenPointInTimeAction(
8894
this.searchTransportService = searchTransportService;
8995
this.namedWriteableRegistry = namedWriteableRegistry;
9096
this.clusterService = clusterService;
97+
this.telemetryProvider = telemetryProvider;
98+
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
9199
transportService.registerRequestHandler(
92100
OPEN_SHARD_READER_CONTEXT_NAME,
93101
EsExecutors.DIRECT_EXECUTOR_SERVICE,
@@ -240,7 +248,8 @@ void runOpenPointInTimePhase(
240248
task,
241249
new ArraySearchPhaseResults<>(shardIterators.size()),
242250
searchRequest.getMaxConcurrentShardRequests(),
243-
clusters
251+
clusters,
252+
coordinatorSearchPhaseAPMMetrics
244253
) {
245254
@Override
246255
protected void executePhaseOnShard(

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.IndexNotFoundException;
6868
import org.elasticsearch.index.query.QueryBuilder;
6969
import org.elasticsearch.index.query.Rewriteable;
70+
import org.elasticsearch.index.search.stats.CoordinatorSearchPhaseAPMMetrics;
7071
import org.elasticsearch.index.shard.ShardId;
7172
import org.elasticsearch.index.shard.ShardNotFoundException;
7273
import org.elasticsearch.indices.ExecutorSelector;
@@ -170,6 +171,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
170171
private final UsageService usageService;
171172
private final boolean collectCCSTelemetry;
172173
private final TimeValue forceConnectTimeoutSecs;
174+
private final CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics;
173175

174176
@Inject
175177
public TransportSearchAction(
@@ -188,7 +190,8 @@ public TransportSearchAction(
188190
ExecutorSelector executorSelector,
189191
SearchResponseMetrics searchResponseMetrics,
190192
Client client,
191-
UsageService usageService
193+
UsageService usageService,
194+
CoordinatorSearchPhaseAPMMetrics coordinatorSearchPhaseAPMMetrics
192195
) {
193196
super(TYPE.name(), transportService, actionFilters, SearchRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
194197
this.threadPool = threadPool;
@@ -218,6 +221,7 @@ public TransportSearchAction(
218221
this.searchResponseMetrics = searchResponseMetrics;
219222
this.client = client;
220223
this.usageService = usageService;
224+
this.coordinatorSearchPhaseAPMMetrics = coordinatorSearchPhaseAPMMetrics;
221225
forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
222226
}
223227

@@ -1682,7 +1686,8 @@ public void runNewSearchPhase(
16821686
clusterState,
16831687
task,
16841688
clusters,
1685-
client
1689+
client,
1690+
coordinatorSearchPhaseAPMMetrics
16861691
);
16871692
} else {
16881693
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
@@ -1703,7 +1708,8 @@ public void runNewSearchPhase(
17031708
task,
17041709
clusters,
17051710
client,
1706-
searchService.batchQueryPhase()
1711+
searchService.batchQueryPhase(),
1712+
coordinatorSearchPhaseAPMMetrics
17071713
);
17081714
}
17091715
success = true;

0 commit comments

Comments
 (0)