Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
Expand All @@ -25,19 +24,15 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.function.BooleanSupplier;

import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;

public class SearchErrorTraceIT extends HttpSmokeTestCase {
private BooleanSupplier hasStackTrace;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand All @@ -49,18 +44,6 @@ public static void setDebugLogLevel() {
Configurator.setLevel(SearchService.class, Level.DEBUG);
}

@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

private void setupIndexWithDocs() {
createIndex("test1", "test2");
indexRandom(
Expand All @@ -86,7 +69,7 @@ public void testSearchFailingQueryErrorTraceDefault() throws IOException {
}
""");
getRestClient().performRequest(searchRequest);
assertFalse(hasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testSearchFailingQueryErrorTraceTrue() throws IOException {
Expand All @@ -105,7 +88,7 @@ public void testSearchFailingQueryErrorTraceTrue() throws IOException {
""");
searchRequest.addParameter("error_trace", "true");
getRestClient().performRequest(searchRequest);
assertTrue(hasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
}

public void testSearchFailingQueryErrorTraceFalse() throws IOException {
Expand All @@ -124,7 +107,7 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException {
""");
searchRequest.addParameter("error_trace", "false");
getRestClient().performRequest(searchRequest);
assertFalse(hasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testDataNodeLogsStackTrace() throws IOException {
Expand Down Expand Up @@ -173,7 +156,7 @@ public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException {
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);
getRestClient().performRequest(searchRequest);
assertFalse(hasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException {
Expand All @@ -190,7 +173,7 @@ public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException {
);
searchRequest.addParameter("error_trace", "true");
getRestClient().performRequest(searchRequest);
assertTrue(hasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
}

public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
Expand All @@ -207,8 +190,7 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
);
searchRequest.addParameter("error_trace", "false");
getRestClient().performRequest(searchRequest);

assertFalse(hasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testDataNodeLogsStackTraceMultiSearch() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public static final class NodeQueryResponse extends TransportResponse {
private final SearchPhaseController.TopDocsStats topDocsStats;
private final QueryPhaseResultConsumer.MergeResult mergeResult;

NodeQueryResponse(StreamInput in) throws IOException {
public NodeQueryResponse(StreamInput in) throws IOException {
this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new);
this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in);
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,108 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NodeQueryResponse;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.BytesTransportResponse;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import static org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME;
import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Utilities around testing the `error_trace` message header in search.
*/
public enum ErrorTraceHelper {
;

public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
public static void assertStackTraceObserved(InternalTestCluster internalTestCluster) {
assertStackTraceObserved(internalTestCluster, true);
}

public static void assertStackTraceCleared(InternalTestCluster internalTestCluster) {
assertStackTraceObserved(internalTestCluster, false);
}

private static void assertStackTraceObserved(InternalTestCluster internalCluster, boolean shouldObserveStackTrace) {
internalCluster.getDataNodeInstances(TransportService.class)
.forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, Exception error) {
TransportMessageListener.super.onResponseSent(requestId, action, error);
if (action.startsWith("indices:data/read/search")) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
error,
t -> t.getStackTrace().length > 0
);
transportMessageHasStackTrace.set(throwable.isPresent());
.forEach(
ts -> asInstanceOf(MockTransportService.class, ts).addRequestHandlingBehavior(
NODE_SEARCH_ACTION_NAME,
(handler, request, channel, task) -> {
TransportChannel wrappedChannel = new TransportChannel() {
@Override
public String getProfileName() {
return channel.getProfileName();
}

@Override
public void sendResponse(TransportResponse response) {
var bytes = asInstanceOf(BytesTransportResponse.class, response);
NodeQueryResponse nodeQueryResponse = null;
try (StreamInput in = bytes.bytes().streamInput()) {
var namedWriteableAwareInput = new NamedWriteableAwareStreamInput(
in,
internalCluster.getNamedWriteableRegistry()
);
nodeQueryResponse = new NodeQueryResponse(namedWriteableAwareInput);
for (Object result : nodeQueryResponse.getResults()) {
if (result instanceof Exception error) {
inspectStackTraceAndAssert(error);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
if (nodeQueryResponse != null) {
nodeQueryResponse.decRef();
}
}

// Forward to the original channel
channel.sendResponse(response);
}

@Override
public void sendResponse(Exception error) {
inspectStackTraceAndAssert(error);

// Forward to the original channel
channel.sendResponse(error);
}

private void inspectStackTraceAndAssert(Exception error) {
ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> {
if (shouldObserveStackTrace) {
assertTrue(t.getStackTrace().length > 0);
} else {
assertEquals(0, t.getStackTrace().length);
}
return true;
});
}
};

handler.messageReceived(request, wrappedChannel, task);
}
}
}));
return transportMessageHasStackTrace::get;
)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.core.config.Configurator;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -21,17 +20,13 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.BooleanSupplier;

public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
private BooleanSupplier transportMessageHasStackTrace;

@Override
protected boolean addMockHttpTransport() {
Expand All @@ -49,18 +44,6 @@ public static void setDebugLogLevel() {
Configurator.setLevel(SearchService.class, Level.DEBUG);
}

@Before
public void setupMessageListener() {
transportMessageHasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

private void setupIndexWithDocs() {
createIndex("test1", "test2");
indexRandom(
Expand Down Expand Up @@ -94,7 +77,7 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was not sent from the data node to the coordinating node
assertFalse(transportMessageHasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
Expand Down Expand Up @@ -122,7 +105,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was sent from the data node to the coordinating node
assertTrue(transportMessageHasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
}

public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
Expand Down Expand Up @@ -150,7 +133,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was not sent from the data node to the coordinating node
assertFalse(transportMessageHasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testDataNodeLogsStackTrace() throws Exception {
Expand Down Expand Up @@ -225,7 +208,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was not sent from the data node to the coordinating node
assertFalse(transportMessageHasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
}

public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws Exception {
Expand Down Expand Up @@ -253,7 +236,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
// check that the stack trace was sent from the data node to the coordinating node
assertTrue(transportMessageHasStackTrace.getAsBoolean());
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
}

private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws IOException {
Expand Down