Skip to content

Commit b185eb1

Browse files
committed
Convert BytesTransportResponse
1 parent ce6435c commit b185eb1

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public static final class NodeQueryResponse extends TransportResponse {
212212
private final SearchPhaseController.TopDocsStats topDocsStats;
213213
private final QueryPhaseResultConsumer.MergeResult mergeResult;
214214

215-
NodeQueryResponse(StreamInput in) throws IOException {
215+
public NodeQueryResponse(StreamInput in) throws IOException {
216216
this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new);
217217
this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in);
218218
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);

test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,21 @@
1212
import org.apache.logging.log4j.Level;
1313
import org.elasticsearch.ExceptionsHelper;
1414
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction;
15+
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NodeQueryResponse;
16+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
17+
import org.elasticsearch.common.io.stream.StreamInput;
1518
import org.elasticsearch.index.query.QueryShardException;
1619
import org.elasticsearch.test.ESIntegTestCase;
1720
import org.elasticsearch.test.InternalTestCluster;
1821
import org.elasticsearch.test.MockLog;
1922
import org.elasticsearch.test.transport.MockTransportService;
23+
import org.elasticsearch.transport.BytesTransportResponse;
2024
import org.elasticsearch.transport.TransportMessageListener;
2125
import org.elasticsearch.transport.TransportResponse;
2226
import org.elasticsearch.transport.TransportService;
2327

28+
import java.io.IOException;
29+
import java.io.UncheckedIOException;
2430
import java.util.Arrays;
2531
import java.util.Optional;
2632
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +48,7 @@ public static BooleanSupplier setupErrorTraceListener(InternalTestCluster intern
4248
internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> {
4349
var mockTs = asInstanceOf(MockTransportService.class, ts);
4450
mockTs.addMessageListener(new TransportMessageListener() {
51+
// This is called when error_trace is false
4552
@Override
4653
public void onResponseSent(long requestId, String action, Exception error) {
4754
TransportMessageListener.super.onResponseSent(requestId, action, error);
@@ -50,13 +57,28 @@ public void onResponseSent(long requestId, String action, Exception error) {
5057
}
5158
}
5259

60+
// This is called when error_trace is true
5361
@Override
5462
public void onBeforeResponseSent(long requestId, String action, TransportResponse response) {
5563
if (SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME.equals(action)) {
56-
var r = asInstanceOf(SearchQueryThenFetchAsyncAction.NodeQueryResponse.class, response);
57-
for (Object result : r.getResults()) {
58-
if (result instanceof Exception error) {
59-
checkStacktraceStateAndRemove(error, mockTs);
64+
var bytes = asInstanceOf(BytesTransportResponse.class, response);
65+
NodeQueryResponse nodeQueryResponse = null;
66+
try (StreamInput in = bytes.bytes().streamInput()) {
67+
var namedWriteableAwareInput = new NamedWriteableAwareStreamInput(
68+
in,
69+
internalCluster.getNamedWriteableRegistry()
70+
);
71+
nodeQueryResponse = new NodeQueryResponse(namedWriteableAwareInput);
72+
for (Object result : nodeQueryResponse.getResults()) {
73+
if (result instanceof Exception error) {
74+
checkStacktraceStateAndRemove(error, mockTs);
75+
}
76+
}
77+
} catch (IOException e) {
78+
throw new UncheckedIOException(e);
79+
} finally {
80+
if (nodeQueryResponse != null) {
81+
nodeQueryResponse.decRef();
6082
}
6183
}
6284
}
@@ -75,9 +97,9 @@ private void checkStacktraceStateAndRemove(Exception error, MockTransportService
7597
/**
7698
* Adds expectations for debug logging of a message and exception on each shard of the given index.
7799
*
78-
* @param numShards the number of shards in the index (an expectation will be added for each shard)
79-
* @param mockLog the mock log
80-
* @param errorTriggeringIndex the name of the index that will trigger the error
100+
* @param numShards the number of shards in the index (an expectation will be added for each shard)
101+
* @param mockLog the mock log
102+
* @param errorTriggeringIndex the name of the index that will trigger the error
81103
*/
82104
public static void addSeenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) {
83105
String nodesDisjunction = format(

0 commit comments

Comments
 (0)