diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java index 398e6d5c94773..4fe894728ee5b 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -52,8 +52,6 @@ public static void setDebugLogLevel() { @Before public void setupMessageListener() { hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); - // TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); } @After diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 8d763698c63c0..8aec4b68b11be 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -212,7 +212,7 @@ public static final class NodeQueryResponse extends TransportResponse { private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; - NodeQueryResponse(StreamInput in) throws IOException { + public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); @@ -552,7 +552,7 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP } } - private static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; + public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; static void registerNodeSearchAction( SearchTransportService searchTransportService, diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index b0408ac3c60cc..450259b1bd399 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -145,6 +145,8 @@ void sendResponse( ) { assert assertValidTransportVersion(transportVersion); assert response.hasReferences(); + var messageListener = this.messageListener; + messageListener.onBeforeResponseSent(requestId, action, response); try { sendMessage( channel, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java index e751dd7e0aab4..f16a76c255333 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java @@ -30,6 +30,14 @@ default void onRequestReceived(long requestId, String action) {} */ default void onResponseSent(long requestId, String action) {} + /** + * Called for every action response sent before the response has been passed to the underlying network implementation. + * @param requestId the request ID (unique per client) + * @param action the request action + * @param response response instance + */ + default void onBeforeResponseSent(long requestId, String action, TransportResponse response) {} + /*** * Called for every failed action response after the response has been passed to the underlying network implementation. * @param requestId the request ID (unique per client) diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index b33fb852c52db..438214eb679fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -11,14 +11,22 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction; +import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NodeQueryResponse; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.query.QueryShardException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,29 +45,61 @@ public enum ErrorTraceHelper { public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) { final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false); - internalCluster.getDataNodeInstances(TransportService.class) - .forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() { + internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> { + var mockTs = asInstanceOf(MockTransportService.class, ts); + mockTs.addMessageListener(new TransportMessageListener() { + // This is called when error_trace is false @Override public void onResponseSent(long requestId, String action, Exception error) { TransportMessageListener.super.onResponseSent(requestId, action, error); if (action.startsWith("indices:data/read/search")) { - Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( - error, - t -> t.getStackTrace().length > 0 - ); - transportMessageHasStackTrace.set(throwable.isPresent()); + checkStacktraceStateAndRemove(error, mockTs); } } - })); + + // This is called when error_trace is true + @Override + public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { + if (SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME.equals(action)) { + var bytes = asInstanceOf(BytesTransportResponse.class, response); + NodeQueryResponse nodeQueryResponse = null; + try (StreamInput in = bytes.bytes().streamInput()) { + var namedWriteableAwareInput = new NamedWriteableAwareStreamInput( + in, + internalCluster.getNamedWriteableRegistry() + ); + nodeQueryResponse = new NodeQueryResponse(namedWriteableAwareInput); + for (Object result : nodeQueryResponse.getResults()) { + if (result instanceof Exception error) { + checkStacktraceStateAndRemove(error, mockTs); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (nodeQueryResponse != null) { + nodeQueryResponse.decRef(); + } + } + } + } + + private void checkStacktraceStateAndRemove(Exception error, MockTransportService mockTs) { + Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0); + transportMessageHasStackTrace.set(throwable.isPresent()); + mockTs.removeMessageListener(this); + } + }); + }); return transportMessageHasStackTrace::get; } /** * Adds expectations for debug logging of a message and exception on each shard of the given index. * - * @param numShards the number of shards in the index (an expectation will be added for each shard) - * @param mockLog the mock log - * @param errorTriggeringIndex the name of the index that will trigger the error + * @param numShards the number of shards in the index (an expectation will be added for each shard) + * @param mockLog the mock log + * @param errorTriggeringIndex the name of the index that will trigger the error */ public static void addSeenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) { String nodesDisjunction = format( diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index a70250335ede7..f823cc12ef72e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -60,6 +60,7 @@ import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; @@ -846,6 +847,12 @@ public void onResponseSent(long requestId, String action) { messageListener.onResponseSent(requestId, action); } + @Override + public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { + super.onBeforeResponseSent(requestId, action, response); + messageListener.onBeforeResponseSent(requestId, action, response); + } + @Override public void onResponseSent(long requestId, String action, Exception e) { super.onResponseSent(requestId, action, e); @@ -898,6 +905,13 @@ public void onRequestSent( } } + @Override + public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { + for (TransportMessageListener listener : listeners) { + listener.onBeforeResponseSent(requestId, action, response); + } + } + @Override @SuppressWarnings("rawtypes") public void onResponseReceived(long requestId, Transport.ResponseContext holder) { diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index e2b76658e5246..71750a9f4c00b 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -52,8 +52,6 @@ public static void setDebugLogLevel() { @Before public void setupMessageListener() { transportMessageHasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); - // TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); } @After