Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Before;

import java.util.Collection;
import java.util.List;
Expand All @@ -39,9 +40,22 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;

/**
* Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs.
* <p>
* The CancellableBulkScorer we use to break the execution is called per search thread in the query.
* It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks.
* This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits.
* </p>
* <p>
* Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096),
* eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this.
* </p>
*/
@ESIntegTestCase.SuiteScopeTestCase
public class FiltersCancellationIT extends ESIntegTestCase {

Expand All @@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
return CollectionUtils.appendToCopy(super.nodePlugins(), PauseScriptPlugin.class);
}

protected Class<? extends Plugin> pausableFieldPluginClass() {
return PauseScriptPlugin.class;
@Override
public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("thread_pool.search.size", 4).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why 4?

Copy link
Contributor Author

@ivancea ivancea Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! That was the core of the issue:

Some context first:

  • The CancellableBulkScorer we use to break the execution is called per search thread in the query.
  • It breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks
  • The test uses 100.000 docs.
  • The test adds around 99000 permits to a semaphore, so only 99000 docs can be processed before the threads getting blocked (And the test failing)
  • Which is what the test does: It expect cancelled queries to not reach that many processed docs, and break earlier.

Now, if there are 25 threads, it would consume up to 25*4096 = 102400 docs (before checking for cancellation), which would be more than the semaphore permits, and threads would get blocked. Which is what happened sometimes.

To the specific question: 4 is just a small number that shouldn't trigger this case by a great margin.

Copy link
Contributor

@GalLalouche GalLalouche Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Either add a short version of this explanation as a comment, or replace 4 with some computation based on the total number of documents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test javadoc explaining what it does, and how it does it (Including those magic constants)

}

@Override
Expand Down Expand Up @@ -99,6 +114,11 @@ public void setupSuiteScopeCluster() throws Exception {
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
}

@Before
public void reset() {
SCRIPT_SEMAPHORE.drainPermits();
}

public void testFiltersCountCancellation() throws Exception {
ensureProperCancellation(
client().prepareSearch(INDEX)
Expand Down Expand Up @@ -129,14 +149,14 @@ public void testFiltersSubAggsCancellation() throws Exception {

private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
assertFalse(searchRequestFuture.isCancelled());
assertFalse(searchRequestFuture.isDone());
assertThat(searchRequestFuture.isCancelled(), equalTo(false));
assertThat(searchRequestFuture.isDone(), equalTo(false));
Comment on lines +152 to +153
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message looks better this way IMO, no functional change here


// Check that there are search tasks running
assertThat(getSearchTasks(), not(empty()));

// Wait for the script field to get blocked
assertBusy(() -> { assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)); });
assertBusy(() -> assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)));

// Cancel the tasks
// Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
Expand All @@ -146,8 +166,8 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder)

// Ensure the search request finished and that there are no more search tasks
assertBusy(() -> {
assertTrue(searchRequestFuture.isDone());
assertThat(getSearchTasks(), empty());
assertThat("Search request didn't finish", searchRequestFuture.isDone(), equalTo(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there an assertTrue that also takes a String?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, I see now that you used it to get the hamcrest error message, and apparently assertTrue(msg, bool) is from junit, not hamcrest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The message with assertTrue/False was just "AssertionError: null", while the equalTo(true) was like "false != true". Which isn't much better, but at least tells you something. Anyway, with the message they should be similar, but I changed them anyway XD

assertThat("There are dangling search tasks", getSearchTasks(), empty());
});
}

Expand Down