Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
59cfd29
Add ESQL telemetry collection
smalyshev Nov 21, 2024
095f342
Move object name out of CCSTelemetrySnapshot
smalyshev Nov 21, 2024
7233eda
Add esql telemetry to stats output
smalyshev Nov 21, 2024
4b2882f
Update docs/changelog/117282.yaml
smalyshev Nov 21, 2024
bbf6dca
Spotless
smalyshev Nov 22, 2024
f193a4d
Add wrappings to capture more errors
smalyshev Nov 22, 2024
5137537
Add test (more to come)
smalyshev Nov 26, 2024
20f91c5
Merge branch 'main' into esql-telemetry
smalyshev Dec 12, 2024
5a84912
Add comment
smalyshev Dec 12, 2024
9d42179
Fix test
smalyshev Dec 12, 2024
fd812df
Merge branch 'main' into esql-telemetry
smalyshev Dec 13, 2024
b12c8a3
More tests
smalyshev Dec 13, 2024
72bdb43
refactor test
smalyshev Dec 14, 2024
a4e9f01
Unit tests
smalyshev Dec 17, 2024
1634ab4
Merge branch 'main' into esql-telemetry
smalyshev Dec 23, 2024
02f2c7f
More tests for telemetry
smalyshev Dec 23, 2024
5bd9438
More tests
smalyshev Dec 23, 2024
550107e
Update changelog
smalyshev Dec 23, 2024
7cd62b0
Update docs
smalyshev Dec 23, 2024
fce04d5
Add REST YAML test
smalyshev Dec 23, 2024
f1e7ed8
Merge branch 'main' into esql-telemetry
smalyshev Dec 23, 2024
7ea40d8
fix test
smalyshev Dec 23, 2024
402b9a6
Move REST test
smalyshev Dec 26, 2024
59ccda8
Merge branch 'main' into esql-telemetry
smalyshev Dec 26, 2024
58a72e1
Merge branch 'main' into esql-telemetry
smalyshev Dec 26, 2024
8ea56da
Merge branch 'main' into esql-telemetry
smalyshev Dec 27, 2024
0ac116e
Pull feedback
smalyshev Dec 27, 2024
ef59b3d
Add capabilities check for rest test
smalyshev Dec 27, 2024
d96401e
Update docs/changelog/119474.yaml
smalyshev Jan 2, 2025
c2f6932
Update 119474.yaml
smalyshev Jan 2, 2025
11afa6c
Delete docs/changelog/117282.yaml
smalyshev Jan 2, 2025
5c10594
Update docs/reference/cluster/stats.asciidoc
smalyshev Jan 2, 2025
537267a
Pull feedback
smalyshev Jan 2, 2025
be9e6ae
Merge branch 'main' into esql-telemetry
smalyshev Jan 2, 2025
4bfb1c7
Update docs/reference/cluster/stats.asciidoc
smalyshev Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119474
summary: "Add ES|QL cross-cluster query telemetry collection"
area: ES|QL
type: enhancement
issues: []
7 changes: 5 additions & 2 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Returns cluster statistics.

* If the {es} {security-features} are enabled, you must have the `monitor` or
`manage` <<privileges-list-cluster,cluster privilege>> to use this API.

[[cluster-stats-api-desc]]
==== {api-description-title}

Expand Down Expand Up @@ -1397,7 +1396,7 @@ as a human-readable string.


`_search`:::
(object) Contains the information about the <<modules-cross-cluster-search, {ccs}>> usage in the cluster.
(object) Contains the information about the <<modules-cross-cluster-search, {ccs}>> usage.
+
.Properties of `_search`
[%collapsible%open]
Expand Down Expand Up @@ -1528,7 +1527,11 @@ This may include requests where partial results were returned, but not requests

=======


======
`_esql`:::
(object) Contains information about <<esql-cross-clusters,{esql} {ccs}>> usage.
The structure of the object is the same as the `_search` object above.

=====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,19 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SkipUnavailableRule;
import org.elasticsearch.test.SkipUnavailableRule.NotSkipped;
import org.elasticsearch.usage.UsageService;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Arrays;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.MRT_FEATURE;
Expand Down Expand Up @@ -498,7 +490,7 @@ public void testRemoteOnlyTimesOut() throws Exception {
assertThat(perCluster.get(REMOTE2), equalTo(null));
}

