From 992ff51741098e54974f63ce4387133133f8c34b Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 15 Jul 2025 13:45:46 -0400 Subject: [PATCH 1/2] Refactor AsyncSearchErrorTraceIT to use assertBusy --- .../xpack/search/AsyncSearchErrorTraceIT.java | 177 ++++++++++-------- 1 file changed, 95 insertions(+), 82 deletions(-) diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index bf8576afc5d70..545694c706265 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -72,11 +72,11 @@ private void setupIndexWithDocs() { refresh(); } - public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -86,23 +86,23 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, I } } """); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay(createAsyncRequest, TimeValue.ZERO); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -112,25 +112,25 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, Inte } } """); - searchRequest.addParameter("error_trace", "true"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "true"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncRequest.addParameter("error_trace", "true"); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay(createAsyncRequest, TimeValue.ZERO); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "true"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was sent from the data node to the coordinating node assertTrue(transportMessageHasStackTrace.getAsBoolean()); } - public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -140,28 +140,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, Int } } """); - searchRequest.addParameter("error_trace", "false"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "false"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncRequest.addParameter("error_trace", "false"); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay(createAsyncRequest, TimeValue.ZERO); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "false"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testDataNodeLogsStackTrace() throws IOException, InterruptedException { + public void testDataNodeLogsStackTrace() throws Exception { setupIndexWithDocs(); - // error_trace defaults to false so we can test both cases with some randomization - final boolean defineErrorTraceFalse = randomBoolean(); - - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncRequest = new Request("POST", "/_async_search"); + createAsyncRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -175,43 +172,43 @@ public void testDataNodeLogsStackTrace() throws IOException, InterruptedExceptio // No matter the value of error_trace (empty, true, or false) we should see stack traces logged int errorTraceValue = randomIntBetween(0, 2); if (errorTraceValue == 0) { - searchRequest.addParameter("error_trace", "true"); + createAsyncRequest.addParameter("error_trace", "true"); } else if (errorTraceValue == 1) { - searchRequest.addParameter("error_trace", "false"); + createAsyncRequest.addParameter("error_trace", "false"); } // else empty - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + createAsyncRequest.addParameter("keep_on_completion", "true"); + createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); String errorTriggeringIndex = "test2"; int numShards = getNumShards(errorTriggeringIndex).numPrimaries; try (var mockLog = MockLog.capture(SearchService.class)) { ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - - // Use the same value of error_trace as the search request - if (errorTraceValue == 0) { - request.addParameter("error_trace", "true"); - } else if (errorTraceValue == 1) { - request.addParameter("error_trace", "false"); - } // else empty - - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( + createAsyncRequest, + TimeValue.ZERO + ); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + // Use the same value of error_trace as the search request + if (errorTraceValue == 0) { + getAsyncRequest.addParameter("error_trace", "true"); + } else if (errorTraceValue == 1) { + getAsyncRequest.addParameter("error_trace", "false"); + } // else empty + awaitAsyncRequestDoneRunning(getAsyncRequest); } mockLog.assertAllExpectationsMatched(); } } - public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncSearchRequest = new Request("POST", "/_async_search"); + createAsyncSearchRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -221,25 +218,28 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr } } """); - searchRequest.addParameter("error_trace", "false"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "true"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncSearchRequest.addParameter("error_trace", "false"); + createAsyncSearchRequest.addParameter("keep_on_completion", "true"); + createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( + createAsyncSearchRequest, + TimeValue.ZERO + ); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "true"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws IOException, InterruptedException { + public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws Exception { setupIndexWithDocs(); - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" + Request createAsyncSearchRequest = new Request("POST", "/_async_search"); + createAsyncSearchRequest.setJsonEntity(""" { "query": { "simple_query_string" : { @@ -249,15 +249,18 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr } } """); - searchRequest.addParameter("error_trace", "true"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "false"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + createAsyncSearchRequest.addParameter("error_trace", "true"); + createAsyncSearchRequest.addParameter("keep_on_completion", "true"); + createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( + createAsyncSearchRequest, + TimeValue.ZERO + ); + if (createAsyncResponseEntity.get("is_running").equals("true")) { + String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); + Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); + getAsyncRequest.addParameter("error_trace", "false"); + awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was sent from the data node to the coordinating node assertTrue(transportMessageHasStackTrace.getAsBoolean()); @@ -270,4 +273,14 @@ private Map performRequestAndGetResponseEntityAfterDelay(Request XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false); } + + private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception { + assertBusy(() -> { + Map getAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( + getAsyncRequest, + TimeValue.timeValueSeconds(1L) + ); + assertFalse((Boolean) getAsyncResponseEntity.get("is_running")); + }); + } } From c43d41249bb0302bde7e7ecf2b47e704d60391eb Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 15 Jul 2025 17:11:40 -0400 Subject: [PATCH 2/2] Remove thread sleep delay --- .../xpack/search/AsyncSearchErrorTraceIT.java | 31 +++++-------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index 545694c706265..e2b76658e5246 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; import org.elasticsearch.search.SearchService; @@ -88,7 +87,7 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception { """); createAsyncRequest.addParameter("keep_on_completion", "true"); createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay(createAsyncRequest, TimeValue.ZERO); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); if (createAsyncResponseEntity.get("is_running").equals("true")) { String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); @@ -115,7 +114,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception { createAsyncRequest.addParameter("error_trace", "true"); createAsyncRequest.addParameter("keep_on_completion", "true"); createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay(createAsyncRequest, TimeValue.ZERO); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); if (createAsyncResponseEntity.get("is_running").equals("true")) { String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); @@ -143,7 +142,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception { createAsyncRequest.addParameter("error_trace", "false"); createAsyncRequest.addParameter("keep_on_completion", "true"); createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay(createAsyncRequest, TimeValue.ZERO); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); if (createAsyncResponseEntity.get("is_running").equals("true")) { String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); @@ -184,10 +183,7 @@ public void testDataNodeLogsStackTrace() throws Exception { int numShards = getNumShards(errorTriggeringIndex).numPrimaries; try (var mockLog = MockLog.capture(SearchService.class)) { ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( - createAsyncRequest, - TimeValue.ZERO - ); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest); if (createAsyncResponseEntity.get("is_running").equals("true")) { String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); @@ -221,10 +217,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr createAsyncSearchRequest.addParameter("error_trace", "false"); createAsyncSearchRequest.addParameter("keep_on_completion", "true"); createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( - createAsyncSearchRequest, - TimeValue.ZERO - ); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest); if (createAsyncResponseEntity.get("is_running").equals("true")) { String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); @@ -252,10 +245,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr createAsyncSearchRequest.addParameter("error_trace", "true"); createAsyncSearchRequest.addParameter("keep_on_completion", "true"); createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms"); - Map createAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( - createAsyncSearchRequest, - TimeValue.ZERO - ); + Map createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest); if (createAsyncResponseEntity.get("is_running").equals("true")) { String asyncExecutionId = (String) createAsyncResponseEntity.get("id"); Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId); @@ -266,9 +256,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr assertTrue(transportMessageHasStackTrace.getAsBoolean()); } - private Map performRequestAndGetResponseEntityAfterDelay(Request r, TimeValue sleep) throws IOException, - InterruptedException { - Thread.sleep(sleep.millis()); + private Map performRequestAndGetResponseEntity(Request r) throws IOException { Response response = getRestClient().performRequest(r); XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false); @@ -276,10 +264,7 @@ private Map performRequestAndGetResponseEntityAfterDelay(Request private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception { assertBusy(() -> { - Map getAsyncResponseEntity = performRequestAndGetResponseEntityAfterDelay( - getAsyncRequest, - TimeValue.timeValueSeconds(1L) - ); + Map getAsyncResponseEntity = performRequestAndGetResponseEntity(getAsyncRequest); assertFalse((Boolean) getAsyncResponseEntity.get("is_running")); }); }