3333import java .util .List ;
3434import java .util .Map ;
3535import java .util .Set ;
36- import java .util .concurrent .TimeUnit ;
36+ import java .util .concurrent .Semaphore ;
3737
3838import static org .elasticsearch .index .query .QueryBuilders .termQuery ;
3939import static org .elasticsearch .search .aggregations .AggregationBuilders .filters ;
4040import static org .elasticsearch .search .aggregations .AggregationBuilders .terms ;
4141import static org .hamcrest .Matchers .empty ;
42+ import static org .hamcrest .Matchers .greaterThan ;
4243import static org .hamcrest .Matchers .not ;
4344
4445@ ESIntegTestCase .SuiteScopeTestCase
46+ @ ESIntegTestCase .ClusterScope (numDataNodes = 1 )
4547public class FiltersCancellationIT extends ESIntegTestCase {
4648
4749 private static final String INDEX = "idx" ;
48- private static final String SLEEP_FIELD = "sleep " ;
50+ private static final String PAUSE_FIELD = "pause " ;
4951 private static final String NUMERIC_FIELD = "value" ;
5052
5153 private static final int NUM_DOCS = 100_000 ;
52- /**
53- * The number of milliseconds to sleep in the script that simulates a long-running operation.
54- * <p>
55- * As CancellableBulkScorer does a minimum of 4096 docs per batch, this number must be low to avoid long test times.
56- * </p>
57- */
58- private static final long SLEEP_SCRIPT_MS = 1 ;
54+ private static final int SEMAPHORE_PERMITS = NUM_DOCS - 1000 ;
55+ private static final Semaphore SCRIPT_SEMAPHORE = new Semaphore (0 );
5956
6057 @ Override
6158 protected Collection <Class <? extends Plugin >> nodePlugins () {
6259 return CollectionUtils .appendToCopy (super .nodePlugins (), pausableFieldPluginClass ());
6360 }
6461
6562 protected Class <? extends Plugin > pausableFieldPluginClass () {
66- return SleepScriptPlugin .class ;
63+ return PauseScriptPlugin .class ;
6764 }
6865
6966 @ Override
7067 public void setupSuiteScopeCluster () throws Exception {
71- XContentBuilder mapping = JsonXContent .contentBuilder ().startObject ();
72- mapping .startObject ("runtime" );
73- {
74- mapping .startObject (SLEEP_FIELD );
68+ try (XContentBuilder mapping = JsonXContent .contentBuilder ()) {
69+ mapping .startObject ();
70+ mapping .startObject ("runtime" );
7571 {
76- mapping .field ("type" , "long" );
77- mapping .startObject ("script" ).field ("source" , "" ).field ("lang" , SleepScriptPlugin .PAUSE_SCRIPT_LANG ).endObject ();
72+ mapping .startObject (PAUSE_FIELD );
73+ {
74+ mapping .field ("type" , "long" );
75+ mapping .startObject ("script" ).field ("source" , "" ).field ("lang" , PauseScriptPlugin .PAUSE_SCRIPT_LANG ).endObject ();
76+ }
77+ mapping .endObject ();
78+ mapping .startObject (NUMERIC_FIELD );
79+ {
80+ mapping .field ("type" , "long" );
81+ }
82+ mapping .endObject ();
7883 }
7984 mapping .endObject ();
80- mapping .startObject (NUMERIC_FIELD );
81- {
82- mapping .field ("type" , "long" );
83- }
8485 mapping .endObject ();
86+
87+ client ().admin ().indices ().prepareCreate (INDEX ).setMapping (mapping ).get ();
8588 }
86- mapping .endObject ();
87- client ().admin ().indices ().prepareCreate (INDEX ).setMapping (mapping .endObject ()).get ();
8889
89- BulkRequestBuilder bulk = client ().prepareBulk ().setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
90- for (int i = 0 ; i < NUM_DOCS ; i ++) {
91- bulk .add (prepareIndex (INDEX ).setId (Integer .toString (i )).setSource (NUMERIC_FIELD , i ));
90+ int DOCS_PER_BULK = 100_000 ;
91+ for (int i = 0 ; i < NUM_DOCS ; i += DOCS_PER_BULK ) {
92+ BulkRequestBuilder bulk = client ().prepareBulk ().setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
93+ for (int j = 0 ; j < DOCS_PER_BULK ; j ++) {
94+ int docId = i + j ;
95+ bulk .add (prepareIndex (INDEX ).setId (Integer .toString (docId )).setSource (NUMERIC_FIELD , docId ));
96+ }
97+ bulk .get ();
9298 }
93- bulk . get ();
99+
94100 client ().admin ().indices ().prepareForceMerge (INDEX ).setMaxNumSegments (1 ).get ();
95101 }
96102
@@ -101,8 +107,8 @@ public void testFiltersCountCancellation() throws Exception {
101107 filters (
102108 "filters" ,
103109 new KeyedFilter [] {
104- new KeyedFilter ("filter1" , termQuery (SLEEP_FIELD , 1 )),
105- new KeyedFilter ("filter2" , termQuery (SLEEP_FIELD , 2 )) }
110+ new KeyedFilter ("filter1" , termQuery (PAUSE_FIELD , 1 )),
111+ new KeyedFilter ("filter2" , termQuery (PAUSE_FIELD , 2 )) }
106112 )
107113 )
108114 );
@@ -115,9 +121,9 @@ public void testFiltersSubAggsCancellation() throws Exception {
115121 filters (
116122 "filters" ,
117123 new KeyedFilter [] {
118- new KeyedFilter ("filter1" , termQuery (SLEEP_FIELD , 1 )),
119- new KeyedFilter ("filter2" , termQuery (SLEEP_FIELD , 2 )) }
120- ).subAggregation (terms ("sub" ).field (SLEEP_FIELD ))
124+ new KeyedFilter ("filter1" , termQuery (PAUSE_FIELD , 1 )),
125+ new KeyedFilter ("filter2" , termQuery (PAUSE_FIELD , 2 )) }
126+ ).subAggregation (terms ("sub" ).field (PAUSE_FIELD ))
121127 )
122128 );
123129 }
@@ -130,27 +136,20 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder)
130136 // Check that there are search tasks running
131137 assertThat (getSearchTasks (), not (empty ()));
132138
133- // Wait to ensure scripts started executing and that we don't cancel too early
134- safeSleep (2000 );
135-
136- // CancellableBulkScorer does a starting batch of 4096 items, x2 after each iteration.
137- // That times SLEEP_SCRIPT_MS gives us the maximum time to wait (x2 to avoid flakiness)
138- long maxWaitMs = 3 * (4096 * SLEEP_SCRIPT_MS );
139+ // Wait for the script field to get blocked
140+ assertBusy (() -> { assertThat (SCRIPT_SEMAPHORE .getQueueLength (), greaterThan (0 )); });
139141
140142 // Cancel the tasks
141- client ().admin ()
142- .cluster ()
143- .prepareCancelTasks ()
144- .setActions (TransportSearchAction .NAME + "*" )
145- .waitForCompletion (true )
146- .setTimeout (TimeValue .timeValueMillis (maxWaitMs ))
147- .get ();
143+ // Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
144+ client ().admin ().cluster ().prepareCancelTasks ().setActions (TransportSearchAction .NAME + "*" ).get ();
145+
146+ SCRIPT_SEMAPHORE .release (SEMAPHORE_PERMITS );
148147
149148 // Ensure the search request finished and that there are no more search tasks
150149 assertBusy (() -> {
151- assertThat (getSearchTasks (), empty ());
152150 assertTrue (searchRequestFuture .isDone ());
153- }, maxWaitMs , TimeUnit .MILLISECONDS );
151+ assertThat (getSearchTasks (), empty ());
152+ });
154153 }
155154
156155 private List <TaskInfo > getSearchTasks () {
@@ -163,7 +162,7 @@ private List<TaskInfo> getSearchTasks() {
163162 .getTasks ();
164163 }
165164
166- public static class SleepScriptPlugin extends Plugin implements ScriptPlugin {
165+ public static class PauseScriptPlugin extends Plugin implements ScriptPlugin {
167166 public static final String PAUSE_SCRIPT_LANG = "pause" ;
168167
169168 @ Override
@@ -195,7 +194,7 @@ public LongFieldScript.LeafFactory newFactory(
195194 @ Override
196195 public void execute () {
197196 try {
198- Thread . sleep ( SLEEP_SCRIPT_MS );
197+ SCRIPT_SEMAPHORE . acquire ( );
199198 } catch (InterruptedException e ) {
200199 throw new AssertionError (e );
201200 }
0 commit comments