From 69bfa0a6ce5eb7196bc239aebc15428e3147ab49 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 22 Apr 2025 15:45:15 +0200 Subject: [PATCH 1/2] Fix SearchErrorTraceIT and friends to work with batched query execution Making this work with batched execution and fixing a memory leak: * Fix memory leak by removing listener on first message. There really only is a single message here per node anyway with batched execution in the mix. Either it's a single shard on the data node and we get a single query message or it's multiple shards and we get a single batched message, so fine to remove listener after the first message since all tests do a single request only anyway. * Add a new hook that allows inspection of the actual response. This is needed for batched since batched sends a non-error response even if the data node failed all searches. We had this before in the `onResponseSent` hook but checking the instance after it's been sent over the wire causes needless overhead in the production code so moving to a "before-style" hook here. --- .../http/SearchErrorTraceIT.java | 2 -- .../SearchQueryThenFetchAsyncAction.java | 2 +- .../transport/OutboundHandler.java | 2 ++ .../transport/TransportMessageListener.java | 8 +++++ .../search/ErrorTraceHelper.java | 34 ++++++++++++++----- .../test/transport/MockTransportService.java | 14 ++++++++ .../xpack/search/AsyncSearchErrorTraceIT.java | 2 -- 7 files changed, 51 insertions(+), 13 deletions(-) diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java index baf7cc183afd2..fdb51872bc127 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -52,8 +52,6 @@ public static void setDebugLogLevel() { @Before public void setupMessageListener() { hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); - // TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); } @After diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index e552d9c9606c8..a577cb8727913 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -543,7 +543,7 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP } } - private static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; + public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; static void registerNodeSearchAction( SearchTransportService searchTransportService, diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index a83d1019e7c64..56fda3543f6d9 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -145,6 +145,8 @@ void sendResponse( ) { assert assertValidTransportVersion(transportVersion); assert response.hasReferences(); + var messageListener = this.messageListener; + messageListener.onBeforeResponseSent(requestId, action, response); try { sendMessage( channel, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java index e751dd7e0aab4..f16a76c255333 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java @@ -30,6 +30,14 @@ default void onRequestReceived(long requestId, String action) {} */ default void onResponseSent(long requestId, String action) {} + /** + * Called for every action response sent before the response has been passed to the underlying network implementation. + * @param requestId the request ID (unique per client) + * @param action the request action + * @param response response instance + */ + default void onBeforeResponseSent(long requestId, String action, TransportResponse response) {} + /*** * Called for every failed action response after the response has been passed to the underlying network implementation. * @param requestId the request ID (unique per client) diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index 90f2ddd00acee..568b7f36ea400 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -11,12 +11,14 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction; import org.elasticsearch.index.query.QueryShardException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.util.Arrays; @@ -38,20 +40,36 @@ public enum ErrorTraceHelper { public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) { final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false); - internalCluster.getDataNodeInstances(TransportService.class) - .forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() { + internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> { + var mockTs = asInstanceOf(MockTransportService.class, ts); + mockTs.addMessageListener(new TransportMessageListener() { @Override public void onResponseSent(long requestId, String action, Exception error) { TransportMessageListener.super.onResponseSent(requestId, action, error); if (action.startsWith("indices:data/read/search")) { - Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( - error, - t -> t.getStackTrace().length > 0 - ); - transportMessageHasStackTrace.set(throwable.isPresent()); + checkStacktraceStateAndRemove(error, mockTs); } } - })); + + @Override + public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { + if (SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME.equals(action)) { + var r = asInstanceOf(SearchQueryThenFetchAsyncAction.NodeQueryResponse.class, response); + for (Object result : r.getResults()) { + if (result instanceof Exception error) { + checkStacktraceStateAndRemove(error, mockTs); + } + } + } + } + + private void checkStacktraceStateAndRemove(Exception error, MockTransportService mockTs) { + Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0); + transportMessageHasStackTrace.set(throwable.isPresent()); + mockTs.removeMessageListener(this); + } + }); + }); return transportMessageHasStackTrace::get; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 3f6cf453fd0d1..3459b1ec1c269 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -60,6 +60,7 @@ import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; @@ -849,6 +850,12 @@ public void onResponseSent(long requestId, String action) { messageListener.onResponseSent(requestId, action); } + @Override + public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { + super.onBeforeResponseSent(requestId, action, response); + messageListener.onBeforeResponseSent(requestId, action, response); + } + @Override public void onResponseSent(long requestId, String action, Exception e) { super.onResponseSent(requestId, action, e); @@ -901,6 +908,13 @@ public void onRequestSent( } } + @Override + public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { + for (TransportMessageListener listener : listeners) { + listener.onBeforeResponseSent(requestId, action, response); + } + } + @Override @SuppressWarnings("rawtypes") public void onResponseReceived(long requestId, Transport.ResponseContext holder) { diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index ef05307743a46..341549b5679f1 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -53,8 +53,6 @@ public static void setDebugLogLevel() { @Before public void setupMessageListener() { transportMessageHasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); - // TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); } @After From b185eb11d61bf480795cbd38deecf0194a65dcb9 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 29 Jul 2025 14:53:18 -0400 Subject: [PATCH 2/2] Convert BytesTransportResponse --- .../SearchQueryThenFetchAsyncAction.java | 2 +- .../search/ErrorTraceHelper.java | 36 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index b2d5693762dca..8aec4b68b11be 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -212,7 +212,7 @@ public static final class NodeQueryResponse extends TransportResponse { private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; - NodeQueryResponse(StreamInput in) throws IOException { + public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index 97a877e3bc586..438214eb679fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -12,15 +12,21 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction; +import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NodeQueryResponse; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.query.QueryShardException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +48,7 @@ public static BooleanSupplier setupErrorTraceListener(InternalTestCluster intern internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> { var mockTs = asInstanceOf(MockTransportService.class, ts); mockTs.addMessageListener(new TransportMessageListener() { + // This is called when error_trace is false @Override public void onResponseSent(long requestId, String action, Exception error) { TransportMessageListener.super.onResponseSent(requestId, action, error); @@ -50,13 +57,28 @@ public void onResponseSent(long requestId, String action, Exception error) { } } + // This is called when error_trace is true @Override public void onBeforeResponseSent(long requestId, String action, TransportResponse response) { if (SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME.equals(action)) { - var r = asInstanceOf(SearchQueryThenFetchAsyncAction.NodeQueryResponse.class, response); - for (Object result : r.getResults()) { - if (result instanceof Exception error) { - checkStacktraceStateAndRemove(error, mockTs); + var bytes = asInstanceOf(BytesTransportResponse.class, response); + NodeQueryResponse nodeQueryResponse = null; + try (StreamInput in = bytes.bytes().streamInput()) { + var namedWriteableAwareInput = new NamedWriteableAwareStreamInput( + in, + internalCluster.getNamedWriteableRegistry() + ); + nodeQueryResponse = new NodeQueryResponse(namedWriteableAwareInput); + for (Object result : nodeQueryResponse.getResults()) { + if (result instanceof Exception error) { + checkStacktraceStateAndRemove(error, mockTs); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (nodeQueryResponse != null) { + nodeQueryResponse.decRef(); } } } @@ -75,9 +97,9 @@ private void checkStacktraceStateAndRemove(Exception error, MockTransportService /** * Adds expectations for debug logging of a message and exception on each shard of the given index. * - * @param numShards the number of shards in the index (an expectation will be added for each shard) - * @param mockLog the mock log - * @param errorTriggeringIndex the name of the index that will trigger the error + * @param numShards the number of shards in the index (an expectation will be added for each shard) + * @param mockLog the mock log + * @param errorTriggeringIndex the name of the index that will trigger the error */ public static void addSeenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) { String nodesDisjunction = format(