Skip to content

Commit cebc1a8

Browse files
authored
Convert BytesTransportResponse when proxying response from/to local node (elastic#135873)
elastic#127112 introduced `BytesTransportResponse` to be used in search batched execution, so that `NodeQueryResponse` could be written as `BytesTransportResponse` as opposed to materializing the response object on heap. When a proxy node acts as a proxy to query its local data, and the coordinating node is on a different version than the proxy node, the response will fail to deserialize in the coord node because it was written with the version of the proxy node as opposed to that of the coord (target) node. This is because `DirectResponseChannel` skips the step of reading and writing back such response, which would lead to it being converted to the right format. This commit attempts to fix this problem by tracking the version used to write the binary response, and conditionally converting it in the `ProxyRequestHandler` when the version don't align.
1 parent b9f878e commit cebc1a8

File tree

12 files changed

+393
-50
lines changed

12 files changed

+393
-50
lines changed

docs/changelog/135873.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135873
2+
summary: Convert `BytesTransportResponse` when proxying response from/to local node
3+
area: "Network"
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,13 @@ static void registerNodeSearchAction(
607607
}
608608
}
609609
);
610-
TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new);
610+
TransportActionProxy.registerProxyAction(
611+
transportService,
612+
NODE_SEARCH_ACTION_NAME,
613+
true,
614+
NodeQueryResponse::new,
615+
namedWriteableRegistry
616+
);
611617
}
612618

613619
private static void releaseLocalContext(
@@ -845,7 +851,10 @@ void onShardDone() {
845851
out.close();
846852
}
847853
}
848-
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference()));
854+
ActionListener.respondAndRelease(
855+
channelListener,
856+
new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())
857+
);
849858
}
850859

851860
private void maybeFreeContext(

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.client.internal.OriginSettingClient;
2222
import org.elasticsearch.client.internal.node.NodeClient;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
24+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2425
import org.elasticsearch.common.io.stream.StreamInput;
2526
import org.elasticsearch.common.io.stream.StreamOutput;
2627
import org.elasticsearch.common.io.stream.Writeable;
@@ -384,7 +385,11 @@ public void writeTo(StreamOutput out) throws IOException {
384385
}
385386
}
386387

387-
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
388+
public static void registerRequestHandler(
389+
TransportService transportService,
390+
SearchService searchService,
391+
NamedWriteableRegistry namedWriteableRegistry
392+
) {
388393
final TransportRequestHandler<ScrollFreeContextRequest> freeContextHandler = (request, channel, task) -> {
389394
logger.trace("releasing search context [{}]", request.id());
390395
boolean freed = searchService.freeReaderContext(request.id());
@@ -401,7 +406,8 @@ public static void registerRequestHandler(TransportService transportService, Sea
401406
transportService,
402407
FREE_CONTEXT_SCROLL_ACTION_NAME,
403408
false,
404-
SearchFreeContextResponse::readFrom
409+
SearchFreeContextResponse::readFrom,
410+
namedWriteableRegistry
405411
);
406412

407413
// TODO: remove this handler once the lowest compatible version stops using it
@@ -411,7 +417,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
411417
OriginalIndices.readOriginalIndices(in);
412418
return res;
413419
}, freeContextHandler);
414-
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom);
420+
TransportActionProxy.registerProxyAction(
421+
transportService,
422+
FREE_CONTEXT_ACTION_NAME,
423+
false,
424+
SearchFreeContextResponse::readFrom,
425+
namedWriteableRegistry
426+
);
415427

416428
transportService.registerRequestHandler(
417429
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
@@ -426,7 +438,8 @@ public static void registerRequestHandler(TransportService transportService, Sea
426438
transportService,
427439
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
428440
false,
429-
(in) -> ActionResponse.Empty.INSTANCE
441+
(in) -> ActionResponse.Empty.INSTANCE,
442+
namedWriteableRegistry
430443
);
431444

432445
transportService.registerRequestHandler(
@@ -435,7 +448,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
435448
ShardSearchRequest::new,
436449
(request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel))
437450
);
438-
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new);
451+
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new, namedWriteableRegistry);
439452

440453
transportService.registerRequestHandler(
441454
QUERY_ACTION_NAME,
@@ -451,7 +464,8 @@ public static void registerRequestHandler(TransportService transportService, Sea
451464
transportService,
452465
QUERY_ACTION_NAME,
453466
true,
454-
(request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new
467+
(request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new,
468+
namedWriteableRegistry
455469
);
456470

457471
transportService.registerRequestHandler(
@@ -465,7 +479,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
465479
channel.getVersion()
466480
)
467481
);
468-
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new);
482+
TransportActionProxy.registerProxyAction(
483+
transportService,
484+
QUERY_ID_ACTION_NAME,
485+
true,
486+
QuerySearchResult::new,
487+
namedWriteableRegistry
488+
);
469489

470490
transportService.registerRequestHandler(
471491
QUERY_SCROLL_ACTION_NAME,
@@ -478,7 +498,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
478498
channel.getVersion()
479499
)
480500
);
481-
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new);
501+
TransportActionProxy.registerProxyAction(
502+
transportService,
503+
QUERY_SCROLL_ACTION_NAME,
504+
true,
505+
ScrollQuerySearchResult::new,
506+
namedWriteableRegistry
507+
);
482508

