Skip to content

Commit 0b95ab2

Browse files
drempapiselasticsearchmachine
andauthored
[9.2] Handle Query Timeouts During Collector Initialization in QueryPhase (elastic#138084) (elastic#138473)
* Handle Query Timeouts During Collector Initialization in QueryPhase (elastic#138084) (cherry picked from commit c699e67) # Conflicts: # server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java * Refactor QueryPhaseTimeoutTests for clarity * [CI] Auto commit changes from spotless * update imports --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 9204527 commit 0b95ab2

File tree

4 files changed

+613
-30
lines changed

4 files changed

+613
-30
lines changed

docs/changelog/138084.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138084
2+
summary: Handle Query Timeouts During Collector Initialization in `QueryPhase`
3+
area: Search
4+
type: bug
5+
issues: []
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.aggregations;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
15+
import org.elasticsearch.action.search.SearchResponse;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.index.query.QueryBuilders;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.plugins.SearchPlugin;
21+
import org.elasticsearch.search.aggregations.support.AggregationContext;
22+
import org.elasticsearch.search.internal.ContextIndexSearcher;
23+
import org.elasticsearch.search.query.SearchTimeoutException;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.elasticsearch.xcontent.XContentBuilder;
26+
import org.elasticsearch.xcontent.XContentParser;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
30+
import java.io.IOException;
31+
import java.util.Collection;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
36+
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.hamcrest.Matchers.instanceOf;
39+
import static org.hamcrest.Matchers.is;
40+
import static org.hamcrest.Matchers.notNullValue;
41+
42+
/**
43+
* Integration test verifying that a TimeExceededException thrown during collector
44+
* preparation (before the actual search executes) is caught in QueryPhase and
45+
* correctly transformed into a partial response with `timed_out=true`, empty hits,
46+
* and empty aggregations rather than an exception.
47+
*/
48+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
49+
public class QueryPhaseForcedTimeoutIT extends ESIntegTestCase {
50+
51+
private static final String INDEX = "index";
52+
53+
@Override
54+
protected Collection<Class<? extends Plugin>> nodePlugins() {
55+
return List.of(ForceTimeoutAggPlugin.class);
56+
}
57+
58+
@Before
59+
public void setupIndex() throws Exception {
60+
assertAcked(prepareCreate(INDEX).setMapping("""
61+
{
62+
"properties": {
63+
"kwd": { "type": "keyword" },
64+
"txt": { "type": "text" }
65+
}
66+
}
67+
"""));
68+
69+
for (int i = 0; i < 10; i++) {
70+
IndexRequest ir = new IndexRequest(INDEX).source(
71+
jsonBuilder().startObject().field("kwd", "value" + i).field("txt", "text " + i).endObject()
72+
);
73+
client().index(ir).actionGet();
74+
}
75+
indicesAdmin().prepareRefresh(INDEX).get();
76+
ensureGreen(INDEX);
77+
}
78+
79+
@After
80+
public void cleanup() {
81+
indicesAdmin().prepareDelete(INDEX).get();
82+
}
83+
84+
/**
85+
* Executes a search using the ForceTimeoutAggPlugin aggregation which throws
86+
* TimeExceededException during collector preparation, and asserts that:
87+
* - the response is returned without failure,
88+
* - the `timed_out` flag is true,
89+
* - hits are empty, and
90+
* - aggregations are non-null but empty.
91+
*/
92+
public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() {
93+
SearchResponse resp = null;
94+
try {
95+
resp = client().prepareSearch(INDEX)
96+
.setQuery(QueryBuilders.matchAllQuery())
97+
.setSize(10)
98+
.setAllowPartialSearchResults(true)
99+
.addAggregation(new ForceTimeoutAggregationBuilder("force_timeout"))
100+
.get();
101+
102+
assertThat(resp, notNullValue());
103+
assertThat("search should be marked timed_out", resp.isTimedOut(), is(true));
104+
assertThat("no hits returned", resp.getHits().getHits().length, equalTo(0));
105+
assertThat(resp.getAggregations(), notNullValue());
106+
assertThat("no aggr returned", resp.getAggregations().asList().isEmpty(), is(true));
107+
assertThat("no shard failures expected", resp.getShardFailures() == null || resp.getShardFailures().length == 0, is(true));
108+
} finally {
109+
if (resp != null) {
110+
resp.decRef();
111+
}
112+
}
113+
}
114+
115+
/**
116+
* In this test we explicitly set allow_partial_search_results=false. Under this
117+
* setting, any shard-level failure in the query phase (including a timeout) is treated as
118+
* a hard failure for the whole search. The coordinating node does not return a response
119+
* with timed_out=true, instead it fails the phase and throws a
120+
* {@link SearchPhaseExecutionException} whose cause is the underlying
121+
* {@link SearchTimeoutException}. This test asserts that behavior.
122+
*/
123+
public void testTimeoutDuringCollectorPreparationDisallowPartialsThrowsException() {
124+
SearchPhaseExecutionException ex = expectThrows(
125+
SearchPhaseExecutionException.class,
126+
() -> client().prepareSearch(INDEX)
127+
.setQuery(QueryBuilders.matchAllQuery())
128+
.setSize(10)
129+
.setAllowPartialSearchResults(false)
130+
.addAggregation(new ForceTimeoutAggregationBuilder("force_timeout"))
131+
.get()
132+
);
133+
134+
assertNotNull("expected a cause on SearchPhaseExecutionException", ex.getCause());
135+
assertThat("expected inner cause to be SearchTimeoutException", ex.getCause(), instanceOf(SearchTimeoutException.class));
136+
}
137+
138+
/**
139+
* A minimal plugin registering a custom aggregation (ForceTimeoutAggregationBuilder)
140+
* whose factory simulates a timeout during collector setup to test QueryPhase handling.
141+
*/
142+
public static class ForceTimeoutAggPlugin extends Plugin implements SearchPlugin {
143+
public static final String NAME = "force_timeout_plugin";
144+
145+
@Override
146+
public List<AggregationSpec> getAggregations() {
147+
return List.of(new AggregationSpec(NAME, ForceTimeoutAggregationBuilder::new, ForceTimeoutAggregationBuilder::parse));
148+
}
149+
}
150+
151+
/**
152+
* Aggregation builder for the ForceTimeoutAggPlugin aggregation.
153+
* It has no parameters and its factory immediately triggers a timeout exception
154+
* when the search collectors are being prepared.
155+
*/
156+
static class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder<ForceTimeoutAggregationBuilder> {
157+
158+
public static final String TYPE = ForceTimeoutAggPlugin.NAME;
159+
160+
private Map<String, Object> metadata;
161+
162+
ForceTimeoutAggregationBuilder(String name) {
163+
super(name);
164+
}
165+
166+
ForceTimeoutAggregationBuilder(StreamInput in) throws IOException {
167+
super(in);
168+
}
169+
170+
static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) {
171+
return new ForceTimeoutAggregationBuilder(name);
172+
}
173+
174+
@Override
175+
protected AggregatorFactory doBuild(
176+
AggregationContext context,
177+
AggregatorFactory parent,
178+
AggregatorFactories.Builder subfactoriesBuilder
179+
) throws IOException {
180+
return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata());
181+
}
182+
183+
@Override
184+
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
185+
ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName());
186+
copy.factoriesBuilder = factoriesBuilder;
187+
copy.setMetadata(metadata);
188+
return copy;
189+
}
190+
191+
@Override
192+
public Map<String, Object> getMetadata() {
193+
return metadata;
194+
}
195+
196+
@Override
197+
public BucketCardinality bucketCardinality() {
198+
return BucketCardinality.ONE;
199+
}
200+
201+
@Override
202+
public TransportVersion getMinimalSupportedVersion() {
203+
return TransportVersion.zero();
204+
}
205+
206+
@Override
207+
protected void doWriteTo(StreamOutput out) {
208+
// Empty
209+
}
210+
211+
@Override
212+
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) {
213+
return builder;
214+
}
215+
216+
@Override
217+
public String getType() {
218+
return TYPE;
219+
}
220+
221+
/**
222+
* Factory implementation for ForceTimeoutAggregationBuilder.
223+
* Its createInternal() method throws a TimeExceededException
224+
* before any actual collection occurs, simulating a timeout during setup.
225+
*/
226+
static class ForceTimeoutAggregatorFactory extends AggregatorFactory {
227+
228+
ForceTimeoutAggregatorFactory(
229+
String name,
230+
AggregationContext context,
231+
AggregatorFactory parent,
232+
AggregatorFactories.Builder subFactoriesBuilder,
233+
Map<String, Object> metadata
234+
) throws IOException {
235+
super(name, context, parent, subFactoriesBuilder, metadata);
236+
}
237+
238+
@Override
239+
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata) {
240+
if (context.searcher() instanceof ContextIndexSearcher cis) {
241+
cis.throwTimeExceededException();
242+
}
243+
throw new AssertionError("unreachable");
244+
}
245+
}
246+
}
247+
}

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.search.SearchContextSourcePrinter;
3434
import org.elasticsearch.search.SearchService;
3535
import org.elasticsearch.search.aggregations.AggregationPhase;
36+
import org.elasticsearch.search.aggregations.InternalAggregations;
3637
import org.elasticsearch.search.internal.ContextIndexSearcher;
3738
import org.elasticsearch.search.internal.ScrollContext;
3839
import org.elasticsearch.search.internal.SearchContext;
@@ -197,47 +198,71 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas
197198
);
198199
}
199200

