diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java index 76948bad7fccb..1ef832c57ce2f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; import java.util.Collection; import java.util.List; @@ -39,9 +40,22 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.filters; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; +/** + * Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs. + *

+ * The CancellableBulkScorer we use to break the execution is called per search thread in the query. + * It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks. + * This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits. + *

+ *

+ * Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096), + * eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this. + *

+ */ @ESIntegTestCase.SuiteScopeTestCase public class FiltersCancellationIT extends ESIntegTestCase { @@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass()); + return CollectionUtils.appendToCopy(super.nodePlugins(), PauseScriptPlugin.class); } - protected Class pausableFieldPluginClass() { - return PauseScriptPlugin.class; + @Override + public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("thread_pool.search.size", 4).build(); } @Override @@ -99,6 +114,11 @@ public void setupSuiteScopeCluster() throws Exception { client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get(); } + @Before + public void reset() { + SCRIPT_SEMAPHORE.drainPermits(); + } + public void testFiltersCountCancellation() throws Exception { ensureProperCancellation( client().prepareSearch(INDEX) @@ -129,14 +149,14 @@ public void testFiltersSubAggsCancellation() throws Exception { private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception { var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute(); - assertFalse(searchRequestFuture.isCancelled()); - assertFalse(searchRequestFuture.isDone()); + assertThat(searchRequestFuture.isCancelled(), equalTo(false)); + assertThat(searchRequestFuture.isDone(), equalTo(false)); // Check that there are search tasks running assertThat(getSearchTasks(), not(empty())); // Wait for the script field to get blocked - assertBusy(() -> { assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)); }); + assertBusy(() -> assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0))); // Cancel the tasks // Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck @@ -146,8 +166,8 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) // Ensure the search request finished and that there are no more search tasks assertBusy(() -> { - assertTrue(searchRequestFuture.isDone()); - assertThat(getSearchTasks(), empty()); + assertThat("Search request didn't finish", searchRequestFuture.isDone(), equalTo(true)); + assertThat("There are dangling search tasks", getSearchTasks(), empty()); }); }