Skip to content

Commit 92f6878

Browse files
committed
Add remote cluster stats to _cluster/stats - phase 1
1 parent e484f09 commit 92f6878

File tree

7 files changed

+151
-6
lines changed

7 files changed

+151
-6
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,34 @@
1919
* A request to get cluster level stats.
2020
*/
2121
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {
22+
private final boolean doRemotes;
23+
2224
/**
2325
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
2426
* based on all nodes will be returned.
2527
*/
2628
public ClusterStatsRequest(String... nodesIds) {
29+
this(false, nodesIds);
30+
}
31+
32+
public ClusterStatsRequest(boolean doRemotes, String... nodesIds) {
2733
super(nodesIds);
34+
this.doRemotes = doRemotes;
2835
}
2936

3037
@Override
3138
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
3239
return new CancellableTask(id, type, action, "", parentTaskId, headers);
3340
}
41+
42+
/**
43+
* Should the remote cluster stats be included in the response.
44+
*/
45+
public boolean doRemotes() {
46+
return doRemotes;
47+
}
48+
49+
public ClusterStatsRequest subRequest() {
50+
return new ClusterStatsRequest(false, nodesIds());
51+
}
3452
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.IOException;
2424
import java.util.List;
2525
import java.util.Locale;
26+
import java.util.Map;
2627

2728
import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG;
2829

@@ -37,6 +38,7 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
3738
final CCSTelemetrySnapshot ccsMetrics;
3839
final long timestamp;
3940
final String clusterUUID;
41+
private final Map<String, RemoteClusterStats> remoteClustersStats;
4042

