2828import org .elasticsearch .test .ESIntegTestCase ;
2929import org .elasticsearch .xcontent .XContentBuilder ;
3030import org .elasticsearch .xcontent .json .JsonXContent ;
31+ import org .junit .Before ;
3132
3233import java .util .Collection ;
3334import java .util .List ;
3940import static org .elasticsearch .search .aggregations .AggregationBuilders .filters ;
4041import static org .elasticsearch .search .aggregations .AggregationBuilders .terms ;
4142import static org .hamcrest .Matchers .empty ;
43+ import static org .hamcrest .Matchers .equalTo ;
4244import static org .hamcrest .Matchers .greaterThan ;
4345import static org .hamcrest .Matchers .not ;
4446
47+ /**
48+ * Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs.
49+ * <p>
50+ * The CancellableBulkScorer we use to break the execution is called per search thread in the query.
51+ * It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks.
52+ * This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits.
53+ * </p>
54+ * <p>
55+ * Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096),
56+ * eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this.
57+ * </p>
58+ */
4559@ ESIntegTestCase .SuiteScopeTestCase
4660public class FiltersCancellationIT extends ESIntegTestCase {
4761
@@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase {
5569
5670 @ Override
5771 protected Collection <Class <? extends Plugin >> nodePlugins () {
58- return CollectionUtils .appendToCopy (super .nodePlugins (), pausableFieldPluginClass () );
72+ return CollectionUtils .appendToCopy (super .nodePlugins (), PauseScriptPlugin . class );
5973 }
6074
61- protected Class <? extends Plugin > pausableFieldPluginClass () {
62- return PauseScriptPlugin .class ;
75+ @ Override
76+ public Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
77+ return Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings )).put ("thread_pool.search.size" , 4 ).build ();
6378 }
6479
6580 @ Override
@@ -99,6 +114,11 @@ public void setupSuiteScopeCluster() throws Exception {
99114 client ().admin ().indices ().prepareForceMerge (INDEX ).setMaxNumSegments (1 ).get ();
100115 }
101116
117+ @ Before
118+ public void reset () {
119+ SCRIPT_SEMAPHORE .drainPermits ();
120+ }
121+
102122 public void testFiltersCountCancellation () throws Exception {
103123 ensureProperCancellation (
104124 client ().prepareSearch (INDEX )
@@ -129,14 +149,14 @@ public void testFiltersSubAggsCancellation() throws Exception {
129149
130150 private void ensureProperCancellation (SearchRequestBuilder searchRequestBuilder ) throws Exception {
131151 var searchRequestFuture = searchRequestBuilder .setTimeout (TimeValue .timeValueSeconds (1 )).execute ();
132- assertFalse (searchRequestFuture .isCancelled ());
133- assertFalse (searchRequestFuture .isDone ());
152+ assertThat (searchRequestFuture .isCancelled (), equalTo ( false ));
153+ assertThat (searchRequestFuture .isDone (), equalTo ( false ));
134154
135155 // Check that there are search tasks running
136156 assertThat (getSearchTasks (), not (empty ()));
137157
138158 // Wait for the script field to get blocked
139- assertBusy (() -> { assertThat (SCRIPT_SEMAPHORE .getQueueLength (), greaterThan (0 )); } );
159+ assertBusy (() -> assertThat (SCRIPT_SEMAPHORE .getQueueLength (), greaterThan (0 )));
140160
141161 // Cancel the tasks
142162 // Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
@@ -146,8 +166,8 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder)
146166
147167 // Ensure the search request finished and that there are no more search tasks
148168 assertBusy (() -> {
149- assertTrue ( searchRequestFuture .isDone ());
150- assertThat (getSearchTasks (), empty ());
169+ assertThat ( "Search request didn't finish" , searchRequestFuture .isDone (), equalTo ( true ));
170+ assertThat ("There are dangling search tasks" , getSearchTasks (), empty ());
151171 });
152172 }
153173
0 commit comments