483509
transportService.registerRequestHandler(
484510
QUERY_FETCH_SCROLL_ACTION_NAME,
@@ -490,7 +516,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
490516
new ChannelActionListener<>(channel)
491517
)
492518
);
493-
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new);
519+
TransportActionProxy.registerProxyAction(
520+
transportService,
521+
QUERY_FETCH_SCROLL_ACTION_NAME,
522+
true,
523+
ScrollQueryFetchSearchResult::new,
524+
namedWriteableRegistry
525+
);
494526

495527
final TransportRequestHandler<RankFeatureShardRequest> rankShardFeatureRequest = (request, channel, task) -> searchService
496528
.executeRankFeaturePhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
@@ -500,7 +532,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
500532
RankFeatureShardRequest::new,
501533
rankShardFeatureRequest
502534
);
503-
TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new);
535+
TransportActionProxy.registerProxyAction(
536+
transportService,
537+
RANK_FEATURE_SHARD_ACTION_NAME,
538+
true,
539+
RankFeatureResult::new,
540+
namedWriteableRegistry
541+
);
504542

505543
final TransportRequestHandler<ShardFetchRequest> shardFetchRequestHandler = (request, channel, task) -> searchService
506544
.executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
@@ -510,7 +548,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
510548
ShardFetchRequest::new,
511549
shardFetchRequestHandler
512550
);
513-
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new);
551+
TransportActionProxy.registerProxyAction(
552+
transportService,
553+
FETCH_ID_SCROLL_ACTION_NAME,
554+
true,
555+
FetchSearchResult::new,
556+
namedWriteableRegistry
557+
);
514558

515559
transportService.registerRequestHandler(
516560
FETCH_ID_ACTION_NAME,
@@ -520,15 +564,27 @@ public static void registerRequestHandler(TransportService transportService, Sea
520564
ShardFetchSearchRequest::new,
521565
shardFetchRequestHandler
522566
);
523-
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new);
567+
TransportActionProxy.registerProxyAction(
568+
transportService,
569+
FETCH_ID_ACTION_NAME,
570+
true,
571+
FetchSearchResult::new,
572+
namedWriteableRegistry
573+
);
524574

525575
transportService.registerRequestHandler(
526576
QUERY_CAN_MATCH_NODE_NAME,
527577
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION),
528578
CanMatchNodeRequest::new,
529579
(request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
530580
);
531-
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new);
581+
TransportActionProxy.registerProxyAction(
582+
transportService,
583+
QUERY_CAN_MATCH_NODE_NAME,
584+
true,
585+
CanMatchNodeResponse::new,
586+
namedWriteableRegistry
587+
);
532588
}
533589

534590
private static Executor buildFreeContextExecutor(TransportService transportService) {

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,13 @@ public TransportOpenPointInTimeAction(
9494
ShardOpenReaderRequest::new,
9595
new ShardOpenReaderRequestHandler()
9696
);
97-
TransportActionProxy.registerProxyAction(transportService, OPEN_SHARD_READER_CONTEXT_NAME, false, ShardOpenReaderResponse::new);
97+
TransportActionProxy.registerProxyAction(
98+
transportService,
99+
OPEN_SHARD_READER_CONTEXT_NAME,
100+
false,
101+
ShardOpenReaderResponse::new,
102+
namedWriteableRegistry
103+
);
98104
}
99105

