Skip to content

Commit 72bdb43

Browse files
committed
refactor test
1 parent b12c8a3 commit 72bdb43

File tree

3 files changed

+172
-157
lines changed

3 files changed

+172
-157
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.tasks.Task;
16+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
17+
import org.elasticsearch.test.SkipUnavailableRule;
18+
import org.elasticsearch.usage.UsageService;
19+
import org.junit.Assert;
20+
import org.junit.Rule;
21+
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.ExecutionException;
26+
27+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
29+
30+
public class AbstractCrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase {
31+
private static final Logger LOGGER = LogManager.getLogger(AbstractCrossClustersUsageTelemetryIT.class);
32+
private static final String REMOTE1 = "cluster-a";
33+
private static final String REMOTE2 = "cluster-b";
34+
private static final String LOCAL_INDEX = "logs-1";
35+
private static final String REMOTE_INDEX = "logs-2";
36+
37+
protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException {
38+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
39+
request.query(query);
40+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
41+
request.columnar(randomBoolean());
42+
request.includeCCSMetadata(randomBoolean());
43+
return getTelemetryFromQuery(request, client);
44+
}
45+
46+
protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException,
47+
InterruptedException {
48+
// We want to send search to a specific node (we don't care which one) so that we could
49+
// collect the CCS telemetry from it later
50+
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
51+
// We don't care here too much about the response, we just want to trigger the telemetry collection.
52+
// So we check it's not null and leave the rest to other tests.
53+
if (client != null) {
54+
assertResponse(
55+
cluster(LOCAL_CLUSTER).client(nodeName)
56+
.filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client))
57+
.execute(EsqlQueryAction.INSTANCE, request),
58+
Assert::assertNotNull
59+
);
60+
61+
} else {
62+
assertResponse(cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull);
63+
}
64+
return getTelemetrySnapshot(nodeName);
65+
}
66+
67+
protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception {
68+
// We want to send search to a specific node (we don't care which one) so that we could
69+
// collect the CCS telemetry from it later
70+
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
71+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
72+
request.query(query);
73+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
74+
request.columnar(randomBoolean());
75+
request.includeCCSMetadata(randomBoolean());
76+
77+
ExecutionException ee = expectThrows(
78+
ExecutionException.class,
79+
cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request)::get
80+
);
81+
assertNotNull(ee.getCause());
82+
83+
return getTelemetrySnapshot(nodeName);
84+
}
85+
86+
private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
87+
var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName);
88+
return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot();
89+
}
90+
91+
@Override
92+
protected boolean reuseClusters() {
93+
return false;
94+
}
95+
96+
@Override
97+
protected List<String> remoteClusterAlias() {
98+
return List.of(REMOTE1, REMOTE2);
99+
}
100+
101+
@Rule
102+
public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2);
103+
104+
protected Map<String, Object> setupClusters() {
105+
int numShardsLocal = randomIntBetween(1, 5);
106+
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
107+
108+
int numShardsRemote = randomIntBetween(1, 5);
109+
populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote);
110+
111+
Map<String, Object> clusterInfo = new HashMap<>();
112+
clusterInfo.put("local.num_shards", numShardsLocal);
113+
clusterInfo.put("local.index", LOCAL_INDEX);
114+
clusterInfo.put("remote.num_shards", numShardsRemote);
115+
clusterInfo.put("remote.index", REMOTE_INDEX);
116+
117+
int numShardsRemote2 = randomIntBetween(1, 5);
118+
populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2);
119+
clusterInfo.put("remote2.index", REMOTE_INDEX);
120+
clusterInfo.put("remote2.num_shards", numShardsRemote2);
121+
122+
return clusterInfo;
123+
}
124+
125+
void populateLocalIndices(String indexName, int numShards) {
126+
Client localClient = client(LOCAL_CLUSTER);
127+
assertAcked(
128+
localClient.admin()
129+
.indices()
130+
.prepareCreate(indexName)
131+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
132+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
133+
);
134+
for (int i = 0; i < 10; i++) {
135+
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
136+
}
137+
localClient.admin().indices().prepareRefresh(indexName).get();
138+
}
139+
140+
void populateRemoteIndices(String clusterAlias, String indexName, int numShards) {
141+
Client remoteClient = client(clusterAlias);
142+
assertAcked(
143+
remoteClient.admin()
144+
.indices()
145+
.prepareCreate(indexName)
146+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
147+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
148+
);
149+
for (int i = 0; i < 10; i++) {
150+
remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get();
151+
}
152+
remoteClient.admin().indices().prepareRefresh(indexName).get();
153+
}
154+
155+
@Override
156+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
157+
var map = skipOverride.getMap();
158+
LOGGER.info("Using skip_unavailable map: [{}]", map);
159+
return map;
160+
}
161+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java

Lines changed: 9 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,24 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10-
import org.apache.logging.log4j.LogManager;
11-
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
13-
import org.elasticsearch.client.internal.Client;
14-
import org.elasticsearch.common.settings.Settings;
1510
import org.elasticsearch.plugins.Plugin;
16-
import org.elasticsearch.tasks.Task;
17-
import org.elasticsearch.test.AbstractMultiClustersTestCase;
18-
import org.elasticsearch.test.SkipUnavailableRule;
19-
import org.elasticsearch.usage.UsageService;
20-
import org.junit.Assert;
21-
import org.junit.Rule;
2211

2312
import java.util.ArrayList;
2413
import java.util.Collection;
25-
import java.util.HashMap;
2614
import java.util.List;
27-
import java.util.Map;
28-
import java.util.concurrent.ExecutionException;
2915

