Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.SearchService;
Expand Down Expand Up @@ -72,11 +71,11 @@ private void setupIndexWithDocs() {
refresh();
}

public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, InterruptedException {
public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
Request createAsyncRequest = new Request("POST", "/_async_search");
createAsyncRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
Expand All @@ -86,23 +85,23 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, I
}
}
""");
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (createAsyncResponseEntity.get("is_running").equals("true")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where the the search might have already completed here? I guess that would be fine since we wouldn't need to wait any longer then, just wondering if that while conditional is needed any longer with the switch to busy waiting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the conditional is necessary, it's fine to always send the GET _async_search request. However I think it is possible that the search has already completed, so that check should speed up the test a bit.

String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was not sent from the data node to the coordinating node
assertFalse(transportMessageHasStackTrace.getAsBoolean());
}

public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, InterruptedException {
public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
Request createAsyncRequest = new Request("POST", "/_async_search");
createAsyncRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
Expand All @@ -112,25 +111,25 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, Inte
}
}
""");
searchRequest.addParameter("error_trace", "true");
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
request.addParameter("error_trace", "true");
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
createAsyncRequest.addParameter("error_trace", "true");
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (createAsyncResponseEntity.get("is_running").equals("true")) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was sent from the data node to the coordinating node
assertTrue(transportMessageHasStackTrace.getAsBoolean());
}

public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, InterruptedException {
public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
Request createAsyncRequest = new Request("POST", "/_async_search");
createAsyncRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
Expand All @@ -140,28 +139,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, Int
}
}
""");
searchRequest.addParameter("error_trace", "false");
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
request.addParameter("error_trace", "false");
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
createAsyncRequest.addParameter("error_trace", "false");
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (createAsyncResponseEntity.get("is_running").equals("true")) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was not sent from the data node to the coordinating node
assertFalse(transportMessageHasStackTrace.getAsBoolean());
}

public void testDataNodeLogsStackTrace() throws IOException, InterruptedException {
public void testDataNodeLogsStackTrace() throws Exception {
setupIndexWithDocs();

// error_trace defaults to false so we can test both cases with some randomization
final boolean defineErrorTraceFalse = randomBoolean();

Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
Request createAsyncRequest = new Request("POST", "/_async_search");
createAsyncRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
Expand All @@ -175,43 +171,40 @@ public void testDataNodeLogsStackTrace() throws IOException, InterruptedExceptio
// No matter the value of error_trace (empty, true, or false) we should see stack traces logged
int errorTraceValue = randomIntBetween(0, 2);
if (errorTraceValue == 0) {
searchRequest.addParameter("error_trace", "true");
createAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
searchRequest.addParameter("error_trace", "false");
createAsyncRequest.addParameter("error_trace", "false");
} // else empty

searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");

String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);

// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
request.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
request.addParameter("error_trace", "false");
} // else empty

while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (createAsyncResponseEntity.get("is_running").equals("true")) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
}
}

public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws IOException, InterruptedException {
public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws Exception {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
Request createAsyncSearchRequest = new Request("POST", "/_async_search");
createAsyncSearchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
Expand All @@ -221,25 +214,25 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
}
}
""");
searchRequest.addParameter("error_trace", "false");
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
request.addParameter("error_trace", "true");
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
createAsyncSearchRequest.addParameter("error_trace", "false");
createAsyncSearchRequest.addParameter("keep_on_completion", "true");
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (createAsyncResponseEntity.get("is_running").equals("true")) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was not sent from the data node to the coordinating node
assertFalse(transportMessageHasStackTrace.getAsBoolean());
}

public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws IOException, InterruptedException {
public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws Exception {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
Request createAsyncSearchRequest = new Request("POST", "/_async_search");
createAsyncSearchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
Expand All @@ -249,25 +242,30 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
}
}
""");
searchRequest.addParameter("error_trace", "true");
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
request.addParameter("error_trace", "false");
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
createAsyncSearchRequest.addParameter("error_trace", "true");
createAsyncSearchRequest.addParameter("keep_on_completion", "true");
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (createAsyncResponseEntity.get("is_running").equals("true")) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was sent from the data node to the coordinating node
assertTrue(transportMessageHasStackTrace.getAsBoolean());
}

private Map<String, Object> performRequestAndGetResponseEntityAfterDelay(Request r, TimeValue sleep) throws IOException,
InterruptedException {
Thread.sleep(sleep.millis());
private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws IOException {
Response response = getRestClient().performRequest(r);
XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue());
return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false);
}

private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception {
assertBusy(() -> {
Map<String, Object> getAsyncResponseEntity = performRequestAndGetResponseEntity(getAsyncRequest);
assertFalse((Boolean) getAsyncResponseEntity.get("is_running"));
});
}
}