Skip to content

Commit 0bd3ee8

Browse files
committed
Extract remote cluster state action
1 parent a16f97f commit 0bd3ee8

File tree

20 files changed

+199
-89
lines changed

20 files changed

+199
-89
lines changed

qa/ccs-unavailable-clusters/src/javaRestTest/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import org.apache.http.nio.entity.NStringEntity;
1515
import org.elasticsearch.TransportVersion;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
17-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1817
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
18+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1919
import org.elasticsearch.action.search.SearchRequest;
2020
import org.elasticsearch.action.search.SearchShardsRequest;
2121
import org.elasticsearch.action.search.SearchShardsResponse;
@@ -117,7 +117,7 @@ private static MockTransportService startTransport(
117117
newService.registerRequestHandler(
118118
ClusterStateAction.NAME,
119119
EsExecutors.DIRECT_EXECUTOR_SERVICE,
120-
ClusterStateRequest::new,
120+
RemoteClusterStateRequest::new,
121121
(request, channel, task) -> {
122122
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
123123
for (DiscoveryNode node : knownNodes) {

server/src/internalClusterTest/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ public void testCreateIndexStopsWaitingWhenIndexDeleted() throws Exception {
135135
.execute();
136136

137137
logger.info("--> wait until the cluster state contains the new index");
138-
assertBusy(
139-
() -> assertTrue(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject().hasIndex(indexName))
140-
);
138+
awaitClusterState(state -> state.metadata().getProject().hasIndex(indexName));
141139

142140
logger.info("--> delete the index");
143141
assertAcked(indicesAdmin().prepareDelete(indexName));

server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1414
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1515
import org.elasticsearch.action.search.ShardSearchFailure;
16-
import org.elasticsearch.cluster.ClusterState;
1716
import org.elasticsearch.cluster.health.ClusterHealthStatus;
1817
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
1918
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -120,10 +119,9 @@ private void buildRedIndex(int numShards) throws Exception {
120119

121120
clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForStatus(ClusterHealthStatus.RED).get();
122121

123-
assertBusy(() -> {
124-
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
122+
awaitClusterState(state -> {
125123
List<ShardRouting> unassigneds = RoutingNodesHelper.shardsWithState(state.getRoutingNodes(), ShardRoutingState.UNASSIGNED);
126-
assertThat(unassigneds.size(), greaterThan(0));
124+
return unassigneds.isEmpty() == false;
127125
});
128126

129127
}

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
6969
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
7070
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
71+
import org.elasticsearch.action.admin.cluster.state.TransportRemoteClusterStateAction;
7172
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
7273
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
7374
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
@@ -1057,7 +1058,7 @@ protected void configure() {
10571058
bind(action.getTransportAction()).asEagerSingleton();
10581059
transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton();
10591060
}
1060-
1061+
bind(TransportRemoteClusterStateAction.class).asEagerSingleton();
10611062
}
10621063

10631064
public ActionFilters getActionFilters() {

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.io.stream.StreamInput;
1919
import org.elasticsearch.common.io.stream.StreamOutput;
2020
import org.elasticsearch.core.TimeValue;
21-
import org.elasticsearch.core.UpdateForV10;
2221
import org.elasticsearch.tasks.CancellableTask;
2322
import org.elasticsearch.tasks.Task;
2423
import org.elasticsearch.tasks.TaskId;
@@ -46,10 +45,8 @@ public ClusterStateRequest(TimeValue masterNodeTimeout) {
4645
}
4746

4847
/**
49-
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
50-
* we no longer need to support calling this action remotely.
48+
* Even though this request is only executed on the local node, we still need to be able to serialize it for cross-cluster requests.
5149
*/
52-
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
5350
public ClusterStateRequest(StreamInput in) throws IOException {
5451
super(in);
5552
routingTable = in.readBoolean();

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.cluster.node.DiscoveryNodes;
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.core.UpdateForV10;
1918

2019
import java.io.IOException;
2120
import java.util.Objects;
@@ -64,11 +63,6 @@ public boolean isWaitForTimedOut() {
6463
return waitForTimedOut;
6564
}
6665

67-
/**
68-
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
69-
* we no longer need to support calling this action remotely.
70-
*/
71-
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7266
@Override
7367
public void writeTo(StreamOutput out) throws IOException {
7468
clusterName.writeTo(out);
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.TransportVersions;
13+
import org.elasticsearch.action.ActionRequest;
14+
import org.elasticsearch.action.ActionRequestValidationException;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
20+
21+
import java.io.IOException;
22+
import java.util.Map;
23+
24+
public class RemoteClusterStateRequest extends ActionRequest {
25+
26+
private final ClusterStateRequest clusterStateRequest;
27+
28+
public RemoteClusterStateRequest(ClusterStateRequest clusterStateRequest) {
29+
this.clusterStateRequest = clusterStateRequest;
30+
}
31+
32+
public RemoteClusterStateRequest(StreamInput in) throws IOException {
33+
this.clusterStateRequest = new ClusterStateRequest(in);
34+
}
35+
36+
@Override
37+
public ActionRequestValidationException validate() {
38+
return clusterStateRequest.validate();
39+
}
40+
41+
@Override
42+
public void writeTo(StreamOutput out) throws IOException {
43+
clusterStateRequest.getParentTask().writeTo(out);
44+
out.writeTimeValue(clusterStateRequest.masterTimeout());
45+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
46+
out.writeVLong(0L); // Master term
47+
} // else no protection against routing loops in older versions
48+
out.writeBoolean(true); // Local
49+
out.writeBoolean(clusterStateRequest.routingTable());
50+
out.writeBoolean(clusterStateRequest.nodes());
51+
out.writeBoolean(clusterStateRequest.metadata());
52+
out.writeBoolean(clusterStateRequest.blocks());
53+
out.writeBoolean(clusterStateRequest.customs());
54+
out.writeStringArray(clusterStateRequest.indices());
55+
clusterStateRequest.indicesOptions().writeIndicesOptions(out);
56+
out.writeTimeValue(clusterStateRequest.waitForTimeout());
57+
out.writeOptionalLong(clusterStateRequest.waitForMetadataVersion());
58+
}
59+
60+
@Override
61+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
62+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
63+
}
64+
65+
@Override
66+
public String getDescription() {
67+
return clusterStateRequest.getDescription();
68+
}
69+
70+
public ClusterStateRequest clusterStateRequest() {
71+
return clusterStateRequest;
72+
}
73+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.ActionRunnable;
1616
import org.elasticsearch.action.support.ActionFilters;
17-
import org.elasticsearch.action.support.ChannelActionListener;
1817
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1918
import org.elasticsearch.cluster.ClusterState;
2019
import org.elasticsearch.cluster.ClusterStateObserver;
@@ -59,11 +58,6 @@ public class TransportClusterStateAction extends TransportLocalClusterStateActio
5958
private final IndexNameExpressionResolver indexNameExpressionResolver;
6059
private final ThreadPool threadPool;
6160

62-
/**
63-
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
64-
* we no longer need to support calling this action remotely.
65-
*/
66-
@SuppressWarnings("this-escape")
6761
@Inject
6862
public TransportClusterStateAction(
6963
TransportService transportService,
@@ -83,15 +77,6 @@ public TransportClusterStateAction(
8377
this.projectResolver = projectResolver;
8478
this.indexNameExpressionResolver = indexNameExpressionResolver;
8579
this.threadPool = threadPool;
86-
87-
transportService.registerRequestHandler(
88-
actionName,
89-
executor,
90-
false,
91-
true,
92-
ClusterStateRequest::new,
93-
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
94-
);
9580
}
9681

9782
@Override
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.HandledTransportAction;
15+
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.injection.guice.Inject;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
import org.elasticsearch.transport.TransportService;
20+
21+
/**
22+
* A remote-only version of {@link TransportClusterStateAction} that should be used for cross-cluster requests.
23+
* It simply exists to handle incoming remote requests and forward them to the local transport action.
24+
*/
25+
public class TransportRemoteClusterStateAction extends HandledTransportAction<RemoteClusterStateRequest, ClusterStateResponse> {
26+
27+
private final Client client;
28+
29+
@Inject
30+
public TransportRemoteClusterStateAction(
31+
TransportService transportService,
32+
ThreadPool threadPool,
33+
ActionFilters actionFilters,
34+
Client client
35+
) {
36+
super(
37+
ClusterStateAction.NAME,
38+
transportService,
39+
actionFilters,
40+
RemoteClusterStateRequest::new,
41+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
42+
);
43+
this.client = client;
44+
}
45+
46+
@Override
47+
protected void doExecute(Task task, RemoteClusterStateRequest request, ActionListener<ClusterStateResponse> listener) {
48+
client.execute(ClusterStateAction.INSTANCE, request.clusterStateRequest(), listener);
49+
}
50+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1515
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
17+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1718
import org.elasticsearch.action.support.ContextPreservingActionListener;
1819
import org.elasticsearch.cluster.node.DiscoveryNode;
1920
import org.elasticsearch.common.settings.Settings;
@@ -158,7 +159,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
158159
transportService.sendRequest(
159160
connection,
160161
ClusterStateAction.NAME,
161-
request,
162+
new RemoteClusterStateRequest(request),
162163
TransportRequestOptions.EMPTY,
163164
new ActionListenerResponseHandler<>(
164165
contextPreservingActionListener.map(response -> response.getState().nodes()::get),

0 commit comments

Comments
 (0)