Skip to content

Commit 9ad7868

Browse files
committed
Implement CCS telemetry export as part of _cluster/stats
1 parent 67d2380 commit 9ad7868

File tree

7 files changed

+43
-6
lines changed

7 files changed

+43
-6
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ static TransportVersion def(int id) {
202202
public static final TransportVersion REPOSITORIES_TELEMETRY = def(8_732_00_0);
203203
public static final TransportVersion ML_INFERENCE_ALIBABACLOUD_SEARCH_ADDED = def(8_733_00_0);
204204

205+
206+
public static final TransportVersion CCS_TELEMETRY_STATS = def(8_733_00_0);
205207
/*
206208
* STOP! READ THIS FIRST! No, really,
207209
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public int hashCode() {
277277
*/
278278
public void add(CCSTelemetrySnapshot stats) {
279279
// This should be called in ClusterStatsResponse ctor only, so we don't need to worry about concurrency
280-
if (stats.totalCount == 0) {
280+
if (stats == null || stats.totalCount == 0) {
281281
// Just ignore the empty stats.
282282
// This could happen if the node is brand new or if the stats are not available, e.g. because it runs an old version.
283283
return;
@@ -315,7 +315,7 @@ public void add(CCSTelemetrySnapshot stats) {
315315
* "p90": 2570
316316
* }
317317
*/
318-
public static void publishLatency(XContentBuilder builder, String name, LongMetricValue took) throws IOException {
318+
private static void publishLatency(XContentBuilder builder, String name, LongMetricValue took) throws IOException {
319319
builder.startObject(name);
320320
{
321321
builder.field("max", took.max());

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
3030
private final ClusterHealthStatus clusterStatus;
3131
private final SearchUsageStats searchUsageStats;
3232
private final RepositoryUsageStats repositoryUsageStats;
33+
private final CCSTelemetrySnapshot ccsMetrics;
3334

3435
public ClusterStatsNodeResponse(StreamInput in) throws IOException {
3536
super(in);
@@ -47,6 +48,11 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
4748
} else {
4849
repositoryUsageStats = RepositoryUsageStats.EMPTY;
4950
}
51+
if (in.getTransportVersion().onOrAfter(TransportVersions.CCS_TELEMETRY_STATS)) {
52+
ccsMetrics = new CCSTelemetrySnapshot(in);
53+
} else {
54+
ccsMetrics = new CCSTelemetrySnapshot();
55+
}
5056
}
5157

5258
public ClusterStatsNodeResponse(
@@ -56,7 +62,8 @@ public ClusterStatsNodeResponse(
5662
NodeStats nodeStats,
5763
ShardStats[] shardsStats,
5864
SearchUsageStats searchUsageStats,
59-
RepositoryUsageStats repositoryUsageStats
65+
RepositoryUsageStats repositoryUsageStats,
66+
CCSTelemetrySnapshot ccsTelemetrySnapshot
6067
) {
6168
super(node);
6269
this.nodeInfo = nodeInfo;
@@ -65,6 +72,7 @@ public ClusterStatsNodeResponse(
6572
this.clusterStatus = clusterStatus;
6673
this.searchUsageStats = Objects.requireNonNull(searchUsageStats);
6774
this.repositoryUsageStats = Objects.requireNonNull(repositoryUsageStats);
75+
this.ccsMetrics = ccsTelemetrySnapshot;
6876
}
6977

7078
public NodeInfo nodeInfo() {
@@ -95,6 +103,10 @@ public RepositoryUsageStats repositoryUsageStats() {
95103
return repositoryUsageStats;
96104
}
97105

106+
public CCSTelemetrySnapshot getCcsMetrics() {
107+
return ccsMetrics;
108+
}
109+
98110
@Override
99111
public void writeTo(StreamOutput out) throws IOException {
100112
super.writeTo(out);
@@ -108,5 +120,9 @@ public void writeTo(StreamOutput out) throws IOException {
108120
if (out.getTransportVersion().onOrAfter(TransportVersions.REPOSITORIES_TELEMETRY)) {
109121
repositoryUsageStats.writeTo(out);
110122
} // else just drop these stats, ok for bwc
123+
if (out.getTransportVersion().onOrAfter(TransportVersions.CCS_TELEMETRY_STATS)) {
124+
ccsMetrics.writeTo(out);
125+
}
111126
}
127+
112128
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424
import java.util.List;
2525
import java.util.Locale;
2626

27+
import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG;
28+
2729
public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResponse> implements ToXContentFragment {
2830

2931
final ClusterStatsNodes nodesStats;
3032
final ClusterStatsIndices indicesStats;
3133
final ClusterHealthStatus status;
3234
final ClusterSnapshotStats clusterSnapshotStats;
3335
final RepositoryUsageStats repositoryUsageStats;
36+
37+
final CCSTelemetrySnapshot ccsMetrics;
3438
final long timestamp;
3539
final String clusterUUID;
3640

@@ -50,6 +54,7 @@ public ClusterStatsResponse(
5054
this.timestamp = timestamp;
5155
nodesStats = new ClusterStatsNodes(nodes);
5256
indicesStats = new ClusterStatsIndices(nodes, mappingStats, analysisStats, versionStats);
57+
ccsMetrics = new CCSTelemetrySnapshot();
5358
ClusterHealthStatus status = null;
5459
for (ClusterStatsNodeResponse response : nodes) {
5560
// only the master node populates the status
@@ -58,6 +63,7 @@ public ClusterStatsResponse(
5863
break;
5964
}
6065
}
66+
nodes.forEach(node -> ccsMetrics.add(node.getCcsMetrics()));
6167
this.status = status;
6268
this.clusterSnapshotStats = clusterSnapshotStats;
6369

@@ -90,6 +96,10 @@ public ClusterStatsIndices getIndicesStats() {
9096
return indicesStats;
9197
}
9298

99+
public CCSTelemetrySnapshot getCcsMetrics() {
100+
return ccsMetrics;
101+
}
102+
93103
@Override
94104
public void writeTo(StreamOutput out) throws IOException {
95105
TransportAction.localOnly();
@@ -125,6 +135,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
125135
builder.field("repositories");
126136
repositoryUsageStats.toXContent(builder, params);
127137

138+
if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) {
139+
ccsMetrics.toXContent(builder, params);
140+
}
141+
128142
return builder;
129143
}
130144

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
8181
private final IndicesService indicesService;
8282
private final RepositoriesService repositoriesService;
8383
private final SearchUsageHolder searchUsageHolder;
84+
private final CCSUsageTelemetry ccsUsageHolder;
8485

8586
private final MetadataStatsCache<MappingStats> mappingStatsCache;
8687
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
@@ -108,6 +109,7 @@ public TransportClusterStatsAction(
108109
this.indicesService = indicesService;
109110
this.repositoriesService = repositoriesService;
110111
this.searchUsageHolder = usageService.getSearchUsageHolder();
112+
this.ccsUsageHolder = usageService.getCcsUsageHolder();
111113
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
112114
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
113115
}
@@ -249,6 +251,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
249251
final SearchUsageStats searchUsageStats = searchUsageHolder.getSearchUsageStats();
250252

251253
final RepositoryUsageStats repositoryUsageStats = repositoriesService.getUsageStats();
254+
final CCSTelemetrySnapshot ccsUsage = ccsUsageHolder.getCCSTelemetrySnapshot();
252255

253256
return new ClusterStatsNodeResponse(
254257
nodeInfo.getNode(),
@@ -257,7 +260,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
257260
nodeStats,
258261
shardsStats.toArray(new ShardStats[shardsStats.size()]),
259262
searchUsageStats,
260-
repositoryUsageStats
263+
repositoryUsageStats,
264+
ccsUsage
261265
);
262266
}
263267

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
127127
public static final String FROZEN_INDICES_DEPRECATION_MESSAGE = "Searching frozen indices [{}] is deprecated."
128128
+ " Consider cold or frozen tiers in place of frozen indices. The frozen feature will be removed in a feature release.";
129129

130-
private static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");
130+
public static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");
131131

132132
/** The maximum number of shards for a single search request. */
133133
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(

server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public void testCreation() {
128128
null,
129129
new ShardStats[] { shardStats },
130130
new SearchUsageStats(),
131-
RepositoryUsageStats.EMPTY
131+
RepositoryUsageStats.EMPTY,
132+
null
132133
);
133134

134135
stats = VersionStats.of(metadata, Collections.singletonList(nodeResponse));

0 commit comments

Comments
 (0)