diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index bf8576afc5d70..e2b76658e5246 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; import org.elasticsearch.search.SearchService; @@ -72,11 +71,11 @@ private void setupIndexWithDocs() { refresh(); } - public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -86,23 +85,23 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, I } } """); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -112,25 +111,25 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, Inte } } """); - searchRequest.addParameter("error_trace", "true"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "true"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncRequest.addParameter("error_trace", "true"); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "true"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was sent from the data node to the coordinating node assertTrue(transportMessageHasStackTrace.getAsBoolean()); } - public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -140,28 +139,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, Int } } """); - searchRequest.addParameter("error_trace", "false"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "false"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncRequest.addParameter("error_trace", "false"); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "false"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testDataNodeLogsStackTrace() throws IOException, InterruptedException { + public void testDataNodeLogsStackTrace() throws Exception { setupIndexWithDocs(); - // error_trace defaults to false so we can test both cases with some randomization - final boolean defineErrorTraceFalse = randomBoolean(); - - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -175,43 +171,40 @@ public void testDataNodeLogsStackTrace() throws IOException, InterruptedExceptio // No matter the value of error_trace (empty, true, or false) we should see stack traces logged int errorTraceValue = randomIntBetween(0, 2); if (errorTraceValue == 0) { - searchRequest.addParameter("error_trace", "true"); + createAsyncRequest.addParameter("error_trace", "true"); } else if (errorTraceValue == 1) { - searchRequest.addParameter("error_trace", "false"); + createAsyncRequest.addParameter("error_trace", "false"); } // else empty - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); String errorTriggeringIndex = "test2"; int numShards = getNumShards(errorTriggeringIndex).numPrimaries; try (var mockLog = MockLog.capture(SearchService.class)) { ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - - // Use the same value of error_trace as the search request - if (errorTraceValue == 0) { - request.addParameter("error_trace", "true"); - } else if (errorTraceValue == 1) { - request.addParameter("error_trace", "false"); - } // else empty - - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + // Use the same value of error_trace as the search request + if (errorTraceValue == 0) { + getAsyncRequest.addParameter("error_trace", "true"); + } else if (errorTraceValue == 1) { + getAsyncRequest.addParameter("error_trace", "false"); + } // else empty + awaitAsyncRequestDoneRunning(getAsyncRequest); } mockLog.assertAllExpectationsMatched(); } } - public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncSearchRequest = new Request("POST", "/_async_search"); + createAsyncSearchRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -221,25 +214,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr } } """); - searchRequest.addParameter("error_trace", "false"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "true"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncSearchRequest.addParameter("error_trace", "false"); + createAsyncSearchRequest.addParameter("keep_on_completion", "true"); + createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "true"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncSearchRequest = new Request("POST", "/_async_search"); + createAsyncSearchRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -249,25 +242,30 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr } } """); - searchRequest.addParameter("error_trace", "true"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "false"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncSearchRequest.addParameter("error_trace", "true"); + createAsyncSearchRequest.addParameter("keep_on_completion", "true"); + createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "false"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was sent from the data node to the coordinating node assertTrue(transportMessageHasStackTrace.getAsBoolean()); } - private Map performRequestAndGetResponseEntityAfterDelay(Request r, TimeValue sleep) throws IOException, - InterruptedException { - Thread.sleep(sleep.millis()); + private Map performRequestAndGetResponseEntity(Request r) throws IOException { Response response = getRestClient().performRequest(r); XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false); } + + private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception { + assertBusy(() -> { + Map getAsyncResponseEntity = performRequestAndGetResponseEntity(getAsyncRequest); + assertFalse((Boolean) getAsyncResponseEntity.get("is_running")); + }); + } }