Skip to content

Commit 02f2c7f

Browse files
committed
More tests for telemetry
1 parent 1634ab4 commit 02f2c7f

File tree

4 files changed

+247
-23
lines changed

4 files changed

+247
-23
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ public boolean equals(Object o) {
275275
public int hashCode() {
276276
return Objects.hash(count, skippedCount, took);
277277
}
278+
279+
@Override
280+
public String toString() {
281+
return Strings.toString(this, true, true);
282+
}
278283
}
279284

280285
/**

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,43 @@
1212
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
1313
import org.elasticsearch.client.internal.Client;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.core.TimeValue;
1516
import org.elasticsearch.tasks.Task;
1617
import org.elasticsearch.test.AbstractMultiClustersTestCase;
1718
import org.elasticsearch.test.SkipUnavailableRule;
1819
import org.elasticsearch.usage.UsageService;
20+
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
1921
import org.junit.Assert;
22+
import org.junit.Before;
2023
import org.junit.Rule;
2124

2225
import java.util.HashMap;
2326
import java.util.List;
2427
import java.util.Map;
2528
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicReference;
2631

32+
import static org.elasticsearch.core.TimeValue.timeValueMillis;
2733
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2834
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
2935

3036
public class AbstractCrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase {
3137
private static final Logger LOGGER = LogManager.getLogger(AbstractCrossClustersUsageTelemetryIT.class);
32-
private static final String REMOTE1 = "cluster-a";
33-
private static final String REMOTE2 = "cluster-b";
34-
private static final String LOCAL_INDEX = "logs-1";
35-
private static final String REMOTE_INDEX = "logs-2";
38+
protected static final String REMOTE1 = "cluster-a";
39+
protected static final String REMOTE2 = "cluster-b";
40+
protected static final String LOCAL_INDEX = "logs-1";
41+
protected static final String REMOTE_INDEX = "logs-2";
42+
// We want to send search to a specific node (we don't care which one) so that we could
43+
// collect the CCS telemetry from it later
44+
protected String queryNode;
45+
46+
@Before
47+
public void setupQueryNode() {
48+
// The tests are set up in a way that all queries within a single test are sent to the same node,
49+
// thus enabling incremental collection of telemetry data, but the node is random for each test.
50+
queryNode = cluster(LOCAL_CLUSTER).getRandomNodeName();
51+
}
3652

3753
protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException {
3854
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
@@ -45,29 +61,57 @@ protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client
4561

4662
protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException,
4763
InterruptedException {
48-
// We want to send search to a specific node (we don't care which one) so that we could
49-
// collect the CCS telemetry from it later
50-
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
5164
// We don't care here too much about the response, we just want to trigger the telemetry collection.
5265
// So we check it's not null and leave the rest to other tests.
5366
if (client != null) {
5467
assertResponse(
55-
cluster(LOCAL_CLUSTER).client(nodeName)
68+
cluster(LOCAL_CLUSTER).client(queryNode)
5669
.filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client))
5770
.execute(EsqlQueryAction.INSTANCE, request),
5871
Assert::assertNotNull
5972
);
6073

6174
} else {
62-
assertResponse(cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull);
75+
assertResponse(cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull);
76+
}
77+
return getTelemetrySnapshot(queryNode);
78+
}
79+
80+
protected CCSTelemetrySnapshot getTelemetryFromAsyncQuery(String query) throws Exception {
81+
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
82+
request.query(query);
83+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
84+
request.columnar(randomBoolean());
85+
request.includeCCSMetadata(randomBoolean());
86+
request.waitForCompletionTimeout(TimeValue.timeValueMillis(100));
87+
request.keepOnCompletion(false);
88+
return getTelemetryFromAsyncQuery(request);
89+
}
90+
91+
protected CCSTelemetrySnapshot getTelemetryFromAsyncQuery(EsqlQueryRequest request) throws Exception {
92+
AtomicReference<String> asyncExecutionId = new AtomicReference<>();
93+
assertResponse(cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request), resp -> {
94+
if (resp.isRunning()) {
95+
assertNotNull("async execution id is null", resp.asyncExecutionId());
96+
asyncExecutionId.set(resp.asyncExecutionId().get());
97+
}
98+
});
99+
if (asyncExecutionId.get() != null) {
100+
assertBusy(() -> {
101+
var getResultsRequest = new GetAsyncResultRequest(asyncExecutionId.get()).setWaitForCompletionTimeout(timeValueMillis(1));
102+
try (
103+
var resp = cluster(LOCAL_CLUSTER).client(queryNode)
104+
.execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest)
105+
.actionGet(30, TimeUnit.SECONDS)
106+
) {
107+
assertFalse(resp.isRunning());
108+
}
109+
});
63110
}
64-
return getTelemetrySnapshot(nodeName);
111+
return getTelemetrySnapshot(queryNode);
65112
}
66113

67114
protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception {
68-
// We want to send search to a specific node (we don't care which one) so that we could
69-
// collect the CCS telemetry from it later
70-
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
71115
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
72116
request.query(query);
73117
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
@@ -76,11 +120,11 @@ protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws
76120

77121
ExecutionException ee = expectThrows(
78122
ExecutionException.class,
79-
cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request)::get
123+
cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request)::get
80124
);
81125
assertNotNull(ee.getCause());
82126

83-
return getTelemetrySnapshot(nodeName);
127+
return getTelemetrySnapshot(queryNode);
84128
}
85129

86130
private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java

Lines changed: 155 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
11+
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
1012
import org.elasticsearch.plugins.Plugin;
13+
import org.elasticsearch.test.SkipUnavailableRule;
1114

1215
import java.util.ArrayList;
1316
import java.util.Collection;
1417
import java.util.List;
18+
import java.util.Map;
1519

1620
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
1721
import static org.hamcrest.Matchers.equalTo;
@@ -26,6 +30,12 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
2630
return plugins;
2731
}
2832

33+
public void assertPerClusterCount(CCSTelemetrySnapshot.PerClusterCCSTelemetry perCluster, long count) {
34+
assertThat(perCluster.getCount(), equalTo(count));
35+
assertThat(perCluster.getSkippedCount(), equalTo(0L));
36+
assertThat(perCluster.getTook().count(), equalTo(count));
37+
}
38+
2939
public void testLocalRemote() throws Exception {
3040
setupClusters();
3141
var telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana");
@@ -46,11 +56,152 @@ public void testLocalRemote() throws Exception {
4656
var perCluster = telemetry.getByRemoteCluster();
4757
assertThat(perCluster.size(), equalTo(3));
4858
for (String clusterAlias : remoteClusterAlias()) {
49-
var clusterTelemetry = perCluster.get(clusterAlias);
50-
assertThat(clusterTelemetry.getCount(), equalTo(1L));
51-
assertThat(clusterTelemetry.getSkippedCount(), equalTo(0L));
52-
assertThat(clusterTelemetry.getTook().count(), equalTo(1L));
59+
assertPerClusterCount(perCluster.get(clusterAlias), 1L);
60+
}
61+
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L);
62+
63+
telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana");
64+
assertThat(telemetry.getTotalCount(), equalTo(2L));
65+
assertThat(telemetry.getClientCounts().get("kibana"), equalTo(2L));
66+
perCluster = telemetry.getByRemoteCluster();
67+
assertThat(perCluster.size(), equalTo(3));
68+
for (String clusterAlias : remoteClusterAlias()) {
69+
assertPerClusterCount(perCluster.get(clusterAlias), 2L);
70+
}
71+
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 2L);
72+
}
73+
74+
public void testLocalOnly() throws Exception {
75+
setupClusters();
76+
// Should not produce any usage info since it's a local search
77+
var telemetry = getTelemetryFromQuery("from logs-* | stats sum (v)", "kibana");
78+
79+
assertThat(telemetry.getTotalCount(), equalTo(0L));
80+
assertThat(telemetry.getSuccessCount(), equalTo(0L));
81+
assertThat(telemetry.getByRemoteCluster().size(), equalTo(0));
82+
}
83+
84+
@SkipUnavailableRule.NotSkipped(aliases = REMOTE1)
85+
public void testFailed() throws Exception {
86+
setupClusters();
87+
// Should not produce any usage info since it's a local search
88+
var telemetry = getTelemetryFromFailedQuery("from no_such_index | stats sum (v)");
89+
90+
assertThat(telemetry.getTotalCount(), equalTo(0L));
91+
assertThat(telemetry.getSuccessCount(), equalTo(0L));
92+
assertThat(telemetry.getByRemoteCluster().size(), equalTo(0));
93+
94+
// One remote is skipped, one is not
95+
telemetry = getTelemetryFromFailedQuery("from logs-*,c*:no_such_index | stats sum (v)");
96+
97+
assertThat(telemetry.getTotalCount(), equalTo(1L));
98+
assertThat(telemetry.getSuccessCount(), equalTo(0L));
99+
assertThat(telemetry.getByRemoteCluster().size(), equalTo(1));
100+
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
101+
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
102+
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L));
103+
Map<String, Long> expectedFailure = Map.of(CCSUsageTelemetry.Result.NOT_FOUND.getName(), 1L);
104+
assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure));
105+
// cluster-b should be skipped
106+
assertThat(telemetry.getByRemoteCluster().get(REMOTE2).getCount(), equalTo(0L));
107+
assertThat(telemetry.getByRemoteCluster().get(REMOTE2).getSkippedCount(), equalTo(1L));
108+
109+
// this is only for cluster-a so no skipped remotes
110+
telemetry = getTelemetryFromFailedQuery("from logs-*,cluster-a:no_such_index | stats sum (v)");
111+
assertThat(telemetry.getTotalCount(), equalTo(2L));
112+
assertThat(telemetry.getSuccessCount(), equalTo(0L));
113+
assertThat(telemetry.getByRemoteCluster().size(), equalTo(1));
114+
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
115+
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
116+
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L));
117+
expectedFailure = Map.of(CCSUsageTelemetry.Result.NOT_FOUND.getName(), 2L);
118+
assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure));
119+
assertThat(telemetry.getByRemoteCluster().size(), equalTo(1));
120+
}
121+
122+
// TODO: enable when skip-up patch is merged
123+
// public void testSkipAllRemotes() throws Exception {
124+
// var telemetry = getTelemetryFromQuery("from logs-*,c*:no_such_index | stats sum (v)", "unknown");
125+
//
126+
// assertThat(telemetry.getTotalCount(), equalTo(1L));
127+
// assertThat(telemetry.getSuccessCount(), equalTo(1L));
128+
// assertThat(telemetry.getFailureReasons().size(), equalTo(0));
129+
// assertThat(telemetry.getTook().count(), equalTo(1L));
130+
// assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
131+
// assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
132+
// assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
133+
// assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
134+
// assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L));
135+
// assertThat(telemetry.getClientCounts().size(), equalTo(0));
136+
//
137+
// var perCluster = telemetry.getByRemoteCluster();
138+
// assertThat(perCluster.size(), equalTo(3));
139+
// for (String clusterAlias : remoteClusterAlias()) {
140+
// var clusterData = perCluster.get(clusterAlias);
141+
// assertThat(clusterData.getCount(), equalTo(0L));
142+
// assertThat(clusterData.getSkippedCount(), equalTo(1L));
143+
// assertThat(clusterData.getTook().count(), equalTo(0L));
144+
// }
145+
// assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L);
146+
// }
147+
148+
public void testRemoteOnly() throws Exception {
149+
setupClusters();
150+
var telemetry = getTelemetryFromQuery("from c*:logs-* | stats sum (v)", "kibana");
151+
152+
assertThat(telemetry.getTotalCount(), equalTo(1L));
153+
assertThat(telemetry.getSuccessCount(), equalTo(1L));
154+
assertThat(telemetry.getFailureReasons().size(), equalTo(0));
155+
assertThat(telemetry.getTook().count(), equalTo(1L));
156+
assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
157+
assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
158+
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
159+
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
160+
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L));
161+
assertThat(telemetry.getClientCounts().size(), equalTo(1));
162+
assertThat(telemetry.getClientCounts().get("kibana"), equalTo(1L));
163+
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(null));
164+
165+
var perCluster = telemetry.getByRemoteCluster();
166+
assertThat(perCluster.size(), equalTo(2));
167+
for (String clusterAlias : remoteClusterAlias()) {
168+
assertPerClusterCount(perCluster.get(clusterAlias), 1L);
169+
}
170+
assertThat(telemetry.getByRemoteCluster().size(), equalTo(2));
171+
}
172+
173+
public void testAsync() throws Exception {
174+
setupClusters();
175+
var telemetry = getTelemetryFromAsyncQuery("from logs-*,c*:logs-* | stats sum (v)");
176+
177+
assertThat(telemetry.getTotalCount(), equalTo(1L));
178+
assertThat(telemetry.getSuccessCount(), equalTo(1L));
179+
assertThat(telemetry.getFailureReasons().size(), equalTo(0));
180+
assertThat(telemetry.getTook().count(), equalTo(1L));
181+
assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
182+
assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
183+
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
184+
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
185+
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L));
186+
assertThat(telemetry.getClientCounts().size(), equalTo(0));
187+
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(1L));
188+
189+
var perCluster = telemetry.getByRemoteCluster();
190+
assertThat(perCluster.size(), equalTo(3));
191+
for (String clusterAlias : remoteClusterAlias()) {
192+
assertPerClusterCount(perCluster.get(clusterAlias), 1L);
53193
}
194+
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L);
54195

196+
// do it again
197+
telemetry = getTelemetryFromAsyncQuery("from logs-*,c*:logs-* | stats sum (v)");
198+
assertThat(telemetry.getTotalCount(), equalTo(2L));
199+
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(2L));
200+
perCluster = telemetry.getByRemoteCluster();
201+
assertThat(perCluster.size(), equalTo(3));
202+
for (String clusterAlias : remoteClusterAlias()) {
203+
assertPerClusterCount(perCluster.get(clusterAlias), 2L);
204+
}
205+
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 2L);
55206
}
56207
}

0 commit comments

Comments
 (0)