Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119474
summary: "Add ES|QL cross-cluster query telemetry collection"
area: ES|QL
type: enhancement
issues: []
7 changes: 5 additions & 2 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Returns cluster statistics.

* If the {es} {security-features} are enabled, you must have the `monitor` or
`manage` <<privileges-list-cluster,cluster privilege>> to use this API.

[[cluster-stats-api-desc]]
==== {api-description-title}

Expand Down Expand Up @@ -1397,7 +1396,7 @@ as a human-readable string.
`_search`:::
(object) Contains the information about the <<modules-cross-cluster-search, {ccs}>> usage in the cluster.
(object) Contains information about <<modules-cross-cluster-search, {ccs}>> usage.
+
.Properties of `_search`
[%collapsible%open]
Expand Down Expand Up @@ -1528,7 +1527,11 @@ This may include requests where partial results were returned, but not requests
=======


======
`_esql`:::
(object) Contains information about <<esql-cross-clusters,{esql} {ccs}>> usage.
The structure of the object is the same as the `_search` object above.
=====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,19 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SkipUnavailableRule;
import org.elasticsearch.test.SkipUnavailableRule.NotSkipped;
import org.elasticsearch.usage.UsageService;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Arrays;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.MRT_FEATURE;
Expand Down Expand Up @@ -498,7 +490,7 @@ public void testRemoteOnlyTimesOut() throws Exception {
assertThat(perCluster.get(REMOTE2), equalTo(null));
}

@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteTimesOutFailure() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
Expand Down Expand Up @@ -528,7 +520,7 @@ public void testRemoteTimesOutFailure() throws Exception {
/**
* Search when all the remotes failed and not skipped
*/
@SkipOverride(aliases = { REMOTE1, REMOTE2 })
@NotSkipped(aliases = { REMOTE1, REMOTE2 })
public void testFailedAllRemotesSearch() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String localIndex = (String) testClusterInfo.get("local.index");
Expand Down Expand Up @@ -577,7 +569,7 @@ public void testRemoteHasNoIndex() throws Exception {
/**
* Test that we're still counting remote search even if remote cluster has no such index
*/
@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteHasNoIndexFailure() throws Exception {
SearchRequest searchRequest = makeSearchRequest(REMOTE1 + ":no_such_index");
CCSTelemetrySnapshot telemetry = getTelemetryFromFailedSearch(searchRequest);
Expand Down Expand Up @@ -689,40 +681,4 @@ private int indexDocs(Client client, String index) {
return numDocs;
}

/**
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@interface SkipOverride {
String[] aliases();
}

/**
* Test rule to process skip annotations
*/
static class SkipUnavailableRule implements TestRule {
private final Map<String, Boolean> skipMap;

SkipUnavailableRule(String... clusterAliases) {
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
}

public Map<String, Boolean> getMap() {
return skipMap;
}

@Override
public Statement apply(Statement base, Description description) {
// Check for annotation named "SkipOverride" and set the overrides accordingly
var aliases = description.getAnnotation(SkipOverride.class);
if (aliases != null) {
for (String alias : aliases.aliases()) {
skipMap.put(alias, false);
}
}
return base;
}

}
}
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ static TransportVersion def(int id) {
public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0);
public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0);
public static final TransportVersion TRANSFORMS_UPGRADE_MODE = def(8_814_00_0);
public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0);
public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
* <br>
*/
public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment {
public static final String CCS_TELEMETRY_FIELD_NAME = "_search";
private long totalCount;
private long successCount;
private final Map<String, Long> failureReasons;
Expand All @@ -66,6 +65,9 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment

private final Map<String, Long> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Whether we should use per-MRT (minimize roundtrips) metrics.
// ES|QL does not have "minimize_roundtrips" option, so we don't collect those metrics for ES|QL usage.
private boolean useMRT = true;

/**
* Creates a new stats instance with the provided info.
Expand Down Expand Up @@ -191,6 +193,11 @@ public Map<String, PerClusterCCSTelemetry> getByRemoteCluster() {
return Collections.unmodifiableMap(byRemoteCluster);
}

public CCSTelemetrySnapshot setUseMRT(boolean useMRT) {
this.useMRT = useMRT;
return this;
}

public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment {
private long count;
private long skippedCount;
Expand Down Expand Up @@ -270,6 +277,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(count, skippedCount, took);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

/**
Expand All @@ -291,8 +303,10 @@ public void add(CCSTelemetrySnapshot stats) {
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
took.add(stats.took);
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
if (useMRT) {
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
}
remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax);
if (totalCount > 0 && oldCount > 0) {
// Weighted average
Expand Down Expand Up @@ -328,30 +342,28 @@ private static void publishLatency(XContentBuilder builder, String name, LongMet

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(CCS_TELEMETRY_FIELD_NAME);
{
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
if (useMRT) {
publishLatency(builder, "took_mrt_true", tookMrtTrue);
publishLatency(builder, "took_mrt_false", tookMrtFalse);
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.endObject();
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ShardOperationFailedException;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.query.SearchTimeoutException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.Arrays;
Expand Down Expand Up @@ -84,6 +86,15 @@ public Builder setClient(String client) {
return this;
}

public Builder setClientFromTask(Task task) {
String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
if (client != null) {
return setClient(client);
} else {
return this;
}
}

public Builder skippedRemote(String remote) {
this.skippedRemotes.add(remote);
return this;
Expand Down Expand Up @@ -133,6 +144,10 @@ public static Result getFailureType(Exception e) {
if (ExceptionsHelper.unwrapCorruption(e) != null) {
return Result.CORRUPTION;
}
ElasticsearchStatusException se = (ElasticsearchStatusException) ExceptionsHelper.unwrap(e, ElasticsearchStatusException.class);
if (se != null && se.getDetailedMessage().contains("license")) {
return Result.LICENSE;
}
// This is kind of last resort check - if we still don't know the reason but all shard failures are remote,
// we assume it's remote's fault somehow.
if (e instanceof SearchPhaseExecutionException spe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public enum Result {
TIMEOUT("timeout"),
CORRUPTION("corruption"),
SECURITY("security"),
LICENSE("license"),
// May be helpful if there's a lot of other reasons, and it may be hard to calculate the unknowns for some clients.
UNKNOWN("other");

Expand Down Expand Up @@ -106,8 +107,14 @@ public String getName() {

private final Map<String, LongAdder> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Should we calculate separate metrics per MRT?
private final boolean useMRT;

public CCSUsageTelemetry() {
this(true);
}

public CCSUsageTelemetry(boolean useMRT) {
this.byRemoteCluster = new ConcurrentHashMap<>();
totalCount = new LongAdder();
successCount = new LongAdder();
Expand All @@ -119,6 +126,7 @@ public CCSUsageTelemetry() {
skippedRemotes = new LongAdder();
featureCounts = new ConcurrentHashMap<>();
clientCounts = new ConcurrentHashMap<>();
this.useMRT = useMRT;
}

public void updateUsage(CCSUsage ccsUsage) {
Expand All @@ -134,10 +142,12 @@ private void doUpdate(CCSUsage ccsUsage) {
if (isSuccess(ccsUsage)) {
successCount.increment();
took.record(searchTook);
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
if (useMRT) {
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
}
}
ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u));
} else {
Expand Down Expand Up @@ -243,6 +253,6 @@ public CCSTelemetrySnapshot getCCSTelemetrySnapshot() {
Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot))
);
).setUseMRT(useMRT);
}
}
Loading
Loading