Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 82 additions & 98 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
) {
tracer.startTrace("executeQueryPhase", Map.of());
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
final long beforeQueryTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strategy pattern can be implemented here to avoid the extra branching and the boilerplate code in the SearchService class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but if you think about it, that will probably not save any lines of code and will most likely be heavier on the CPU still than just inlining the logic here (at best that's what the compiler will get us to with escape analysis and inlining).
-> I think I'd vote for just inlining here even if it looks less nice simple because we're already having trouble with the i-cache and as a result a predictable improvement trumps deduplication? (it's also not like this logic will ever see its use expand, so any abstractions we add have like 3 uses and will in all likelihood never see any additional uses either)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here. This PR looks more straightforward to read and follow than it did before. I just proposed a more object-oriented solution to make it more eye-friendly.

var opsListener = context.indexShard().getSearchOperationListener();
opsListener.onPreQueryPhase(context);
try {
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
}
afterQueryTime = executor.success();
afterQueryTime = System.nanoTime();
opsListener.onQueryPhase(context, afterQueryTime - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(context);
}
tracer.stopTrace(task);
}
if (request.numberOfShards() == 1 && (request.source() == null || request.source().rankBuilder() == null)) {
Expand Down Expand Up @@ -824,15 +832,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (
Releasable scope = tracer.withScope(context.getTask());
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
) {
var opsListener = context.indexShard().getSearchOperationListener();
try (Releasable scope = tracer.withScope(context.getTask());) {
opsListener.onPreFetchPhase(context);
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null);
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
executor.success();
opsListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedFetchPhase(context);
}
}
// This will incRef the QuerySearchResult when it gets created
return QueryFetchSearchResult.of(context.queryResult(), context.fetchResult());
Expand All @@ -856,14 +868,21 @@ public void executeQueryPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
) {
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
QueryPhase.execute(searchContext);
executor.success();
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) {
var opsListener = searchContext.indexShard().getSearchOperationListener();
final long beforeQueryTime = System.nanoTime();
opsListener.onPreQueryPhase(searchContext);
try {
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
QueryPhase.execute(searchContext);
opsListener.onQueryPhase(searchContext, System.nanoTime() - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(searchContext);
}
}
readerContext.setRescoreDocIds(searchContext.rescoreDocIds());
// ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed.
return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget());
Expand Down Expand Up @@ -894,18 +913,26 @@ public void executeQueryPhase(
// fork the execution in the search thread pool
runAsync(getExecutor(readerContext.indexShard()), () -> {
readerContext.setAggregatedDfs(request.dfs());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
) {
searchContext.searcher().setAggregatedDfs(request.dfs());
QueryPhase.execute(searchContext);
final QuerySearchResult queryResult = searchContext.queryResult();
if (queryResult.hasSearchContext() == false && readerContext.singleSession()) {
// no hits, we can release the context since there will be no fetch phase
freeReaderContext(readerContext.id());
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) {
final QuerySearchResult queryResult;
var opsListener = searchContext.indexShard().getSearchOperationListener();
final long before = System.nanoTime();
opsListener.onPreQueryPhase(searchContext);
try {
searchContext.searcher().setAggregatedDfs(request.dfs());
QueryPhase.execute(searchContext);
queryResult = searchContext.queryResult();
if (queryResult.hasSearchContext() == false && readerContext.singleSession()) {
// no hits, we can release the context since there will be no fetch phase
freeReaderContext(readerContext.id());
}
opsListener.onQueryPhase(searchContext, System.nanoTime() - before);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(searchContext);
}
}
executor.success();
// Pass the rescoreDocIds to the queryResult to send them the coordinating node
// and receive them back in the fetch phase.
// We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
Expand Down Expand Up @@ -954,16 +981,25 @@ public void executeFetchPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
) {
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
searchContext.addQueryResult();
QueryPhase.execute(searchContext);
final long afterQueryTime = executor.success();
var opsListener = readerContext.indexShard().getSearchOperationListener();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To align with the rest of the cases, you can add it to the try-catch before the onPreQueryPhase call.

try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);) {
final long afterQueryTime;
try {
final long beforeQueryTime = System.nanoTime();
opsListener.onPreQueryPhase(searchContext);
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, searchContext);
searchContext.addQueryResult();
QueryPhase.execute(searchContext);
afterQueryTime = System.nanoTime();
opsListener.onQueryPhase(searchContext, afterQueryTime - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(searchContext);
}
}
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime);
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
} catch (Exception e) {
Expand All @@ -987,18 +1023,20 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
}
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds()));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs()));
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(
searchContext,
true,
System.nanoTime()
)
) {
final long startTime = System.nanoTime();
var opsListener = searchContext.indexShard().getSearchOperationListener();
opsListener.onPreFetchPhase(searchContext);
try {
fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks());
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
executor.success();
opsListener.onFetchPhase(searchContext, System.nanoTime() - startTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedFetchPhase(searchContext);
}
}
var fetchResult = searchContext.fetchResult();
// inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
Expand Down Expand Up @@ -1972,58 +2010,4 @@ public AggregationReduceContext forFinalReduction() {
}
};
}

/**
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
*/
private static final class SearchOperationListenerExecutor implements AutoCloseable {
private final SearchOperationListener listener;
private final SearchContext context;
private final long time;
private final boolean fetch;
private long afterQueryTime = -1;
private boolean closed = false;

SearchOperationListenerExecutor(SearchContext context) {
this(context, false, System.nanoTime());
}

SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
this.listener = context.indexShard().getSearchOperationListener();
this.context = context;
time = startTime;
this.fetch = fetch;
if (fetch) {
listener.onPreFetchPhase(context);
} else {
listener.onPreQueryPhase(context);
}
}

long success() {
return afterQueryTime = System.nanoTime();
}

@Override
public void close() {
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
if (closed == false) {
closed = true;
if (afterQueryTime != -1) {
if (fetch) {
listener.onFetchPhase(context, afterQueryTime - time);
} else {
listener.onQueryPhase(context, afterQueryTime - time);
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
} else {
listener.onFailedQueryPhase(context);
}
}
}
}
}
}