28
28
import org .elasticsearch .test .ESIntegTestCase ;
29
29
import org .elasticsearch .xcontent .XContentBuilder ;
30
30
import org .elasticsearch .xcontent .json .JsonXContent ;
31
+ import org .junit .Before ;
31
32
32
33
import java .util .Collection ;
33
34
import java .util .List ;
39
40
import static org .elasticsearch .search .aggregations .AggregationBuilders .filters ;
40
41
import static org .elasticsearch .search .aggregations .AggregationBuilders .terms ;
41
42
import static org .hamcrest .Matchers .empty ;
43
+ import static org .hamcrest .Matchers .equalTo ;
42
44
import static org .hamcrest .Matchers .greaterThan ;
43
45
import static org .hamcrest .Matchers .not ;
44
46
47
+ /**
48
+ * Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs.
49
+ * <p>
50
+ * The CancellableBulkScorer we use to break the execution is called per search thread in the query.
51
+ * It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks.
52
+ * This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits.
53
+ * </p>
54
+ * <p>
55
+ * Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096),
56
+ * eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this.
57
+ * </p>
58
+ */
45
59
@ ESIntegTestCase .SuiteScopeTestCase
46
60
public class FiltersCancellationIT extends ESIntegTestCase {
47
61
@@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase {
55
69
56
70
@ Override
57
71
protected Collection <Class <? extends Plugin >> nodePlugins () {
58
- return CollectionUtils .appendToCopy (super .nodePlugins (), pausableFieldPluginClass () );
72
+ return CollectionUtils .appendToCopy (super .nodePlugins (), PauseScriptPlugin . class );
59
73
}
60
74
61
- protected Class <? extends Plugin > pausableFieldPluginClass () {
62
- return PauseScriptPlugin .class ;
75
+ @ Override
76
+ public Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
77
+ return Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings )).put ("thread_pool.search.size" , 4 ).build ();
63
78
}
64
79
65
80
@ Override
@@ -99,6 +114,11 @@ public void setupSuiteScopeCluster() throws Exception {
99
114
client ().admin ().indices ().prepareForceMerge (INDEX ).setMaxNumSegments (1 ).get ();
100
115
}
101
116
117
+ @ Before
118
+ public void reset () {
119
+ SCRIPT_SEMAPHORE .drainPermits ();
120
+ }
121
+
102
122
public void testFiltersCountCancellation () throws Exception {
103
123
ensureProperCancellation (
104
124
client ().prepareSearch (INDEX )
@@ -129,14 +149,14 @@ public void testFiltersSubAggsCancellation() throws Exception {
129
149
130
150
private void ensureProperCancellation (SearchRequestBuilder searchRequestBuilder ) throws Exception {
131
151
var searchRequestFuture = searchRequestBuilder .setTimeout (TimeValue .timeValueSeconds (1 )).execute ();
132
- assertFalse (searchRequestFuture .isCancelled ());
133
- assertFalse (searchRequestFuture .isDone ());
152
+ assertThat (searchRequestFuture .isCancelled (), equalTo ( false ));
153
+ assertThat (searchRequestFuture .isDone (), equalTo ( false ));
134
154
135
155
// Check that there are search tasks running
136
156
assertThat (getSearchTasks (), not (empty ()));
137
157
138
158
// Wait for the script field to get blocked
139
- assertBusy (() -> { assertThat (SCRIPT_SEMAPHORE .getQueueLength (), greaterThan (0 )); } );
159
+ assertBusy (() -> assertThat (SCRIPT_SEMAPHORE .getQueueLength (), greaterThan (0 )));
140
160
141
161
// Cancel the tasks
142
162
// Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
@@ -146,8 +166,8 @@ private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder)
146
166
147
167
// Ensure the search request finished and that there are no more search tasks
148
168
assertBusy (() -> {
149
- assertTrue ( searchRequestFuture .isDone ());
150
- assertThat (getSearchTasks (), empty ());
169
+ assertThat ( "Search request didn't finish" , searchRequestFuture .isDone (), equalTo ( true ));
170
+ assertThat ("There are dangling search tasks" , getSearchTasks (), empty ());
151
171
});
152
172
}
153
173
0 commit comments