Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@

package org.elasticsearch.action.admin.cluster.shards;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;

import java.io.IOException;
import java.util.Objects;

public final class ClusterSearchShardsRequest extends MasterNodeReadRequest<ClusterSearchShardsRequest>
implements
IndicesRequest.Replaceable {
public final class ClusterSearchShardsRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable {

private String[] indices = Strings.EMPTY_ARRAY;
@Nullable
Expand All @@ -38,6 +36,11 @@ public ClusterSearchShardsRequest(TimeValue masterNodeTimeout, String... indices
indices(indices);
}

/**
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
* so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
public ClusterSearchShardsRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
Expand All @@ -46,20 +49,6 @@ public ClusterSearchShardsRequest(StreamInput in) throws IOException {
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
indicesOptions.writeIndicesOptions(out);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* Sets the indices the search will be executed on.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProjectState;
Expand All @@ -25,6 +27,7 @@
import org.elasticsearch.cluster.routing.SearchShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
Expand All @@ -39,7 +42,7 @@
import java.util.Map;
import java.util.Set;

public class TransportClusterSearchShardsAction extends TransportMasterNodeReadAction<
public class TransportClusterSearchShardsAction extends TransportLocalClusterStateAction<
ClusterSearchShardsRequest,
ClusterSearchShardsResponse> {

Expand All @@ -49,6 +52,12 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadA
private final ProjectResolver projectResolver;
private final IndexNameExpressionResolver indexNameExpressionResolver;

/**
* AP prior to 9.3 this was a {@link TransportMasterNodeReadAction} so for BwC it must be registered with the TransportService until
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for the AP - we can use git annotate to find you if we need to :)

* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
@SuppressWarnings("this-escape")
@Inject
public TransportClusterSearchShardsAction(
TransportService transportService,
Expand All @@ -61,17 +70,23 @@ public TransportClusterSearchShardsAction(
) {
super(
TYPE.name(),
transportService,
clusterService,
threadPool,
actionFilters,
ClusterSearchShardsRequest::new,
ClusterSearchShardsResponse::new,
transportService.getTaskManager(),
clusterService,
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION)
);
this.indicesService = indicesService;
this.projectResolver = projectResolver;
this.indexNameExpressionResolver = indexNameExpressionResolver;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
ClusterSearchShardsRequest::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
Expand All @@ -86,7 +101,7 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
final ClusterSearchShardsRequest request,
final ClusterState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -63,6 +66,11 @@ public SearchShardsResponse(
this(groups, nodes, aliasFilters, null);
}

/**
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
* so for BwC we must remain able to read these responses until we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
public SearchShardsResponse(StreamInput in) throws IOException {
this.groups = in.readCollectionAsList(SearchShardsGroup::new);
this.nodes = in.readCollectionAsList(DiscoveryNode::new);
Expand All @@ -74,6 +82,11 @@ public SearchShardsResponse(StreamInput in) throws IOException {
}
}

/**
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
* so for BwC we must remain able to write these responses until we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(groups);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
indices
).indicesOptions(searchShardsIdxOpts).local(true).preference(preference).routing(routing);
).indicesOptions(searchShardsIdxOpts).preference(preference).routing(routing);
transportService.sendRequest(
connection,
TransportClusterSearchShardsAction.TYPE.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
RestUtils.getMasterNodeTimeout(request),
Strings.splitStringByCommaToArray(request.param("index"))
);
clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local()));
clusterSearchShardsRequest.routing(request.param("routing"));
clusterSearchShardsRequest.preference(request.param("preference"));
clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
RestUtils.consumeDeprecatedLocalParameter(request);
return channel -> client.execute(
TransportClusterSearchShardsAction.TYPE,
clusterSearchShardsRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,28 @@
package org.elasticsearch.action.admin.cluster.shards;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;

import java.io.IOException;

public class ClusterSearchShardsRequestTests extends ESTestCase {

/**
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
* so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely.
* This test method can be removed once we no longer need to keep serialization and deserialization around.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
public void testSerialization() throws Exception {
ClusterSearchShardsRequest request = new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT);
if (randomBoolean()) {
Expand Down Expand Up @@ -48,7 +62,7 @@ public void testSerialization() throws Exception {
TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setTransportVersion(version);
request.writeTo(out);
new WriteToWrapper(request).writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
in.setTransportVersion(version);
ClusterSearchShardsRequest deserialized = new ClusterSearchShardsRequest(in);
Expand All @@ -67,4 +81,33 @@ public void testIndicesMustNotBeNull() {
expectThrows(NullPointerException.class, () -> request.indices((String) null));
expectThrows(NullPointerException.class, () -> request.indices(new String[] { "index1", null, "index3" }));
}

/**
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
* so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely.
* This class preserves the {@link Writeable#writeTo(StreamOutput)} functionality of the request so that
* the necessary deserialization can be tested.
*/
private static class WriteToWrapper extends MasterNodeReadRequest<WriteToWrapper> {
private final ClusterSearchShardsRequest request;

WriteToWrapper(ClusterSearchShardsRequest request) {
super(request.masterTimeout());
this.request = request;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(request.indices());
out.writeOptionalString(request.routing());
out.writeOptionalString(request.preference());
request.indicesOptions().writeIndicesOptions(out);
}
}
}