Skip to content

Commit 9ff998e

Browse files
committed
PR feedback
1 parent 875a381 commit 9ff998e

File tree

16 files changed

+275
-133
lines changed

16 files changed

+275
-133
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ public void testNoMasterActions() throws Exception {
8282
internalCluster().setDisruptionScheme(disruptionScheme);
8383
disruptionScheme.startDisrupting();
8484

85-
final String masterLessNode = internalCluster().getRandomNodeName();
86-
final Client clientToMasterlessNode = client(masterLessNode);
85+
final String masterlessNode = internalCluster().getRandomNodeName();
86+
final Client clientToMasterlessNode = client(masterlessNode);
8787

88-
awaitClusterState(masterLessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
88+
awaitClusterState(masterlessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
8989

9090
assertRequestBuilderThrows(
9191
clientToMasterlessNode.prepareGet("test", "1"),
@@ -239,10 +239,10 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception {
239239
internalCluster().setDisruptionScheme(disruptionScheme);
240240
disruptionScheme.startDisrupting();
241241

242-
final String masterLessNode = internalCluster().getRandomNodeName();
243-
final Client clientToMasterlessNode = client(masterLessNode);
242+
final String masterlessNode = internalCluster().getRandomNodeName();
243+
final Client clientToMasterlessNode = client(masterlessNode);
244244

245-
awaitClusterState(masterLessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
245+
awaitClusterState(masterlessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
246246

247247
GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "1").get();
248248
assertExists(getResponse);

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@
1414
import org.elasticsearch.action.support.IndicesOptions;
1515
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1616
import org.elasticsearch.common.Strings;
17-
import org.elasticsearch.common.io.stream.StreamInput;
1817
import org.elasticsearch.core.TimeValue;
1918
import org.elasticsearch.tasks.CancellableTask;
2019
import org.elasticsearch.tasks.Task;
2120
import org.elasticsearch.tasks.TaskId;
2221

23-
import java.io.IOException;
2422
import java.util.Arrays;
2523
import java.util.Map;
2624

25+
/**
26+
* A local-only request for obtaining (parts of) the cluster state. {@link RemoteClusterStateRequest} can be used for obtaining cluster
27+
* states from remote clusters.
28+
*/
2729
public class ClusterStateRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
2830

2931
public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1);
@@ -42,22 +44,6 @@ public ClusterStateRequest(TimeValue masterNodeTimeout) {
4244
super(masterNodeTimeout);
4345
}
4446

45-
/**
46-
* Even though this request is only executed on the local node, we still need to be able to serialize it for cross-cluster requests.
47-
*/
48-
ClusterStateRequest(StreamInput in) throws IOException {
49-
super(in);
50-
routingTable = in.readBoolean();
51-
nodes = in.readBoolean();
52-
metadata = in.readBoolean();
53-
blocks = in.readBoolean();
54-
customs = in.readBoolean();
55-
indices = in.readStringArray();
56-
indicesOptions = IndicesOptions.readIndicesOptions(in);
57-
waitForTimeout = in.readTimeValue();
58-
waitForMetadataVersion = in.readOptionalLong();
59-
}
60-
6147
@Override
6248
public ActionRequestValidationException validate() {
6349
return null;
@@ -200,6 +186,7 @@ public String getDescription() {
200186
if (customs) {
201187
stringBuilder.append("customs, ");
202188
}
189+
stringBuilder.append("local, ");
203190
if (waitForMetadataVersion != null) {
204191
stringBuilder.append("wait for metadata version [")
205192
.append(waitForMetadataVersion)

server/src/main/java/org/elasticsearch/action/admin/cluster/state/RemoteClusterStateRequest.java

Lines changed: 182 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,56 +9,187 @@
99

1010
package org.elasticsearch.action.admin.cluster.state;
1111

12-
import org.elasticsearch.TransportVersions;
13-
import org.elasticsearch.action.ActionRequest;
1412
import org.elasticsearch.action.ActionRequestValidationException;
13+
import org.elasticsearch.action.IndicesRequest;
14+
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
16+
import org.elasticsearch.common.Strings;
1517
import org.elasticsearch.common.io.stream.StreamInput;
1618
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.core.TimeValue;
1720
import org.elasticsearch.tasks.CancellableTask;
1821
import org.elasticsearch.tasks.Task;
1922
import org.elasticsearch.tasks.TaskId;
2023

2124
import java.io.IOException;
25+
import java.util.Arrays;
2226
import java.util.Map;
2327

2428
/**
2529
* A remote-only version of {@link ClusterStateRequest} that should be used for cross-cluster requests.
2630
* It simply exists to handle incoming remote requests and forward them to the local transport action.
2731
*/
28-
public class RemoteClusterStateRequest extends ActionRequest {
32+
public class RemoteClusterStateRequest extends MasterNodeReadRequest<RemoteClusterStateRequest> implements IndicesRequest.Replaceable {
2933

30-
private final ClusterStateRequest clusterStateRequest;
34+
private boolean routingTable = true;
35+
private boolean nodes = true;
36+
private boolean metadata = true;
37+
private boolean blocks = true;
38+
private boolean customs = true;
39+
private Long waitForMetadataVersion;
40+
private TimeValue waitForTimeout = ClusterStateRequest.DEFAULT_WAIT_FOR_NODE_TIMEOUT;
41+
private String[] indices = Strings.EMPTY_ARRAY;
42+
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
3143

32-
public RemoteClusterStateRequest(ClusterStateRequest clusterStateRequest) {
33-
this.clusterStateRequest = clusterStateRequest;
44+
public RemoteClusterStateRequest(TimeValue masterNodeTimeout) {
45+
super(masterNodeTimeout);
3446
}
3547

3648
public RemoteClusterStateRequest(StreamInput in) throws IOException {
37-
this.clusterStateRequest = new ClusterStateRequest(in);
49+
super(in);
50+
routingTable = in.readBoolean();
51+
nodes = in.readBoolean();
52+
metadata = in.readBoolean();
53+
blocks = in.readBoolean();
54+
customs = in.readBoolean();
55+
indices = in.readStringArray();
56+
indicesOptions = IndicesOptions.readIndicesOptions(in);
57+
waitForTimeout = in.readTimeValue();
58+
waitForMetadataVersion = in.readOptionalLong();
3859
}
3960

4061
@Override
4162
public ActionRequestValidationException validate() {
42-
return clusterStateRequest.validate();
63+
// We defer validation to `ClusterStateRequest`, which will run on the local node.
64+
return null;
65+
}
66+
67+
public RemoteClusterStateRequest all() {
68+
routingTable = true;
69+
nodes = true;
70+
metadata = true;
71+
blocks = true;
72+
customs = true;
73+
indices = Strings.EMPTY_ARRAY;
74+
return this;
75+
}
76+
77+
public RemoteClusterStateRequest clear() {
78+
routingTable = false;
79+
nodes = false;
80+
metadata = false;
81+
blocks = false;
82+
customs = false;
83+
indices = Strings.EMPTY_ARRAY;
84+
return this;
85+
}
86+
87+
public boolean routingTable() {
88+
return routingTable;
89+
}
90+
91+
public RemoteClusterStateRequest routingTable(boolean routingTable) {
92+
this.routingTable = routingTable;
93+
return this;
94+
}
95+
96+
public boolean nodes() {
97+
return nodes;
98+
}
99+
100+
public RemoteClusterStateRequest nodes(boolean nodes) {
101+
this.nodes = nodes;
102+
return this;
103+
}
104+
105+
public boolean metadata() {
106+
return metadata;
107+
}
108+
109+
public RemoteClusterStateRequest metadata(boolean metadata) {
110+
this.metadata = metadata;
111+
return this;
112+
}
113+
114+
public boolean blocks() {
115+
return blocks;
116+
}
117+
118+
public RemoteClusterStateRequest blocks(boolean blocks) {
119+
this.blocks = blocks;
120+
return this;
121+
}
122+
123+
@Override
124+
public String[] indices() {
125+
return indices;
126+
}
127+
128+
@Override
129+
public RemoteClusterStateRequest indices(String... indices) {
130+
this.indices = indices;
131+
return this;
132+
}
133+
134+
@Override
135+
public IndicesOptions indicesOptions() {
136+
return this.indicesOptions;
137+
}
138+
139+
public final RemoteClusterStateRequest indicesOptions(IndicesOptions indicesOptions) {
140+
this.indicesOptions = indicesOptions;
141+
return this;
142+
}
143+
144+
@Override
145+
public boolean includeDataStreams() {
146+
return true;
147+
}
148+
149+
public RemoteClusterStateRequest customs(boolean customs) {
150+
this.customs = customs;
151+
return this;
152+
}
153+
154+
public boolean customs() {
155+
return customs;
156+
}
157+
158+
public TimeValue waitForTimeout() {
159+
return waitForTimeout;
160+
}
161+
162+
public RemoteClusterStateRequest waitForTimeout(TimeValue waitForTimeout) {
163+
this.waitForTimeout = waitForTimeout;
164+
return this;
165+
}
166+
167+
public Long waitForMetadataVersion() {
168+
return waitForMetadataVersion;
169+
}
170+
171+
public RemoteClusterStateRequest waitForMetadataVersion(long waitForMetadataVersion) {
172+
if (waitForMetadataVersion < 1) {
173+
throw new IllegalArgumentException(
174+
"provided waitForMetadataVersion should be >= 1, but instead is [" + waitForMetadataVersion + "]"
175+
);
176+
}
177+
this.waitForMetadataVersion = waitForMetadataVersion;
178+
return this;
43179
}
44180

45181
@Override
46182
public void writeTo(StreamOutput out) throws IOException {
47-
clusterStateRequest.getParentTask().writeTo(out);
48-
out.writeTimeValue(clusterStateRequest.masterTimeout());
49-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
50-
out.writeVLong(0L); // Master term
51-
} // else no protection against routing loops in older versions
52-
out.writeBoolean(true); // Local
53-
out.writeBoolean(clusterStateRequest.routingTable());
54-
out.writeBoolean(clusterStateRequest.nodes());
55-
out.writeBoolean(clusterStateRequest.metadata());
56-
out.writeBoolean(clusterStateRequest.blocks());
57-
out.writeBoolean(clusterStateRequest.customs());
58-
out.writeStringArray(clusterStateRequest.indices());
59-
clusterStateRequest.indicesOptions().writeIndicesOptions(out);
60-
out.writeTimeValue(clusterStateRequest.waitForTimeout());
61-
out.writeOptionalLong(clusterStateRequest.waitForMetadataVersion());
183+
super.writeTo(out);
184+
out.writeBoolean(routingTable);
185+
out.writeBoolean(nodes);
186+
out.writeBoolean(metadata);
187+
out.writeBoolean(blocks);
188+
out.writeBoolean(customs);
189+
out.writeStringArray(indices);
190+
indicesOptions.writeIndicesOptions(out);
191+
out.writeTimeValue(waitForTimeout);
192+
out.writeOptionalLong(waitForMetadataVersion);
62193
}
63194

64195
@Override
@@ -68,10 +199,34 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
68199

69200
@Override
70201
public String getDescription() {
71-
return clusterStateRequest.getDescription();
72-
}
202+
final StringBuilder stringBuilder = new StringBuilder("remote cluster state [");
203+
if (routingTable) {
204+
stringBuilder.append("routing table, ");
205+
}
206+
if (nodes) {
207+
stringBuilder.append("nodes, ");
208+
}
209+
if (metadata) {
210+
stringBuilder.append("metadata, ");
211+
}
212+
if (blocks) {
213+
stringBuilder.append("blocks, ");
214+
}
215+
if (customs) {
216+
stringBuilder.append("customs, ");
217+
}
218+
if (waitForMetadataVersion != null) {
219+
stringBuilder.append("wait for metadata version [")
220+
.append(waitForMetadataVersion)
221+
.append("] with timeout [")
222+
.append(waitForTimeout)
223+
.append("], ");
224+
}
225+
if (indices.length > 0) {
226+
stringBuilder.append("indices ").append(Arrays.toString(indices)).append(", ");
227+
}
228+
stringBuilder.append("master timeout [").append(masterNodeTimeout()).append("]]");
229+
return stringBuilder.toString();
73230

74-
public ClusterStateRequest clusterStateRequest() {
75-
return clusterStateRequest;
76231
}
77232
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportRemoteClusterStateAction.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@ public TransportRemoteClusterStateAction(
4545

4646
@Override
4747
protected void doExecute(Task task, RemoteClusterStateRequest request, ActionListener<ClusterStateResponse> listener) {
48-
client.execute(ClusterStateAction.INSTANCE, request.clusterStateRequest(), listener);
48+
final ClusterStateRequest localRequest = new ClusterStateRequest(request.masterNodeTimeout());
49+
localRequest.routingTable(request.routingTable());
50+
localRequest.nodes(request.nodes());
51+
localRequest.metadata(request.metadata());
52+
localRequest.blocks(request.blocks());
53+
localRequest.customs(request.customs());
54+
if (request.waitForMetadataVersion() != null) {
55+
localRequest.waitForMetadataVersion(request.waitForMetadataVersion());
56+
}
57+
localRequest.waitForTimeout(request.waitForTimeout());
58+
localRequest.indices(request.indices());
59+
localRequest.indicesOptions(request.indicesOptions());
60+
client.execute(ClusterStateAction.INSTANCE, localRequest, listener);
4961
}
5062
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
1313
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
1414
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
15-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1615
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1716
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1817
import org.elasticsearch.action.support.ContextPreservingActionListener;
@@ -149,7 +148,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
149148
}), RemoteClusterNodesAction.Response::new, TransportResponseHandler.TRANSPORT_WORKER)
150149
);
151150
} else {
152-
final ClusterStateRequest request = new ClusterStateRequest(
151+
final RemoteClusterStateRequest request = new RemoteClusterStateRequest(
153152
/* Timeout doesn't really matter with .local(true) */
154153
TimeValue.THIRTY_SECONDS
155154
);
@@ -159,7 +158,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
159158
transportService.sendRequest(
160159
connection,
161160
ClusterStateAction.NAME,
162-
new RemoteClusterStateRequest(request),
161+
request,
163162
TransportRequestOptions.EMPTY,
164163
new ActionListenerResponseHandler<>(
165164
contextPreservingActionListener.map(response -> response.getState().nodes()::get),

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
17-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1817
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1918
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
2019
import org.elasticsearch.cluster.ClusterName;
@@ -322,10 +321,10 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl
322321
sniffResponseHandler = new RemoteClusterNodesSniffResponseHandler(connection, listener, seedNodesSuppliers);
323322
} else {
324323
action = ClusterStateAction.NAME;
325-
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(SNIFF_REQUEST_TIMEOUT);
324+
final RemoteClusterStateRequest clusterStateRequest = new RemoteClusterStateRequest(SNIFF_REQUEST_TIMEOUT);
326325
clusterStateRequest.clear();
327326
clusterStateRequest.nodes(true);
328-
request = new RemoteClusterStateRequest(clusterStateRequest);
327+
request = clusterStateRequest;
329328
sniffResponseHandler = new ClusterStateSniffResponseHandler(connection, listener, seedNodesSuppliers);
330329
}
331330

0 commit comments

Comments
 (0)