77
88package org .elasticsearch .xpack .search ;
99
10+ import org .apache .http .HttpEntity ;
11+ import org .apache .http .util .EntityUtils ;
1012import org .apache .logging .log4j .Level ;
1113import org .apache .logging .log4j .core .config .Configurator ;
1214import org .elasticsearch .client .Request ;
1618import org .elasticsearch .plugins .Plugin ;
1719import org .elasticsearch .search .ErrorTraceHelper ;
1820import org .elasticsearch .search .SearchService ;
19- import org .elasticsearch .test .ESIntegTestCase ;
2021import org .elasticsearch .test .MockLog ;
2122import org .elasticsearch .test .junit .annotations .TestLogging ;
2223import org .elasticsearch .test .transport .MockTransportService ;
3132 reason = "testing debug log output to identify race condition" ,
3233 value = "org.elasticsearch.xpack.search.MutableSearchResponse:DEBUG,org.elasticsearch.xpack.search.AsyncSearchTask:DEBUG"
3334)
34- public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
35+ public class AsyncSearchErrorTraceIT extends AsyncSearchIntegTestCase {
3536
3637 @ Override
3738 protected boolean addMockHttpTransport () {
@@ -76,10 +77,15 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
7677 createAsyncRequest .addParameter ("keep_on_completion" , "true" );
7778 createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
7879 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
79- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
80- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
81- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
82- awaitAsyncRequestDoneRunning (getAsyncRequest );
80+
81+ try {
82+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
83+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
84+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
85+ awaitAsyncRequestDoneRunning (getAsyncRequest );
86+ }
87+ } finally {
88+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
8389 }
8490 // check that the stack trace was not sent from the data node to the coordinating node
8591 ErrorTraceHelper .assertStackTraceCleared (internalCluster ());
@@ -103,11 +109,16 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
103109 createAsyncRequest .addParameter ("keep_on_completion" , "true" );
104110 createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
105111 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
106- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
107- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
108- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
109- getAsyncRequest .addParameter ("error_trace" , "true" );
110- awaitAsyncRequestDoneRunning (getAsyncRequest );
112+
113+ try {
114+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
115+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
116+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
117+ getAsyncRequest .addParameter ("error_trace" , "true" );
118+ awaitAsyncRequestDoneRunning (getAsyncRequest );
119+ }
120+ } finally {
121+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
111122 }
112123 // check that the stack trace was sent from the data node to the coordinating node
113124 ErrorTraceHelper .assertStackTraceObserved (internalCluster ());
@@ -131,11 +142,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
131142 createAsyncRequest .addParameter ("keep_on_completion" , "true" );
132143 createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
133144 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
134- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
135- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
136- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
137- getAsyncRequest .addParameter ("error_trace" , "false" );
138- awaitAsyncRequestDoneRunning (getAsyncRequest );
145+
146+ try {
147+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
148+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
149+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
150+ getAsyncRequest .addParameter ("error_trace" , "false" );
151+ awaitAsyncRequestDoneRunning (getAsyncRequest );
152+ }
153+ } finally {
154+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
139155 }
140156 // check that the stack trace was not sent from the data node to the coordinating node
141157 ErrorTraceHelper .assertStackTraceCleared (internalCluster ());
@@ -172,19 +188,24 @@ public void testDataNodeLogsStackTrace() throws Exception {
172188 try (var mockLog = MockLog .capture (SearchService .class )) {
173189 ErrorTraceHelper .addSeenLoggingExpectations (numShards , mockLog , errorTriggeringIndex );
174190 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
175- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
176- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
177- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
178- // Use the same value of error_trace as the search request
179- if (errorTraceValue == 0 ) {
180- getAsyncRequest .addParameter ("error_trace" , "true" );
181- } else if (errorTraceValue == 1 ) {
182- getAsyncRequest .addParameter ("error_trace" , "false" );
183- } // else empty
184- awaitAsyncRequestDoneRunning (getAsyncRequest );
185- }
186191
187- mockLog .assertAllExpectationsMatched ();
192+ try {
193+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
194+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
195+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
196+ // Use the same value of error_trace as the search request
197+ if (errorTraceValue == 0 ) {
198+ getAsyncRequest .addParameter ("error_trace" , "true" );
199+ } else if (errorTraceValue == 1 ) {
200+ getAsyncRequest .addParameter ("error_trace" , "false" );
201+ } // else empty
202+ awaitAsyncRequestDoneRunning (getAsyncRequest );
203+ }
204+
205+ mockLog .assertAllExpectationsMatched ();
206+ } finally {
207+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
208+ }
188209 }
189210 }
190211
@@ -206,11 +227,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
206227 createAsyncSearchRequest .addParameter ("keep_on_completion" , "true" );
207228 createAsyncSearchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
208229 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncSearchRequest );
209- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
210- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
211- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
212- getAsyncRequest .addParameter ("error_trace" , "true" );
213- awaitAsyncRequestDoneRunning (getAsyncRequest );
230+
231+ try {
232+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
233+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
234+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
235+ getAsyncRequest .addParameter ("error_trace" , "true" );
236+ awaitAsyncRequestDoneRunning (getAsyncRequest );
237+ }
238+ } finally {
239+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
214240 }
215241 // check that the stack trace was not sent from the data node to the coordinating node
216242 ErrorTraceHelper .assertStackTraceCleared (internalCluster ());
@@ -234,11 +260,15 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
234260 createAsyncSearchRequest .addParameter ("keep_on_completion" , "true" );
235261 createAsyncSearchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
236262 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncSearchRequest );
237- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
238- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
239- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
240- getAsyncRequest .addParameter ("error_trace" , "false" );
241- awaitAsyncRequestDoneRunning (getAsyncRequest );
263+ try {
264+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
265+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
266+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
267+ getAsyncRequest .addParameter ("error_trace" , "false" );
268+ awaitAsyncRequestDoneRunning (getAsyncRequest );
269+ }
270+ } finally {
271+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
242272 }
243273 // check that the stack trace was sent from the data node to the coordinating node
244274 ErrorTraceHelper .assertStackTraceObserved (internalCluster ());
@@ -250,6 +280,26 @@ private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws
250280 return XContentHelper .convertToMap (entityContentType .xContent (), response .getEntity ().getContent (), false );
251281 }
252282
283+ private void deleteAsyncSearchIfPresent (Map <String , Object > map ) throws IOException {
284+ String id = (String ) map .get ("id" );
285+ if (id == null ) {
286+ return ;
287+ }
288+
289+ // Make sure the .async-search system index is green before deleting it
290+ try {
291+ ensureGreen (".async-search" );
292+ } catch (Exception ignore ) {
293+ // the index may not exist
294+ }
295+
296+ Response response = getRestClient ().performRequest (new Request ("DELETE" , "/_async_search/" + id ));
297+ HttpEntity entity = response .getEntity ();
298+ if (entity != null ) {
299+ EntityUtils .consumeQuietly (entity );
300+ }
301+ }
302+
253303 private void awaitAsyncRequestDoneRunning (Request getAsyncRequest ) throws Exception {
254304 assertBusy (() -> {
255305 Map <String , Object > getAsyncResponseEntity = performRequestAndGetResponseEntity (getAsyncRequest );
0 commit comments