1414import org .elasticsearch .common .settings .Settings ;
1515import org .elasticsearch .common .util .CollectionUtils ;
1616import org .elasticsearch .common .xcontent .XContentHelper ;
17- import org .elasticsearch .core .TimeValue ;
1817import org .elasticsearch .plugins .Plugin ;
1918import org .elasticsearch .search .ErrorTraceHelper ;
2019import org .elasticsearch .search .SearchService ;
@@ -72,11 +71,11 @@ private void setupIndexWithDocs() {
7271 refresh ();
7372 }
7473
75- public void testAsyncSearchFailingQueryErrorTraceDefault () throws IOException , InterruptedException {
74+ public void testAsyncSearchFailingQueryErrorTraceDefault () throws Exception {
7675 setupIndexWithDocs ();
7776
78- Request searchRequest = new Request ("POST" , "/_async_search" );
79- searchRequest .setJsonEntity ("""
77+ Request createAsyncRequest = new Request ("POST" , "/_async_search" );
78+ createAsyncRequest .setJsonEntity ("""
8079 {
8180 "query": {
8281 "simple_query_string" : {
@@ -86,23 +85,23 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, I
8685 }
8786 }
8887 """ );
89- searchRequest .addParameter ("keep_on_completion" , "true" );
90- searchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
91- Map <String , Object > responseEntity = performRequestAndGetResponseEntityAfterDelay ( searchRequest , TimeValue . ZERO );
92- String asyncExecutionId = ( String ) responseEntity .get ("id" );
93- Request request = new Request ( "GET" , "/_async_search/" + asyncExecutionId );
94- while ( responseEntity . get ( "is_running" ) instanceof Boolean isRunning && isRunning ) {
95- responseEntity = performRequestAndGetResponseEntityAfterDelay ( request , TimeValue . timeValueSeconds ( 1L ) );
88+ createAsyncRequest .addParameter ("keep_on_completion" , "true" );
89+ createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
90+ Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity ( createAsyncRequest );
91+ if ( createAsyncResponseEntity .get ("is_running" ). equals ( "true" )) {
92+ String asyncExecutionId = ( String ) createAsyncResponseEntity . get ( "id" );
93+ Request getAsyncRequest = new Request ( "GET" , "/_async_search/" + asyncExecutionId );
94+ awaitAsyncRequestDoneRunning ( getAsyncRequest );
9695 }
9796 // check that the stack trace was not sent from the data node to the coordinating node
9897 assertFalse (transportMessageHasStackTrace .getAsBoolean ());
9998 }
10099
101- public void testAsyncSearchFailingQueryErrorTraceTrue () throws IOException , InterruptedException {
100+ public void testAsyncSearchFailingQueryErrorTraceTrue () throws Exception {
102101 setupIndexWithDocs ();
103102
104- Request searchRequest = new Request ("POST" , "/_async_search" );
105- searchRequest .setJsonEntity ("""
103+ Request createAsyncRequest = new Request ("POST" , "/_async_search" );
104+ createAsyncRequest .setJsonEntity ("""
106105 {
107106 "query": {
108107 "simple_query_string" : {
@@ -112,25 +111,25 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, Inte
112111 }
113112 }
114113 """ );
115- searchRequest .addParameter ("error_trace" , "true" );
116- searchRequest .addParameter ("keep_on_completion" , "true" );
117- searchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
118- Map <String , Object > responseEntity = performRequestAndGetResponseEntityAfterDelay ( searchRequest , TimeValue . ZERO );
119- String asyncExecutionId = ( String ) responseEntity .get ("id" );
120- Request request = new Request ( "GET" , "/_async_search/" + asyncExecutionId );
121- request . addParameter ( "error_trace " , "true" );
122- while ( responseEntity . get ( "is_running" ) instanceof Boolean isRunning && isRunning ) {
123- responseEntity = performRequestAndGetResponseEntityAfterDelay ( request , TimeValue . timeValueSeconds ( 1L ) );
114+ createAsyncRequest .addParameter ("error_trace" , "true" );
115+ createAsyncRequest .addParameter ("keep_on_completion" , "true" );
116+ createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
117+ Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity ( createAsyncRequest );
118+ if ( createAsyncResponseEntity .get ("is_running" ). equals ( "true" )) {
119+ String asyncExecutionId = ( String ) createAsyncResponseEntity . get ( "id" );
120+ Request getAsyncRequest = new Request ( "GET " , "/_async_search/" + asyncExecutionId );
121+ getAsyncRequest . addParameter ( "error_trace" , "true" );
122+ awaitAsyncRequestDoneRunning ( getAsyncRequest );
124123 }
125124 // check that the stack trace was sent from the data node to the coordinating node
126125 assertTrue (transportMessageHasStackTrace .getAsBoolean ());
127126 }
128127
129- public void testAsyncSearchFailingQueryErrorTraceFalse () throws IOException , InterruptedException {
128+ public void testAsyncSearchFailingQueryErrorTraceFalse () throws Exception {
130129 setupIndexWithDocs ();
131130
132- Request searchRequest = new Request ("POST" , "/_async_search" );
133- searchRequest .setJsonEntity ("""
131+ Request createAsyncRequest = new Request ("POST" , "/_async_search" );
132+ createAsyncRequest .setJsonEntity ("""
134133 {
135134 "query": {
136135 "simple_query_string" : {
@@ -140,28 +139,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, Int
140139 }
141140 }
142141 """ );
143- searchRequest .addParameter ("error_trace" , "false" );
144- searchRequest .addParameter ("keep_on_completion" , "true" );
145- searchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
146- Map <String , Object > responseEntity = performRequestAndGetResponseEntityAfterDelay ( searchRequest , TimeValue . ZERO );
147- String asyncExecutionId = ( String ) responseEntity .get ("id" );
148- Request request = new Request ( "GET" , "/_async_search/" + asyncExecutionId );
149- request . addParameter ( "error_trace " , "false" );
150- while ( responseEntity . get ( "is_running" ) instanceof Boolean isRunning && isRunning ) {
151- responseEntity = performRequestAndGetResponseEntityAfterDelay ( request , TimeValue . timeValueSeconds ( 1L ) );
142+ createAsyncRequest .addParameter ("error_trace" , "false" );
143+ createAsyncRequest .addParameter ("keep_on_completion" , "true" );
144+ createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
145+ Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity ( createAsyncRequest );
146+ if ( createAsyncResponseEntity .get ("is_running" ). equals ( "true" )) {
147+ String asyncExecutionId = ( String ) createAsyncResponseEntity . get ( "id" );
148+ Request getAsyncRequest = new Request ( "GET " , "/_async_search/" + asyncExecutionId );
149+ getAsyncRequest . addParameter ( "error_trace" , "false" );
150+ awaitAsyncRequestDoneRunning ( getAsyncRequest );
152151 }
153152 // check that the stack trace was not sent from the data node to the coordinating node
154153 assertFalse (transportMessageHasStackTrace .getAsBoolean ());
155154 }
156155
157- public void testDataNodeLogsStackTrace () throws IOException , InterruptedException {
156+ public void testDataNodeLogsStackTrace () throws Exception {
158157 setupIndexWithDocs ();
159158
160- // error_trace defaults to false so we can test both cases with some randomization
161- final boolean defineErrorTraceFalse = randomBoolean ();
162-
163- Request searchRequest = new Request ("POST" , "/_async_search" );
164- searchRequest .setJsonEntity ("""
159+ Request createAsyncRequest = new Request ("POST" , "/_async_search" );
160+ createAsyncRequest .setJsonEntity ("""
165161 {
166162 "query": {
167163 "simple_query_string" : {
@@ -175,43 +171,40 @@ public void testDataNodeLogsStackTrace() throws IOException, InterruptedExceptio
175171 // No matter the value of error_trace (empty, true, or false) we should see stack traces logged
176172 int errorTraceValue = randomIntBetween (0 , 2 );
177173 if (errorTraceValue == 0 ) {
178- searchRequest .addParameter ("error_trace" , "true" );
174+ createAsyncRequest .addParameter ("error_trace" , "true" );
179175 } else if (errorTraceValue == 1 ) {
180- searchRequest .addParameter ("error_trace" , "false" );
176+ createAsyncRequest .addParameter ("error_trace" , "false" );
181177 } // else empty
182178
183- searchRequest .addParameter ("keep_on_completion" , "true" );
184- searchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
179+ createAsyncRequest .addParameter ("keep_on_completion" , "true" );
180+ createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
185181
186182 String errorTriggeringIndex = "test2" ;
187183 int numShards = getNumShards (errorTriggeringIndex ).numPrimaries ;
188184 try (var mockLog = MockLog .capture (SearchService .class )) {
189185 ErrorTraceHelper .addSeenLoggingExpectations (numShards , mockLog , errorTriggeringIndex );
190-
191- Map <String , Object > responseEntity = performRequestAndGetResponseEntityAfterDelay (searchRequest , TimeValue .ZERO );
192- String asyncExecutionId = (String ) responseEntity .get ("id" );
193- Request request = new Request ("GET" , "/_async_search/" + asyncExecutionId );
194-
195- // Use the same value of error_trace as the search request
196- if (errorTraceValue == 0 ) {
197- request .addParameter ("error_trace" , "true" );
198- } else if (errorTraceValue == 1 ) {
199- request .addParameter ("error_trace" , "false" );
200- } // else empty
201-
202- while (responseEntity .get ("is_running" ) instanceof Boolean isRunning && isRunning ) {
203- responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
186+ Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
187+ if (createAsyncResponseEntity .get ("is_running" ).equals ("true" )) {
188+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
189+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
190+ // Use the same value of error_trace as the search request
191+ if (errorTraceValue == 0 ) {
192+ getAsyncRequest .addParameter ("error_trace" , "true" );
193+ } else if (errorTraceValue == 1 ) {
194+ getAsyncRequest .addParameter ("error_trace" , "false" );
195+ } // else empty
196+ awaitAsyncRequestDoneRunning (getAsyncRequest );
204197 }
205198
206199 mockLog .assertAllExpectationsMatched ();
207200 }
208201 }
209202
210- public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet () throws IOException , InterruptedException {
203+ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet () throws Exception {
211204 setupIndexWithDocs ();
212205
213- Request searchRequest = new Request ("POST" , "/_async_search" );
214- searchRequest .setJsonEntity ("""
206+ Request createAsyncSearchRequest = new Request ("POST" , "/_async_search" );
207+ createAsyncSearchRequest .setJsonEntity ("""
215208 {
216209 "query": {
217210 "simple_query_string" : {
@@ -221,25 +214,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
221214 }
222215 }
223216 """ );
224- searchRequest .addParameter ("error_trace" , "false" );
225- searchRequest .addParameter ("keep_on_completion" , "true" );
226- searchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
227- Map <String , Object > responseEntity = performRequestAndGetResponseEntityAfterDelay ( searchRequest , TimeValue . ZERO );
228- String asyncExecutionId = ( String ) responseEntity .get ("id" );
229- Request request = new Request ( "GET" , "/_async_search/" + asyncExecutionId );
230- request . addParameter ( "error_trace " , "true" );
231- while ( responseEntity . get ( "is_running" ) instanceof Boolean isRunning && isRunning ) {
232- responseEntity = performRequestAndGetResponseEntityAfterDelay ( request , TimeValue . timeValueSeconds ( 1L ) );
217+ createAsyncSearchRequest .addParameter ("error_trace" , "false" );
218+ createAsyncSearchRequest .addParameter ("keep_on_completion" , "true" );
219+ createAsyncSearchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
220+ Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity ( createAsyncSearchRequest );
221+ if ( createAsyncResponseEntity .get ("is_running" ). equals ( "true" )) {
222+ String asyncExecutionId = ( String ) createAsyncResponseEntity . get ( "id" );
223+ Request getAsyncRequest = new Request ( "GET " , "/_async_search/" + asyncExecutionId );
224+ getAsyncRequest . addParameter ( "error_trace" , "true" );
225+ awaitAsyncRequestDoneRunning ( getAsyncRequest );
233226 }
234227 // check that the stack trace was not sent from the data node to the coordinating node
235228 assertFalse (transportMessageHasStackTrace .getAsBoolean ());
236229 }
237230
238- public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet () throws IOException , InterruptedException {
231+ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet () throws Exception {
239232 setupIndexWithDocs ();
240233
241- Request searchRequest = new Request ("POST" , "/_async_search" );
242- searchRequest .setJsonEntity ("""
234+ Request createAsyncSearchRequest = new Request ("POST" , "/_async_search" );
235+ createAsyncSearchRequest .setJsonEntity ("""
243236 {
244237 "query": {
245238 "simple_query_string" : {
@@ -249,25 +242,30 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
249242 }
250243 }
251244 """ );
252- searchRequest .addParameter ("error_trace" , "true" );
253- searchRequest .addParameter ("keep_on_completion" , "true" );
254- searchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
255- Map <String , Object > responseEntity = performRequestAndGetResponseEntityAfterDelay ( searchRequest , TimeValue . ZERO );
256- String asyncExecutionId = ( String ) responseEntity .get ("id" );
257- Request request = new Request ( "GET" , "/_async_search/" + asyncExecutionId );
258- request . addParameter ( "error_trace " , "false" );
259- while ( responseEntity . get ( "is_running" ) instanceof Boolean isRunning && isRunning ) {
260- responseEntity = performRequestAndGetResponseEntityAfterDelay ( request , TimeValue . timeValueSeconds ( 1L ) );
245+ createAsyncSearchRequest .addParameter ("error_trace" , "true" );
246+ createAsyncSearchRequest .addParameter ("keep_on_completion" , "true" );
247+ createAsyncSearchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
248+ Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity ( createAsyncSearchRequest );
249+ if ( createAsyncResponseEntity .get ("is_running" ). equals ( "true" )) {
250+ String asyncExecutionId = ( String ) createAsyncResponseEntity . get ( "id" );
251+ Request getAsyncRequest = new Request ( "GET " , "/_async_search/" + asyncExecutionId );
252+ getAsyncRequest . addParameter ( "error_trace" , "false" );
253+ awaitAsyncRequestDoneRunning ( getAsyncRequest );
261254 }
262255 // check that the stack trace was sent from the data node to the coordinating node
263256 assertTrue (transportMessageHasStackTrace .getAsBoolean ());
264257 }
265258
266- private Map <String , Object > performRequestAndGetResponseEntityAfterDelay (Request r , TimeValue sleep ) throws IOException ,
267- InterruptedException {
268- Thread .sleep (sleep .millis ());
259+ private Map <String , Object > performRequestAndGetResponseEntity (Request r ) throws IOException {
269260 Response response = getRestClient ().performRequest (r );
270261 XContentType entityContentType = XContentType .fromMediaType (response .getEntity ().getContentType ().getValue ());
271262 return XContentHelper .convertToMap (entityContentType .xContent (), response .getEntity ().getContent (), false );
272263 }
264+
265+ private void awaitAsyncRequestDoneRunning (Request getAsyncRequest ) throws Exception {
266+ assertBusy (() -> {
267+ Map <String , Object > getAsyncResponseEntity = performRequestAndGetResponseEntity (getAsyncRequest );
268+ assertFalse ((Boolean ) getAsyncResponseEntity .get ("is_running" ));
269+ });
270+ }
273271}
0 commit comments