Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 82 additions & 98 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,13 +748,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
) {
tracer.startTrace("executeQueryPhase", Map.of());
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
final long beforeQueryTime = System.nanoTime();
var opsListener = context.indexShard().getSearchOperationListener();
opsListener.onPreQueryPhase(context);
try {
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
}
afterQueryTime = executor.success();
afterQueryTime = System.nanoTime();
opsListener.onQueryPhase(context, afterQueryTime - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(context);
}
tracer.stopTrace(task);
}
if (request.numberOfShards() == 1 && (request.source() == null || request.source().rankBuilder() == null)) {
Expand Down Expand Up @@ -812,15 +820,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (
Releasable scope = tracer.withScope(context.getTask());
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
) {
var opsListener = context.indexShard().getSearchOperationListener();
try (Releasable scope = tracer.withScope(context.getTask());) {
opsListener.onPreFetchPhase(context);
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null);
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
executor.success();
opsListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedFetchPhase(context);
}
}
// This will incRef the QuerySearchResult when it gets created
return QueryFetchSearchResult.of(context.queryResult(), context.fetchResult());
Expand All @@ -844,14 +856,21 @@ public void executeQueryPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
) {
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
QueryPhase.execute(searchContext);
executor.success();
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) {
var opsListener = searchContext.indexShard().getSearchOperationListener();
final long beforeQueryTime = System.nanoTime();
opsListener.onPreQueryPhase(searchContext);
try {
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
QueryPhase.execute(searchContext);
opsListener.onQueryPhase(searchContext, System.nanoTime() - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(searchContext);
}
}
readerContext.setRescoreDocIds(searchContext.rescoreDocIds());
// ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed.
return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget());
Expand Down Expand Up @@ -882,18 +901,26 @@ public void executeQueryPhase(
// fork the execution in the search thread pool
runAsync(getExecutor(readerContext.indexShard()), () -> {
readerContext.setAggregatedDfs(request.dfs());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
) {
searchContext.searcher().setAggregatedDfs(request.dfs());
QueryPhase.execute(searchContext);
final QuerySearchResult queryResult = searchContext.queryResult();
if (queryResult.hasSearchContext() == false && readerContext.singleSession()) {
// no hits, we can release the context since there will be no fetch phase
freeReaderContext(readerContext.id());
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) {
final QuerySearchResult queryResult;
var opsListener = searchContext.indexShard().getSearchOperationListener();
final long before = System.nanoTime();
opsListener.onPreQueryPhase(searchContext);
try {
searchContext.searcher().setAggregatedDfs(request.dfs());
QueryPhase.execute(searchContext);
queryResult = searchContext.queryResult();
if (queryResult.hasSearchContext() == false && readerContext.singleSession()) {
// no hits, we can release the context since there will be no fetch phase
freeReaderContext(readerContext.id());
}
opsListener.onQueryPhase(searchContext, System.nanoTime() - before);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(searchContext);
}
}
executor.success();
// Pass the rescoreDocIds to the queryResult to send them the coordinating node
// and receive them back in the fetch phase.
// We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
Expand Down Expand Up @@ -942,16 +969,25 @@ public void executeFetchPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
) {
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
searchContext.addQueryResult();
QueryPhase.execute(searchContext);
final long afterQueryTime = executor.success();
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);) {
var opsListener = readerContext.indexShard().getSearchOperationListener();
final long beforeQueryTime = System.nanoTime();
final long afterQueryTime;
try {
opsListener.onPreQueryPhase(searchContext);
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
searchContext.addQueryResult();
QueryPhase.execute(searchContext);
afterQueryTime = System.nanoTime();
opsListener.onQueryPhase(searchContext, afterQueryTime - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(searchContext);
}
}
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime);
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
} catch (Exception e) {
Expand All @@ -975,18 +1011,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
}
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds()));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs()));
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(
searchContext,
true,
System.nanoTime()
)
) {
final long startTime = System.nanoTime();
var opsListener = searchContext.indexShard().getSearchOperationListener();
opsListener.onPreFetchPhase(searchContext);
try {
fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks());
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
executor.success();
opsListener.onFetchPhase(searchContext, System.nanoTime() - startTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedFetchPhase(searchContext);
}
}
var fetchResult = searchContext.fetchResult();
// inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
Expand Down Expand Up @@ -2007,58 +2045,4 @@ public AggregationReduceContext forFinalReduction() {
}
};
}

/**
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
*/
private static final class SearchOperationListenerExecutor implements AutoCloseable {
private final SearchOperationListener listener;
private final SearchContext context;
private final long time;
private final boolean fetch;
private long afterQueryTime = -1;
private boolean closed = false;

SearchOperationListenerExecutor(SearchContext context) {
this(context, false, System.nanoTime());
}

SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
this.listener = context.indexShard().getSearchOperationListener();
this.context = context;
time = startTime;
this.fetch = fetch;
if (fetch) {
listener.onPreFetchPhase(context);
} else {
listener.onPreQueryPhase(context);
}
}

long success() {
return afterQueryTime = System.nanoTime();
}

@Override
public void close() {
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
if (closed == false) {
closed = true;
if (afterQueryTime != -1) {
if (fetch) {
listener.onFetchPhase(context, afterQueryTime - time);
} else {
listener.onQueryPhase(context, afterQueryTime - time);
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
} else {
listener.onFailedQueryPhase(context);
}
}
}
}
}
}