Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,33 +127,7 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException {
assertFalse(hasStackTrace.getAsBoolean());
}

public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_search");
searchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
"query": "foo",
"fields": ["field"]
}
}
}
""");

String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

searchRequest.addParameter("error_trace", "true");
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException {
public void testDataNodeLogsStackTrace() throws IOException {
setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_search");
Expand All @@ -173,10 +147,14 @@ public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOExce
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

// error_trace defaults to false so we can test both cases with some randomization
if (randomBoolean()) {
// No matter the value of error_trace (empty, true, or false) we should see stack traces logged
int errorTraceValue = randomIntBetween(0, 2);
if (errorTraceValue == 0) {
searchRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
searchRequest.addParameter("error_trace", "false");
}
} // else empty

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
Expand Down Expand Up @@ -233,32 +211,7 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
assertFalse(hasStackTrace.getAsBoolean());
}

public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrueMultiSearch() throws IOException {
setupIndexWithDocs();

XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
);
Request searchRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
searchRequest.setEntity(
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);

searchRequest.addParameter("error_trace", "true");

String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() throws IOException {
public void testDataNodeLogsStackTraceMultiSearch() throws IOException {
setupIndexWithDocs();

XContentType contentType = XContentType.JSON;
Expand All @@ -271,16 +224,19 @@ public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() th
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);

// error_trace defaults to false so we can test both cases with some randomization
if (randomBoolean()) {
searchRequest.addParameter("error_trace", "false");
}

String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

// No matter the value of error_trace (empty, true, or false) we should see stack traces logged
int errorTraceValue = randomIntBetween(0, 2);
if (errorTraceValue == 0) {
searchRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
searchRequest.addParameter("error_trace", "false");
} // else empty

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
Expand Down
64 changes: 34 additions & 30 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,11 @@ protected void doClose() {
}

/**
* Wraps the listener to avoid sending StackTraces back to the coordinating
* node if the `error_trace` header is set to {@code false}. Upon reading we
* default to {@code true} to maintain the same behavior as before the change,
* due to older nodes not being able to specify whether it needs stack traces.
* Wraps the listener to ensure errors are logged and to avoid sending
* StackTraces back to the coordinating node if the `error_trace` header is
* set to {@code false}. Upon reading, we default to {@code true} to maintain
* the same behavior as before the change, due to older nodes not being able
* to specify whether they need stack traces.
*
* @param <T> the type of the response
* @param listener the action listener to be wrapped
Expand All @@ -588,44 +589,47 @@ protected void doClose() {
* @param threadPool with context where to write the new header
* @return the wrapped action listener
*/
static <T> ActionListener<T> maybeWrapListenerForStackTrace(
static <T> ActionListener<T> wrapListenerForErrorHandling(
ActionListener<T> listener,
TransportVersion version,
String nodeId,
ShardId shardId,
long taskId,
ThreadPool threadPool
) {
boolean header = true;
final boolean header;
if (version.onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) {
header = Boolean.parseBoolean(threadPool.getThreadContext().getHeaderOrDefault("error_trace", "false"));
}
if (header == false) {
return listener.delegateResponse((l, e) -> {
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
"[%s]%s: failed to execute search request for task [%d]",
nodeId,
shardId,
taskId
);
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
logger.debug(messageSupplier, e);
} else {
logger.warn(messageSupplier, e);
}
} else {
header = true;
}
return listener.delegateResponse((l, e) -> {
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
"[%s]%s: failed to execute search request for task [%d]",
nodeId,
shardId,
taskId
);
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
logger.debug(messageSupplier, e);
} else {
logger.warn(messageSupplier, e);
}

if (header == false) {
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> {
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY);
return false;
});
l.onFailure(e);
});
}
return listener;
}

l.onFailure(e);
});
}

public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
request.getChannelVersion(),
clusterService.localNode().getId(),
Expand Down Expand Up @@ -676,7 +680,7 @@ public void executeQueryPhase(ShardSearchRequest request, CancellableTask task,
rewriteAndFetchShardRequest(
shard,
request,
maybeWrapListenerForStackTrace(
wrapListenerForErrorHandling(
listener,
request.getChannelVersion(),
clusterService.localNode().getId(),
Expand Down Expand Up @@ -913,7 +917,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
shardSearchRequest.getChannelVersion(),
clusterService.localNode().getId(),
Expand Down Expand Up @@ -970,7 +974,7 @@ public void executeQueryPhase(
TransportVersion version
) {
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
version,
clusterService.localNode().getId(),
Expand Down Expand Up @@ -1032,7 +1036,7 @@ public void executeQueryPhase(
) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
version,
clusterService.localNode().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor;
import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace;
import static org.elasticsearch.search.SearchService.wrapListenerForErrorHandling;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.not;

Expand Down Expand Up @@ -137,7 +137,7 @@ public Type getType() {
doTestCanMatch(searchRequest, sortField, true, null, false);
}

public void testMaybeWrapListenerForStackTrace() {
public void testWrapListenerForErrorHandling() {
ShardId shardId = new ShardId("index", "index", 0);
// Tests that the same listener has stack trace if is not wrapped or does not have stack trace if it is wrapped.
AtomicBoolean isWrapped = new AtomicBoolean(false);
Expand All @@ -160,12 +160,12 @@ public void onFailure(Exception e) {
e.fillInStackTrace();
assertThat(e.getStackTrace().length, is(not(0)));
listener.onFailure(e);
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), "node", shardId, 123L, threadPool);
listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), "node", shardId, 123L, threadPool);
isWrapped.set(true);
listener.onFailure(e);
}

public void testMaybeWrapListenerForStackTraceDebugLog() {
public void testWrapListenerForErrorHandlingDebugLog() {
final String nodeId = "node";
final String index = "index";
ShardId shardId = new ShardId(index, index, 0);
Expand Down Expand Up @@ -198,12 +198,12 @@ public void onFailure(Exception e) {
}
};
IllegalArgumentException e = new IllegalArgumentException(exceptionMessage); // 400-level exception
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener.onFailure(e);
}
}

public void testMaybeWrapListenerForStackTraceWarnLog() {
public void testWrapListenerForErrorHandlingWarnLog() {
final String nodeId = "node";
final String index = "index";
ShardId shardId = new ShardId(index, index, 0);
Expand Down Expand Up @@ -235,7 +235,7 @@ public void onFailure(Exception e) {
}
};
IllegalStateException e = new IllegalStateException(exceptionMessage); // 500-level exception
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.stream.Collectors;

import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.test.ESIntegTestCase.getNodeId;
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
import static org.elasticsearch.test.ESTestCase.asInstanceOf;

Expand Down Expand Up @@ -90,32 +89,4 @@ public static void addSeenLoggingExpectations(int numShards, MockLog mockLog, St
);
}
}

/**
* Adds expectations for the _absence_ of debug logging of a message. An unseen expectation is added for each
* combination of node in the internal cluster and shard in the index.
*
* @param numShards the number of shards in the index (an expectation will be added for each shard)
* @param mockLog the mock log
* @param errorTriggeringIndex the name of the index that will trigger the error
*/
public static void addUnseenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) {
for (String nodeName : internalCluster().getNodeNames()) {
for (int shard = 0; shard < numShards; shard++) {
mockLog.addExpectation(
new MockLog.UnseenEventExpectation(
format(
"\"[%s][%s][%d]: failed to execute search request\" and an exception logged",
getNodeId(nodeName),
errorTriggeringIndex,
shard
),
SearchService.class.getCanonicalName(),
Level.DEBUG,
format("[%s][%s][%d]: failed to execute search request", getNodeId(nodeName), errorTriggeringIndex, shard)
)
);
}
}
}
}
Loading