Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130279.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130279
summary: Fix missing removal of query cancellation callback in QueryPhase
area: Search
type: bug
issues: [130071]
64 changes: 41 additions & 23 deletions server/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,29 +209,47 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas
searcher.addQueryCancellation(timeoutRunnable);
}

QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
if (searchContext.getProfilers() != null) {
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
}
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
if (searcher.timeExceeded()) {
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
SearchTimeoutException.handleTimeout(
searchContext.request().allowPartialSearchResults(),
searchContext.shardTarget(),
searchContext.queryResult()
);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
}
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
try {
QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
if (searchContext.getProfilers() != null) {
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
}
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
if (searcher.timeExceeded()) {
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
SearchTimeoutException.handleTimeout(
searchContext.request().allowPartialSearchResults(),
searchContext.shardTarget(),
searchContext.queryResult()
);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
}
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}
} finally {
/*
* If the request ALLOWS PARTIAL RESULTS, we remove the query-cancellation callback
* **before** moving to later phases.
*
* Reason: When allowPartialSearchResults == true we want only the **query** phase
* to be interruptible by timeout, the fetch phase must run to completion so the client
* receives the hits already collected. Otherwise, the timeout cancellation could fire
* during fetch and return an empty SearchHits[] (see bug #130071).
*
* When allowPartialSearchResults==false, we keep the callback active,
* preserving the cross-phase timeout propagation introduced in PR #98715.
*/
if (searchContext.request().allowPartialSearchResults() && timeoutRunnable != null) {
searcher.removeQueryCancellation(timeoutRunnable);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is not wrong, but perhaps incomplete: we duplicate the low level cancellation runnable registered in DefaultSearchContext#preProcess as well, so I think that we should rather clear all the cancellation checks.

One way to do that would be to recreate the searcher before executing the fetch phase for the single shard case, but I see that requires quite some changes, as we don't want to rebuild the entire search context which is a rather heavy object.

Another way would be to call the existing ContextIndexSearcher#close method, but reusing the searcher after closing it sounds like an anti-pattern, although it would work in this case (relying on the fact that close only clears the cancellation runnables).

Maybe a better way would be to replace the current removeQueryCancellation method with a removeQueryCancellations that clears them all like close does. I would still call it tough only where needed, meaning in the only place where we effectively reuse the context/searcher. Otherwise, it is not evident why we need to do so in query phase as opposed to other places.

I prefer this explicit treatment before executing fetch for the single shard scenario, because it addresses the edge case, and I am not sure we want to handle it in a generic manner by removing cancellation checks where searchers are normally not reused across phases.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I don't think we need the conditional based on allowPartialSearchResults. If we do not allow partial search results, a hard error will be thrown at the end of the query phase before doing fetch if there's a timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this explicit treatment before executing fetch for the single shard scenario, because it addresses the edge case, and I am not sure we want to handle it in a generic manner by removing cancellation checks where searchers are normally not reused across phases.

I updated the code with those changes.

By the way, I don't think we need the conditional based on allowPartialSearchResults. If we do not allow partial search results, a hard error will be thrown at the end of the query phase before doing fetch if there's a timeout.

Absolutely, I completely agree!

I believe the latest changes address the feedback, but let me know if you'd like anything refined further.

}
}
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,24 @@ public ShardSearchRequest request() {
return context;
}

public void testQueryCancellationCallbackRemovedWhenPartialAllowed() throws Exception {
TrackingSearcher searcher = new TrackingSearcher(reader);
try (TestSearchContext ctx = createSearchContextWithAllowPartialResults(searcher, true)) {
QueryPhase.executeQuery(ctx);
assertNotNull("callback should be registered", searcher.added);
}
assertTrue("callback should be removed", searcher.removed);
}

public void testQueryCancellationCallbackNotRemovedWhenPartialDisallowed() throws Exception {
TrackingSearcher searcher = new TrackingSearcher(reader);
try (TestSearchContext ctx = createSearchContextWithAllowPartialResults(searcher, false)) {
QueryPhase.executeQuery(ctx);
assertNotNull("callback should be registered", searcher.added);
}
assertFalse("callback must stay registered for later phases", searcher.removed);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could also have checked that there are no cancellation runnables using hasCancellations? Would that remove the need for the TrackingSearcher ? Anyways, I believe if you follow my guidance in the other comment, I think that this will have to become more of a fetch phase test than a query phase test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the previous test because, with the current changes, the logic now lives in SearchService, not in the query phase anymore.

I wasn't sure where exactly to test whether context.searcher().removeQueryCancellations() is called, maybe in SearchServiceSingleNodeTests, but I wasn't confident about the best approach.

Happy to add a test with some guidance, or of course feel free to adjust the PR directly if you'd like.

}

private static final class TestSuggester extends Suggester<TestSuggestionContext> {
private final ContextIndexSearcher contextIndexSearcher;

Expand Down Expand Up @@ -572,4 +590,45 @@ public final boolean isCacheable(LeafReaderContext ctx) {
return false;
}
}

private TestSearchContext createSearchContextWithAllowPartialResults(TrackingSearcher searcher, boolean allowPartial) {
TestSearchContext context = new TestSearchContext(null, indexShard, searcher) {

@Override
public ShardSearchRequest request() {
SearchRequest req = new SearchRequest();
req.allowPartialSearchResults(allowPartial);
return new ShardSearchRequest(OriginalIndices.NONE, req, indexShard.shardId(), 0, 1, AliasFilter.EMPTY, 1F, 0, null);
}
};

context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));

return context;
}

private class TrackingSearcher extends ContextIndexSearcher {
Runnable added;
boolean removed;

TrackingSearcher(IndexReader reader) throws IOException {
super(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
LuceneTestCase.MAYBE_CACHE_POLICY,
true
);
}

@Override
public void addQueryCancellation(Runnable r) {
added = r;
}

@Override
public void removeQueryCancellation(Runnable r) {
if (r == added) removed = true;
}
}
}
Loading