77
88package org .elasticsearch .xpack .search ;
99
10+ import org .apache .http .HttpEntity ;
11+ import org .apache .http .util .EntityUtils ;
1012import org .apache .logging .log4j .Level ;
1113import org .apache .logging .log4j .core .config .Configurator ;
1214import org .elasticsearch .client .Request ;
1618import org .elasticsearch .plugins .Plugin ;
1719import org .elasticsearch .search .ErrorTraceHelper ;
1820import org .elasticsearch .search .SearchService ;
19- import org .elasticsearch .test .ESIntegTestCase ;
2021import org .elasticsearch .test .MockLog ;
2122import org .elasticsearch .test .junit .annotations .TestLogging ;
2223import org .elasticsearch .test .transport .MockTransportService ;
3132 reason = "testing debug log output to identify race condition" ,
3233 value = "org.elasticsearch.xpack.search.MutableSearchResponse:DEBUG,org.elasticsearch.xpack.search.AsyncSearchTask:DEBUG"
3334)
34- public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
35+ public class AsyncSearchErrorTraceIT extends AsyncSearchIntegTestCase {
3536
3637 @ Override
3738 protected boolean addMockHttpTransport () {
@@ -77,10 +78,15 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
7778 createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
7879 ErrorTraceHelper .expectStackTraceCleared (internalCluster ());
7980 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
80- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
81- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
82- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
83- awaitAsyncRequestDoneRunning (getAsyncRequest );
81+
82+ try {
83+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
84+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
85+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
86+ awaitAsyncRequestDoneRunning (getAsyncRequest );
87+ }
88+ } finally {
89+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
8490 }
8591 }
8692
@@ -103,11 +109,16 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
103109 createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
104110 ErrorTraceHelper .expectStackTraceObserved (internalCluster ());
105111 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
106- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
107- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
108- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
109- getAsyncRequest .addParameter ("error_trace" , "true" );
110- awaitAsyncRequestDoneRunning (getAsyncRequest );
112+
113+ try {
114+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
115+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
116+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
117+ getAsyncRequest .addParameter ("error_trace" , "true" );
118+ awaitAsyncRequestDoneRunning (getAsyncRequest );
119+ }
120+ } finally {
121+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
111122 }
112123 }
113124
@@ -130,11 +141,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
130141 createAsyncRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
131142 ErrorTraceHelper .expectStackTraceCleared (internalCluster ());
132143 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
133- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
134- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
135- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
136- getAsyncRequest .addParameter ("error_trace" , "false" );
137- awaitAsyncRequestDoneRunning (getAsyncRequest );
144+
145+ try {
146+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
147+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
148+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
149+ getAsyncRequest .addParameter ("error_trace" , "false" );
150+ awaitAsyncRequestDoneRunning (getAsyncRequest );
151+ }
152+ } finally {
153+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
138154 }
139155 }
140156
@@ -169,19 +185,24 @@ public void testDataNodeLogsStackTrace() throws Exception {
169185 try (var mockLog = MockLog .capture (SearchService .class )) {
170186 ErrorTraceHelper .addSeenLoggingExpectations (numShards , mockLog , errorTriggeringIndex );
171187 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncRequest );
172- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
173- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
174- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
175- // Use the same value of error_trace as the search request
176- if (errorTraceValue == 0 ) {
177- getAsyncRequest .addParameter ("error_trace" , "true" );
178- } else if (errorTraceValue == 1 ) {
179- getAsyncRequest .addParameter ("error_trace" , "false" );
180- } // else empty
181- awaitAsyncRequestDoneRunning (getAsyncRequest );
182- }
183188
184- mockLog .assertAllExpectationsMatched ();
189+ try {
190+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
191+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
192+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
193+ // Use the same value of error_trace as the search request
194+ if (errorTraceValue == 0 ) {
195+ getAsyncRequest .addParameter ("error_trace" , "true" );
196+ } else if (errorTraceValue == 1 ) {
197+ getAsyncRequest .addParameter ("error_trace" , "false" );
198+ } // else empty
199+ awaitAsyncRequestDoneRunning (getAsyncRequest );
200+ }
201+
202+ mockLog .assertAllExpectationsMatched ();
203+ } finally {
204+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
205+ }
185206 }
186207 }
187208
@@ -204,11 +225,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
204225 createAsyncSearchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
205226 ErrorTraceHelper .expectStackTraceCleared (internalCluster ());
206227 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncSearchRequest );
207- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
208- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
209- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
210- getAsyncRequest .addParameter ("error_trace" , "true" );
211- awaitAsyncRequestDoneRunning (getAsyncRequest );
228+
229+ try {
230+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
231+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
232+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
233+ getAsyncRequest .addParameter ("error_trace" , "true" );
234+ awaitAsyncRequestDoneRunning (getAsyncRequest );
235+ }
236+ } finally {
237+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
212238 }
213239 }
214240
@@ -231,11 +257,15 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
231257 createAsyncSearchRequest .addParameter ("wait_for_completion_timeout" , "0ms" );
232258 ErrorTraceHelper .expectStackTraceObserved (internalCluster ());
233259 Map <String , Object > createAsyncResponseEntity = performRequestAndGetResponseEntity (createAsyncSearchRequest );
234- if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
235- String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
236- Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
237- getAsyncRequest .addParameter ("error_trace" , "false" );
238- awaitAsyncRequestDoneRunning (getAsyncRequest );
260+ try {
261+ if (Boolean .TRUE .equals (createAsyncResponseEntity .get ("is_running" ))) {
262+ String asyncExecutionId = (String ) createAsyncResponseEntity .get ("id" );
263+ Request getAsyncRequest = new Request ("GET" , "/_async_search/" + asyncExecutionId );
264+ getAsyncRequest .addParameter ("error_trace" , "false" );
265+ awaitAsyncRequestDoneRunning (getAsyncRequest );
266+ }
267+ } finally {
268+ deleteAsyncSearchIfPresent (createAsyncResponseEntity );
239269 }
240270 }
241271
@@ -245,6 +275,26 @@ private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws
245275 return XContentHelper .convertToMap (entityContentType .xContent (), response .getEntity ().getContent (), false );
246276 }
247277
278+ private void deleteAsyncSearchIfPresent (Map <String , Object > map ) throws IOException {
279+ String id = (String ) map .get ("id" );
280+ if (id == null ) {
281+ return ;
282+ }
283+
284+ // Make sure the .async-search system index is green before deleting it
285+ try {
286+ ensureGreen (".async-search" );
287+ } catch (Exception ignore ) {
288+ // the index may not exist
289+ }
290+
291+ Response response = getRestClient ().performRequest (new Request ("DELETE" , "/_async_search/" + id ));
292+ HttpEntity entity = response .getEntity ();
293+ if (entity != null ) {
294+ EntityUtils .consumeQuietly (entity );
295+ }
296+ }
297+
248298 private void awaitAsyncRequestDoneRunning (Request getAsyncRequest ) throws Exception {
249299 assertBusy (() -> {
250300 Map <String , Object > getAsyncResponseEntity = performRequestAndGetResponseEntity (getAsyncRequest );
0 commit comments