Skip to content

Commit c6e7c04

Browse files
committed
Implement remote cluster stats polling
1 parent 6092acf commit c6e7c04

File tree

9 files changed

+623
-216
lines changed

9 files changed

+623
-216
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
7373
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
7474
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
75+
import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction;
7576
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
7677
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
7778
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
@@ -641,6 +642,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
641642
actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class);
642643
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
643644
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
645+
actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class);
644646
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
645647
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
646648
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,23 @@
99
package org.elasticsearch.action.admin.cluster.stats;
1010

1111
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
1214
import org.elasticsearch.tasks.CancellableTask;
1315
import org.elasticsearch.tasks.Task;
1416
import org.elasticsearch.tasks.TaskId;
1517

18+
import java.io.IOException;
1619
import java.util.Map;
1720

1821
/**
1922
* A request to get cluster level stats.
23+
* This request can be used both to request stats from single cluster or from remote cluster.
2024
*/
2125
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {
26+
/**
27+
* Should the remote cluster stats be included in the response.
28+
*/
2229
private final boolean doRemotes;
2330

2431
/**
@@ -34,6 +41,12 @@ public ClusterStatsRequest(boolean doRemotes, String... nodesIds) {
3441
this.doRemotes = doRemotes;
3542
}
3643

44+
public ClusterStatsRequest(StreamInput in) throws IOException {
45+
super(in.readStringArray());
46+
// We will never ask the remote to collect remote stats
47+
doRemotes = false;
48+
}
49+
3750
@Override
3851
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
3952
return new CancellableTask(id, type, action, "", parentTaskId, headers);
@@ -49,4 +62,11 @@ public boolean doRemotes() {
4962
public ClusterStatsRequest subRequest() {
5063
return new ClusterStatsRequest(false, nodesIds());
5164
}
65+
66+
@Override
67+
public void writeTo(StreamOutput out) throws IOException {
68+
out.writeStringArrayNullable(nodesIds());
69+
// We will never ask remote to collect remote stats
70+
}
71+
5272
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.Locale;
2626
import java.util.Map;
27+
import java.util.Set;
2728

2829
import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG;
2930

@@ -160,24 +161,46 @@ public static class RemoteClusterStats implements ToXContentFragment {
160161
private final String clusterUUID;
161162
private final String mode;
162163
private final boolean skipUnavailable;
163-
private final boolean transportCompress;
164-
private final List<String> versions;
164+
private final String transportCompress;
165+
private final Set<String> versions;
165166
private final String status;
167+
private final long nodesCount;
168+
private final long shardsCount;
169+
private final long indicesCount;
170+
private final long indicesBytes;
171+
private final long heapBytes;
172+
private final long memBytes;
166173

167174
public RemoteClusterStats(
168-
String clusterUUID,
175+
RemoteClusterStatsResponse remoteResponse,
169176
String mode,
170177
boolean skipUnavailable,
171-
boolean transportCompress,
172-
List<String> versions,
173-
String status
178+
String transportCompress
174179
) {
175-
this.clusterUUID = clusterUUID;
176180
this.mode = mode;
177181
this.skipUnavailable = skipUnavailable;
178-
this.transportCompress = transportCompress;
179-
this.versions = versions;
180-
this.status = status;
182+
this.transportCompress = transportCompress.toLowerCase(Locale.ROOT);
183+
if (remoteResponse != null) {
184+
this.clusterUUID = remoteResponse.getClusterUUID();
185+
this.versions = remoteResponse.getVersions();
186+
this.status = remoteResponse.getStatus().name().toLowerCase(Locale.ROOT);
187+
this.nodesCount = remoteResponse.getNodesCount();
188+
this.shardsCount = remoteResponse.getShardsCount();
189+
this.indicesCount = remoteResponse.getIndicesCount();
190+
this.indicesBytes = remoteResponse.getIndicesBytes();
191+
this.heapBytes = remoteResponse.getHeapBytes();
192+
this.memBytes = remoteResponse.getMemBytes();
193+
} else {
194+
this.status = "unavailable";
195+
this.clusterUUID = "unavailable";
196+
this.versions = Set.of();
197+
this.nodesCount = 0;
198+
this.shardsCount = 0;
199+
this.indicesCount = 0;
200+
this.indicesBytes = 0;
201+
this.heapBytes = 0;
202+
this.memBytes = 0;
203+
}
181204
}
182205

183206
@Override
@@ -187,8 +210,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
187210
builder.field("mode", mode);
188211
builder.field("skip_unavailable", skipUnavailable);
189212
builder.field("transport.compress", transportCompress);
190-
builder.field("version", versions);
191213
builder.field("status", status);
214+
builder.field("version", versions);
215+
builder.field("nodes_count", nodesCount);
216+
builder.field("shards_count", shardsCount);
217+
builder.field("indices_count", indicesCount);
218+
builder.field("indices_total_size_bytes", indicesBytes);
219+
builder.field("max_heap_bytes", heapBytes);
220+
builder.field("mem_total_bytes", memBytes);
192221
builder.endObject();
193222
return builder;
194223
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.admin.cluster.stats;
10+
11+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
12+
import org.elasticsearch.cluster.ClusterName;
13+
import org.elasticsearch.cluster.health.ClusterHealthStatus;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
16+
17+
import java.io.IOException;
18+
import java.util.List;
19+
import java.util.Set;
20+
21+
/**
22+
* Trimmed down cluster stats response for reporting to a remote cluster.
23+
*/
24+
public class RemoteClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResponse> {
25+
final String clusterUUID;
26+
final ClusterHealthStatus status;
27+
private final Set<String> versions;
28+
private final long nodesCount;
29+
private final long shardsCount;
30+
private final long indicesCount;
31+
private final long indicesBytes;
32+
private final long heapBytes;
33+
private final long memBytes;
34+
private String remoteName;
35+
36+
public Set<String> getVersions() {
37+
return versions;
38+
}
39+
40+
public long getNodesCount() {
41+
return nodesCount;
42+
}
43+
44+
public long getShardsCount() {
45+
return shardsCount;
46+
}
47+
48+
public long getIndicesCount() {
49+
return indicesCount;
50+
}
51+
52+
public long getIndicesBytes() {
53+
return indicesBytes;
54+
}
55+
56+
public long getHeapBytes() {
57+
return heapBytes;
58+
}
59+
60+
public long getMemBytes() {
61+
return memBytes;
62+
}
63+
64+
public String getRemoteName() {
65+
return remoteName;
66+
}
67+
68+
public void setRemoteName(String remoteName) {
69+
this.remoteName = remoteName;
70+
}
71+
72+
public RemoteClusterStatsResponse(
73+
ClusterName clusterName,
74+
String clusterUUID,
75+
ClusterHealthStatus status,
76+
Set<String> versions,
77+
long nodesCount,
78+
long shardsCount,
79+
long indicesCount,
80+
long indicesBytes,
81+
long heapBytes,
82+
long memBytes
83+
) {
84+
super(clusterName, List.of(), List.of());
85+
this.clusterUUID = clusterUUID;
86+
this.status = status;
87+
this.versions = versions;
88+
this.nodesCount = nodesCount;
89+
this.shardsCount = shardsCount;
90+
this.indicesCount = indicesCount;
91+
this.indicesBytes = indicesBytes;
92+
this.heapBytes = heapBytes;
93+
this.memBytes = memBytes;
94+
}
95+
96+
public String getClusterUUID() {
97+
return this.clusterUUID;
98+
}
99+
100+
public ClusterHealthStatus getStatus() {
101+
return this.status;
102+
}
103+
104+
@Override
105+
public void writeTo(StreamOutput out) throws IOException {
106+
super.writeTo(out);
107+
out.writeString(clusterUUID);
108+
status.writeTo(out);
109+
out.writeStringCollection(versions);
110+
out.writeLong(nodesCount);
111+
out.writeLong(shardsCount);
112+
out.writeLong(indicesCount);
113+
out.writeLong(indicesBytes);
114+
out.writeLong(heapBytes);
115+
out.writeLong(memBytes);
116+
}
117+
118+
public RemoteClusterStatsResponse(StreamInput in) throws IOException {
119+
super(in);
120+
this.clusterUUID = in.readString();
121+
this.status = ClusterHealthStatus.readFrom(in);
122+
this.versions = in.readCollectionAsSet(StreamInput::readString);
123+
this.nodesCount = in.readLong();
124+
this.shardsCount = in.readLong();
125+
this.indicesCount = in.readLong();
126+
this.indicesBytes = in.readLong();
127+
this.heapBytes = in.readLong();
128+
this.memBytes = in.readLong();
129+
}
130+
131+
@Override
132+
protected List<ClusterStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
133+
return List.of();
134+
}
135+
136+
@Override
137+
protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException {}
138+
139+
/**
140+
* Default empty response, can be used in case the cluster did not respond.
141+
*/
142+
public static final RemoteClusterStatsResponse EMPTY = new RemoteClusterStatsResponse(
143+
ClusterName.DEFAULT,
144+
"",
145+
ClusterHealthStatus.RED,
146+
Set.of(),
147+
0,
148+
0,
149+
0,
150+
0,
151+
0,
152+
0
153+
);
154+
}

0 commit comments

Comments
 (0)