diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java index 398e6d5c94773..f407592f63132 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Request; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; @@ -25,19 +24,15 @@ import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; -import org.junit.After; -import org.junit.Before; import org.junit.BeforeClass; import java.io.IOException; import java.nio.charset.Charset; import java.util.Collection; -import java.util.function.BooleanSupplier; import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; public class SearchErrorTraceIT extends HttpSmokeTestCase { - private BooleanSupplier hasStackTrace; @Override protected Collection> nodePlugins() { @@ -49,18 +44,6 @@ public static void setDebugLogLevel() { Configurator.setLevel(SearchService.class, Level.DEBUG); } - @Before - public void setupMessageListener() { - hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); - // TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); - } - - @After - public void resetSettings() { - updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey())); - } - private void setupIndexWithDocs() { createIndex("test1", "test2"); indexRandom( @@ -86,7 +69,7 @@ public void testSearchFailingQueryErrorTraceDefault() throws IOException { } """); getRestClient().performRequest(searchRequest); - assertFalse(hasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testSearchFailingQueryErrorTraceTrue() throws IOException { @@ -105,7 +88,7 @@ public void testSearchFailingQueryErrorTraceTrue() throws IOException { """); searchRequest.addParameter("error_trace", "true"); getRestClient().performRequest(searchRequest); - assertTrue(hasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceObserved(internalCluster()); } public void testSearchFailingQueryErrorTraceFalse() throws IOException { @@ -124,7 +107,7 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException { """); searchRequest.addParameter("error_trace", "false"); getRestClient().performRequest(searchRequest); - assertFalse(hasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testDataNodeLogsStackTrace() throws IOException { @@ -173,7 +156,7 @@ public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException { new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null)) ); getRestClient().performRequest(searchRequest); - assertFalse(hasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException { @@ -190,7 +173,7 @@ public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException { ); searchRequest.addParameter("error_trace", "true"); getRestClient().performRequest(searchRequest); - assertTrue(hasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceObserved(internalCluster()); } public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException { @@ -207,8 +190,7 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException { ); searchRequest.addParameter("error_trace", "false"); getRestClient().performRequest(searchRequest); - - assertFalse(hasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testDataNodeLogsStackTraceMultiSearch() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 51406d8c9ad19..a8f22eb1cc572 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -222,7 +222,7 @@ public static final class NodeQueryResponse extends TransportResponse { private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; - NodeQueryResponse(StreamInput in) throws IOException { + public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index b33fb852c52db..ab8308391b202 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -11,23 +11,30 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NodeQueryResponse; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.query.QueryShardException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.BytesTransportResponse; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; +import static org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME; import static org.elasticsearch.common.Strings.format; import static org.elasticsearch.test.ESIntegTestCase.internalCluster; import static org.elasticsearch.test.ESTestCase.asInstanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Utilities around testing the `error_trace` message header in search. @@ -35,23 +42,77 @@ public enum ErrorTraceHelper { ; - public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) { - final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false); + public static void assertStackTraceObserved(InternalTestCluster internalTestCluster) { + assertStackTraceObserved(internalTestCluster, true); + } + + public static void assertStackTraceCleared(InternalTestCluster internalTestCluster) { + assertStackTraceObserved(internalTestCluster, false); + } + + private static void assertStackTraceObserved(InternalTestCluster internalCluster, boolean shouldObserveStackTrace) { internalCluster.getDataNodeInstances(TransportService.class) - .forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() { - @Override - public void onResponseSent(long requestId, String action, Exception error) { - TransportMessageListener.super.onResponseSent(requestId, action, error); - if (action.startsWith("indices:data/read/search")) { - Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( - error, - t -> t.getStackTrace().length > 0 - ); - transportMessageHasStackTrace.set(throwable.isPresent()); + .forEach( + ts -> asInstanceOf(MockTransportService.class, ts).addRequestHandlingBehavior( + NODE_SEARCH_ACTION_NAME, + (handler, request, channel, task) -> { + TransportChannel wrappedChannel = new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public void sendResponse(TransportResponse response) { + var bytes = asInstanceOf(BytesTransportResponse.class, response); + NodeQueryResponse nodeQueryResponse = null; + try (StreamInput in = bytes.bytes().streamInput()) { + var namedWriteableAwareInput = new NamedWriteableAwareStreamInput( + in, + internalCluster.getNamedWriteableRegistry() + ); + nodeQueryResponse = new NodeQueryResponse(namedWriteableAwareInput); + for (Object result : nodeQueryResponse.getResults()) { + if (result instanceof Exception error) { + inspectStackTraceAndAssert(error); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (nodeQueryResponse != null) { + nodeQueryResponse.decRef(); + } + } + + // Forward to the original channel + channel.sendResponse(response); + } + + @Override + public void sendResponse(Exception error) { + inspectStackTraceAndAssert(error); + + // Forward to the original channel + channel.sendResponse(error); + } + + private void inspectStackTraceAndAssert(Exception error) { + ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> { + if (shouldObserveStackTrace) { + assertTrue(t.getStackTrace().length > 0); + } else { + assertEquals(0, t.getStackTrace().length); + } + return true; + }); + } + }; + + handler.messageReceived(request, wrappedChannel, task); } - } - })); - return transportMessageHasStackTrace::get; + ) + ); } /** diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index e2b76658e5246..cb1b4997be578 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.core.config.Configurator; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.plugins.Plugin; @@ -21,17 +20,13 @@ import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; -import org.junit.After; -import org.junit.Before; import org.junit.BeforeClass; import java.io.IOException; import java.util.Collection; import java.util.Map; -import java.util.function.BooleanSupplier; public class AsyncSearchErrorTraceIT extends ESIntegTestCase { - private BooleanSupplier transportMessageHasStackTrace; @Override protected boolean addMockHttpTransport() { @@ -49,18 +44,6 @@ public static void setDebugLogLevel() { Configurator.setLevel(SearchService.class, Level.DEBUG); } - @Before - public void setupMessageListener() { - transportMessageHasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); - // TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); - } - - @After - public void resetSettings() { - updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey())); - } - private void setupIndexWithDocs() { createIndex("test1", "test2"); indexRandom( @@ -94,7 +77,7 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception { awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node - assertFalse(transportMessageHasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception { @@ -122,7 +105,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception { awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was sent from the data node to the coordinating node - assertTrue(transportMessageHasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceObserved(internalCluster()); } public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception { @@ -150,7 +133,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception { awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node - assertFalse(transportMessageHasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testDataNodeLogsStackTrace() throws Exception { @@ -225,7 +208,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was not sent from the data node to the coordinating node - assertFalse(transportMessageHasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceCleared(internalCluster()); } public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws Exception { @@ -253,7 +236,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr awaitAsyncRequestDoneRunning(getAsyncRequest); } // check that the stack trace was sent from the data node to the coordinating node - assertTrue(transportMessageHasStackTrace.getAsBoolean()); + ErrorTraceHelper.assertStackTraceObserved(internalCluster()); } private Map performRequestAndGetResponseEntity(Request r) throws IOException {