Skip to content

Commit 5137537

Browse files
committed
Add test (more to come)
1 parent f193a4d commit 5137537

File tree

6 files changed

+280
-12
lines changed

6 files changed

+280
-12
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
6565

6666
private final Map<String, Long> clientCounts;
6767
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
68+
private boolean useMRT = true;
6869

6970
/**
7071
* Creates a new stats instance with the provided info.
@@ -190,6 +191,11 @@ public Map<String, PerClusterCCSTelemetry> getByRemoteCluster() {
190191
return Collections.unmodifiableMap(byRemoteCluster);
191192
}
192193

194+
public CCSTelemetrySnapshot setUseMRT(boolean useMRT) {
195+
this.useMRT = useMRT;
196+
return this;
197+
}
198+
193199
public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment {
194200
private long count;
195201
private long skippedCount;
@@ -290,8 +296,10 @@ public void add(CCSTelemetrySnapshot stats) {
290296
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
291297
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
292298
took.add(stats.took);
293-
tookMrtTrue.add(stats.tookMrtTrue);
294-
tookMrtFalse.add(stats.tookMrtFalse);
299+
if (useMRT) {
300+
tookMrtTrue.add(stats.tookMrtTrue);
301+
tookMrtFalse.add(stats.tookMrtFalse);
302+
}
295303
remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax);
296304
if (totalCount > 0 && oldCount > 0) {
297305
// Weighted average
@@ -331,8 +339,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
331339
builder.field("success", successCount);
332340
builder.field("skipped", skippedRemotes);
333341
publishLatency(builder, "took", took);
334-
publishLatency(builder, "took_mrt_true", tookMrtTrue);
335-
publishLatency(builder, "took_mrt_false", tookMrtFalse);
342+
if (useMRT) {
343+
publishLatency(builder, "took_mrt_true", tookMrtTrue);
344+
publishLatency(builder, "took_mrt_false", tookMrtFalse);
345+
}
336346
builder.field("remotes_per_search_max", remotesPerSearchMax);
337347
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
338348
builder.field("failure_reasons", failureReasons);

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public String getName() {
106106

107107
private final Map<String, LongAdder> clientCounts;
108108
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
109+
// Should we calculate separate metrics per MRT?
110+
private boolean useMRT = true;
109111

110112
public CCSUsageTelemetry() {
111113
this.byRemoteCluster = new ConcurrentHashMap<>();
@@ -121,6 +123,11 @@ public CCSUsageTelemetry() {
121123
clientCounts = new ConcurrentHashMap<>();
122124
}
123125

126+
public CCSUsageTelemetry(boolean useMRT) {
127+
this();
128+
this.useMRT = useMRT;
129+
}
130+
124131
public void updateUsage(CCSUsage ccsUsage) {
125132
assert ccsUsage.getRemotesCount() > 0 : "Expected at least one remote cluster in CCSUsage";
126133
// TODO: fork this to a background thread?
@@ -134,10 +141,12 @@ private void doUpdate(CCSUsage ccsUsage) {
134141
if (isSuccess(ccsUsage)) {
135142
successCount.increment();
136143
took.record(searchTook);
137-
if (isMRT(ccsUsage)) {
138-
tookMrtTrue.record(searchTook);
139-
} else {
140-
tookMrtFalse.record(searchTook);
144+
if (useMRT) {
145+
if (isMRT(ccsUsage)) {
146+
tookMrtTrue.record(searchTook);
147+
} else {
148+
tookMrtFalse.record(searchTook);
149+
}
141150
}
142151
ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u));
143152
} else {
@@ -243,6 +252,6 @@ public CCSTelemetrySnapshot getCCSTelemetrySnapshot() {
243252
Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)),
244253
Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)),
245254
Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot))
246-
);
255+
).setUseMRT(useMRT);
247256
}
248257
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public ClusterStatsResponse(
6262
nodesStats = new ClusterStatsNodes(nodes);
6363
indicesStats = new ClusterStatsIndices(nodes, mappingStats, analysisStats, versionStats);
6464
ccsMetrics = new CCSTelemetrySnapshot();
65-
esqlMetrics = new CCSTelemetrySnapshot();
65+
esqlMetrics = new CCSTelemetrySnapshot().setUseMRT(false);
6666
ClusterHealthStatus status = null;
6767
for (ClusterStatsNodeResponse response : nodes) {
6868
// only the master node populates the status

server/src/main/java/org/elasticsearch/usage/UsageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public UsageService() {
3232
this.handlers = new HashMap<>();
3333
this.searchUsageHolder = new SearchUsageHolder();
3434
this.ccsUsageHolder = new CCSUsageTelemetry();
35-
this.esqlUsageHolder = new CCSUsageTelemetry();
35+
this.esqlUsageHolder = new CCSUsageTelemetry(false);
3636
}
3737

3838
/**
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.common.settings.Setting;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.plugins.Plugin;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
21+
import org.elasticsearch.usage.UsageService;
22+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
23+
import org.junit.Assert;
24+
import org.junit.Rule;
25+
import org.junit.rules.TestRule;
26+
import org.junit.runner.Description;
27+
import org.junit.runners.model.Statement;
28+
29+
import java.lang.annotation.ElementType;
30+
import java.lang.annotation.Retention;
31+
import java.lang.annotation.RetentionPolicy;
32+
import java.lang.annotation.Target;
33+
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.Collection;
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.concurrent.ExecutionException;
40+
import java.util.function.Function;
41+
import java.util.stream.Collectors;
42+
43+
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
44+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
45+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
46+
import static org.hamcrest.Matchers.equalTo;
47+
48+
public class CrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase {
49+
private static final Logger LOGGER = LogManager.getLogger(CrossClustersUsageTelemetryIT.class);
50+
private static final String REMOTE1 = "cluster-a";
51+
private static final String REMOTE2 = "cluster-b";
52+
private static final String LOCAL_INDEX = "logs-1";
53+
private static final String REMOTE_INDEX = "logs-2";
54+
55+
@Override
56+
protected boolean reuseClusters() {
57+
return false;
58+
}
59+
60+
@Override
61+
protected Collection<String> remoteClusterAlias() {
62+
return List.of(REMOTE1, REMOTE2);
63+
}
64+
65+
@Rule
66+
public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2);
67+
68+
public void testLocalRemote() throws Exception {
69+
setupClusters();
70+
var telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana");
71+
72+
assertThat(telemetry.getTotalCount(), equalTo(1L));
73+
assertThat(telemetry.getSuccessCount(), equalTo(1L));
74+
assertThat(telemetry.getFailureReasons().size(), equalTo(0));
75+
assertThat(telemetry.getTook().count(), equalTo(1L));
76+
assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
77+
assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
78+
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
79+
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
80+
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L));
81+
assertThat(telemetry.getClientCounts().size(), equalTo(1));
82+
assertThat(telemetry.getClientCounts().get("kibana"), equalTo(1L));
83+
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(null));
84+
85+
var perCluster = telemetry.getByRemoteCluster();
86+
assertThat(perCluster.size(), equalTo(3));
87+
for (String clusterAlias : remoteClusterAlias()) {
88+
var clusterTelemetry = perCluster.get(clusterAlias);
89+
assertThat(clusterTelemetry.getCount(), equalTo(1L));
90+
assertThat(clusterTelemetry.getSkippedCount(), equalTo(0L));
91+
assertThat(clusterTelemetry.getTook().count(), equalTo(1L));
92+
}
93+
94+
}
95+
96+
private CCSTelemetrySnapshot getTelemetryFromQuery(String query) throws ExecutionException, InterruptedException {
97+
return getTelemetryFromQuery(query, null);
98+
}
99+
100+
private CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException {
101+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
102+
request.query(query);
103+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
104+
request.columnar(randomBoolean());
105+
request.includeCCSMetadata(randomBoolean());
106+
return getTelemetryFromQuery(request, client);
107+
}
108+
109+
private CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException,
110+
InterruptedException {
111+
// We want to send search to a specific node (we don't care which one) so that we could
112+
// collect the CCS telemetry from it later
113+
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
114+
// We don't care here too much about the response, we just want to trigger the telemetry collection.
115+
// So we check it's not null and leave the rest to other tests.
116+
if (client != null) {
117+
assertResponse(
118+
cluster(LOCAL_CLUSTER).client(nodeName)
119+
.filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, "kibana"))
120+
.execute(EsqlQueryAction.INSTANCE, request),
121+
Assert::assertNotNull
122+
);
123+
124+
} else {
125+
assertResponse(cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull);
126+
}
127+
return getTelemetrySnapshot(nodeName);
128+
}
129+
130+
private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
131+
var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName);
132+
return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot();
133+
}
134+
135+
Map<String, Object> setupClusters() {
136+
int numShardsLocal = randomIntBetween(1, 5);
137+
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
138+
139+
int numShardsRemote = randomIntBetween(1, 5);
140+
populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote);
141+
142+
Map<String, Object> clusterInfo = new HashMap<>();
143+
clusterInfo.put("local.num_shards", numShardsLocal);
144+
clusterInfo.put("local.index", LOCAL_INDEX);
145+
clusterInfo.put("remote.num_shards", numShardsRemote);
146+
clusterInfo.put("remote.index", REMOTE_INDEX);
147+
148+
int numShardsRemote2 = randomIntBetween(1, 5);
149+
populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2);
150+
clusterInfo.put("remote2.index", REMOTE_INDEX);
151+
clusterInfo.put("remote2.num_shards", numShardsRemote2);
152+
153+
return clusterInfo;
154+
}
155+
156+
void populateLocalIndices(String indexName, int numShards) {
157+
Client localClient = client(LOCAL_CLUSTER);
158+
assertAcked(
159+
localClient.admin()
160+
.indices()
161+
.prepareCreate(indexName)
162+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
163+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
164+
);
165+
for (int i = 0; i < 10; i++) {
166+
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
167+
}
168+
localClient.admin().indices().prepareRefresh(indexName).get();
169+
}
170+
171+
void populateRemoteIndices(String clusterAlias, String indexName, int numShards) {
172+
Client remoteClient = client(clusterAlias);
173+
assertAcked(
174+
remoteClient.admin()
175+
.indices()
176+
.prepareCreate(indexName)
177+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
178+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
179+
);
180+
for (int i = 0; i < 10; i++) {
181+
remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get();
182+
}
183+
remoteClient.admin().indices().prepareRefresh(indexName).get();
184+
}
185+
186+
@Override
187+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
188+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
189+
plugins.add(EsqlPlugin.class);
190+
plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
191+
return plugins;
192+
}
193+
194+
public static class InternalExchangePlugin extends Plugin {
195+
@Override
196+
public List<Setting<?>> getSettings() {
197+
return List.of(
198+
Setting.timeSetting(
199+
ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING,
200+
TimeValue.timeValueSeconds(30),
201+
Setting.Property.NodeScope
202+
)
203+
);
204+
}
205+
}
206+
207+
@Override
208+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
209+
var map = skipOverride.getMap();
210+
LOGGER.info("Using skip_unavailable map: [{}]", map);
211+
return map;
212+
}
213+
214+
/**
215+
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
216+
*/
217+
@Retention(RetentionPolicy.RUNTIME)
218+
@Target(ElementType.METHOD)
219+
@interface SkipOverride {
220+
String[] aliases();
221+
}
222+
223+
/**
224+
* Test rule to process skip annotations
225+
*/
226+
static class SkipUnavailableRule implements TestRule {
227+
private final Map<String, Boolean> skipMap;
228+
229+
SkipUnavailableRule(String... clusterAliases) {
230+
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
231+
}
232+
233+
public Map<String, Boolean> getMap() {
234+
return skipMap;
235+
}
236+
237+
@Override
238+
public Statement apply(Statement base, Description description) {
239+
// Check for annotation named "SkipOverride" and set the overrides accordingly
240+
var aliases = description.getAnnotation(SkipOverride.class);
241+
if (aliases != null) {
242+
for (String alias : aliases.aliases()) {
243+
skipMap.put(alias, false);
244+
}
245+
}
246+
return base;
247+
}
248+
249+
}
250+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ private void recordCCSTelemetry(Task task, EsqlExecutionInfo executionInfo, Esql
223223
if (request.async()) {
224224
usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
225225
}
226-
usageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE);
227226

228227
AtomicInteger count = new AtomicInteger();
229228
executionInfo.getClusters().forEach((clusterAlias, cluster) -> {

0 commit comments

Comments
 (0)