Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;
protected final SearchResponseMetrics searchResponseMetrics;

// protected for tests
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
Expand All @@ -114,7 +116,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchTask task,
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchResponseMetrics searchResponseMetrics
) {
super(name);
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -155,6 +158,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
// at the end of the search
addReleasable(resultConsumer);
this.clusters = clusters;
this.searchResponseMetrics = searchResponseMetrics;
}

protected void notifyListShards(
Expand Down Expand Up @@ -665,7 +669,7 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
* @see #onShardFailure(int, SearchShardTarget, Exception)
* @see #onShardResult(SearchPhaseResult)
*/
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
protected void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
executeNextPhase(getName(), this::getNextPhase);
}

Expand All @@ -684,6 +688,10 @@ public SearchTransportService getSearchTransport() {
return searchTransportService;
}

public SearchResponseMetrics getSearchResponseMetrics() {
return searchResponseMetrics;
}

public final void execute(Runnable command) {
executor.execute(command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -47,7 +48,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
Client client
Client client,
SearchResponseMetrics searchResponseMetrics
) {
super(
"dfs",
Expand All @@ -66,7 +68,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchResponseMetrics
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
addReleasable(queryPhaseResultConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.core.SimpleRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
private volatile BottomSortValuesCollector bottomSortCollector;
private final Client client;
private final boolean batchQueryPhase;
private long phaseStartTimeNanos;

SearchQueryThenFetchAsyncAction(
Logger logger,
Expand All @@ -110,7 +112,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
SearchTask task,
SearchResponse.Clusters clusters,
Client client,
boolean batchQueryPhase
boolean batchQueryPhase,
SearchResponseMetrics searchResponseMetrics
) {
super(
"query",
Expand All @@ -129,7 +132,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
task,
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchResponseMetrics
);
this.topDocsSize = getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down Expand Up @@ -421,6 +425,7 @@ private static boolean isPartOfPIT(

@Override
protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
phaseStartTimeNanos = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be potentially streamlined into the run method in the parent class. We may be able to even report the latency at the coordinator in a generic manner, with all the code in AbstractSearchAsyncAction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My other PR attempted the "generic" path as well. There is some weirdness around the PIT creation and queries that caused a lot of issues in CI but I think I have an idea of where the issue was. I'll try and move this code into the AbstractSearchAsyncAction and that should cover at least DFS and query phases.

I'll have to check if that helps fetch other subsequent phases. The issue with them is that they don't subclass off of AbstractSearchAsyncAction but reference it via a passed in context so those phases might not hit run to set the start time of the phase. I'll have to do some debug tracing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I'd limit the change to tracking the two variations of query phase and perhaps open point in time. The AbstractSearchAsyncAction subclasses to be more concrete. I would not consider the other so called search phases that important to be honest. I care about can match, dfs, query, fetch. We can expand further later as needed.

if (this.batchQueryPhase == false) {
super.doRun(shardIndexMap);
return;
Expand Down Expand Up @@ -564,6 +569,13 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
}
}

@Override
protected void onPhaseDone() {
final long tookInNanos = System.nanoTime() - phaseStartTimeNanos;
searchResponseMetrics.recordQueryPhaseDuration(tookInNanos);
super.onPhaseDone();
}

public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]";

static void registerNodeSearchAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
private final TransportService transportService;
private final SearchService searchService;
private final ClusterService clusterService;
private final SearchResponseMetrics searchResponseMetrics;

@Inject
public TransportOpenPointInTimeAction(
Expand All @@ -79,7 +81,8 @@ public TransportOpenPointInTimeAction(
TransportSearchAction transportSearchAction,
SearchTransportService searchTransportService,
NamedWriteableRegistry namedWriteableRegistry,
ClusterService clusterService
ClusterService clusterService,
SearchResponseMetrics searchResponseMetrics
) {
super(TYPE.name(), transportService, actionFilters, OpenPointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.transportService = transportService;
Expand All @@ -88,6 +91,7 @@ public TransportOpenPointInTimeAction(
this.searchTransportService = searchTransportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterService = clusterService;
this.searchResponseMetrics = searchResponseMetrics;
transportService.registerRequestHandler(
OPEN_SHARD_READER_CONTEXT_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Expand Down Expand Up @@ -246,7 +250,8 @@ void runOpenPointInTimePhase(
task,
new ArraySearchPhaseResults<>(shardIterators.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters
clusters,
searchResponseMetrics
) {
@Override
protected void executePhaseOnShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1688,7 +1688,8 @@ public void runNewSearchPhase(
clusterState,
task,
clusters,
client
client,
searchResponseMetrics
);
} else {
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
Expand All @@ -1709,7 +1710,8 @@ public void runNewSearchPhase(
task,
clusters,
client,
searchService.batchQueryPhase()
searchService.batchQueryPhase(),
searchResponseMetrics
);
}
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Container class for aggregated metrics about search responses.
Expand All @@ -42,9 +43,11 @@ public String getDisplayName() {

public static final String TOOK_DURATION_TOTAL_HISTOGRAM_NAME = "es.search_response.took_durations.histogram";
public static final String RESPONSE_COUNT_TOTAL_COUNTER_NAME = "es.search_response.response_count.total";
public static final String QUERY_SEARCH_PHASE_METRIC = "es.search_response.coordinator_phases_took_durations.query.histogram";

private final LongHistogram tookDurationTotalMillisHistogram;
private final LongCounter responseCountTotalCounter;
private final LongHistogram queryPhaseDurationHistogram;

public SearchResponseMetrics(MeterRegistry meterRegistry) {
this(
Expand All @@ -59,13 +62,23 @@ public SearchResponseMetrics(MeterRegistry meterRegistry) {
+ "success, partial failure, or failure, expressed as a single total counter and individual "
+ "attribute counters",
"count"
),
meterRegistry.registerLongHistogram(
QUERY_SEARCH_PHASE_METRIC,
"Query search phase execution times at the coordinator level, expressed as a histogram",
"millis"
)
);
}

private SearchResponseMetrics(LongHistogram tookDurationTotalMillisHistogram, LongCounter responseCountTotalCounter) {
private SearchResponseMetrics(
LongHistogram tookDurationTotalMillisHistogram,
LongCounter responseCountTotalCounter,
LongHistogram queryPhaseDurationHistogram
) {
this.tookDurationTotalMillisHistogram = tookDurationTotalMillisHistogram;
this.responseCountTotalCounter = responseCountTotalCounter;
this.queryPhaseDurationHistogram = queryPhaseDurationHistogram;
}

public long recordTookTimeForSearchScroll(long tookTime) {
Expand All @@ -91,4 +104,8 @@ public void incrementResponseCount(ResponseCountTotalStatus responseCountTotalSt
attributesWithStatus.put(RESPONSE_COUNT_TOTAL_STATUS_ATTRIBUTE_NAME, responseCountTotalStatus.getDisplayName());
responseCountTotalCounter.incrementBy(1L, attributesWithStatus);
}

public void recordQueryPhaseDuration(long tookInNanos) {
queryPhaseDurationHistogram.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;

Expand Down Expand Up @@ -88,7 +90,8 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
null,
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {
@Override
protected SearchPhase getNextPhase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.transport.Transport;
import org.junit.Assert;

Expand Down Expand Up @@ -66,7 +68,8 @@ public MockSearchPhaseContext(int numShards) {
new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()),
new ArraySearchPhaseResults<>(numShards),
5,
null
null,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
);
this.numShards = numShards;
numSuccess = new AtomicInteger(numShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
Expand Down Expand Up @@ -111,7 +113,8 @@ public void testSkipSearchShards() throws InterruptedException {
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {

@Override
Expand Down Expand Up @@ -218,7 +221,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException {
null,
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {

@Override
Expand Down Expand Up @@ -334,7 +338,8 @@ public void sendFreeContext(
null,
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {

@Override
Expand Down Expand Up @@ -464,7 +469,8 @@ public void sendFreeContext(
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {
@Override
protected void executePhaseOnShard(
Expand Down Expand Up @@ -572,7 +578,8 @@ public void testAllowPartialResults() throws InterruptedException {
null,
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {

@Override
Expand Down Expand Up @@ -670,7 +677,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException {
null,
new ArraySearchPhaseResults<>(searchShardIterators.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
) {

@Override
Expand Down
Loading