From 8f2769904bf1ed956e82a1b16ee19aec655950e7 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 10 Feb 2025 17:06:43 +0100 Subject: [PATCH] Fix SearchTimeoutIT (#120390) Two of the timeout tests have been muted for several months. The reason is that we tightened the assertions to cover for partial results being returned, but there were edge cases in which partial results were not actually returned. The timeout used in the test was time dependent, hence when the timeout precisely will be thrown is unpredictable, because we have timeout checks in different places in the codebase, when iterating through the leaves, before scoring any document, or while scoring documents. The edge case that caused failures is a typical timing issue where the initial check for timeout in CancellableBulkScorer already triggers the timeout, before any document has been collected. I made several adjustments to the test to make it more robust: - use index random to index documents, that speeds it up - share indexing across test methods, so that it happens once at the suite level - replace the custom query that triggers a timeout to not be a script query, but rather a lucene query that is not time dependent, and throws a time exceeded exception precisely where we expect it, so that we can test how the system reacts to that. That allows to test that partial results are always returned when a timeout happens while scoring documents, and that partial results are never returned when a timeout happens before we even started to score documents. Closes #98369 Closes #98053 --- .../elasticsearch/search/SearchTimeoutIT.java | 322 ++++++++++++++---- 1 file changed, 260 insertions(+), 62 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java index f63f09764621b..f45c37715f7cc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java @@ -9,36 +9,56 @@ package org.elasticsearch.search; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.BulkScorer; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.ScorerSupplier; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Bits; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.MockScriptPlugin; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptType; +import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.xcontent.XContentBuilder; +import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.Map; +import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.index.query.QueryBuilders.scriptQuery; -import static org.elasticsearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) +@ESIntegTestCase.SuiteScopeTestCase public class SearchTimeoutIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Collections.singleton(ScriptedTimeoutPlugin.class); + return Collections.singleton(BulkScorerTimeoutQueryPlugin.class); } @Override @@ -46,75 +66,253 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).build(); } - private void indexDocs() { - for (int i = 0; i < 32; i++) { - prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get(); - } - refresh("test"); + @Override + protected void setupSuiteScopeCluster() throws Exception { + super.setupSuiteScopeCluster(); + indexRandom(true, "test", randomIntBetween(20, 50)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98369") - public void testTopHitsTimeout() { - indexDocs(); - SearchResponse searchResponse = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) - .get(); - assertThat(searchResponse.isTimedOut(), equalTo(true)); - assertEquals(0, searchResponse.getShardFailures().length); - assertEquals(0, searchResponse.getFailedShards()); - assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); - assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); - assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); - assertThat(searchResponse.getHits().getHits().length, greaterThan(0)); + public void testTopHitsTimeoutBeforeCollecting() { + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(false)); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + // timeout happened before we could collect any doc, total hits is 0 and no hits are returned + assertEquals(0, searchResponse.getHits().getTotalHits().value()); + assertEquals(0, searchResponse.getHits().getHits().length); + }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98053") - public void testAggsTimeout() { - indexDocs(); - SearchResponse searchResponse = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) - .setSize(0) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) - .addAggregation(new TermsAggregationBuilder("terms").field("field.keyword")) - .get(); - assertThat(searchResponse.isTimedOut(), equalTo(true)); - assertEquals(0, searchResponse.getShardFailures().length); - assertEquals(0, searchResponse.getFailedShards()); - assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); - assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); - assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); - assertEquals(searchResponse.getHits().getHits().length, 0); - StringTerms terms = searchResponse.getAggregations().get("terms"); - assertEquals(1, terms.getBuckets().size()); - StringTerms.Bucket bucket = terms.getBuckets().get(0); - assertEquals("value", bucket.getKeyAsString()); - assertThat(bucket.getDocCount(), greaterThan(0L)); + public void testTopHitsTimeoutWhileCollecting() { + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(true)); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); + assertThat(searchResponse.getHits().getHits().length, greaterThan(0)); + }); } - public void testPartialResultsIntolerantTimeout() throws Exception { - prepareIndex("test").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + public void testAggsTimeoutBeforeCollecting() { + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setSize(0) + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(false)) + .addAggregation(new TermsAggregationBuilder("terms").field("field.keyword")); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + assertEquals(0, searchResponse.getHits().getTotalHits().value()); + assertEquals(0, searchResponse.getHits().getHits().length); + StringTerms terms = searchResponse.getAggregations().get("terms"); + // timeout happened before we could collect any doc, total hits is 0 and no buckets are returned + assertEquals(0, terms.getBuckets().size()); + }); + } + public void testAggsTimeoutWhileCollecting() { + SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setSize(0) + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(true)) + .addAggregation(new TermsAggregationBuilder("terms").field("field.keyword")); + ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> { + assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(0, searchResponse.getFailedShards()); + assertThat(searchResponse.getSuccessfulShards(), greaterThan(0)); + assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards()); + assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L)); + assertEquals(0, searchResponse.getHits().getHits().length); + StringTerms terms = searchResponse.getAggregations().get("terms"); + assertEquals(1, terms.getBuckets().size()); + StringTerms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("value", bucket.getKeyAsString()); + assertThat(bucket.getDocCount(), greaterThan(0L)); + }); + } + + public void testPartialResultsIntolerantTimeoutBeforeCollecting() { ElasticsearchException ex = expectThrows( ElasticsearchException.class, - prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) + prepareSearch("test") + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(false)) .setAllowPartialSearchResults(false) // this line causes timeouts to report failures ); assertTrue(ex.toString().contains("Time exceeded")); + assertEquals(429, ex.status().getStatus()); } - public static class ScriptedTimeoutPlugin extends MockScriptPlugin { - static final String SCRIPT_NAME = "search_timeout"; + public void testPartialResultsIntolerantTimeoutWhileCollecting() { + ElasticsearchException ex = expectThrows( + ElasticsearchException.class, + prepareSearch("test") + // setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set + .setTimeout(new TimeValue(10, TimeUnit.SECONDS)) + .setQuery(new BulkScorerTimeoutQuery(true)) + .setAllowPartialSearchResults(false) // this line causes timeouts to report failures + ); + assertTrue(ex.toString().contains("Time exceeded")); + assertEquals(429, ex.status().getStatus()); + } + + public static final class BulkScorerTimeoutQueryPlugin extends Plugin implements SearchPlugin { + @Override + public List> getQueries() { + return Collections.singletonList(new QuerySpec("timeout", BulkScorerTimeoutQuery::new, parser -> { + throw new UnsupportedOperationException(); + })); + } + } + + /** + * Query builder that produces a Lucene Query which throws a + * {@link org.elasticsearch.search.internal.ContextIndexSearcher.TimeExceededException} before or while scoring documents. + * This helps make this test not time dependent, otherwise it would be unpredictable when exactly the timeout happens, which is + * rather important if we want to test that we are able to return partial results on timeout. + */ + public static final class BulkScorerTimeoutQuery extends AbstractQueryBuilder { + + private final boolean partialResults; + + BulkScorerTimeoutQuery(boolean partialResults) { + this.partialResults = partialResults; + } + + BulkScorerTimeoutQuery(StreamInput in) throws IOException { + super(in); + this.partialResults = in.readBoolean(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeBoolean(partialResults); + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) {} @Override - public Map, Object>> pluginScripts() { - return Collections.singletonMap(SCRIPT_NAME, params -> { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); + protected Query doToQuery(SearchExecutionContext context) { + return new Query() { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) { + return new ConstantScoreWeight(this, boost) { + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + + @Override + public ScorerSupplier scorerSupplier(LeafReaderContext context) { + return new ScorerSupplier() { + @Override + public BulkScorer bulkScorer() { + if (partialResults == false) { + ((ContextIndexSearcher) searcher).throwTimeExceededException(); + } + final int maxDoc = context.reader().maxDoc(); + return new BulkScorer() { + @Override + public int score(LeafCollector collector, Bits acceptDocs, int min, int max) throws IOException { + max = Math.min(max, maxDoc); + collector.setScorer(new Scorable() { + @Override + public float score() { + return 1f; + } + }); + for (int doc = min; doc < max; ++doc) { + if (acceptDocs == null || acceptDocs.get(doc)) { + collector.collect(doc); + // collect one doc per segment, only then throw a timeout: this ensures partial + // results are returned + ((ContextIndexSearcher) searcher).throwTimeExceededException(); + } + } + // there is a slight chance that no docs are scored for a specific segment. + // other shards / slices will throw the timeout anyway, one is enough. + return max == maxDoc ? DocIdSetIterator.NO_MORE_DOCS : max; + } + + @Override + public long cost() { + return maxDoc; + } + }; + } + + @Override + public Scorer get(long leadCost) { + assert false; + return new ConstantScoreScorer(score(), scoreMode, DocIdSetIterator.all(context.reader().maxDoc())); + } + + @Override + public long cost() { + assert false; + return context.reader().maxDoc(); + } + }; + } + }; + } + + @Override + public String toString(String field) { + return "timeout query"; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); } - return true; - }); + }; + } + + @Override + protected boolean doEquals(BulkScorerTimeoutQuery other) { + return false; + } + + @Override + protected int doHashCode() { + return 0; + } + + @Override + public String getWriteableName() { + return "timeout"; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return null; } } }