Skip to content

Commit 71c0a09

Browse files
authored
Add remote cluster server to nodes info response (#94444)
This PR adds remote cluster server info as a new metric for nodes info API. It also prevents the underlying _remote_cluster profile from being reported under the transport profile section. The fact that RCS uses transport profile underlyingly is an implementation detail and should not be exposed.
1 parent e126dc7 commit 71c0a09

File tree

20 files changed

+163
-47
lines changed

20 files changed

+163
-47
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@
2626
import org.elasticsearch.node.ReportingService;
2727
import org.elasticsearch.search.aggregations.support.AggregationInfo;
2828
import org.elasticsearch.threadpool.ThreadPoolInfo;
29+
import org.elasticsearch.transport.RemoteClusterServerInfo;
2930
import org.elasticsearch.transport.TransportInfo;
3031

3132
import java.io.IOException;
3233
import java.util.HashMap;
3334
import java.util.Map;
3435

36+
import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_REMOTE_CLUSTER_SECURITY;
37+
3538
/**
3639
* Node information (static, does not change over time).
3740
*/
@@ -76,6 +79,9 @@ public NodeInfo(StreamInput in) throws IOException {
7679
if (in.getTransportVersion().onOrAfter(TransportVersion.V_7_10_0)) {
7780
addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new));
7881
}
82+
if (in.getTransportVersion().onOrAfter(TRANSPORT_VERSION_REMOTE_CLUSTER_SECURITY)) {
83+
addInfoIfNonNull(RemoteClusterServerInfo.class, in.readOptionalWriteable(RemoteClusterServerInfo::new));
84+
}
7985
}
8086

8187
public NodeInfo(
@@ -89,6 +95,7 @@ public NodeInfo(
8995
@Nullable ThreadPoolInfo threadPool,
9096
@Nullable TransportInfo transport,
9197
@Nullable HttpInfo http,
98+
@Nullable RemoteClusterServerInfo remoteClusterServer,
9299
@Nullable PluginsAndModules plugins,
93100
@Nullable IngestInfo ingest,
94101
@Nullable AggregationInfo aggsInfo,
@@ -104,6 +111,7 @@ public NodeInfo(
104111
addInfoIfNonNull(ThreadPoolInfo.class, threadPool);
105112
addInfoIfNonNull(TransportInfo.class, transport);
106113
addInfoIfNonNull(HttpInfo.class, http);
114+
addInfoIfNonNull(RemoteClusterServerInfo.class, remoteClusterServer);
107115
addInfoIfNonNull(PluginsAndModules.class, plugins);
108116
addInfoIfNonNull(IngestInfo.class, ingest);
109117
addInfoIfNonNull(AggregationInfo.class, aggsInfo);
@@ -197,5 +205,8 @@ public void writeTo(StreamOutput out) throws IOException {
197205
if (out.getTransportVersion().onOrAfter(TransportVersion.V_7_10_0)) {
198206
out.writeOptionalWriteable(getInfo(AggregationInfo.class));
199207
}
208+
if (out.getTransportVersion().onOrAfter(TRANSPORT_VERSION_REMOTE_CLUSTER_SECURITY)) {
209+
out.writeOptionalWriteable(getInfo(RemoteClusterServerInfo.class));
210+
}
200211
}
201212
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.transport.TcpTransport;
1415

1516
import java.io.IOException;
1617
import java.util.Arrays;
@@ -139,6 +140,7 @@ public enum Metric {
139140
THREAD_POOL("thread_pool"),
140141
TRANSPORT("transport"),
141142
HTTP("http"),
143+
REMOTE_CLUSTER_SERVER("remote_cluster_server"),
142144
PLUGINS("plugins"),
143145
INGEST("ingest"),
144146
AGGREGATIONS("aggregations"),
@@ -155,7 +157,10 @@ public String metricName() {
155157
}
156158

157159
public static Set<String> allMetrics() {
158-
return Arrays.stream(values()).map(Metric::metricName).collect(Collectors.toSet());
160+
return Arrays.stream(values())
161+
.filter(metric -> TcpTransport.isUntrustedRemoteClusterEnabled() || metric != REMOTE_CLUSTER_SERVER)
162+
.map(Metric::metricName)
163+
.collect(Collectors.toSet());
159164
}
160165
}
161166
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.monitor.process.ProcessInfo;
2424
import org.elasticsearch.search.aggregations.support.AggregationInfo;
2525
import org.elasticsearch.threadpool.ThreadPoolInfo;
26+
import org.elasticsearch.transport.RemoteClusterServerInfo;
2627
import org.elasticsearch.transport.TransportInfo;
2728
import org.elasticsearch.xcontent.ToXContentFragment;
2829
import org.elasticsearch.xcontent.XContentBuilder;
@@ -110,6 +111,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
110111
if (nodeInfo.getInfo(HttpInfo.class) != null) {
111112
nodeInfo.getInfo(HttpInfo.class).toXContent(builder, params);
112113
}
114+
if (nodeInfo.getInfo(RemoteClusterServerInfo.class) != null) {
115+
nodeInfo.getInfo(RemoteClusterServerInfo.class).toXContent(builder, params);
116+
}
113117
if (nodeInfo.getInfo(PluginsAndModules.class) != null) {
114118
nodeInfo.getInfo(PluginsAndModules.class).toXContent(builder, params);
115119
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) {
8787
metrics.contains(NodesInfoRequest.Metric.THREAD_POOL.metricName()),
8888
metrics.contains(NodesInfoRequest.Metric.TRANSPORT.metricName()),
8989
metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()),
90+
metrics.contains(NodesInfoRequest.Metric.REMOTE_CLUSTER_SERVER.metricName()),
9091
metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()),
9192
metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()),
9293
metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()),