@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteTimesOutFailure() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
Expand Down Expand Up @@ -528,7 +520,7 @@ public void testRemoteTimesOutFailure() throws Exception {
/**
* Search when all the remotes failed and not skipped
*/
@SkipOverride(aliases = { REMOTE1, REMOTE2 })
@NotSkipped(aliases = { REMOTE1, REMOTE2 })
public void testFailedAllRemotesSearch() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String localIndex = (String) testClusterInfo.get("local.index");
Expand Down Expand Up @@ -577,7 +569,7 @@ public void testRemoteHasNoIndex() throws Exception {
/**
* Test that we're still counting remote search even if remote cluster has no such index
*/
@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteHasNoIndexFailure() throws Exception {
SearchRequest searchRequest = makeSearchRequest(REMOTE1 + ":no_such_index");
CCSTelemetrySnapshot telemetry = getTelemetryFromFailedSearch(searchRequest);
Expand Down Expand Up @@ -695,40 +687,4 @@ private void indexDocs(Client client, String index, ActionListener<Void> listene
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute(listener.safeMap(r -> null));
}

/**
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@interface SkipOverride {
String[] aliases();
}

/**
* Test rule to process skip annotations
*/
static class SkipUnavailableRule implements TestRule {
private final Map<String, Boolean> skipMap;

SkipUnavailableRule(String... clusterAliases) {
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
}

public Map<String, Boolean> getMap() {
return skipMap;
}

@Override
public Statement apply(Statement base, Description description) {
// Check for annotation named "SkipOverride" and set the overrides accordingly
var aliases = description.getAnnotation(SkipOverride.class);
if (aliases != null) {
for (String alias : aliases.aliases()) {
skipMap.put(alias, false);
}
}
return base;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0);
public static final TransportVersion TRANSFORMS_UPGRADE_MODE = def(8_814_00_0);
public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0);
public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
* <br>
*/
public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment {
public static final String CCS_TELEMETRY_FIELD_NAME = "_search";
private long totalCount;
private long successCount;
private final Map<String, Long> failureReasons;
Expand All @@ -66,6 +65,9 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment

private final Map<String, Long> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Whether we should use per-MRT (minimize roundtrips) metrics.
// ES|QL does not have "minimize_roundtrips" option, so we don't collect those metrics for ES|QL usage.
private boolean useMRT = true;

/**
* Creates a new stats instance with the provided info.
Expand Down Expand Up @@ -191,6 +193,11 @@ public Map<String, PerClusterCCSTelemetry> getByRemoteCluster() {
return Collections.unmodifiableMap(byRemoteCluster);
}

public CCSTelemetrySnapshot setUseMRT(boolean useMRT) {
this.useMRT = useMRT;
return this;
}

public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment {
private long count;
private long skippedCount;
Expand Down Expand Up @@ -270,6 +277,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(count, skippedCount, took);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

/**
Expand All @@ -291,8 +303,10 @@ public void add(CCSTelemetrySnapshot stats) {
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
took.add(stats.took);
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
if (useMRT) {
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
}
remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax);
if (totalCount > 0 && oldCount > 0) {
// Weighted average
Expand Down Expand Up @@ -328,30 +342,28 @@ private static void publishLatency(XContentBuilder builder, String name, LongMet

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(CCS_TELEMETRY_FIELD_NAME);
{
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
if (useMRT) {
publishLatency(builder, "took_mrt_true", tookMrtTrue);
publishLatency(builder, "took_mrt_false", tookMrtFalse);
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.endObject();
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ShardOperationFailedException;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.query.SearchTimeoutException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.Arrays;
Expand Down Expand Up @@ -84,6 +86,15 @@ public Builder setClient(String client) {
return this;
}

public Builder setClientFromTask(Task task) {
String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
if (client != null) {
return setClient(client);
} else {
return this;
}
}

public Builder skippedRemote(String remote) {
this.skippedRemotes.add(remote);
return this;
Expand Down Expand Up @@ -133,6 +144,10 @@ public static Result getFailureType(Exception e) {
if (ExceptionsHelper.unwrapCorruption(e) != null) {
return Result.CORRUPTION;
}
ElasticsearchStatusException se = (ElasticsearchStatusException) ExceptionsHelper.unwrap(e, ElasticsearchStatusException.class);
if (se != null && se.getDetailedMessage().contains("license")) {
return Result.LICENSE;
}
// This is kind of last resort check - if we still don't know the reason but all shard failures are remote,
// we assume it's remote's fault somehow.
if (e instanceof SearchPhaseExecutionException spe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public enum Result {
TIMEOUT("timeout"),
CORRUPTION("corruption"),
SECURITY("security"),
LICENSE("license"),
// May be helpful if there's a lot of other reasons, and it may be hard to calculate the unknowns for some clients.
UNKNOWN("other");

Expand Down Expand Up @@ -106,8 +107,14 @@ public String getName() {

private final Map<String, LongAdder> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Should we calculate separate metrics per MRT?
private final boolean useMRT;

public CCSUsageTelemetry() {
this(true);
}

public CCSUsageTelemetry(boolean useMRT) {
this.byRemoteCluster = new ConcurrentHashMap<>();
totalCount = new LongAdder();
successCount = new LongAdder();
Expand All @@ -119,6 +126,7 @@ public CCSUsageTelemetry() {
skippedRemotes = new LongAdder();
featureCounts = new ConcurrentHashMap<>();
clientCounts = new ConcurrentHashMap<>();
this.useMRT = useMRT;
}

public void updateUsage(CCSUsage ccsUsage) {
Expand All @@ -134,10 +142,12 @@ private void doUpdate(CCSUsage ccsUsage) {
if (isSuccess(ccsUsage)) {
successCount.increment();
took.record(searchTook);
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
if (useMRT) {
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
}
}
ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u));
} else {
Expand Down Expand Up @@ -243,6 +253,6 @@ public CCSTelemetrySnapshot getCCSTelemetrySnapshot() {
Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot))
);
).setUseMRT(useMRT);
}
}
Loading