Skip to content

Commit 69f8e87

Browse files
committed
TransportClusterSearchShardsAction runs locally
This action only need cluster state and can be run any node. Make sure to keep necessary serialization and deserialization for BwC.
1 parent 81ff1a7 commit 69f8e87

File tree

6 files changed

+89
-30
lines changed

6 files changed

+89
-30
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,20 @@
99

1010
package org.elasticsearch.action.admin.cluster.shards;
1111

12-
import org.elasticsearch.action.ActionRequestValidationException;
1312
import org.elasticsearch.action.IndicesRequest;
1413
import org.elasticsearch.action.support.IndicesOptions;
15-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
14+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
15+
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
1616
import org.elasticsearch.common.Strings;
1717
import org.elasticsearch.common.io.stream.StreamInput;
18-
import org.elasticsearch.common.io.stream.StreamOutput;
1918
import org.elasticsearch.core.Nullable;
2019
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.core.UpdateForV10;
2121

2222
import java.io.IOException;
2323
import java.util.Objects;
2424

25-
public final class ClusterSearchShardsRequest extends MasterNodeReadRequest<ClusterSearchShardsRequest>
26-
implements
27-
IndicesRequest.Replaceable {
25+
public final class ClusterSearchShardsRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
2826

2927
private String[] indices = Strings.EMPTY_ARRAY;
3028
@Nullable
@@ -38,6 +36,11 @@ public ClusterSearchShardsRequest(TimeValue masterNodeTimeout, String... indices
3836
indices(indices);
3937
}
4038

39+
/**
40+
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
41+
* so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely.
42+
*/
43+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
4144
public ClusterSearchShardsRequest(StreamInput in) throws IOException {
4245
super(in);
4346
indices = in.readStringArray();
@@ -46,20 +49,6 @@ public ClusterSearchShardsRequest(StreamInput in) throws IOException {
4649
indicesOptions = IndicesOptions.readIndicesOptions(in);
4750
}
4851

49-
@Override
50-
public void writeTo(StreamOutput out) throws IOException {
51-
super.writeTo(out);
52-
out.writeStringArray(indices);
53-
out.writeOptionalString(routing);
54-
out.writeOptionalString(preference);
55-
indicesOptions.writeIndicesOptions(out);
56-
}
57-
58-
@Override
59-
public ActionRequestValidationException validate() {
60-
return null;
61-
}
62-
6352
/**
6453
* Sets the indices the search will be executed on.
6554
*/

server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.support.ActionFilters;
15+
import org.elasticsearch.action.support.ChannelActionListener;
16+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1517
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
1618
import org.elasticsearch.cluster.ClusterState;
1719
import org.elasticsearch.cluster.ProjectState;
@@ -25,6 +27,7 @@
2527
import org.elasticsearch.cluster.routing.SearchShardRouting;
2628
import org.elasticsearch.cluster.routing.ShardRouting;
2729
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.core.UpdateForV10;
2831
import org.elasticsearch.index.shard.ShardId;
2932
import org.elasticsearch.indices.IndicesService;
3033
import org.elasticsearch.injection.guice.Inject;
@@ -39,7 +42,7 @@
3942
import java.util.Map;
4043
import java.util.Set;
4144

42-
public class TransportClusterSearchShardsAction extends TransportMasterNodeReadAction<
45+
public class TransportClusterSearchShardsAction extends TransportLocalClusterStateAction<
4346
ClusterSearchShardsRequest,
4447
ClusterSearchShardsResponse> {
4548

@@ -49,6 +52,11 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadA
4952
private final ProjectResolver projectResolver;
5053
private final IndexNameExpressionResolver indexNameExpressionResolver;
5154

55+
/**
56+
* AP prior to 9.3 this was a {@link TransportMasterNodeReadAction} so for BwC it must be registered with the TransportService until
57+
* we no longer need to support calling this action remotely.
58+
*/
59+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
5260
@Inject
5361
public TransportClusterSearchShardsAction(
5462
TransportService transportService,
@@ -61,17 +69,23 @@ public TransportClusterSearchShardsAction(
6169
) {
6270
super(
6371
TYPE.name(),
64-
transportService,
65-
clusterService,
66-
threadPool,
6772
actionFilters,
68-
ClusterSearchShardsRequest::new,
69-
ClusterSearchShardsResponse::new,
73+
transportService.getTaskManager(),
74+
clusterService,
7075
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION)
7176
);
7277
this.indicesService = indicesService;
7378
this.projectResolver = projectResolver;
7479
this.indexNameExpressionResolver = indexNameExpressionResolver;
80+
81+
transportService.registerRequestHandler(
82+
actionName,
83+
executor,
84+
false,
85+
true,
86+
ClusterSearchShardsRequest::new,
87+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
88+
);
7589
}
7690

7791
@Override
@@ -86,7 +100,7 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C
86100
}
87101

88102
@Override
89-
protected void masterOperation(
103+
protected void localClusterStateOperation(
90104
Task task,
91105
final ClusterSearchShardsRequest request,
92106
final ClusterState state,

server/src/main/java/org/elasticsearch/action/search/SearchShardsResponse.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import org.elasticsearch.action.ResolvedIndexExpressions;
1515
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
1616
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
17+
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
18+
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
1719
import org.elasticsearch.cluster.node.DiscoveryNode;
1820
import org.elasticsearch.common.io.stream.StreamInput;
1921
import org.elasticsearch.common.io.stream.StreamOutput;
2022
import org.elasticsearch.common.util.Maps;
2123
import org.elasticsearch.core.Nullable;
24+
import org.elasticsearch.core.UpdateForV10;
2225
import org.elasticsearch.index.Index;
2326
import org.elasticsearch.index.shard.ShardId;
2427
import org.elasticsearch.search.internal.AliasFilter;
@@ -63,6 +66,11 @@ public SearchShardsResponse(
6366
this(groups, nodes, aliasFilters, null);
6467
}
6568

69+
/**
70+
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
71+
* so for BwC we must remain able to read these responses until we no longer need to support calling this action remotely.
72+
*/
73+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
6674
public SearchShardsResponse(StreamInput in) throws IOException {
6775
this.groups = in.readCollectionAsList(SearchShardsGroup::new);
6876
this.nodes = in.readCollectionAsList(DiscoveryNode::new);
@@ -74,6 +82,11 @@ public SearchShardsResponse(StreamInput in) throws IOException {
7482
}
7583
}
7684

85+
/**
86+
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
87+
* so for BwC we must remain able to write these responses until we no longer need to support calling this action remotely.
88+
*/
89+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
7790
@Override
7891
public void writeTo(StreamOutput out) throws IOException {
7992
out.writeCollection(groups);

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1160,7 +1160,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
11601160
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(
11611161
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
11621162
indices
1163-
).indicesOptions(searchShardsIdxOpts).local(true).preference(preference).routing(routing);
1163+
).indicesOptions(searchShardsIdxOpts).preference(preference).routing(routing);
11641164
transportService.sendRequest(
11651165
connection,
11661166
TransportClusterSearchShardsAction.TYPE.name(),

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
4848
RestUtils.getMasterNodeTimeout(request),
4949
Strings.splitStringByCommaToArray(request.param("index"))
5050
);
51-
clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local()));
5251
clusterSearchShardsRequest.routing(request.param("routing"));
5352
clusterSearchShardsRequest.preference(request.param("preference"));
5453
clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
54+
RestUtils.consumeDeprecatedLocalParameter(request);
5555
return channel -> client.execute(
5656
TransportClusterSearchShardsAction.TYPE,
5757
clusterSearchShardsRequest,

server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,28 @@
1010
package org.elasticsearch.action.admin.cluster.shards;
1111

1212
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
16+
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
1417
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1518
import org.elasticsearch.common.io.stream.StreamInput;
19+
import org.elasticsearch.common.io.stream.StreamOutput;
20+
import org.elasticsearch.common.io.stream.Writeable;
21+
import org.elasticsearch.core.UpdateForV10;
1622
import org.elasticsearch.test.ESTestCase;
1723
import org.elasticsearch.test.TransportVersionUtils;
1824

25+
import java.io.IOException;
26+
1927
public class ClusterSearchShardsRequestTests extends ESTestCase {
2028

29+
/**
30+
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
31+
* so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely.
32+
* This test method can be removed once we no longer need to keep serialization and deserialization around.
33+
*/
34+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
2135
public void testSerialization() throws Exception {
2236
ClusterSearchShardsRequest request = new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT);
2337
if (randomBoolean()) {
@@ -48,7 +62,7 @@ public void testSerialization() throws Exception {
4862
TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random());
4963
try (BytesStreamOutput out = new BytesStreamOutput()) {
5064
out.setTransportVersion(version);
51-
request.writeTo(out);
65+
new WriteToWrapper(request).writeTo(out);
5266
try (StreamInput in = out.bytes().streamInput()) {
5367
in.setTransportVersion(version);
5468
ClusterSearchShardsRequest deserialized = new ClusterSearchShardsRequest(in);
@@ -67,4 +81,33 @@ public void testIndicesMustNotBeNull() {
6781
expectThrows(NullPointerException.class, () -> request.indices((String) null));
6882
expectThrows(NullPointerException.class, () -> request.indices(new String[] { "index1", null, "index3" }));
6983
}
84+
85+
/**
86+
* AP prior to 9.3 {@link TransportClusterSearchShardsAction} was a {@link TransportMasterNodeReadAction}
87+
* so for BwC we must remain able to read these requests until we no longer need to support calling this action remotely.
88+
* This class preserves the {@link Writeable#writeTo(StreamOutput)} functionality of the request so that
89+
* the necessary deserialization can be tested.
90+
*/
91+
private static class WriteToWrapper extends MasterNodeReadRequest<WriteToWrapper> {
92+
private final ClusterSearchShardsRequest request;
93+
94+
WriteToWrapper(ClusterSearchShardsRequest request) {
95+
super(request.masterTimeout());
96+
this.request = request;
97+
}
98+
99+
@Override
100+
public ActionRequestValidationException validate() {
101+
return null;
102+
}
103+
104+
@Override
105+
public void writeTo(StreamOutput out) throws IOException {
106+
super.writeTo(out);
107+
out.writeStringArray(request.indices());
108+
out.writeOptionalString(request.routing());
109+
out.writeOptionalString(request.preference());
110+
request.indicesOptions().writeIndicesOptions(out);
111+
}
112+
}
70113
}

0 commit comments

Comments
 (0)