Skip to content

Commit cbb0884

Browse files
committed
Fix everything
1 parent d543f1c commit cbb0884

File tree

14 files changed

+221
-58
lines changed

14 files changed

+221
-58
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@
6767
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
6868
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
6969
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
70+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateAction;
7071
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
72+
import org.elasticsearch.action.admin.cluster.state.TransportRemoteClusterStateAction;
7173
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
7274
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
7375
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
@@ -653,6 +655,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
653655
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
654656
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
655657
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
658+
actions.register(RemoteClusterStateAction.INSTANCE, TransportRemoteClusterStateAction.class);
656659
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
657660
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
658661
actions.register(ClusterGetSettingsAction.INSTANCE, TransportClusterGetSettingsAction.class);

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class ClusterStateAction extends ActionType<ClusterStateResponse> {
1717
public static final ClusterStateAction INSTANCE = new ClusterStateAction();
1818
public static final String NAME = "cluster:monitor/state";
1919
public static final RemoteClusterActionType<ClusterStateResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
20-
NAME,
20+
RemoteClusterStateAction.NAME,
2121
ClusterStateResponse::new
2222
);
2323

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.RemoteClusterActionType;
14+
15+
public class RemoteClusterStateAction extends ActionType<ClusterStateResponse> {
16+
17+
public static final RemoteClusterStateAction INSTANCE = new RemoteClusterStateAction();
18+
public static final String NAME = "cluster:monitor/state_remote";
19+
public static final RemoteClusterActionType<ClusterStateResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
20+
NAME,
21+
ClusterStateResponse::new
22+
);
23+
24+
private RemoteClusterStateAction() {
25+
super(NAME);
26+
}
27+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.TransportVersions;
13+
import org.elasticsearch.action.ActionRequest;
14+
import org.elasticsearch.action.ActionRequestValidationException;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
20+
21+
import java.io.IOException;
22+
import java.util.Map;
23+
24+
public class RemoteClusterStateRequest extends ActionRequest {
25+
26+
private final ClusterStateRequest clusterStateRequest;
27+
28+
public RemoteClusterStateRequest(ClusterStateRequest clusterStateRequest) {
29+
this.clusterStateRequest = clusterStateRequest;
30+
}
31+
32+
public RemoteClusterStateRequest(StreamInput in) throws IOException {
33+
this.clusterStateRequest = new ClusterStateRequest(in);
34+
}
35+
36+
@Override
37+
public ActionRequestValidationException validate() {
38+
return clusterStateRequest.validate();
39+
}
40+
41+
@Override
42+
public void writeTo(StreamOutput out) throws IOException {
43+
clusterStateRequest.getParentTask().writeTo(out);
44+
out.writeTimeValue(clusterStateRequest.masterTimeout());
45+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
46+
out.writeVLong(0L); // Master term
47+
} // else no protection against routing loops in older versions
48+
out.writeBoolean(true); // Local
49+
out.writeBoolean(clusterStateRequest.routingTable());
50+
out.writeBoolean(clusterStateRequest.nodes());
51+
out.writeBoolean(clusterStateRequest.metadata());
52+
out.writeBoolean(clusterStateRequest.blocks());
53+
out.writeBoolean(clusterStateRequest.customs());
54+
out.writeStringArray(clusterStateRequest.indices());
55+
clusterStateRequest.indicesOptions().writeIndicesOptions(out);
56+
out.writeTimeValue(clusterStateRequest.waitForTimeout());
57+
out.writeOptionalLong(clusterStateRequest.waitForMetadataVersion());
58+
}
59+
60+
@Override
61+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
62+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
63+
}
64+
65+
@Override
66+
public String getDescription() {
67+
return clusterStateRequest.getDescription();
68+
}
69+
70+
public ClusterStateRequest clusterStateRequest() {
71+
return clusterStateRequest;
72+
}
73+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.HandledTransportAction;
15+
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.injection.guice.Inject;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
import org.elasticsearch.transport.TransportService;
20+
21+
public class TransportRemoteClusterStateAction extends HandledTransportAction<RemoteClusterStateRequest, ClusterStateResponse> {
22+
23+
private final Client client;
24+
25+
@Inject
26+
public TransportRemoteClusterStateAction(
27+
TransportService transportService,
28+
ThreadPool threadPool,
29+
ActionFilters actionFilters,
30+
Client client
31+
) {
32+
super(
33+
RemoteClusterStateAction.NAME,
34+
transportService,
35+
actionFilters,
36+
RemoteClusterStateRequest::new,
37+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
38+
);
39+
this.client = client;
40+
}
41+
42+
@Override
43+
protected void doExecute(Task task, RemoteClusterStateRequest request, ActionListener<ClusterStateResponse> listener) {
44+
client.execute(ClusterStateAction.INSTANCE, request.clusterStateRequest(), listener);
45+
}
46+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
1313
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
14-
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1514
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1615
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
16+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateAction;
17+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1718
import org.elasticsearch.action.support.ContextPreservingActionListener;
1819
import org.elasticsearch.cluster.node.DiscoveryNode;
1920
import org.elasticsearch.common.settings.Settings;
@@ -157,8 +158,8 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
157158

158159
transportService.sendRequest(
159160
connection,
160-
ClusterStateAction.NAME,
161-
request,
161+
RemoteClusterStateAction.NAME,
162+
new RemoteClusterStateRequest(request),
162163
TransportRequestOptions.EMPTY,
163164
new ActionListenerResponseHandler<>(
164165
contextPreservingActionListener.map(response -> response.getState().nodes()::get),

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
import org.elasticsearch.Version;
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
16-
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1716
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1817
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
18+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateAction;
19+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1920
import org.elasticsearch.cluster.ClusterName;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -320,11 +321,11 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl
320321
request = RemoteClusterNodesAction.Request.REMOTE_CLUSTER_SERVER_NODES;
321322
sniffResponseHandler = new RemoteClusterNodesSniffResponseHandler(connection, listener, seedNodesSuppliers);
322323
} else {
323-
action = ClusterStateAction.NAME;
324+
action = RemoteClusterStateAction.NAME;
324325
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(SNIFF_REQUEST_TIMEOUT);
325326
clusterStateRequest.clear();
326327
clusterStateRequest.nodes(true);
327-
request = clusterStateRequest;
328+
request = new RemoteClusterStateRequest(clusterStateRequest);
328329
sniffResponseHandler = new ClusterStateSniffResponseHandler(connection, listener, seedNodesSuppliers);
329330
}
330331

server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1818
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1919
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
20+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
2021
import org.elasticsearch.action.bulk.BulkRequest;
2122
import org.elasticsearch.action.search.ClearScrollRequest;
2223
import org.elasticsearch.action.search.SearchRequest;
@@ -119,7 +120,7 @@ public <Request extends ActionRequest> void getConnection(
119120
ClusterStateResponse.class,
120121
listener -> remoteClusterClient.execute(
121122
ClusterStateAction.REMOTE_TYPE,
122-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
123+
new RemoteClusterStateRequest(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)),
123124
listener
124125
)
125126
).getMessage()
@@ -133,7 +134,7 @@ public <Request extends ActionRequest> void getConnection(
133134
listener -> remoteClusterClient.execute(
134135
null,
135136
ClusterStateAction.REMOTE_TYPE,
136-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
137+
new RemoteClusterStateRequest(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)),
137138
listener
138139
)
139140
).getMessage()