100106
@Override

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public TransportSearchAction(
196196
this.searchPhaseController = searchPhaseController;
197197
this.searchTransportService = searchTransportService;
198198
this.remoteClusterService = searchTransportService.getRemoteClusterService();
199-
SearchTransportService.registerRequestHandler(transportService, searchService);
199+
SearchTransportService.registerRequestHandler(transportService, searchService, namedWriteableRegistry);
200200
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(
201201
searchTransportService,
202202
searchService,

server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,41 @@
99

1010
package org.elasticsearch.transport;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.common.bytes.ReleasableBytesReference;
14+
import org.elasticsearch.common.io.stream.StreamInput;
1315
import org.elasticsearch.common.io.stream.StreamOutput;
1416

1517
import java.io.IOException;
18+
import java.util.Objects;
1619

1720
/**
1821
* A specialized, bytes only response, that can potentially be optimized on the network layer.
1922
*/
2023
public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage {
2124

2225
private final ReleasableBytesReference bytes;
26+
private final TransportVersion version;
2327

24-
public BytesTransportResponse(ReleasableBytesReference bytes) {
28+
public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) {
2529
this.bytes = bytes;
30+
this.version = Objects.requireNonNull(version);
31+
}
32+
33+
/**
34+
* Does the binary response need conversion before being sent to the provided target version?
35+
*/
36+
public boolean mustConvertResponseForVersion(TransportVersion targetVersion) {
37+
return version.equals(targetVersion) == false;
38+
}
39+
40+
/**
41+
* Returns a {@link StreamInput} configured to read the underlying bytes that this response holds.
42+
*/
43+
public StreamInput streamInput() throws IOException {
44+
StreamInput streamInput = bytes.streamInput();
45+
streamInput.setTransportVersion(version);
46+
return streamInput;
2647
}
2748

2849
@Override

server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
package org.elasticsearch.transport;
1010

1111
import org.elasticsearch.cluster.node.DiscoveryNode;
12+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1214
import org.elasticsearch.common.io.stream.StreamInput;
1315
import org.elasticsearch.common.io.stream.StreamOutput;
1416
import org.elasticsearch.common.io.stream.Writeable;
@@ -18,6 +20,7 @@
1820
import org.elasticsearch.tasks.TaskId;
1921

2022
import java.io.IOException;
23+
import java.io.UncheckedIOException;
2124
import java.util.Map;
2225
import java.util.concurrent.Executor;
2326
import java.util.function.Function;
@@ -36,15 +39,18 @@ private static class ProxyRequestHandler<T extends ProxyRequest<TransportRequest
3639
private final TransportService service;
3740
private final String action;
3841
private final Function<TransportRequest, Writeable.Reader<? extends TransportResponse>> responseFunction;
42+
private final NamedWriteableRegistry namedWriteableRegistry;
3943

4044
ProxyRequestHandler(
4145
TransportService service,
4246
String action,
43-
Function<TransportRequest, Writeable.Reader<? extends TransportResponse>> responseFunction
47+
Function<TransportRequest, Writeable.Reader<? extends TransportResponse>> responseFunction,
48+
NamedWriteableRegistry namedWriteableRegistry
4449
) {
4550
this.service = service;
4651
this.action = action;
4752
this.responseFunction = responseFunction;
53+
this.namedWriteableRegistry = namedWriteableRegistry;
4854
}
4955

5056
@Override
@@ -62,7 +68,28 @@ public Executor executor() {
6268

6369
@Override
6470
public void handleResponse(TransportResponse response) {
65-
channel.sendResponse(response);
71+
// This is a short term solution to ensure data node responses for batched search go back to the coordinating
72+
// node in the expected format when a proxy data node proxies the request to itself. The response would otherwise
73+
// be sent directly via DirectResponseChannel, skipping the read and write step that this handler normally performs.
74+
if (response instanceof BytesTransportResponse btr && btr.mustConvertResponseForVersion(channel.getVersion())) {
75+
try (
76+
NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(
77+
btr.streamInput(),
78+
namedWriteableRegistry
79+
)
80+
) {
81+
TransportResponse convertedResponse = responseFunction.apply(wrappedRequest).read(in);
82+
try {
83+
channel.sendResponse(convertedResponse);
84+
} finally {
85+
convertedResponse.decRef();
86+
}
87+
} catch (IOException e) {
88+
throw new UncheckedIOException(e);
89+
}
90+
} else {
91+
channel.sendResponse(response);
92+
}
6693
}
6794

6895
@Override
@@ -73,7 +100,7 @@ public void handleException(TransportException exp) {
73100
@Override
74101
public TransportResponse read(StreamInput in) throws IOException {
75102
if (in.getTransportVersion().equals(channel.getVersion()) && in.supportReadAllToReleasableBytesReference()) {
76-
return new BytesTransportResponse(in.readAllToReleasableBytesReference());
103+
return new BytesTransportResponse(in.readAllToReleasableBytesReference(), in.getTransportVersion());
77104
} else {
78105
return responseFunction.apply(wrappedRequest).read(in);
79106
}
@@ -144,7 +171,9 @@ public static void registerProxyActionWithDynamicResponseType(
144171
TransportService service,
145172
String action,
146173
boolean cancellable,
147-
Function<TransportRequest, Writeable.Reader<? extends TransportResponse>> responseFunction
174+
Function<TransportRequest, Writeable.Reader<? extends TransportResponse>> responseFunction,
175+
NamedWriteableRegistry namedWriteableRegistry
176+
148177
) {
149178
RequestHandlerRegistry<? extends TransportRequest> requestHandler = service.getRequestHandler(action);
150179
service.registerRequestHandler(
@@ -155,7 +184,7 @@ public static void registerProxyActionWithDynamicResponseType(
155184
in -> cancellable
156185
? new CancellableProxyRequest<>(in, requestHandler::newRequest)
157186
: new ProxyRequest<>(in, requestHandler::newRequest),
158-
new ProxyRequestHandler<>(service, action, responseFunction)
187+
new ProxyRequestHandler<>(service, action, responseFunction, namedWriteableRegistry)
159188
);
160189
}
161190

@@ -167,9 +196,10 @@ public static void registerProxyAction(
167196
TransportService service,
168197
String action,
169198
boolean cancellable,
170-
Writeable.Reader<? extends TransportResponse> reader
199+
Writeable.Reader<? extends TransportResponse> reader,
200+
NamedWriteableRegistry namedWriteableRegistry
171201
) {
172-
registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader);
202+
registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader, namedWriteableRegistry);
173203
}
174204

175205
private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/";

0 commit comments

Comments
 (0)