3016
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
31-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
32-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
3317
import static org.hamcrest.Matchers.equalTo;
3418

35-
public class CrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase {
36-
private static final Logger LOGGER = LogManager.getLogger(CrossClustersUsageTelemetryIT.class);
37-
private static final String REMOTE1 = "cluster-a";
38-
private static final String REMOTE2 = "cluster-b";
39-
private static final String LOCAL_INDEX = "logs-1";
40-
private static final String REMOTE_INDEX = "logs-2";
19+
public class CrossClustersUsageTelemetryIT extends AbstractCrossClustersUsageTelemetryIT {
20+
21+
@Override
22+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
23+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
24+
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
25+
plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
26+
return plugins;
27+
}
4128

4229
public void testLocalRemote() throws Exception {
4330
setupClusters();
@@ -66,137 +53,4 @@ public void testLocalRemote() throws Exception {
6653
}
6754

6855
}
69-
70-
protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException {
71-
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
72-
request.query(query);
73-
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
74-
request.columnar(randomBoolean());
75-
request.includeCCSMetadata(randomBoolean());
76-
return getTelemetryFromQuery(request, client);
77-
}
78-
79-
protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException,
80-
InterruptedException {
81-
// We want to send search to a specific node (we don't care which one) so that we could
82-
// collect the CCS telemetry from it later
83-
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
84-
// We don't care here too much about the response, we just want to trigger the telemetry collection.
85-
// So we check it's not null and leave the rest to other tests.
86-
if (client != null) {
87-
assertResponse(
88-
cluster(LOCAL_CLUSTER).client(nodeName)
89-
.filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client))
90-
.execute(EsqlQueryAction.INSTANCE, request),
91-
Assert::assertNotNull
92-
);
93-
94-
} else {
95-
assertResponse(cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull);
96-
}
97-
return getTelemetrySnapshot(nodeName);
98-
}
99-
100-
protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception {
101-
// We want to send search to a specific node (we don't care which one) so that we could
102-
// collect the CCS telemetry from it later
103-
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
104-
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
105-
request.query(query);
106-
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
107-
request.columnar(randomBoolean());
108-
request.includeCCSMetadata(randomBoolean());
109-
110-
ExecutionException ee = expectThrows(
111-
ExecutionException.class,
112-
cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request)::get
113-
);
114-
assertNotNull(ee.getCause());
115-
116-
return getTelemetrySnapshot(nodeName);
117-
}
118-
119-
private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
120-
var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName);
121-
return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot();
122-
}
123-
124-
@Override
125-
protected boolean reuseClusters() {
126-
return false;
127-
}
128-
129-
@Override
130-
protected List<String> remoteClusterAlias() {
131-
return List.of(REMOTE1, REMOTE2);
132-
}
133-
134-
@Rule
135-
public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2);
136-
137-
protected Map<String, Object> setupClusters() {
138-
int numShardsLocal = randomIntBetween(1, 5);
139-
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
140-
141-
int numShardsRemote = randomIntBetween(1, 5);
142-
populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote);
143-
144-
Map<String, Object> clusterInfo = new HashMap<>();
145-
clusterInfo.put("local.num_shards", numShardsLocal);
146-
clusterInfo.put("local.index", LOCAL_INDEX);
147-
clusterInfo.put("remote.num_shards", numShardsRemote);
148-
clusterInfo.put("remote.index", REMOTE_INDEX);
149-
150-
int numShardsRemote2 = randomIntBetween(1, 5);
151-
populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2);
152-
clusterInfo.put("remote2.index", REMOTE_INDEX);
153-
clusterInfo.put("remote2.num_shards", numShardsRemote2);
154-
155-
return clusterInfo;
156-
}
157-
158-
void populateLocalIndices(String indexName, int numShards) {
159-
Client localClient = client(LOCAL_CLUSTER);
160-
assertAcked(
161-
localClient.admin()
162-
.indices()
163-
.prepareCreate(indexName)
164-
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
165-
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
166-
);
167-
for (int i = 0; i < 10; i++) {
168-
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
169-
}
170-
localClient.admin().indices().prepareRefresh(indexName).get();
171-
}
172-
173-
void populateRemoteIndices(String clusterAlias, String indexName, int numShards) {
174-
Client remoteClient = client(clusterAlias);
175-
assertAcked(
176-
remoteClient.admin()
177-
.indices()
178-
.prepareCreate(indexName)
179-
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
180-
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
181-
);
182-
for (int i = 0; i < 10; i++) {
183-
remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get();
184-
}
185-
remoteClient.admin().indices().prepareRefresh(indexName).get();
186-
}
187-
188-
@Override
189-
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
190-
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
191-
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
192-
plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
193-
return plugins;
194-
}
195-
196-
@Override
197-
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
198-
var map = skipOverride.getMap();
199-
LOGGER.info("Using skip_unavailable map: [{}]", map);
200-
return map;
201-
}
20256
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
import static org.hamcrest.Matchers.equalTo;
1919

20-
public class CrossClustersUsageTelemetryNoLicenseIT extends CrossClustersUsageTelemetryIT {
20+
public class CrossClustersUsageTelemetryNoLicenseIT extends AbstractCrossClustersUsageTelemetryIT {
2121

2222
@Override
2323
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
2424
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
25-
plugins.remove(EsqlPluginWithEnterpriseOrTrialLicense.class);
2625
plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class);
26+
plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
2727
return plugins;
2828
}
2929

0 commit comments

Comments
 (0)