diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java
index 76948bad7fccb..1ef832c57ce2f 100644
--- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java
+++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/FiltersCancellationIT.java
@@ -28,6 +28,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
+import org.junit.Before;
import java.util.Collection;
import java.util.List;
@@ -39,9 +40,22 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
+/**
+ * Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs.
+ *
+ * The CancellableBulkScorer we use to break the execution is called per search thread in the query.
+ * It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks.
+ * This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits.
+ *
+ *
+ * Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096),
+ * eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this.
+ *
+ */
@ESIntegTestCase.SuiteScopeTestCase
public class FiltersCancellationIT extends ESIntegTestCase {
@@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase {
@Override
protected Collection> nodePlugins() {
- return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
+ return CollectionUtils.appendToCopy(super.nodePlugins(), PauseScriptPlugin.class);
}
- protected Class extends Plugin> pausableFieldPluginClass() {
- return PauseScriptPlugin.class;
+ @Override
+ public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+ return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("thread_pool.search.size", 4).build();
}
@Override
@@ -99,6 +114,11 @@ public void setupSuiteScopeCluster() throws Exception {
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
}
+ @Before
+ public void reset() {
+ SCRIPT_SEMAPHORE.drainPermits();
+ }
+
public void testFiltersCountCancellation() throws Exception {
ensureProperCancellation(
client().prepareSearch(INDEX)
@@ -129,14 +149,14 @@ public void testFiltersSubAggsCancellation() throws Exception {
private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
- assertFalse(searchRequestFuture.isCancelled());
- assertFalse(searchRequestFuture.isDone());
+ assertThat(searchRequestFuture.isCancelled(), equalTo(false));
+ assertThat(searchRequestFuture.isDone(), equalTo(false));
// Check that there are search tasks running
assertThat(getSearchTasks(), not(empty()));
// Wait for the script field to get blocked
- assertBusy(() -> { assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)); });
+ assertBusy(() -> assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)));
// Cancel the tasks
// Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
@@ -146,8 +166,8 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder)
// Ensure the search request finished and that there are no more search tasks
assertBusy(() -> {
- assertTrue(searchRequestFuture.isDone());
- assertThat(getSearchTasks(), empty());
+ assertThat("Search request didn't finish", searchRequestFuture.isDone(), equalTo(true));
+ assertThat("There are dangling search tasks", getSearchTasks(), empty());
});
}