diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java index f63f09764621b..f45c37715f7cc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java @@ -9,36 +9,56 @@ package org.elasticsearch.search; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.BulkScorer; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.ScorerSupplier; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Bits; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.MockScriptPlugin; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptType; +import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.xcontent.XContentBuilder; +import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.Map; +import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.index.query.QueryBuilders.scriptQuery; -import static org.elasticsearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) +@ESIntegTestCase.SuiteScopeTestCase public class SearchTimeoutIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Collections.singleton(ScriptedTimeoutPlugin.class); + return Collections.singleton(BulkScorerTimeoutQueryPlugin.class); } @Override @@ -46,75 +66,253 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).build(); } - private void indexDocs() { - for (int i = 0; i < 32; i++) { - prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get(); - } - refresh("test"); + @Override + protected void setupSuiteScopeCluster() throws Exception { + super.setupSuiteScopeCluster(); + indexRandom(true, "test", randomIntBetween(20, 50)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98369") - public void testTopHitsTimeout() { - indexDocs(); - SearchResponse searchResponse = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) - .get(); - assertThat(searchResponse.isTimedOut(), equalTo(true)); - assertEquals(0, searchResponse.getShardFailures().length); - assertEquals(0, searchResponse.getFailedShards()); - assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); - assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); - assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); - assertThat(searchResponse.getHits().getHits().length, greaterThan(0)); + public void testTopHitsTimeoutBeforeCollecting() { + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(false)); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + // timeout happened before we could collect any doc, total hits is 0 and no hits are returned + assertEquals(0, searchResponse.getHits().getTotalHits().value()); + assertEquals(0, searchResponse.getHits().getHits().length); + }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98053") - public void testAggsTimeout() { - indexDocs(); - SearchResponse searchResponse = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) - .setSize(0) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) - .addAggregation(new TermsAggregationBuilder("terms").field("field.keyword")) - .get(); - assertThat(searchResponse.isTimedOut(), equalTo(true)); - assertEquals(0, searchResponse.getShardFailures().length); - assertEquals(0, searchResponse.getFailedShards()); - assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); - assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); - assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); - assertEquals(searchResponse.getHits().getHits().length, 0); - StringTerms terms = searchResponse.getAggregations().get("terms"); - assertEquals(1, terms.getBuckets().size()); - StringTerms.Bucket bucket = terms.getBuckets().get(0); - assertEquals("value", bucket.getKeyAsString()); - assertThat(bucket.getDocCount(), greaterThan(0L)); + public void testTopHitsTimeoutWhileCollecting() { + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(true)); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); + assertThat(searchResponse.getHits().getHits().length, greaterThan(0)); + }); } - public void testPartialResultsIntolerantTimeout() throws Exception { - prepareIndex("test").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + public void testAggsTimeoutBeforeCollecting() { + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setSize(0) + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(false)) + .addAggregation(new TermsAggregationBuilder("terms").field("field.keyword")); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + assertEquals(0, searchResponse.getHits().getTotalHits().value()); + assertEquals(0, searchResponse.getHits().getHits().length); + StringTerms terms = searchResponse.getAggregations().get("terms"); + // timeout happened before we could collect any doc, total hits is 0 and no buckets are returned + assertEquals(0, terms.getBuckets().size()); + }); + } + public void testAggsTimeoutWhileCollecting() { + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setSize(0) + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(true)) + .addAggregation(new TermsAggregationBuilder("terms").field("field.keyword")); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); + assertEquals(0, searchResponse.getHits().getHits().length); + StringTerms terms = searchResponse.getAggregations().get("terms"); + assertEquals(1, terms.getBuckets().size()); + StringTerms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("value", bucket.getKeyAsString()); + assertThat(bucket.getDocCount(), greaterThan(0L)); + }); + } + + public void testPartialResultsIntolerantTimeoutBeforeCollecting() { ElasticsearchException ex = expectThrows( ElasticsearchException.class, - prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) + prepareSearch("test") + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(false)) .setAllowPartialSearchResults(false) // this line causes timeouts to report failures ); assertTrue(ex.toString().contains("Time exceeded")); + assertEquals(429, ex.status().getStatus()); } - public static class ScriptedTimeoutPlugin extends MockScriptPlugin { - static final String SCRIPT_NAME = "search_timeout"; + public void testPartialResultsIntolerantTimeoutWhileCollecting() { + ElasticsearchException ex = expectThrows( + ElasticsearchException.class, + prepareSearch("test") + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(true)) + .setAllowPartialSearchResults(false) // this line causes timeouts to report failures + ); + assertTrue(ex.toString().contains("Time exceeded")); + assertEquals(429, ex.status().getStatus()); + } + + public static final class BulkScorerTimeoutQueryPlugin extends Plugin implements SearchPlugin { + @Override + public List> getQueries() { + return Collections.singletonList(new QuerySpec("timeout", BulkScorerTimeoutQuery::new, parser -> { + throw new UnsupportedOperationException(); + })); + } + } + + /** + * Query builder that produces a Lucene Query which throws a + * {@link org.elasticsearch.search.internal.ContextIndexSearcher.TimeExceededException} before or while scoring documents. + * This helps make this test not time dependent, otherwise it would be unpredictable when exactly the timeout happens, which is + * rather important if we want to test that we are able to return partial results on timeout. + */ + public static final class BulkScorerTimeoutQuery extends AbstractQueryBuilder { + + private final boolean partialResults; + + BulkScorerTimeoutQuery(boolean partialResults) { + this.partialResults = partialResults; + } + + BulkScorerTimeoutQuery(StreamInput in) throws IOException { + super(in); + this.partialResults = in.readBoolean(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeBoolean(partialResults); + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) {} @Override - public Map, Object>> pluginScripts() { - return Collections.singletonMap(SCRIPT_NAME, params -> { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); + protected Query doToQuery(SearchExecutionContext context) { + return new Query() { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) { + return new ConstantScoreWeight(this, boost) { + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + + @Override + public ScorerSupplier scorerSupplier(LeafReaderContext context) { + return new ScorerSupplier() { + @Override + public BulkScorer bulkScorer() { + if (partialResults == false) { + ((ContextIndexSearcher) searcher).throwTimeExceededException(); + } + final int maxDoc = context.reader().maxDoc(); + return new BulkScorer() { + @Override + public int score(LeafCollector collector, Bits acceptDocs, int min, int max) throws IOException { + max = Math.min(max, maxDoc); + collector.setScorer(new Scorable() { + @Override + public float score() { + return 1f; + } + }); + for (int doc = min; doc < max; ++doc) { + if (acceptDocs == null || acceptDocs.get(doc)) { + collector.collect(doc); + // collect one doc per segment, only then throw a timeout: this ensures partial + // results are returned + ((ContextIndexSearcher) searcher).throwTimeExceededException(); + } + } + // there is a slight chance that no docs are scored for a specific segment. + // other shards / slices will throw the timeout anyway, one is enough. + return max == maxDoc ? DocIdSetIterator.NO_MORE_DOCS : max; + } + + @Override + public long cost() { + return maxDoc; + } + }; + } + + @Override + public Scorer get(long leadCost) { + assert false; + return new ConstantScoreScorer(score(), scoreMode, DocIdSetIterator.all(context.reader().maxDoc())); + } + + @Override + public long cost() { + assert false; + return context.reader().maxDoc(); + } + }; + } + }; + } + + @Override + public String toString(String field) { + return "timeout query"; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); } - return true; - }); + }; + } + + @Override + protected boolean doEquals(BulkScorerTimeoutQuery other) { + return false; + } + + @Override + protected int doHashCode() { + return 0; + } + + @Override + public String getWriteableName() { + return "timeout"; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return null; } } }