Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Before;

import java.util.Collection;
import java.util.List;
Expand All @@ -39,9 +40,22 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;

/**
* Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs.
* <p>
* The CancellableBulkScorer we use to break the execution is called per search thread in the query.
* It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks.
* This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits.
* </p>
* <p>
* Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096),
* eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this.
* </p>
*/
@ESIntegTestCase.SuiteScopeTestCase
public class FiltersCancellationIT extends ESIntegTestCase {

Expand All @@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
return CollectionUtils.appendToCopy(super.nodePlugins(), PauseScriptPlugin.class);
}

protected Class<? extends Plugin> pausableFieldPluginClass() {
return PauseScriptPlugin.class;
@Override
public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("thread_pool.search.size", 4).build();
}

@Override
Expand Down Expand Up @@ -99,6 +114,11 @@ public void setupSuiteScopeCluster() throws Exception {
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
}

@Before
public void reset() {
SCRIPT_SEMAPHORE.drainPermits();
}

public void testFiltersCountCancellation() throws Exception {
ensureProperCancellation(
client().prepareSearch(INDEX)
Expand Down Expand Up @@ -129,14 +149,14 @@ public void testFiltersSubAggsCancellation() throws Exception {

private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
assertFalse(searchRequestFuture.isCancelled());
assertFalse(searchRequestFuture.isDone());
assertThat(searchRequestFuture.isCancelled(), equalTo(false));
assertThat(searchRequestFuture.isDone(), equalTo(false));

// Check that there are search tasks running
assertThat(getSearchTasks(), not(empty()));

// Wait for the script field to get blocked
assertBusy(() -> { assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)); });
assertBusy(() -> assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)));

// Cancel the tasks
// Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
Expand All @@ -146,8 +166,8 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder)

// Ensure the search request finished and that there are no more search tasks
assertBusy(() -> {
assertTrue(searchRequestFuture.isDone());
assertThat(getSearchTasks(), empty());
assertThat("Search request didn't finish", searchRequestFuture.isDone(), equalTo(true));
assertThat("There are dangling search tasks", getSearchTasks(), empty());
});
}

Expand Down