diff --git a/docs/changelog/135873.yaml b/docs/changelog/135873.yaml new file mode 100644 index 0000000000000..11311425296d7 --- /dev/null +++ b/docs/changelog/135873.yaml @@ -0,0 +1,5 @@ +pr: 135873 +summary: Convert `BytesTransportResponse` when proxying response from/to local node +area: "Network" +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a8f22eb1cc572..4aced264baa61 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -607,7 +607,13 @@ static void registerNodeSearchAction( } } ); - TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + NODE_SEARCH_ACTION_NAME, + true, + NodeQueryResponse::new, + namedWriteableRegistry + ); } private static void releaseLocalContext( @@ -845,7 +851,10 @@ void onShardDone() { out.close(); } } - ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference())); + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); } private void maybeFreeContext( diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1e0fa28889c97..fc885f7562a80 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -384,7 +385,11 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static void registerRequestHandler(TransportService transportService, SearchService searchService) { + public static void registerRequestHandler( + TransportService transportService, + SearchService searchService, + NamedWriteableRegistry namedWriteableRegistry + ) { final TransportRequestHandler freeContextHandler = (request, channel, task) -> { logger.trace("releasing search context [{}]", request.id()); boolean freed = searchService.freeReaderContext(request.id()); @@ -401,7 +406,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, false, - SearchFreeContextResponse::readFrom + SearchFreeContextResponse::readFrom, + namedWriteableRegistry ); // TODO: remove this handler once the lowest compatible version stops using it @@ -411,7 +417,13 @@ public static void registerRequestHandler(TransportService transportService, Sea OriginalIndices.readOriginalIndices(in); return res; }, freeContextHandler); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom); + TransportActionProxy.registerProxyAction( + transportService, + FREE_CONTEXT_ACTION_NAME, + false, + SearchFreeContextResponse::readFrom, + namedWriteableRegistry + ); transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, @@ -426,7 +438,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, false, - (in) -> ActionResponse.Empty.INSTANCE + (in) -> ActionResponse.Empty.INSTANCE, + namedWriteableRegistry ); transportService.registerRequestHandler( @@ -435,7 +448,7 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)) ); - TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new); + TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new, namedWriteableRegistry); transportService.registerRequestHandler( QUERY_ACTION_NAME, @@ -451,7 +464,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, QUERY_ACTION_NAME, true, - (request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new + (request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new, + namedWriteableRegistry ); transportService.registerRequestHandler( @@ -465,7 +479,13 @@ public static void registerRequestHandler(TransportService transportService, Sea channel.getVersion() ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_ID_ACTION_NAME, + true, + QuerySearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_SCROLL_ACTION_NAME, @@ -478,7 +498,13 @@ public static void registerRequestHandler(TransportService transportService, Sea channel.getVersion() ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_SCROLL_ACTION_NAME, + true, + ScrollQuerySearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_FETCH_SCROLL_ACTION_NAME, @@ -490,7 +516,13 @@ public static void registerRequestHandler(TransportService transportService, Sea new ChannelActionListener<>(channel) ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_FETCH_SCROLL_ACTION_NAME, + true, + ScrollQueryFetchSearchResult::new, + namedWriteableRegistry + ); final TransportRequestHandler rankShardFeatureRequest = (request, channel, task) -> searchService .executeRankFeaturePhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); @@ -500,7 +532,13 @@ public static void registerRequestHandler(TransportService transportService, Sea RankFeatureShardRequest::new, rankShardFeatureRequest ); - TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new); + TransportActionProxy.registerProxyAction( + transportService, + RANK_FEATURE_SHARD_ACTION_NAME, + true, + RankFeatureResult::new, + namedWriteableRegistry + ); final TransportRequestHandler shardFetchRequestHandler = (request, channel, task) -> searchService .executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); @@ -510,7 +548,13 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardFetchRequest::new, shardFetchRequestHandler ); - TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + FETCH_ID_SCROLL_ACTION_NAME, + true, + FetchSearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( FETCH_ID_ACTION_NAME, @@ -520,7 +564,13 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardFetchSearchRequest::new, shardFetchRequestHandler ); - TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + FETCH_ID_ACTION_NAME, + true, + FetchSearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_CAN_MATCH_NODE_NAME, @@ -528,7 +578,13 @@ public static void registerRequestHandler(TransportService transportService, Sea CanMatchNodeRequest::new, (request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel)) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_CAN_MATCH_NODE_NAME, + true, + CanMatchNodeResponse::new, + namedWriteableRegistry + ); } private static Executor buildFreeContextExecutor(TransportService transportService) { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 1038308bc6bf3..f9ae44c473014 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -94,7 +94,13 @@ public TransportOpenPointInTimeAction( ShardOpenReaderRequest::new, new ShardOpenReaderRequestHandler() ); - TransportActionProxy.registerProxyAction(transportService, OPEN_SHARD_READER_CONTEXT_NAME, false, ShardOpenReaderResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + OPEN_SHARD_READER_CONTEXT_NAME, + false, + ShardOpenReaderResponse::new, + namedWriteableRegistry + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index cd8c561a4ad75..3385f53c9ed51 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -196,7 +196,7 @@ public TransportSearchAction( this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.remoteClusterService = searchTransportService.getRemoteClusterService(); - SearchTransportService.registerRequestHandler(transportService, searchService); + SearchTransportService.registerRequestHandler(transportService, searchService, namedWriteableRegistry); SearchQueryThenFetchAsyncAction.registerNodeSearchAction( searchTransportService, searchService, diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java index 571d0d4008e24..528199b335373 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java @@ -9,10 +9,13 @@ package org.elasticsearch.transport; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Objects; /** * A specialized, bytes only response, that can potentially be optimized on the network layer. @@ -20,9 +23,27 @@ public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage { private final ReleasableBytesReference bytes; + private final TransportVersion version; - public BytesTransportResponse(ReleasableBytesReference bytes) { + public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) { this.bytes = bytes; + this.version = Objects.requireNonNull(version); + } + + /** + * Does the binary response need conversion before being sent to the provided target version? + */ + public boolean mustConvertResponseForVersion(TransportVersion targetVersion) { + return version.equals(targetVersion) == false; + } + + /** + * Returns a {@link StreamInput} configured to read the underlying bytes that this response holds. + */ + public StreamInput streamInput() throws IOException { + StreamInput streamInput = bytes.streamInput(); + streamInput.setTransportVersion(version); + return streamInput; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index bc5dab2074a6e..d53ab209c3960 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -9,6 +9,8 @@ package org.elasticsearch.transport; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -18,6 +20,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; @@ -36,15 +39,18 @@ private static class ProxyRequestHandler> responseFunction; + private final NamedWriteableRegistry namedWriteableRegistry; ProxyRequestHandler( TransportService service, String action, - Function> responseFunction + Function> responseFunction, + NamedWriteableRegistry namedWriteableRegistry ) { this.service = service; this.action = action; this.responseFunction = responseFunction; + this.namedWriteableRegistry = namedWriteableRegistry; } @Override @@ -62,7 +68,28 @@ public Executor executor() { @Override public void handleResponse(TransportResponse response) { - channel.sendResponse(response); + // This is a short term solution to ensure data node responses for batched search go back to the coordinating + // node in the expected format when a proxy data node proxies the request to itself. The response would otherwise + // be sent directly via DirectResponseChannel, skipping the read and write step that this handler normally performs. + if (response instanceof BytesTransportResponse btr && btr.mustConvertResponseForVersion(channel.getVersion())) { + try ( + NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( + btr.streamInput(), + namedWriteableRegistry + ) + ) { + TransportResponse convertedResponse = responseFunction.apply(wrappedRequest).read(in); + try { + channel.sendResponse(convertedResponse); + } finally { + convertedResponse.decRef(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + channel.sendResponse(response); + } } @Override @@ -73,7 +100,7 @@ public void handleException(TransportException exp) { @Override public TransportResponse read(StreamInput in) throws IOException { if (in.getTransportVersion().equals(channel.getVersion()) && in.supportReadAllToReleasableBytesReference()) { - return new BytesTransportResponse(in.readAllToReleasableBytesReference()); + return new BytesTransportResponse(in.readAllToReleasableBytesReference(), in.getTransportVersion()); } else { return responseFunction.apply(wrappedRequest).read(in); } @@ -144,7 +171,9 @@ public static void registerProxyActionWithDynamicResponseType( TransportService service, String action, boolean cancellable, - Function> responseFunction + Function> responseFunction, + NamedWriteableRegistry namedWriteableRegistry + ) { RequestHandlerRegistry requestHandler = service.getRequestHandler(action); service.registerRequestHandler( @@ -155,7 +184,7 @@ public static void registerProxyActionWithDynamicResponseType( in -> cancellable ? new CancellableProxyRequest<>(in, requestHandler::newRequest) : new ProxyRequest<>(in, requestHandler::newRequest), - new ProxyRequestHandler<>(service, action, responseFunction) + new ProxyRequestHandler<>(service, action, responseFunction, namedWriteableRegistry) ); } @@ -167,9 +196,10 @@ public static void registerProxyAction( TransportService service, String action, boolean cancellable, - Writeable.Reader reader + Writeable.Reader reader, + NamedWriteableRegistry namedWriteableRegistry ) { - registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader); + registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader, namedWriteableRegistry); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index e1a525cab3f52..c4146634f10c6 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -14,6 +14,8 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -110,6 +112,7 @@ private MockTransportService buildService(VersionInformation version, TransportV } public void testSendMessage() throws InterruptedException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); serviceA.registerRequestHandler( "internal:test", EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -123,7 +126,7 @@ public void testSendMessage() throws InterruptedException { } ); final boolean cancellable = randomBoolean(); - TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); serviceB.registerRequestHandler( @@ -139,7 +142,7 @@ public void testSendMessage() throws InterruptedException { assertThat(response.hasReferences(), equalTo(false)); } ); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC); serviceC.registerRequestHandler( "internal:test", @@ -155,7 +158,7 @@ public void testSendMessage() throws InterruptedException { } ); - TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); // Node A -> Node B -> Node C: different versions - serialize the response { final List responses = Collections.synchronizedList(new ArrayList<>()); @@ -277,7 +280,165 @@ public void testSendLocalRequest() throws Exception { latch.countDown(); } }); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction( + serviceB, + "internal:test", + cancellable, + // For a proxy node proxying to itself, the response is sent directly, without it being read by the proxy layer + r -> { throw new AssertionError(); }, + new NamedWriteableRegistry(Collections.emptyList()) + ); + AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); + + // Node A -> Proxy Node B (Local execution) + serviceA.sendRequest( + nodeB, + TransportActionProxy.getProxyAction("internal:test"), + TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request + new TransportResponseHandler() { + @Override + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); + } + + @Override + public Executor executor() { + return TransportResponseHandler.TRANSPORT_WORKER; + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_B", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + + final var responseInstance = response.get(); + assertThat(responseInstance, notNullValue()); + responseInstance.decRef(); + assertBusy(() -> assertThat(responseInstance.hasReferences(), equalTo(false))); + } + + public void testSendLocalRequestBytesTransportResponseSameVersion() throws Exception { + final AtomicReference response = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(2); + + final boolean cancellable = randomBoolean(); + serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> { + try { + assertThat(task instanceof CancellableTask, equalTo(cancellable)); + assertEquals(request.sourceNode, "TS_A"); + + SimpleTestResponse tsB = new SimpleTestResponse("TS_B"); + try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) { + out.setTransportVersion(transportVersion1); + tsB.writeTo(out); + // simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse + final BytesTransportResponse responseB = new BytesTransportResponse(out.moveToBytesReference(), transportVersion1); + channel.sendResponse(responseB); + response.set(responseB); + } finally { + tsB.decRef(); + } + } finally { + latch.countDown(); + } + }); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, in -> { + throw new AssertionError("read should not be called for local proxying when versions align"); + }, new NamedWriteableRegistry(Collections.emptyList())); + AbstractSimpleTransportTestCase.connectToNode(serviceC, nodeB); + + // Node C -> Proxy Node B (Local execution) + serviceC.sendRequest( + nodeB, + TransportActionProxy.getProxyAction("internal:test"), + TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request + new TransportResponseHandler() { + @Override + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); + } + + @Override + public Executor executor() { + return TransportResponseHandler.TRANSPORT_WORKER; + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_B", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + + final var responseInstance = response.get(); + assertThat(responseInstance, notNullValue()); + responseInstance.decRef(); + assertBusy(() -> assertThat(responseInstance.hasReferences(), equalTo(false))); + } + + public void testSendLocalRequestBytesTransportResponseDifferentVersions() throws Exception { + final AtomicReference response = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(2); + + final boolean cancellable = randomBoolean(); + serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> { + try { + assertThat(task instanceof CancellableTask, equalTo(cancellable)); + assertEquals(request.sourceNode, "TS_A"); + + SimpleTestResponse tsB = new SimpleTestResponse("TS_B"); + try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) { + out.setTransportVersion(transportVersion1); + tsB.writeTo(out); + // simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse + final BytesTransportResponse responseB = new BytesTransportResponse(out.moveToBytesReference(), transportVersion1); + channel.sendResponse(responseB); + response.set(responseB); + } finally { + tsB.decRef(); + } + } finally { + latch.countDown(); + } + }); + TransportActionProxy.registerProxyAction( + serviceB, + "internal:test", + cancellable, + // this is called by the conversion layer in ProxyRequestHandler + SimpleTestResponse::new, + new NamedWriteableRegistry(Collections.emptyList()) + ); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); // Node A -> Proxy Node B (Local execution) @@ -324,6 +485,7 @@ public void handleException(TransportException exp) { } public void testException() throws InterruptedException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); boolean cancellable = randomBoolean(); serviceA.registerRequestHandler( "internal:test", @@ -335,7 +497,7 @@ public void testException() throws InterruptedException { channel.sendResponse(response); } ); - TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); serviceB.registerRequestHandler( @@ -348,7 +510,7 @@ public void testException() throws InterruptedException { channel.sendResponse(response); } ); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC); serviceC.registerRequestHandler( "internal:test", @@ -358,7 +520,7 @@ public void testException() throws InterruptedException { throw new ElasticsearchException("greetings from TS_C"); } ); - TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); CountDownLatch latch = new CountDownLatch(1); serviceA.sendRequest( @@ -450,11 +612,17 @@ protected void closeInternal() {} SimpleTestResponse(StreamInput in) throws IOException { this.targetNode = in.readString(); + if (in.getTransportVersion().supports(transportVersion1)) { + in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(targetNode); + if (out.getTransportVersion().supports(transportVersion1)) { + out.writeBoolean(true); + } } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 66210d43f2f7a..e586ed8679062 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -51,7 +52,8 @@ private TransportDeleteCcrRestoreSessionAction( String actionName, ActionFilters actionFilters, TransportService transportService, - CcrRestoreSourceService ccrRestoreService + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry ) { super( actionName, @@ -60,7 +62,13 @@ private TransportDeleteCcrRestoreSessionAction( ClearCcrRestoreSessionRequest::new, transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) ); - TransportActionProxy.registerProxyAction(transportService, actionName, false, in -> ActionResponse.Empty.INSTANCE); + TransportActionProxy.registerProxyAction( + transportService, + actionName, + false, + in -> ActionResponse.Empty.INSTANCE, + namedWriteableRegistry + ); this.ccrRestoreService = ccrRestoreService; } @@ -80,16 +88,22 @@ public static class InternalTransportAction extends TransportDeleteCcrRestoreSes public InternalTransportAction( ActionFilters actionFilters, TransportService transportService, - CcrRestoreSourceService ccrRestoreService + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService); + super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService, namedWriteableRegistry); } } public static class TransportAction extends TransportDeleteCcrRestoreSessionAction { @Inject - public TransportAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, actionFilters, transportService, ccrRestoreService); + public TransportAction( + ActionFilters actionFilters, + TransportService transportService, + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry + ) { + super(NAME, actionFilters, transportService, ccrRestoreService, namedWriteableRegistry); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index 1fb4fb6f0fb72..035eb88fb11b8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; @@ -64,7 +65,8 @@ private TransportGetCcrRestoreFileChunkAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { super( actionName, @@ -73,7 +75,13 @@ private TransportGetCcrRestoreFileChunkAction( GetCcrRestoreFileChunkRequest::new, transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) ); - TransportActionProxy.registerProxyAction(transportService, actionName, false, GetCcrRestoreFileChunkResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + actionName, + false, + GetCcrRestoreFileChunkResponse::new, + namedWriteableRegistry + ); this.restoreSourceService = restoreSourceService; this.bigArrays = bigArrays; } @@ -111,9 +119,10 @@ public InternalTransportAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService); + super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService, namedWriteableRegistry); } } @@ -123,9 +132,10 @@ public TransportAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(NAME, bigArrays, transportService, actionFilters, restoreSourceService); + super(NAME, bigArrays, transportService, actionFilters, restoreSourceService, namedWriteableRegistry); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java index 540a6a8f7bcb5..f2bdda160bbf6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -23,6 +24,8 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -54,21 +57,29 @@ public void testPrivilegeForActions() { } public void testActionNames() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); - final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + final var action = new ClearCcrRestoreSessionAction.TransportAction( + actionFilters, + transportService, + ccrRestoreSourceService, + namedWriteableRegistry + ); assertThat(action.actionName, equalTo(ClearCcrRestoreSessionAction.NAME)); final var internalAction = new ClearCcrRestoreSessionAction.InternalTransportAction( actionFilters, transportService, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(internalAction.actionName, equalTo(ClearCcrRestoreSessionAction.INTERNAL_NAME)); } public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); @@ -87,7 +98,12 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { } }).when(ccrRestoreSourceService).ensureSessionShardIdConsistency(anyString(), any()); - final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + final var action = new ClearCcrRestoreSessionAction.TransportAction( + actionFilters, + transportService, + ccrRestoreSourceService, + namedWriteableRegistry + ); final String sessionUUID = UUIDs.randomBase64UUID(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java index 61866dbf2029f..1b938b9150c6f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -27,6 +28,8 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -58,6 +61,7 @@ public void testPrivilegeForActions() { } public void testActionNames() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final BigArrays bigArrays = mock(BigArrays.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); @@ -67,7 +71,8 @@ public void testActionNames() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(action.actionName, equalTo(GetCcrRestoreFileChunkAction.NAME)); @@ -75,12 +80,14 @@ public void testActionNames() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(internalAction.actionName, equalTo(GetCcrRestoreFileChunkAction.INTERNAL_NAME)); } public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), ByteSizeValue.ofBytes(1024)); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); @@ -108,7 +115,8 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); final String expectedFileName = randomAlphaOfLengthBetween(3, 12);