Skip to content

Commit 4ade56c

Browse files
committed
Fix serialization
1 parent 037d2f1 commit 4ade56c

File tree

15 files changed

+183
-95
lines changed

15 files changed

+183
-95
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99

1010
package org.elasticsearch.action.admin.cluster.state;
1111

12-
import org.elasticsearch.TransportVersions;
1312
import org.elasticsearch.action.ActionRequestValidationException;
1413
import org.elasticsearch.action.IndicesRequest;
1514
import org.elasticsearch.action.support.IndicesOptions;
1615
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1716
import org.elasticsearch.common.Strings;
1817
import org.elasticsearch.common.io.stream.StreamInput;
19-
import org.elasticsearch.common.io.stream.StreamOutput;
2018
import org.elasticsearch.core.TimeValue;
2119
import org.elasticsearch.core.UpdateForV10;
2220
import org.elasticsearch.tasks.CancellableTask;
@@ -63,25 +61,6 @@ public ClusterStateRequest(StreamInput in) throws IOException {
6361
waitForMetadataVersion = in.readOptionalLong();
6462
}
6563

66-
@Override
67-
public void writeTo(StreamOutput out) throws IOException {
68-
getParentTask().writeTo(out);
69-
out.writeTimeValue(masterTimeout());
70-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
71-
out.writeVLong(0L); // Master term
72-
} // else no protection against routing loops in older versions
73-
out.writeBoolean(true); // Local
74-
out.writeBoolean(routingTable);
75-
out.writeBoolean(nodes);
76-
out.writeBoolean(metadata);
77-
out.writeBoolean(blocks);
78-
out.writeBoolean(customs);
79-
out.writeStringArray(indices);
80-
indicesOptions.writeIndicesOptions(out);
81-
out.writeTimeValue(waitForTimeout);
82-
out.writeOptionalLong(waitForMetadataVersion);
83-
}
84-
8564
@Override
8665
public ActionRequestValidationException validate() {
8766
return null;

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.cluster.node.DiscoveryNodes;
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.core.UpdateForV10;
1918

2019
import java.io.IOException;
2120
import java.util.Objects;
@@ -64,11 +63,6 @@ public boolean isWaitForTimedOut() {
6463
return waitForTimedOut;
6564
}
6665

67-
/**
68-
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
69-
* we no longer need to support calling this action remotely.
70-
*/
71-
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7266
@Override
7367
public void writeTo(StreamOutput out) throws IOException {
7468
clusterName.writeTo(out);
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.TransportVersions;
13+
import org.elasticsearch.action.ActionRequest;
14+
import org.elasticsearch.action.ActionRequestValidationException;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
20+
21+
import java.io.IOException;
22+
import java.util.Map;
23+
24+
public class RemoteClusterStateRequest extends ActionRequest {
25+
26+
private final ClusterStateRequest clusterStateRequest;
27+
28+
public RemoteClusterStateRequest(ClusterStateRequest clusterStateRequest) {
29+
this.clusterStateRequest = clusterStateRequest;
30+
}
31+
32+
public RemoteClusterStateRequest(StreamInput in) throws IOException {
33+
this.clusterStateRequest = new ClusterStateRequest(in);
34+
}
35+
36+
@Override
37+
public ActionRequestValidationException validate() {
38+
return clusterStateRequest.validate();
39+
}
40+
41+
@Override
42+
public void writeTo(StreamOutput out) throws IOException {
43+
getParentTask().writeTo(out);
44+
out.writeTimeValue(clusterStateRequest.masterTimeout());
45+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
46+
out.writeVLong(0L); // Master term
47+
} // else no protection against routing loops in older versions
48+
out.writeBoolean(true); // Local
49+
out.writeBoolean(clusterStateRequest.routingTable());
50+
out.writeBoolean(clusterStateRequest.nodes());
51+
out.writeBoolean(clusterStateRequest.metadata());
52+
out.writeBoolean(clusterStateRequest.blocks());
53+
out.writeBoolean(clusterStateRequest.customs());
54+
out.writeStringArray(clusterStateRequest.indices());
55+
clusterStateRequest.indicesOptions().writeIndicesOptions(out);
56+
out.writeTimeValue(clusterStateRequest.waitForTimeout());
57+
out.writeOptionalLong(clusterStateRequest.waitForMetadataVersion());
58+
}
59+
60+
@Override
61+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
62+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
63+
}
64+
65+
@Override
66+
public String getDescription() {
67+
return clusterStateRequest.getDescription();
68+
}
69+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.ActionRunnable;
1616
import org.elasticsearch.action.support.ActionFilters;
17-
import org.elasticsearch.action.support.ChannelActionListener;
1817
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1918
import org.elasticsearch.cluster.ClusterState;
2019
import org.elasticsearch.cluster.ClusterStateObserver;
@@ -59,11 +58,6 @@ public class TransportClusterStateAction extends TransportLocalClusterStateActio
5958
private final IndexNameExpressionResolver indexNameExpressionResolver;
6059
private final ThreadPool threadPool;
6160

62-
/**
63-
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
64-
* we no longer need to support calling this action remotely.
65-
*/
66-
@SuppressWarnings("this-escape")
6761
@Inject
6862
public TransportClusterStateAction(
6963
TransportService transportService,
@@ -83,15 +77,6 @@ public TransportClusterStateAction(
8377
this.projectResolver = projectResolver;
8478
this.indexNameExpressionResolver = indexNameExpressionResolver;
8579
this.threadPool = threadPool;
86-
87-
transportService.registerRequestHandler(
88-
actionName,
89-
executor,
90-
false,
91-
true,
92-
ClusterStateRequest::new,
93-
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
94-
);
9580
}
9681

9782
@Override
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.HandledTransportAction;
15+
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.injection.guice.Inject;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
import org.elasticsearch.transport.TransportService;
20+
21+
public class TransportRemoteClusterStateAction extends HandledTransportAction<RemoteClusterStateRequest, ClusterStateResponse> {
22+
23+
private final Client client;
24+
25+
@Inject
26+
protected TransportRemoteClusterStateAction(
27+
TransportService transportService,
28+
ThreadPool threadPool,
29+
ActionFilters actionFilters,
30+
Client client
31+
) {
32+
super(
33+
ClusterStateAction.NAME,
34+
transportService,
35+
actionFilters,
36+
RemoteClusterStateRequest::new,
37+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
38+
);
39+
this.client = client;
40+
}
41+
42+
@Override
43+
protected void doExecute(Task task, RemoteClusterStateRequest request, ActionListener<ClusterStateResponse> listener) {
44+
client.execute(ClusterStateAction.INSTANCE, request, listener);
45+
}
46+
}

server/src/main/java/org/elasticsearch/action/support/local/LocalClusterStateRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected LocalClusterStateRequest(StreamInput in, boolean readLocal) throws IOE
6464
}
6565

6666
@Override
67-
public void writeTo(StreamOutput out) throws IOException {
67+
public final void writeTo(StreamOutput out) throws IOException {
6868
TransportAction.localOnly();
6969
}
7070

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1515
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
17+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1718
import org.elasticsearch.action.support.ContextPreservingActionListener;
1819
import org.elasticsearch.cluster.node.DiscoveryNode;
1920
import org.elasticsearch.common.settings.Settings;
@@ -158,7 +159,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
158159
transportService.sendRequest(
159160
connection,
160161
ClusterStateAction.NAME,
161-
request,
162+
new RemoteClusterStateRequest(request),
162163
TransportRequestOptions.EMPTY,
163164
new ActionListenerResponseHandler<>(
164165
contextPreservingActionListener.map(response -> response.getState().nodes()::get),

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1717
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1818
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
19+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1920
import org.elasticsearch.cluster.ClusterName;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -324,7 +325,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl
324325
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(SNIFF_REQUEST_TIMEOUT);
325326
clusterStateRequest.clear();
326327
clusterStateRequest.nodes(true);
327-
request = clusterStateRequest;
328+
request = new RemoteClusterStateRequest(clusterStateRequest);
328329
sniffResponseHandler = new ClusterStateSniffResponseHandler(connection, listener, seedNodesSuppliers);
329330
}
330331

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
17-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1817
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
18+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1919
import org.elasticsearch.action.search.SearchRequest;
2020
import org.elasticsearch.action.search.SearchShardsRequest;
2121
import org.elasticsearch.action.search.SearchShardsResponse;
@@ -163,7 +163,7 @@ public static MockTransportService startTransport(
163163
newService.registerRequestHandler(
164164
ClusterStateAction.NAME,
165165
EsExecutors.DIRECT_EXECUTOR_SERVICE,
166-
ClusterStateRequest::new,
166+
RemoteClusterStateRequest::new,
167167
(request, channel, task) -> {
168168
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
169169
for (DiscoveryNode node : knownNodes) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import org.elasticsearch.action.ActionType;
1515
import org.elasticsearch.action.RemoteClusterActionType;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
17-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1817
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
18+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1919
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
2020
import org.elasticsearch.action.admin.indices.stats.IndexStats;
2121
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@@ -134,7 +134,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
134134
client,
135135
clusterAlias,
136136
remoteClient,
137-
CcrRequests.metadataRequest(leaderIndex),
137+
new RemoteClusterStateRequest(CcrRequests.metadataRequest(leaderIndex)),
138138
onFailure,
139139
remoteClusterStateResponse -> {
140140
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
@@ -201,7 +201,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
201201
public static void checkRemoteClusterLicenseAndFetchClusterState(
202202
final Client client,
203203
final String clusterAlias,
204-
final ClusterStateRequest request,
204+
final RemoteClusterStateRequest request,
205205
final Consumer<Exception> onFailure,
206206
final Consumer<ClusterStateResponse> leaderClusterStateConsumer
207207
) {
@@ -250,7 +250,7 @@ private static void checkRemoteClusterLicenseAndFetchClusterState(
250250
final Client client,
251251
final String clusterAlias,
252252
final RemoteClusterClient remoteClient,
253-
final ClusterStateRequest request,
253+
final RemoteClusterStateRequest request,
254254
final Consumer<Exception> onFailure,
255255
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
256256
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,

0 commit comments

Comments
 (0)