server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesAction.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,15 @@
2323
import org.elasticsearch.common.inject.Inject;
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.io.stream.StreamOutput;
26-
import org.elasticsearch.common.transport.BoundTransportAddress;
2726
import org.elasticsearch.common.util.concurrent.ThreadContext;
2827
import org.elasticsearch.tasks.Task;
29-
import org.elasticsearch.transport.TransportInfo;
28+
import org.elasticsearch.transport.RemoteClusterServerInfo;
3029
import org.elasticsearch.transport.TransportService;
3130

3231
import java.io.IOException;
3332
import java.util.List;
34-
import java.util.Map;
3533
import java.util.Objects;
3634

37-
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
38-
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;
39-
4035
public class RemoteClusterNodesAction extends ActionType<RemoteClusterNodesAction.Response> {
4136

4237
public static final RemoteClusterNodesAction INSTANCE = new RemoteClusterNodesAction();
@@ -99,7 +94,7 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
9994
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
10095
final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
10196
nodesInfoRequest.clear();
102-
nodesInfoRequest.addMetrics(NodesInfoRequest.Metric.SETTINGS.metricName(), NodesInfoRequest.Metric.TRANSPORT.metricName());
97+
nodesInfoRequest.addMetrics(NodesInfoRequest.Metric.REMOTE_CLUSTER_SERVER.metricName());
10398
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
10499
try (var ignore = threadContext.stashContext()) {
105100
threadContext.markAsSystemContext();
@@ -109,15 +104,11 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
109104
nodesInfoRequest,
110105
new ActionListenerResponseHandler<>(ActionListener.wrap(response -> {
111106
final List<DiscoveryNode> remoteClusterNodes = response.getNodes().stream().map(nodeInfo -> {
112-
if (false == REMOTE_CLUSTER_SERVER_ENABLED.get(nodeInfo.getSettings())) {
107+
final RemoteClusterServerInfo remoteClusterServerInfo = nodeInfo.getInfo(RemoteClusterServerInfo.class);
108+
if (remoteClusterServerInfo == null) {
113109
return null;
114110
}
115-
final Map<String, BoundTransportAddress> profileAddresses = nodeInfo.getInfo(TransportInfo.class)
116-
.getProfileAddresses();
117-
final BoundTransportAddress remoteClusterServerAddress = profileAddresses.get(REMOTE_CLUSTER_PROFILE);
118-
assert remoteClusterServerAddress != null
119-
: "remote cluster server is enabled but corresponding transport profile is missing";
120-
return nodeInfo.getNode().withTransportAddress(remoteClusterServerAddress.publishAddress());
111+
return nodeInfo.getNode().withTransportAddress(remoteClusterServerInfo.getAddress().publishAddress());
121112
}).filter(Objects::nonNull).toList();
122113
listener.onResponse(new Response(remoteClusterNodes));
123114
}, listener::onFailure), NodesInfoResponse::new)

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in, DiscoveryNode
178178
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) {
179179
assert task instanceof CancellableTask;
180180
final CancellableTask cancellableTask = (CancellableTask) task;
181-
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false);
181+
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, false, true, false, false, false);
182182
NodeStats nodeStats = nodeService.stats(
183183
CommonStatsFlags.NONE,
184184
true,

server/src/main/java/org/elasticsearch/node/NodeService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public NodeInfo info(
101101
boolean threadPool,
102102
boolean transport,
103103
boolean http,
104+
boolean remoteClusterServer,
104105
boolean plugin,
105106
boolean ingest,
106107
boolean aggs,
@@ -117,6 +118,7 @@ public NodeInfo info(
117118
threadPool ? this.threadPool.info() : null,
118119
transport ? transportService.info() : null,
119120
http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
121+
remoteClusterServer ? transportService.getRemoteClusterService().info() : null,
120122
plugin ? (pluginService == null ? null : pluginService.info()) : null,
121123
ingest ? (ingestService == null ? null : ingestService.info()) : null,
122124
aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null,
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.transport;
10+
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.network.InetAddresses;
14+
import org.elasticsearch.common.transport.BoundTransportAddress;
15+
import org.elasticsearch.common.transport.TransportAddress;
16+
import org.elasticsearch.node.ReportingService;
17+
import org.elasticsearch.xcontent.XContentBuilder;
18+
19+
import java.io.IOException;
20+
21+
public class RemoteClusterServerInfo implements ReportingService.Info {
22+
23+
private final BoundTransportAddress address;
24+
25+
public RemoteClusterServerInfo(StreamInput in) throws IOException {
26+
this(new BoundTransportAddress(in));
27+
}
28+
29+
public RemoteClusterServerInfo(BoundTransportAddress address) {
30+
this.address = address;
31+
}
32+
33+
@Override
34+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
35+
builder.startObject("remote_cluster_server");
36+
builder.array("bound_address", (Object[]) address.boundAddresses());
37+
TransportAddress publishAddress = address.publishAddress();
38+
String publishAddressString = publishAddress.toString();
39+
String hostString = publishAddress.address().getHostString();
40+
if (InetAddresses.isInetAddress(hostString) == false) {
41+
publishAddressString = hostString + '/' + publishAddress;
42+
}
43+
builder.field("publish_address", publishAddressString);
44+
builder.endObject();
45+
return builder;
46+
}
47+
48+
@Override
49+
public void writeTo(StreamOutput out) throws IOException {
50+
address.writeTo(out);
51+
}
52+
53+
public BoundTransportAddress getAddress() {
54+
return address;
55+
}
56+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.util.concurrent.CountDown;
2828
import org.elasticsearch.core.IOUtils;
2929
import org.elasticsearch.core.TimeValue;
30+
import org.elasticsearch.node.ReportingService;
3031
import org.elasticsearch.threadpool.ThreadPool;
3132

3233
import java.io.Closeable;
@@ -48,11 +49,12 @@
4849
import static org.elasticsearch.common.settings.Setting.boolSetting;
4950
import static org.elasticsearch.common.settings.Setting.enumSetting;
5051
import static org.elasticsearch.common.settings.Setting.timeSetting;
52+
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;
5153

5254
/**
5355
* Basic service for accessing remote clusters via gateway nodes
5456
*/
55-
public final class RemoteClusterService extends RemoteClusterAware implements Closeable {
57+
public final class RemoteClusterService extends RemoteClusterAware implements Closeable, ReportingService<RemoteClusterServerInfo> {
5658

5759
private static final Logger logger = LogManager.getLogger(RemoteClusterService.class);
5860

@@ -129,17 +131,23 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
129131
public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake";
130132

131133
private final boolean enabled;
134+
private final boolean remoteClusterServerEnabled;
132135

133136
public boolean isEnabled() {
134137
return enabled;
135138
}
136139

140+
public boolean isRemoteClusterServerEnabled() {
141+
return remoteClusterServerEnabled;
142+
}
143+
137144
private final TransportService transportService;
138145
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
139146

140147
RemoteClusterService(Settings settings, TransportService transportService) {
141148
super(settings);
142149
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
150+
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
143151
this.transportService = transportService;
144152

145153
if (RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.get(settings)) {
@@ -375,6 +383,15 @@ public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
375383
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
376384
}
377385

386+
@Override
387+
public RemoteClusterServerInfo info() {
388+
if (remoteClusterServerEnabled) {
389+
return new RemoteClusterServerInfo(transportService.boundRemoteAccessAddress());
390+
} else {
391+
return null;
392+
}
393+
}
394+
378395
/**
379396
* Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode}
380397
* function on success.

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.function.Function;
6161
import java.util.function.Predicate;
6262
import java.util.function.Supplier;
63+
import java.util.stream.Collectors;
6364

6465
import static org.elasticsearch.core.Strings.format;
6566

@@ -406,7 +407,16 @@ public TransportInfo info() {
406407
if (boundTransportAddress == null) {
407408
return null;
408409
}
409-
return new TransportInfo(boundTransportAddress, transport.profileBoundAddresses());
410+
final Map<String, BoundTransportAddress> profileAddresses = transport.profileBoundAddresses();
411+
if (remoteClusterService.isRemoteClusterServerEnabled()) {
412+
final Map<String, BoundTransportAddress> filteredProfileAddress = profileAddresses.entrySet()
413+
.stream()
414+
.filter(entry -> false == RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE.equals(entry.getKey()))
415+
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
416+
return new TransportInfo(boundTransportAddress, filteredProfileAddress);
417+
} else {
418+
return new TransportInfo(boundTransportAddress, profileAddresses);
419+
}
410420
}
411421

412422
public TransportStats stats() {

0 commit comments

Comments
 (0)