Skip to content

Commit 9eadfac

Browse files
authored
Fix Async/SearchErrorTraceIT to work with batched query execution (#132227)
1 parent 605262d commit 9eadfac

File tree

4 files changed

+92
-66
lines changed

4 files changed

+92
-66
lines changed

qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.action.search.MultiSearchRequest;
1717
import org.elasticsearch.action.search.SearchRequest;
1818
import org.elasticsearch.client.Request;
19-
import org.elasticsearch.common.settings.Settings;
2019
import org.elasticsearch.common.util.CollectionUtils;
2120
import org.elasticsearch.plugins.Plugin;
2221
import org.elasticsearch.search.ErrorTraceHelper;
@@ -25,19 +24,15 @@
2524
import org.elasticsearch.test.MockLog;
2625
import org.elasticsearch.test.transport.MockTransportService;
2726
import org.elasticsearch.xcontent.XContentType;
28-
import org.junit.After;
29-
import org.junit.Before;
3027
import org.junit.BeforeClass;
3128

3229
import java.io.IOException;
3330
import java.nio.charset.Charset;
3431
import java.util.Collection;
35-
import java.util.function.BooleanSupplier;
3632

3733
import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;
3834

3935
public class SearchErrorTraceIT extends HttpSmokeTestCase {
40-
private BooleanSupplier hasStackTrace;
4136

4237
@Override
4338
protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -49,18 +44,6 @@ public static void setDebugLogLevel() {
4944
Configurator.setLevel(SearchService.class, Level.DEBUG);
5045
}
5146

52-
@Before
53-
public void setupMessageListener() {
54-
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
55-
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
56-
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
57-
}
58-
59-
@After
60-
public void resetSettings() {
61-
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
62-
}
63-
6447
private void setupIndexWithDocs() {
6548
createIndex("test1", "test2");
6649
indexRandom(
@@ -86,7 +69,7 @@ public void testSearchFailingQueryErrorTraceDefault() throws IOException {
8669
}
8770
""");
8871
getRestClient().performRequest(searchRequest);
89-
assertFalse(hasStackTrace.getAsBoolean());
72+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
9073
}
9174

9275
public void testSearchFailingQueryErrorTraceTrue() throws IOException {
@@ -105,7 +88,7 @@ public void testSearchFailingQueryErrorTraceTrue() throws IOException {
10588
""");
10689
searchRequest.addParameter("error_trace", "true");
10790
getRestClient().performRequest(searchRequest);
108-
assertTrue(hasStackTrace.getAsBoolean());
91+
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
10992
}
11093

11194
public void testSearchFailingQueryErrorTraceFalse() throws IOException {
@@ -124,7 +107,7 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException {
124107
""");
125108
searchRequest.addParameter("error_trace", "false");
126109
getRestClient().performRequest(searchRequest);
127-
assertFalse(hasStackTrace.getAsBoolean());
110+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
128111
}
129112

130113
public void testDataNodeLogsStackTrace() throws IOException {
@@ -173,7 +156,7 @@ public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException {
173156
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
174157
);
175158
getRestClient().performRequest(searchRequest);
176-
assertFalse(hasStackTrace.getAsBoolean());
159+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
177160
}
178161

179162
public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException {
@@ -190,7 +173,7 @@ public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException {
190173
);
191174
searchRequest.addParameter("error_trace", "true");
192175
getRestClient().performRequest(searchRequest);
193-
assertTrue(hasStackTrace.getAsBoolean());
176+
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
194177
}
195178

196179
public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
@@ -207,8 +190,7 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
207190
);
208191
searchRequest.addParameter("error_trace", "false");
209192
getRestClient().performRequest(searchRequest);
210-
211-
assertFalse(hasStackTrace.getAsBoolean());
193+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
212194
}
213195

214196
public void testDataNodeLogsStackTraceMultiSearch() throws IOException {

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public static final class NodeQueryResponse extends TransportResponse {
222222
private final SearchPhaseController.TopDocsStats topDocsStats;
223223
private final QueryPhaseResultConsumer.MergeResult mergeResult;
224224

225-
NodeQueryResponse(StreamInput in) throws IOException {
225+
public NodeQueryResponse(StreamInput in) throws IOException {
226226
this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new);
227227
this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in);
228228
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);

test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java

Lines changed: 80 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,47 +11,108 @@
1111

1212
import org.apache.logging.log4j.Level;
1313
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NodeQueryResponse;
15+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
16+
import org.elasticsearch.common.io.stream.StreamInput;
1417
import org.elasticsearch.index.query.QueryShardException;
1518
import org.elasticsearch.test.ESIntegTestCase;
1619
import org.elasticsearch.test.InternalTestCluster;
1720
import org.elasticsearch.test.MockLog;
1821
import org.elasticsearch.test.transport.MockTransportService;
19-
import org.elasticsearch.transport.TransportMessageListener;
22+
import org.elasticsearch.transport.BytesTransportResponse;
23+
import org.elasticsearch.transport.TransportChannel;
24+
import org.elasticsearch.transport.TransportResponse;
2025
import org.elasticsearch.transport.TransportService;
2126

27+
import java.io.IOException;
28+
import java.io.UncheckedIOException;
2229
import java.util.Arrays;
23-
import java.util.Optional;
24-
import java.util.concurrent.atomic.AtomicBoolean;
25-
import java.util.function.BooleanSupplier;
2630
import java.util.stream.Collectors;
2731

32+
import static org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME;
2833
import static org.elasticsearch.common.Strings.format;
2934
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
3035
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
36+
import static org.junit.Assert.assertEquals;
37+
import static org.junit.Assert.assertTrue;
3138

3239
/**
3340
* Utilities around testing the `error_trace` message header in search.
3441
*/
3542
public enum ErrorTraceHelper {
3643
;
3744

38-
public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
39-
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
45+
public static void assertStackTraceObserved(InternalTestCluster internalTestCluster) {
46+
assertStackTraceObserved(internalTestCluster, true);
47+
}
48+
49+
public static void assertStackTraceCleared(InternalTestCluster internalTestCluster) {
50+
assertStackTraceObserved(internalTestCluster, false);
51+
}
52+
53+
private static void assertStackTraceObserved(InternalTestCluster internalCluster, boolean shouldObserveStackTrace) {
4054
internalCluster.getDataNodeInstances(TransportService.class)
41-
.forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() {
42-
@Override
43-
public void onResponseSent(long requestId, String action, Exception error) {
44-
TransportMessageListener.super.onResponseSent(requestId, action, error);
45-
if (action.startsWith("indices:data/read/search")) {
46-
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
47-
error,
48-
t -> t.getStackTrace().length > 0
49-
);
50-
transportMessageHasStackTrace.set(throwable.isPresent());
55+
.forEach(
56+
ts -> asInstanceOf(MockTransportService.class, ts).addRequestHandlingBehavior(
57+
NODE_SEARCH_ACTION_NAME,
58+
(handler, request, channel, task) -> {
59+
TransportChannel wrappedChannel = new TransportChannel() {
60+
@Override
61+
public String getProfileName() {
62+
return channel.getProfileName();
63+
}
64+
65+
@Override
66+
public void sendResponse(TransportResponse response) {
67+
var bytes = asInstanceOf(BytesTransportResponse.class, response);
68+
NodeQueryResponse nodeQueryResponse = null;
69+
try (StreamInput in = bytes.bytes().streamInput()) {
70+
var namedWriteableAwareInput = new NamedWriteableAwareStreamInput(
71+
in,
72+
internalCluster.getNamedWriteableRegistry()
73+
);
74+
nodeQueryResponse = new NodeQueryResponse(namedWriteableAwareInput);
75+
for (Object result : nodeQueryResponse.getResults()) {
76+
if (result instanceof Exception error) {
77+
inspectStackTraceAndAssert(error);
78+
}
79+
}
80+
} catch (IOException e) {
81+
throw new UncheckedIOException(e);
82+
} finally {
83+
if (nodeQueryResponse != null) {
84+
nodeQueryResponse.decRef();
85+
}
86+
}
87+
88+
// Forward to the original channel
89+
channel.sendResponse(response);
90+
}
91+
92+
@Override
93+
public void sendResponse(Exception error) {
94+
inspectStackTraceAndAssert(error);
95+
96+
// Forward to the original channel
97+
channel.sendResponse(error);
98+
}
99+
100+
private void inspectStackTraceAndAssert(Exception error) {
101+
ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> {
102+
if (shouldObserveStackTrace) {
103+
assertTrue(t.getStackTrace().length > 0);
104+
} else {
105+
assertEquals(0, t.getStackTrace().length);
106+
}
107+
return true;
108+
});
109+
}
110+
};
111+
112+
handler.messageReceived(request, wrappedChannel, task);
51113
}
52-
}
53-
}));
54-
return transportMessageHasStackTrace::get;
114+
)
115+
);
55116
}
56117

57118
/**

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.logging.log4j.core.config.Configurator;
1212
import org.elasticsearch.client.Request;
1313
import org.elasticsearch.client.Response;
14-
import org.elasticsearch.common.settings.Settings;
1514
import org.elasticsearch.common.util.CollectionUtils;
1615
import org.elasticsearch.common.xcontent.XContentHelper;
1716
import org.elasticsearch.plugins.Plugin;
@@ -21,17 +20,13 @@
2120
import org.elasticsearch.test.MockLog;
2221
import org.elasticsearch.test.transport.MockTransportService;
2322
import org.elasticsearch.xcontent.XContentType;
24-
import org.junit.After;
25-
import org.junit.Before;
2623
import org.junit.BeforeClass;
2724

2825
import java.io.IOException;
2926
import java.util.Collection;
3027
import java.util.Map;
31-
import java.util.function.BooleanSupplier;
3228

3329
public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
34-
private BooleanSupplier transportMessageHasStackTrace;
3530

3631
@Override
3732
protected boolean addMockHttpTransport() {
@@ -49,18 +44,6 @@ public static void setDebugLogLevel() {
4944
Configurator.setLevel(SearchService.class, Level.DEBUG);
5045
}
5146

52-
@Before
53-
public void setupMessageListener() {
54-
transportMessageHasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
55-
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
56-
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
57-
}
58-
59-
@After
60-
public void resetSettings() {
61-
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
62-
}
63-
6447
private void setupIndexWithDocs() {
6548
createIndex("test1", "test2");
6649
indexRandom(
@@ -94,7 +77,7 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
9477
awaitAsyncRequestDoneRunning(getAsyncRequest);
9578
}
9679
// check that the stack trace was not sent from the data node to the coordinating node
97-
assertFalse(transportMessageHasStackTrace.getAsBoolean());
80+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
9881
}
9982

10083
public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
@@ -122,7 +105,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
122105
awaitAsyncRequestDoneRunning(getAsyncRequest);
123106
}
124107
// check that the stack trace was sent from the data node to the coordinating node
125-
assertTrue(transportMessageHasStackTrace.getAsBoolean());
108+
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
126109
}
127110

128111
public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
@@ -150,7 +133,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
150133
awaitAsyncRequestDoneRunning(getAsyncRequest);
151134
}
152135
// check that the stack trace was not sent from the data node to the coordinating node
153-
assertFalse(transportMessageHasStackTrace.getAsBoolean());
136+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
154137
}
155138

156139
public void testDataNodeLogsStackTrace() throws Exception {
@@ -225,7 +208,7 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
225208
awaitAsyncRequestDoneRunning(getAsyncRequest);
226209
}
227210
// check that the stack trace was not sent from the data node to the coordinating node
228-
assertFalse(transportMessageHasStackTrace.getAsBoolean());
211+
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
229212
}
230213

231214
public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws Exception {
@@ -253,7 +236,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
253236
awaitAsyncRequestDoneRunning(getAsyncRequest);
254237
}
255238
// check that the stack trace was sent from the data node to the coordinating node
256-
assertTrue(transportMessageHasStackTrace.getAsBoolean());
239+
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
257240
}
258241

259242
private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws IOException {

0 commit comments

Comments
 (0)