Skip to content

Commit ad4ee05

Browse files
committed
CPS usage telemetry support
1 parent afd3a42 commit ad4ee05

File tree

7 files changed

+36
-13
lines changed

7 files changed

+36
-13
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,11 @@ public void add(CCSTelemetrySnapshot stats) {
300300
successCount += stats.successCount;
301301
skippedRemotes += stats.skippedRemotes;
302302
stats.failureReasons.forEach((k, v) -> failureReasons.merge(k, v, Long::sum));
303-
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
303+
stats.featureCounts.forEach((k, v) -> {
304+
if (useMRT || k.equals(CCSUsageTelemetry.MRT_FEATURE) == false) {
305+
featureCounts.merge(k, v, Long::sum);
306+
}
307+
});
304308
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
305309
took.add(stats.took);
306310
if (useMRT) {

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public class ClusterStatsRequest extends BaseNodesRequest {
2828
* Return stripped down stats for remote clusters.
2929
*/
3030
private boolean remoteStats;
31+
/**
32+
* Are we dealing with CPS environment?
33+
* In CPS, we do not display MRT stats.
34+
*/
35+
private boolean isCPS = false;
3136

3237
/**
3338
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
@@ -43,13 +48,19 @@ public ClusterStatsRequest(boolean doRemotes, String... nodesIds) {
4348
this.remoteStats = false;
4449
}
4550

51+
public static ClusterStatsRequest newServerlessRequest(String[] nodeIds) {
52+
final ClusterStatsRequest request = new ClusterStatsRequest(false, nodeIds);
53+
request.isCPS = true;
54+
return request;
55+
}
56+
4657
@Override
4758
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
4859
return new CancellableTask(id, type, action, "", parentTaskId, headers);
4960
}
5061

5162
public static ClusterStatsRequest newRemoteClusterStatsRequest() {
52-
final var request = new ClusterStatsRequest();
63+
final ClusterStatsRequest request = new ClusterStatsRequest();
5364
request.remoteStats = true;
5465
return request;
5566
}
@@ -67,4 +78,8 @@ public boolean doRemotes() {
6778
public boolean isRemoteStats() {
6879
return remoteStats;
6980
}
81+
82+
public boolean isCPS() {
83+
return isCPS;
84+
}
7085
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.io.stream.StreamInput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.common.unit.ByteSizeValue;
22+
import org.elasticsearch.rest.RestRequest;
2223
import org.elasticsearch.xcontent.ToXContentFragment;
2324
import org.elasticsearch.xcontent.XContentBuilder;
2425

@@ -55,14 +56,15 @@ public ClusterStatsResponse(
5556
AnalysisStats analysisStats,
5657
VersionStats versionStats,
5758
ClusterSnapshotStats clusterSnapshotStats,
58-
Map<String, RemoteClusterStats> remoteClustersStats
59+
Map<String, RemoteClusterStats> remoteClustersStats,
60+
boolean skipMRT
5961
) {
6062
super(clusterName, nodes, failures);
6163
this.clusterUUID = clusterUUID;
6264
this.timestamp = timestamp;
6365
nodesStats = new ClusterStatsNodes(nodes);
6466
indicesStats = new ClusterStatsIndices(nodes, mappingStats, analysisStats, versionStats);
65-
ccsMetrics = new CCSTelemetrySnapshot();
67+
ccsMetrics = new CCSTelemetrySnapshot().setUseMRT(skipMRT == false);
6668
esqlMetrics = new CCSTelemetrySnapshot().setUseMRT(false);
6769
ClusterHealthStatus status = null;
6870
for (ClusterStatsNodeResponse response : nodes) {

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ protected void newResponseAsync(
197197
null,
198198
null,
199199
null,
200-
Map.of()
200+
Map.of(),
201+
false
201202
)
202203
: new ClusterStatsResponse(
203204
System.currentTimeMillis(),
@@ -209,7 +210,8 @@ protected void newResponseAsync(
209210
additionalStats.analysisStats(),
210211
VersionStats.of(clusterService.state().metadata(), responses),
211212
additionalStats.clusterSnapshotStats(),
212-
additionalStats.getRemoteStats()
213+
additionalStats.getRemoteStats(),
214+
request.isCPS()
213215
)
214216
).addListener(listener);
215217
}
@@ -440,7 +442,7 @@ public Map<String, RemoteClusterStats> getRemoteStats() {
440442
}
441443

442444
private boolean doRemotes(ClusterStatsRequest request) {
443-
return SearchService.CCS_COLLECT_TELEMETRY.get(settings) && request.doRemotes();
445+
return SearchService.CCS_COLLECT_TELEMETRY.get(settings) && request.doRemotes() && request.isCPS() == false;
444446
}
445447

446448
private class RemoteStatsFanout extends CancellableFanOut<String, RemoteClusterStatsResponse, Map<String, RemoteClusterStats>> {

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,9 @@ public Set<String> supportedQueryParameters() {
5656

5757
@Override
5858
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
59-
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest(
60-
request.paramAsBoolean("include_remotes", false),
61-
request.paramAsStringArray("nodeId", null)
62-
);
59+
ClusterStatsRequest clusterStatsRequest = request.isServerlessRequest()
60+
? ClusterStatsRequest.newServerlessRequest(request.paramAsStringArray("nodeId", null))
61+
: new ClusterStatsRequest(request.paramAsBoolean("include_remotes", false), request.paramAsStringArray("nodeId", null));
6362
clusterStatsRequest.setTimeout(getTimeout(request));
6463
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
6564
.cluster()

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
9292
// We use withLocal = false because if we have a remote join it will be forced into the fragment by the mapper anyway,
9393
// And the verifier checks that there are no non-synthetic limits before the join.
9494
// TODO: However, this means that the non-remote join will be always forced on the coordinator. We may want to revisit this.
95-
return duplicateLimitAsFirstGrandchild(limit, false);
95+
return duplicateLimitAsFirstGrandchild(limit, join.isRemote());
9696
}
9797
return limit;
9898
}

x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,8 @@ public void testToXContent() throws IOException {
434434
AnalysisStats.of(metadata, () -> {}),
435435
VersionStats.of(metadata, singletonList(mockNodeResponse)),
436436
ClusterSnapshotStats.EMPTY,
437-
null
437+
null,
438+
false
438439
);
439440

440441
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);

0 commit comments

Comments
 (0)