diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java index 5329128dfdeba..3ddcd795bfc52 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java @@ -9,22 +9,20 @@ package org.elasticsearch.action.admin.cluster.shards; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; import java.io.IOException; import java.util.Objects; -public final class ClusterSearchShardsRequest extends MasterNodeReadRequest - implements - IndicesRequest.Replaceable { +public final class ClusterSearchShardsRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable { private String[] indices = Strings.EMPTY_ARRAY; @Nullable @@ -38,6 +36,11 @@ public ClusterSearchShardsRequest(TimeValue masterNodeTimeout, String... indices indices(indices); } + /** + * AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction} + * so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION) public ClusterSearchShardsRequest(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); @@ -46,20 +49,6 @@ public ClusterSearchShardsRequest(StreamInput in) throws IOException { indicesOptions = IndicesOptions.readIndicesOptions(in); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArray(indices); - out.writeOptionalString(routing); - out.writeOptionalString(preference); - indicesOptions.writeIndicesOptions(out); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - /** * Sets the indices the search will be executed on. */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index 78e6b5da9230c..110c381148b65 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -12,6 +12,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalClusterStateAction; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ProjectState; @@ -25,6 +27,7 @@ import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; @@ -39,7 +42,7 @@ import java.util.Map; import java.util.Set; -public class TransportClusterSearchShardsAction extends TransportMasterNodeReadAction< +public class TransportClusterSearchShardsAction extends TransportLocalClusterStateAction< ClusterSearchShardsRequest, ClusterSearchShardsResponse> { @@ -49,6 +52,12 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadA private final ProjectResolver projectResolver; private final IndexNameExpressionResolver indexNameExpressionResolver; + /** + * AP prior to 9.3 this was a {@link TransportMasterNodeReadAction} so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION) + @SuppressWarnings("this-escape") @Inject public TransportClusterSearchShardsAction( TransportService transportService, @@ -61,17 +70,23 @@ public TransportClusterSearchShardsAction( ) { super( TYPE.name(), - transportService, - clusterService, - threadPool, actionFilters, - ClusterSearchShardsRequest::new, - ClusterSearchShardsResponse::new, + transportService.getTaskManager(), + clusterService, threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) ); this.indicesService = indicesService; this.projectResolver = projectResolver; this.indexNameExpressionResolver = indexNameExpressionResolver; + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + ClusterSearchShardsRequest::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override @@ -86,7 +101,7 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, final ClusterSearchShardsRequest request, final ClusterState state, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardsResponse.java index 5425be4e6229c..76263ff6cf6d3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardsResponse.java @@ -14,11 +14,14 @@ import org.elasticsearch.action.ResolvedIndexExpressions; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.internal.AliasFilter; @@ -63,6 +66,11 @@ public SearchShardsResponse( this(groups, nodes, aliasFilters, null); } + /** + * AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction} + * so for BwC we must remain able to read these responses until we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION) public SearchShardsResponse(StreamInput in) throws IOException { this.groups = in.readCollectionAsList(SearchShardsGroup::new); this.nodes = in.readCollectionAsList(DiscoveryNode::new); @@ -74,6 +82,11 @@ public SearchShardsResponse(StreamInput in) throws IOException { } } + /** + * AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction} + * so for BwC we must remain able to write these responses until we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(groups); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 077c45eeafc70..9d051d1a18beb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1160,7 +1160,7 @@ Map createFinalResponse() { ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest( MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, indices - ).indicesOptions(searchShardsIdxOpts).local(true).preference(preference).routing(routing); + ).indicesOptions(searchShardsIdxOpts).preference(preference).routing(routing); transportService.sendRequest( connection, TransportClusterSearchShardsAction.TYPE.name(), diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java index 4b2fc41c0ccba..35e4c9773cd1d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java @@ -48,10 +48,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC RestUtils.getMasterNodeTimeout(request), Strings.splitStringByCommaToArray(request.param("index")) ); - clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local())); clusterSearchShardsRequest.routing(request.param("routing")); clusterSearchShardsRequest.preference(request.param("preference")); clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions())); + RestUtils.consumeDeprecatedLocalParameter(request); return channel -> client.execute( TransportClusterSearchShardsAction.TYPE, clusterSearchShardsRequest, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java index d4a3d29ea68f2..d18cd4a80af4d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java @@ -10,14 +10,28 @@ package org.elasticsearch.action.admin.cluster.shards; import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; +import java.io.IOException; + public class ClusterSearchShardsRequestTests extends ESTestCase { + /** + * AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction} + * so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely. + * This test method can be removed once we no longer need to keep serialization and deserialization around. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION) public void testSerialization() throws Exception { ClusterSearchShardsRequest request = new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT); if (randomBoolean()) { @@ -48,7 +62,7 @@ public void testSerialization() throws Exception { TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random()); try (BytesStreamOutput out = new BytesStreamOutput()) { out.setTransportVersion(version); - request.writeTo(out); + new WriteToWrapper(request).writeTo(out); try (StreamInput in = out.bytes().streamInput()) { in.setTransportVersion(version); ClusterSearchShardsRequest deserialized = new ClusterSearchShardsRequest(in); @@ -67,4 +81,33 @@ public void testIndicesMustNotBeNull() { expectThrows(NullPointerException.class, () -> request.indices((String) null)); expectThrows(NullPointerException.class, () -> request.indices(new String[] { "index1", null, "index3" })); } + + /** + * AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction} + * so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely. + * This class preserves the {@link Writeable#writeTo(StreamOutput)} functionality of the request so that + * the necessary deserialization can be tested. + */ + private static class WriteToWrapper extends MasterNodeReadRequest { + private final ClusterSearchShardsRequest request; + + WriteToWrapper(ClusterSearchShardsRequest request) { + super(request.masterTimeout()); + this.request = request; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(request.indices()); + out.writeOptionalString(request.routing()); + out.writeOptionalString(request.preference()); + request.indicesOptions().writeIndicesOptions(out); + } + } }