Skip to content

Commit baae609

Browse files
committed
Add ESQL telemetry collection (#119474)
* Add ESQL telemetry collection (cherry picked from commit 0292905) # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java
1 parent edfdd07 commit baae609

File tree

28 files changed

+865
-126
lines changed

28 files changed

+865
-126
lines changed

docs/changelog/119474.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119474
2+
summary: "Add ES|QL cross-cluster query telemetry collection"
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/reference/cluster/stats.asciidoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ Returns cluster statistics.
2525

2626
* If the {es} {security-features} are enabled, you must have the `monitor` or
2727
`manage` <<privileges-list-cluster,cluster privilege>> to use this API.
28-
2928
[[cluster-stats-api-desc]]
3029
==== {api-description-title}
3130

@@ -1397,7 +1396,7 @@ as a human-readable string.
13971396
13981397
13991398
`_search`:::
1400-
(object) Contains the information about the <<modules-cross-cluster-search, {ccs}>> usage in the cluster.
1399+
(object) Contains information about <<modules-cross-cluster-search, {ccs}>> usage.
14011400
+
14021401
.Properties of `_search`
14031402
[%collapsible%open]
@@ -1528,7 +1527,11 @@ This may include requests where partial results were returned, but not requests
15281527
15291528
=======
15301529

1530+
15311531
======
1532+
`_esql`:::
1533+
(object) Contains information about <<esql-cross-clusters,{esql} {ccs}>> usage.
1534+
The structure of the object is the same as the `_search` object above.
15321535
15331536
=====
15341537

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java

Lines changed: 6 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -37,27 +37,19 @@
3737
import org.elasticsearch.tasks.Task;
3838
import org.elasticsearch.test.AbstractMultiClustersTestCase;
3939
import org.elasticsearch.test.InternalTestCluster;
40+
import org.elasticsearch.test.SkipUnavailableRule;
41+
import org.elasticsearch.test.SkipUnavailableRule.NotSkipped;
4042
import org.elasticsearch.usage.UsageService;
4143
import org.junit.Assert;
4244
import org.junit.Rule;
43-
import org.junit.rules.TestRule;
44-
import org.junit.runner.Description;
45-
import org.junit.runners.model.Statement;
46-
47-
import java.lang.annotation.ElementType;
48-
import java.lang.annotation.Retention;
49-
import java.lang.annotation.RetentionPolicy;
50-
import java.lang.annotation.Target;
51-
import java.util.Arrays;
45+
5246
import java.util.Collection;
5347
import java.util.Collections;
5448
import java.util.HashMap;
5549
import java.util.List;
5650
import java.util.Map;
5751
import java.util.concurrent.ExecutionException;
5852
import java.util.concurrent.TimeUnit;
59-
import java.util.function.Function;
60-
import java.util.stream.Collectors;
6153

6254
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
6355
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.MRT_FEATURE;
@@ -498,7 +490,7 @@ public void testRemoteOnlyTimesOut() throws Exception {
498490
assertThat(perCluster.get(REMOTE2), equalTo(null));
499491
}
500492

501-
@SkipOverride(aliases = { REMOTE1 })
493+
@NotSkipped(aliases = { REMOTE1 })
502494
public void testRemoteTimesOutFailure() throws Exception {
503495
Map<String, Object> testClusterInfo = setupClusters();
504496
String remoteIndex = (String) testClusterInfo.get("remote.index");
@@ -528,7 +520,7 @@ public void testRemoteTimesOutFailure() throws Exception {
528520
/**
529521
* Search when all the remotes failed and not skipped
530522
*/
531-
@SkipOverride(aliases = { REMOTE1, REMOTE2 })
523+
@NotSkipped(aliases = { REMOTE1, REMOTE2 })
532524
public void testFailedAllRemotesSearch() throws Exception {
533525
Map<String, Object> testClusterInfo = setupClusters();
534526
String localIndex = (String) testClusterInfo.get("local.index");
@@ -577,7 +569,7 @@ public void testRemoteHasNoIndex() throws Exception {
577569
/**
578570
* Test that we're still counting remote search even if remote cluster has no such index
579571
*/
580-
@SkipOverride(aliases = { REMOTE1 })
572+
@NotSkipped(aliases = { REMOTE1 })
581573
public void testRemoteHasNoIndexFailure() throws Exception {
582574
SearchRequest searchRequest = makeSearchRequest(REMOTE1 + ":no_such_index");
583575
CCSTelemetrySnapshot telemetry = getTelemetryFromFailedSearch(searchRequest);
@@ -689,40 +681,4 @@ private int indexDocs(Client client, String index) {
689681
return numDocs;
690682
}
691683

692-
/**
693-
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
694-
*/
695-
@Retention(RetentionPolicy.RUNTIME)
696-
@Target(ElementType.METHOD)
697-
@interface SkipOverride {
698-
String[] aliases();
699-
}
700-
701-
/**
702-
* Test rule to process skip annotations
703-
*/
704-
static class SkipUnavailableRule implements TestRule {
705-
private final Map<String, Boolean> skipMap;
706-
707-
SkipUnavailableRule(String... clusterAliases) {
708-
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
709-
}
710-
711-
public Map<String, Boolean> getMap() {
712-
return skipMap;
713-
}
714-
715-
@Override
716-
public Statement apply(Statement base, Description description) {
717-
// Check for annotation named "SkipOverride" and set the overrides accordingly
718-
var aliases = description.getAnnotation(SkipOverride.class);
719-
if (aliases != null) {
720-
for (String alias : aliases.aliases()) {
721-
skipMap.put(alias, false);
722-
}
723-
}
724-
return base;
725-
}
726-
727-
}
728684
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ static TransportVersion def(int id) {
152152
public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0);
153153
public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0);
154154
public static final TransportVersion TRANSFORMS_UPGRADE_MODE = def(8_814_00_0);
155+
public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0);
156+
public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0);
155157

156158
/*
157159
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
* <br>
4242
*/
4343
public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment {
44-
public static final String CCS_TELEMETRY_FIELD_NAME = "_search";
4544
private long totalCount;
4645
private long successCount;
4746
private final Map<String, Long> failureReasons;
@@ -66,6 +65,9 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
6665

6766
private final Map<String, Long> clientCounts;
6867
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
68+
// Whether we should use per-MRT (minimize roundtrips) metrics.
69+
// ES|QL does not have "minimize_roundtrips" option, so we don't collect those metrics for ES|QL usage.
70+
private boolean useMRT = true;
6971

7072
/**
7173
* Creates a new stats instance with the provided info.
@@ -191,6 +193,11 @@ public Map<String, PerClusterCCSTelemetry> getByRemoteCluster() {
191193
return Collections.unmodifiableMap(byRemoteCluster);
192194
}
193195

196+
public CCSTelemetrySnapshot setUseMRT(boolean useMRT) {
197+
this.useMRT = useMRT;
198+
return this;
199+
}
200+
194201
public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment {
195202
private long count;
196203
private long skippedCount;
@@ -270,6 +277,11 @@ public boolean equals(Object o) {
270277
public int hashCode() {
271278
return Objects.hash(count, skippedCount, took);
272279
}
280+
281+
@Override
282+
public String toString() {
283+
return Strings.toString(this, true, true);
284+
}
273285
}
274286

275287
/**
@@ -291,8 +303,10 @@ public void add(CCSTelemetrySnapshot stats) {
291303
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
292304
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
293305
took.add(stats.took);
294-
tookMrtTrue.add(stats.tookMrtTrue);
295-
tookMrtFalse.add(stats.tookMrtFalse);
306+
if (useMRT) {
307+
tookMrtTrue.add(stats.tookMrtTrue);
308+
tookMrtFalse.add(stats.tookMrtFalse);
309+
}
296310
remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax);
297311
if (totalCount > 0 && oldCount > 0) {
298312
// Weighted average
@@ -328,30 +342,28 @@ private static void publishLatency(XContentBuilder builder, String name, LongMet
328342

329343
@Override
330344
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
331-
builder.startObject(CCS_TELEMETRY_FIELD_NAME);
332-
{
333-
builder.field("total", totalCount);
334-
builder.field("success", successCount);
335-
builder.field("skipped", skippedRemotes);
336-
publishLatency(builder, "took", took);
345+
builder.field("total", totalCount);
346+
builder.field("success", successCount);
347+
builder.field("skipped", skippedRemotes);
348+
publishLatency(builder, "took", took);
349+
if (useMRT) {
337350
publishLatency(builder, "took_mrt_true", tookMrtTrue);
338351
publishLatency(builder, "took_mrt_false", tookMrtFalse);
339-
builder.field("remotes_per_search_max", remotesPerSearchMax);
340-
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
341-
builder.field("failure_reasons", failureReasons);
342-
builder.field("features", featureCounts);
343-
builder.field("clients", clientCounts);
344-
builder.startObject("clusters");
345-
{
346-
for (var entry : byRemoteCluster.entrySet()) {
347-
String remoteName = entry.getKey();
348-
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
349-
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
350-
}
351-
builder.field(remoteName, entry.getValue());
352+
}
353+
builder.field("remotes_per_search_max", remotesPerSearchMax);
354+
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
355+
builder.field("failure_reasons", failureReasons);
356+
builder.field("features", featureCounts);
357+
builder.field("clients", clientCounts);
358+
builder.startObject("clusters");
359+
{
360+
for (var entry : byRemoteCluster.entrySet()) {
361+
String remoteName = entry.getKey();
362+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
363+
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
352364
}
365+
builder.field(remoteName, entry.getValue());
353366
}
354-
builder.endObject();
355367
}
356368
builder.endObject();
357369
return builder;

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.action.admin.cluster.stats;
1111

1212
import org.elasticsearch.ElasticsearchSecurityException;
13+
import org.elasticsearch.ElasticsearchStatusException;
1314
import org.elasticsearch.ExceptionsHelper;
1415
import org.elasticsearch.ResourceNotFoundException;
1516
import org.elasticsearch.action.ShardOperationFailedException;
@@ -20,6 +21,7 @@
2021
import org.elasticsearch.core.TimeValue;
2122
import org.elasticsearch.search.SearchShardTarget;
2223
import org.elasticsearch.search.query.SearchTimeoutException;
24+
import org.elasticsearch.tasks.Task;
2325
import org.elasticsearch.tasks.TaskCancelledException;
2426

2527
import java.util.Arrays;
@@ -84,6 +86,15 @@ public Builder setClient(String client) {
8486
return this;
8587
}
8688

89+
public Builder setClientFromTask(Task task) {
90+
String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
91+
if (client != null) {
92+
return setClient(client);
93+
} else {
94+
return this;
95+
}
96+
}
97+
8798
public Builder skippedRemote(String remote) {
8899
this.skippedRemotes.add(remote);
89100
return this;
@@ -133,6 +144,10 @@ public static Result getFailureType(Exception e) {
133144
if (ExceptionsHelper.unwrapCorruption(e) != null) {
134145
return Result.CORRUPTION;
135146
}
147+
ElasticsearchStatusException se = (ElasticsearchStatusException) ExceptionsHelper.unwrap(e, ElasticsearchStatusException.class);
148+
if (se != null && se.getDetailedMessage().contains("license")) {
149+
return Result.LICENSE;
150+
}
136151
// This is kind of last resort check - if we still don't know the reason but all shard failures are remote,
137152
// we assume it's remote's fault somehow.
138153
if (e instanceof SearchPhaseExecutionException spe) {

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public enum Result {
4747
TIMEOUT("timeout"),
4848
CORRUPTION("corruption"),
4949
SECURITY("security"),
50+
LICENSE("license"),
5051
// May be helpful if there's a lot of other reasons, and it may be hard to calculate the unknowns for some clients.
5152
UNKNOWN("other");
5253

@@ -106,8 +107,14 @@ public String getName() {
106107

107108
private final Map<String, LongAdder> clientCounts;
108109
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
110+
// Should we calculate separate metrics per MRT?
111+
private final boolean useMRT;
109112

110113
public CCSUsageTelemetry() {
114+
this(true);
115+
}
116+
117+
public CCSUsageTelemetry(boolean useMRT) {
111118
this.byRemoteCluster = new ConcurrentHashMap<>();
112119
totalCount = new LongAdder();
113120
successCount = new LongAdder();
@@ -119,6 +126,7 @@ public CCSUsageTelemetry() {
119126
skippedRemotes = new LongAdder();
120127
featureCounts = new ConcurrentHashMap<>();
121128
clientCounts = new ConcurrentHashMap<>();
129+
this.useMRT = useMRT;
122130
}
123131

124132
public void updateUsage(CCSUsage ccsUsage) {
@@ -134,10 +142,12 @@ private void doUpdate(CCSUsage ccsUsage) {
134142
if (isSuccess(ccsUsage)) {
135143
successCount.increment();
136144
took.record(searchTook);
137-
if (isMRT(ccsUsage)) {
138-
tookMrtTrue.record(searchTook);
139-
} else {
140-
tookMrtFalse.record(searchTook);
145+
if (useMRT) {
146+
if (isMRT(ccsUsage)) {
147+
tookMrtTrue.record(searchTook);
148+
} else {
149+
tookMrtFalse.record(searchTook);
150+
}
141151
}
142152
ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u));
143153
} else {
@@ -243,6 +253,6 @@ public CCSTelemetrySnapshot getCCSTelemetrySnapshot() {
243253
Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)),
244254
Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)),
245255
Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot))
246-
);
256+
).setUseMRT(useMRT);
247257
}
248258
}

0 commit comments

Comments
 (0)