200-
CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager(
201-
postFilterWeight,
202-
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
203-
searchContext,
204-
hasFilterCollector
205-
);
206-
207201
final Runnable timeoutRunnable = getTimeoutCheck(searchContext);
208202
if (timeoutRunnable != null) {
209203
searcher.addQueryCancellation(timeoutRunnable);
210204
}
211205

212-
QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
213-
if (searchContext.getProfilers() != null) {
214-
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
215-
}
216-
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
217-
if (searcher.timeExceeded()) {
218-
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
219-
SearchTimeoutException.handleTimeout(
220-
searchContext.request().allowPartialSearchResults(),
221-
searchContext.shardTarget(),
222-
searchContext.queryResult()
223-
);
224-
}
225-
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
226-
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
227-
}
228-
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
229-
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
230-
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
231-
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
232-
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
233-
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
234-
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
206+
try {
207+
CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager
208+
.createQueryPhaseCollectorManager(
209+
postFilterWeight,
210+
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
211+
searchContext,
212+
hasFilterCollector
213+
);
214+
215+
QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
216+
217+
if (searchContext.getProfilers() != null) {
218+
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
219+
}
220+
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
221+
222+
if (searcher.timeExceeded()) {
223+
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
224+
SearchTimeoutException.handleTimeout(
225+
searchContext.request().allowPartialSearchResults(),
226+
searchContext.shardTarget(),
227+
searchContext.queryResult()
228+
);
229+
}
230+
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
231+
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
232+
}
233+
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
234+
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
235+
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
236+
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
237+
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
238+
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
239+
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
240+
}
241+
} catch (ContextIndexSearcher.TimeExceededException tee) {
242+
finalizeAsTimedOutResult(searchContext);
235243
}
236244
} catch (Exception e) {
237245
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
238246
}
239247
}
240248

249+
/**
250+
* Marks the current search as timed out and finalizes the {@link QuerySearchResult}
251+
* with a well-formed empty response. This ensures that even when a timeout occurs
252+
* (e.g., during collector setup or search execution), the shard still returns a
253+
* valid result object with empty top docs and aggregations instead of throwing.
254+
*/
255+
private static void finalizeAsTimedOutResult(SearchContext searchContext) {
256+
QuerySearchResult queryResult = searchContext.queryResult();
257+
SearchTimeoutException.handleTimeout(searchContext.request().allowPartialSearchResults(), searchContext.shardTarget(), queryResult);
258+
259+
queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]);
260+
261+
if (searchContext.aggregations() != null) {
262+
queryResult.aggregations(InternalAggregations.EMPTY);
263+
}
264+
}
265+
241266
/**
242267
* Returns whether collection within the provided <code>reader</code> can be early-terminated if it sorts
243268
* with <code>sortAndFormats</code>.

0 commit comments

Comments
 (0)