4143
public ClusterStatsResponse(
4244
long timestamp,
@@ -47,7 +49,8 @@ public ClusterStatsResponse(
4749
MappingStats mappingStats,
4850
AnalysisStats analysisStats,
4951
VersionStats versionStats,
50-
ClusterSnapshotStats clusterSnapshotStats
52+
ClusterSnapshotStats clusterSnapshotStats,
53+
Map<String, RemoteClusterStats> remoteClustersStats
5154
) {
5255
super(clusterName, nodes, failures);
5356
this.clusterUUID = clusterUUID;
@@ -74,6 +77,7 @@ public ClusterStatsResponse(
7477
// stats should be the same on every node so just pick one of them
7578
.findAny()
7679
.orElse(RepositoryUsageStats.EMPTY);
80+
this.remoteClustersStats = remoteClustersStats;
7781
}
7882

7983
public String getClusterUUID() {
@@ -137,6 +141,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
137141

138142
if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) {
139143
builder.startObject("ccs");
144+
if (remoteClustersStats != null) {
145+
builder.field("clusters", remoteClustersStats);
146+
}
140147
ccsMetrics.toXContent(builder, params);
141148
builder.endObject();
142149
}
@@ -149,4 +156,41 @@ public String toString() {
149156
return Strings.toString(this, true, true);
150157
}
151158

159+
public static class RemoteClusterStats implements ToXContentFragment {
160+
private final String clusterUUID;
161+
private final String mode;
162+
private final boolean skipUnavailable;
163+
private final boolean transportCompress;
164+
private final List<String> versions;
165+
private final String status;
166+
167+
public RemoteClusterStats(
168+
String clusterUUID,
169+
String mode,
170+
boolean skipUnavailable,
171+
boolean transportCompress,
172+
List<String> versions,
173+
String status
174+
) {
175+
this.clusterUUID = clusterUUID;
176+
this.mode = mode;
177+
this.skipUnavailable = skipUnavailable;
178+
this.transportCompress = transportCompress;
179+
this.versions = versions;
180+
this.status = status;
181+
}
182+
183+
@Override
184+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
185+
builder.startObject();
186+
builder.field("cluster_uuid", clusterUUID);
187+
builder.field("mode", mode);
188+
builder.field("skip_unavailable", skipUnavailable);
189+
builder.field("transport.compress", transportCompress);
190+
builder.field("version", versions);
191+
builder.field("status", status);
192+
builder.endObject();
193+
return builder;
194+
}
195+
}
152196
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
2020
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2121
import org.elasticsearch.action.support.ActionFilters;
22+
import org.elasticsearch.action.support.GroupedActionListener;
23+
import org.elasticsearch.action.support.PlainActionFuture;
2224
import org.elasticsearch.action.support.nodes.TransportNodesAction;
2325
import org.elasticsearch.cluster.ClusterSnapshotStats;
2426
import org.elasticsearch.cluster.ClusterState;
@@ -46,6 +48,9 @@
4648
import org.elasticsearch.tasks.Task;
4749
import org.elasticsearch.tasks.TaskId;
4850
import org.elasticsearch.threadpool.ThreadPool;
51+
import org.elasticsearch.transport.RemoteClusterConnection;
52+
import org.elasticsearch.transport.RemoteClusterService;
53+
import org.elasticsearch.transport.RemoteConnectionInfo;
4954
import org.elasticsearch.transport.TransportRequest;
5055
import org.elasticsearch.transport.TransportService;
5156
import org.elasticsearch.transport.Transports;
@@ -54,8 +59,11 @@
5459

5560
import java.io.IOException;
5661
import java.util.ArrayList;
62+
import java.util.Collection;
63+
import java.util.HashMap;
5764
import java.util.List;
5865
import java.util.Map;
66+
import java.util.concurrent.ExecutionException;
5967
import java.util.function.BiFunction;
6068
import java.util.function.BooleanSupplier;
6169

@@ -85,6 +93,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
8593

8694
private final MetadataStatsCache<MappingStats> mappingStatsCache;
8795
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
96+
private final RemoteClusterService remoteClusterService;
8897

8998
@Inject
9099
public TransportClusterStatsAction(
@@ -112,6 +121,7 @@ public TransportClusterStatsAction(
112121
this.ccsUsageHolder = usageService.getCcsUsageHolder();
113122
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
114123
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
124+
this.remoteClusterService = transportService.getRemoteClusterService();
115125
}
116126

117127
@Override
@@ -136,6 +146,10 @@ protected void newResponseAsync(
136146
clusterService.threadPool().absoluteTimeInMillis()
137147
);
138148

149+
// TODO: this should not be happening here but leaving it here for now until we figure out proper
150+
// threading/async model for this
151+
var remoteClusterStats = getRemoteClusterStats(request);
152+
139153
final ListenableFuture<MappingStats> mappingStatsStep = new ListenableFuture<>();
140154
final ListenableFuture<AnalysisStats> analysisStatsStep = new ListenableFuture<>();
141155
mappingStatsCache.get(metadata, cancellableTask::isCancelled, mappingStatsStep);
@@ -155,7 +169,8 @@ protected void newResponseAsync(
155169
mappingStats,
156170
analysisStats,
157171
VersionStats.of(metadata, responses),
158-
clusterSnapshotStats
172+
clusterSnapshotStats,
173+
remoteClusterStats
159174
)
160175
)
161176
)
@@ -315,4 +330,68 @@ protected boolean isFresh(Long currentKey, Long newKey) {
315330
return newKey <= currentKey;
316331
}
317332
}
333+
334+
private Map<String, ClusterStatsResponse.RemoteClusterStats> getRemoteClusterStats(ClusterStatsRequest request) {
335+
if (request.doRemotes() == false) {
336+
return null;
337+
}
338+
Map<String, ClusterStatsResponse.RemoteClusterStats> remoteClustersStats = new HashMap<>();
339+
340+
for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) {
341+
RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias);
342+
RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo();
343+
var remoteClusterStats = new ClusterStatsResponse.RemoteClusterStats(
344+
"UUID", // TODO cluster_uuid
345+
remoteConnectionInfo.getModeInfo().modeName(),
346+
remoteConnection.isSkipUnavailable(),
347+
false, // TODO transport.compress
348+
List.of(), // TODO version
349+
"green" // TODO status
350+
);
351+
remoteClustersStats.put(clusterAlias, remoteClusterStats);
352+
}
353+
return remoteClustersStats;
354+
}
355+
356+
private Collection<ClusterStatsResponse> getStatsFromRemotes(ClusterStatsRequest request) {
357+
if (request.doRemotes() == false) {
358+
return null;
359+
}
360+
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();
361+
362+
var remotesListener = new PlainActionFuture< Collection<ClusterStatsResponse>>();
363+
GroupedActionListener<ClusterStatsResponse> groupListener = new GroupedActionListener<ClusterStatsResponse>(
364+
remotes.size(),
365+
remotesListener
366+
);
367+
368+
for (String clusterAlias : remotes) {
369+
ClusterStatsRequest remoteRequest = request.subRequest();
370+
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
371+
clusterAlias,
372+
remoteClientResponseExecutor,
373+
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
374+
);
375+
remoteClusterService.getConnection(clusterAlias).sendRequest(
376+
1,
377+
,
378+
remoteRequest,
379+
null
380+
);
381+
remoteClusterClient.execute(TransportClusterStatsAction.TYPE, remoteRequest, groupListener);
382+
}
383+
384+
Collection<ClusterStatsResponse> remoteStats = null;
385+
try {
386+
remoteStats = remotesListener.get();
387+
} catch (InterruptedException e) {
388+
return null;
389+
} catch (ExecutionException e) {
390+
return null;
391+
}
392+
393+
return remoteStats;
394+
395+
}
396+
318397
}

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ public String getName() {
4141

4242
@Override
4343
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
44-
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest(request.paramAsStringArray("nodeId", null));
44+
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest(
45+
request.paramAsBoolean("remotes", false),
46+
request.paramAsStringArray("nodeId", null)
47+
);
4548
clusterStatsRequest.timeout(getTimeout(request));
4649
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
4750
.cluster()

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
4343
* connections per cluster has been reached.
4444
*/
45-
final class RemoteClusterConnection implements Closeable {
45+
public final class RemoteClusterConnection implements Closeable {
4646

4747
private final TransportService transportService;
4848
private final RemoteConnectionManager remoteConnectionManager;
@@ -98,7 +98,7 @@ void setSkipUnavailable(boolean skipUnavailable) {
9898
/**
9999
* Returns whether this cluster is configured to be skipped when unavailable
100100
*/
101-
boolean isSkipUnavailable() {
101+
public boolean isSkipUnavailable() {
102102
return skipUnavailable;
103103
}
104104

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public void maybeEnsureConnectedAndGetConnection(
276276
}
277277
}
278278

279-
RemoteClusterConnection getRemoteClusterConnection(String cluster) {
279+
public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
280280
if (enabled == false) {
281281
throw new IllegalArgumentException(
282282
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"

server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
3030
final ModeInfo modeInfo;
3131
final TimeValue initialConnectionTimeout;
3232
final String clusterAlias;
33+
3334
final boolean skipUnavailable;
3435
final boolean hasClusterCredentials;
3536

0 commit comments

Comments
 (0)