Skip to content

Commit a305288

Browse files
authored
[8.18] Log stack traces on data nodes before they are cleared for transport (elastic#125732) (elastic#126246)
* Log stack traces on data nodes before they are cleared for transport (elastic#125732) We recently cleared stack traces on data nodes before transport back to the coordinating node when error_trace=false to reduce unnecessary data transfer and memory on the coordinating node (elastic#118266). However, all logging of exceptions happens on the coordinating node, so stack traces disappeared from any logs. This change logs stack traces directly on the data node when error_trace=false. (cherry picked from commit 9f6eb1d)
1 parent ccb530e commit a305288

File tree

7 files changed

+488
-6
lines changed

7 files changed

+488
-6
lines changed

docs/changelog/125732.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125732
2+
summary: Log stack traces on data nodes before they are cleared for transport
3+
area: Search
4+
type: bug
5+
issues: []

qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,21 @@
1111

1212
import org.apache.http.entity.ContentType;
1313
import org.apache.http.nio.entity.NByteArrayEntity;
14+
import org.apache.logging.log4j.Level;
15+
import org.apache.logging.log4j.core.config.Configurator;
1416
import org.elasticsearch.ExceptionsHelper;
1517
import org.elasticsearch.action.search.MultiSearchRequest;
1618
import org.elasticsearch.action.search.SearchRequest;
1719
import org.elasticsearch.client.Request;
20+
import org.elasticsearch.search.ErrorTraceHelper;
21+
import org.elasticsearch.search.SearchService;
1822
import org.elasticsearch.search.builder.SearchSourceBuilder;
23+
import org.elasticsearch.test.MockLog;
1924
import org.elasticsearch.transport.TransportMessageListener;
2025
import org.elasticsearch.transport.TransportService;
2126
import org.elasticsearch.xcontent.XContentType;
2227
import org.junit.Before;
28+
import org.junit.BeforeClass;
2329

2430
import java.io.IOException;
2531
import java.nio.charset.Charset;
@@ -31,6 +37,11 @@
3137
public class SearchErrorTraceIT extends HttpSmokeTestCase {
3238
private AtomicBoolean hasStackTrace;
3339

40+
@BeforeClass
41+
public static void setDebugLogLevel() {
42+
Configurator.setLevel(SearchService.class, Level.DEBUG);
43+
}
44+
3445
@Before
3546
private void setupMessageListener() {
3647
internalCluster().getDataNodeInstances(TransportService.class).forEach(ts -> {
@@ -119,6 +130,63 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException {
119130
assertFalse(hasStackTrace.get());
120131
}
121132

133+
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException {
134+
hasStackTrace = new AtomicBoolean();
135+
setupIndexWithDocs();
136+
137+
Request searchRequest = new Request("POST", "/_search");
138+
searchRequest.setJsonEntity("""
139+
{
140+
"query": {
141+
"simple_query_string" : {
142+
"query": "foo",
143+
"fields": ["field"]
144+
}
145+
}
146+
}
147+
""");
148+
149+
String errorTriggeringIndex = "test2";
150+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
151+
try (var mockLog = MockLog.capture(SearchService.class)) {
152+
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
153+
154+
searchRequest.addParameter("error_trace", "true");
155+
getRestClient().performRequest(searchRequest);
156+
mockLog.assertAllExpectationsMatched();
157+
}
158+
}
159+
160+
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException {
161+
hasStackTrace = new AtomicBoolean();
162+
setupIndexWithDocs();
163+
164+
Request searchRequest = new Request("POST", "/_search");
165+
searchRequest.setJsonEntity("""
166+
{
167+
"query": {
168+
"simple_query_string" : {
169+
"query": "foo",
170+
"fields": ["field"]
171+
}
172+
}
173+
}
174+
""");
175+
176+
String errorTriggeringIndex = "test2";
177+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
178+
try (var mockLog = MockLog.capture(SearchService.class)) {
179+
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
180+
181+
// error_trace defaults to false so we can test both cases with some randomization
182+
if (randomBoolean()) {
183+
searchRequest.addParameter("error_trace", "false");
184+
}
185+
getRestClient().performRequest(searchRequest);
186+
mockLog.assertAllExpectationsMatched();
187+
}
188+
}
189+
122190
public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException {
123191
hasStackTrace = new AtomicBoolean();
124192
setupIndexWithDocs();
@@ -172,4 +240,59 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
172240

173241
assertFalse(hasStackTrace.get());
174242
}
243+
244+
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrueMultiSearch() throws IOException {
245+
hasStackTrace = new AtomicBoolean();
246+
setupIndexWithDocs();
247+
248+
XContentType contentType = XContentType.JSON;
249+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
250+
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
251+
);
252+
Request searchRequest = new Request("POST", "/_msearch");
253+
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
254+
searchRequest.setEntity(
255+
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
256+
);
257+
258+
searchRequest.addParameter("error_trace", "true");
259+
260+
String errorTriggeringIndex = "test2";
261+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
262+
try (var mockLog = MockLog.capture(SearchService.class)) {
263+
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
264+
265+
getRestClient().performRequest(searchRequest);
266+
mockLog.assertAllExpectationsMatched();
267+
}
268+
}
269+
270+
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() throws IOException {
271+
hasStackTrace = new AtomicBoolean();
272+
setupIndexWithDocs();
273+
274+
XContentType contentType = XContentType.JSON;
275+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
276+
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
277+
);
278+
Request searchRequest = new Request("POST", "/_msearch");
279+
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
280+
searchRequest.setEntity(
281+
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
282+
);
283+
284+
// error_trace defaults to false so we can test both cases with some randomization
285+
if (randomBoolean()) {
286+
searchRequest.addParameter("error_trace", "false");
287+
}
288+
289+
String errorTriggeringIndex = "test2";
290+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
291+
try (var mockLog = MockLog.capture(SearchService.class)) {
292+
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
293+
294+
getRestClient().performRequest(searchRequest);
295+
mockLog.assertAllExpectationsMatched();
296+
}
297+
}
175298
}

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import java.util.function.Supplier;
159159

160160
import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER;
161+
import static org.elasticsearch.common.Strings.format;
161162
import static org.elasticsearch.core.TimeValue.timeValueHours;
162163
import static org.elasticsearch.core.TimeValue.timeValueMillis;
163164
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
@@ -529,12 +530,18 @@ protected void doClose() {
529530
* @param <T> the type of the response
530531
* @param listener the action listener to be wrapped
531532
* @param version channel version of the request
533+
* @param nodeId id of the current node
534+
* @param shardId id of the shard being searched
535+
* @param taskId id of the task being executed
532536
* @param threadPool with context where to write the new header
533537
* @return the wrapped action listener
534538
*/
535539
static <T> ActionListener<T> maybeWrapListenerForStackTrace(
536540
ActionListener<T> listener,
537541
TransportVersion version,
542+
String nodeId,
543+
ShardId shardId,
544+
long taskId,
538545
ThreadPool threadPool
539546
) {
540547
boolean header = true;
@@ -543,6 +550,18 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
543550
}
544551
if (header == false) {
545552
return listener.delegateResponse((l, e) -> {
553+
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
554+
"[%s]%s: failed to execute search request for task [%d]",
555+
nodeId,
556+
shardId,
557+
taskId
558+
);
559+
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
560+
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
561+
logger.debug(messageSupplier, e);
562+
} else {
563+
logger.warn(messageSupplier, e);
564+
}
546565
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> {
547566
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY);
548567
return false;
@@ -554,7 +573,14 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
554573
}
555574

556575
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
557-
listener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
576+
listener = maybeWrapListenerForStackTrace(
577+
listener,
578+
request.getChannelVersion(),
579+
clusterService.localNode().getId(),
580+
request.shardId(),
581+
task.getId(),
582+
threadPool
583+
);
558584
final IndexShard shard = getShard(request);
559585
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
560586
// fork the execution in the search thread pool
@@ -592,7 +618,14 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
592618
}
593619

594620
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
595-
ActionListener<SearchPhaseResult> finalListener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
621+
ActionListener<SearchPhaseResult> finalListener = maybeWrapListenerForStackTrace(
622+
listener,
623+
request.getChannelVersion(),
624+
clusterService.localNode().getId(),
625+
request.shardId(),
626+
task.getId(),
627+
threadPool
628+
);
596629
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
597630
: "empty responses require more than one shard";
598631
final IndexShard shard = getShard(request);
@@ -785,9 +818,16 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
785818
}
786819

787820
public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) {
788-
listener = maybeWrapListenerForStackTrace(listener, request.getShardSearchRequest().getChannelVersion(), threadPool);
789821
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
790822
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
823+
listener = maybeWrapListenerForStackTrace(
824+
listener,
825+
shardSearchRequest.getChannelVersion(),
826+
clusterService.localNode().getId(),
827+
shardSearchRequest.shardId(),
828+
task.getId(),
829+
threadPool
830+
);
791831
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
792832
runAsync(getExecutor(readerContext.indexShard()), () -> {
793833
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.RANK_FEATURE, false)) {
@@ -832,8 +872,15 @@ public void executeQueryPhase(
832872
ActionListener<ScrollQuerySearchResult> listener,
833873
TransportVersion version
834874
) {
835-
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
836875
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
876+
listener = maybeWrapListenerForStackTrace(
877+
listener,
878+
version,
879+
clusterService.localNode().getId(),
880+
readerContext.indexShard().shardId(),
881+
task.getId(),
882+
threadPool
883+
);
837884
final Releasable markAsUsed;
838885
try {
839886
markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll()));
@@ -874,9 +921,16 @@ public void executeQueryPhase(
874921
ActionListener<QuerySearchResult> listener,
875922
TransportVersion version
876923
) {
877-
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
878924
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
879925
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
926+
listener = maybeWrapListenerForStackTrace(
927+
listener,
928+
version,
929+
clusterService.localNode().getId(),
930+
shardSearchRequest.shardId(),
931+
task.getId(),
932+
threadPool
933+
);
880934
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
881935
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> {
882936
// fork the execution in the search thread pool

0 commit comments

Comments
 (0)