server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1414
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1515
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
16+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1617
import org.elasticsearch.action.search.SearchResponse;
1718
import org.elasticsearch.action.search.SearchScrollRequest;
1819
import org.elasticsearch.action.search.TransportSearchScrollAction;
@@ -108,7 +109,7 @@ public void testConnectAndExecuteRequest() throws Exception {
108109
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
109110
),
110111
clusterStateResponseListener -> {
111-
final var request = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
112+
final var request = new RemoteClusterStateRequest(new ClusterStateRequest(TEST_REQUEST_TIMEOUT));
112113
if (randomBoolean()) {
113114
client.execute(ClusterStateAction.REMOTE_TYPE, request, clusterStateResponseListener);
114115
} else {
@@ -197,7 +198,7 @@ public void testEnsureWeReconnect() throws Exception {
197198
final ClusterStateResponse clusterStateResponse = safeAwait(
198199
listener -> client.execute(
199200
ClusterStateAction.REMOTE_TYPE,
200-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
201+
new RemoteClusterStateRequest(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)),
201202
listener
202203
)
203204
);
@@ -295,7 +296,7 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
295296
ClusterStateResponse.class,
296297
listener -> client.execute(
297298
ClusterStateAction.REMOTE_TYPE,
298-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
299+
new RemoteClusterStateRequest(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)),
299300
listener
300301
)
301302
),
@@ -318,7 +319,7 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
318319
() -> safeAwait(
319320
listener -> client.execute(
320321
ClusterStateAction.REMOTE_TYPE,
321-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
322+
new RemoteClusterStateRequest(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)),
322323
listener.map(v -> v)
323324
)
324325
),

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
import org.elasticsearch.TransportVersion;
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
16-
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
17-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1816
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
17+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateAction;
18+
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
1919
import org.elasticsearch.action.search.SearchRequest;
2020
import org.elasticsearch.action.search.SearchShardsRequest;
2121
import org.elasticsearch.action.search.SearchShardsResponse;
@@ -161,9 +161,9 @@ public static MockTransportService startTransport(
161161
}
162162
);
163163
newService.registerRequestHandler(
164-
ClusterStateAction.NAME,
164+
RemoteClusterStateAction.NAME,
165165
EsExecutors.DIRECT_EXECUTOR_SERVICE,
166-
ClusterStateRequest::new,
166+
RemoteClusterStateRequest::new,
167167
(request, channel, task) -> {
168168
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
169169
for (DiscoveryNode node : knownNodes) {
@@ -647,7 +647,7 @@ private void doTestCollectNodes(boolean hasClusterCredentials) throws Exception
647647
oneOf(RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME, RemoteClusterNodesAction.TYPE.name())
648648
);
649649
} else {
650-
assertThat(action, oneOf(TransportService.HANDSHAKE_ACTION_NAME, ClusterStateAction.NAME));
650+
assertThat(action, oneOf(TransportService.HANDSHAKE_ACTION_NAME, RemoteClusterStateAction.NAME));
651651
}
652652
connection.sendRequest(requestId, action, request, options);
653653
});

0 commit comments

Comments
 (0)