Skip to content

Commit 114d72b

Browse files
smalyshevfzowl
authored andcommitted
Refactor CCS tests (elastic#121547)
* Refactor CCS tests Clean up duplication, move stop tests to separate test
1 parent 417c28f commit 114d72b

File tree

8 files changed

+533
-735
lines changed

8 files changed

+533
-735
lines changed

muted-tests.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -408,15 +408,6 @@ tests:
408408
- class: org.elasticsearch.xpack.transform.checkpoint.TransformCCSCanMatchIT
409409
method: testTransformLifecycle_RangeQueryThatMatchesNoShards
410410
issue: https://github.com/elastic/elasticsearch/issues/121480
411-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT
412-
method: testStopQueryLocal
413-
issue: https://github.com/elastic/elasticsearch/issues/121487
414-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT
415-
method: testSuccessfulPathways
416-
issue: https://github.com/elastic/elasticsearch/issues/121488
417-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT
418-
method: testAsyncQueriesWithLimit0
419-
issue: https://github.com/elastic/elasticsearch/issues/121489
420411
- class: org.elasticsearch.xpack.security.profile.ProfileIntegTests
421412
method: testSuggestProfilesWithHint
422413
issue: https://github.com/elastic/elasticsearch/issues/121116
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
11+
import org.elasticsearch.action.index.IndexRequest;
12+
import org.elasticsearch.action.support.WriteRequest;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
21+
import org.elasticsearch.test.XContentTestUtils;
22+
import org.elasticsearch.transport.RemoteClusterAware;
23+
import org.elasticsearch.xcontent.XContentBuilder;
24+
import org.elasticsearch.xcontent.json.JsonXContent;
25+
import org.junit.Before;
26+
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.Collection;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicLong;
35+
36+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
39+
40+
public abstract class AbstractCrossClusterTestCase extends AbstractMultiClustersTestCase {
41+
protected static final String REMOTE_CLUSTER_1 = "cluster-a";
42+
protected static final String REMOTE_CLUSTER_2 = "remote-b";
43+
protected static final String LOCAL_INDEX = "logs-1";
44+
protected static final String REMOTE_INDEX = "logs-2";
45+
protected static final String INDEX_WITH_BLOCKING_MAPPING = "blocking";
46+
protected static final String INDEX_WITH_FAIL_MAPPING = "failing";
47+
48+
@Override
49+
protected List<String> remoteClusterAlias() {
50+
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
51+
}
52+
53+
@Override
54+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
55+
return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, randomBoolean());
56+
}
57+
58+
@Override
59+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
60+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
61+
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
62+
plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
63+
plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class);
64+
plugins.add(SimplePauseFieldPlugin.class);
65+
plugins.add(FailingPauseFieldPlugin.class);
66+
plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class);
67+
return plugins;
68+
}
69+
70+
public static class InternalExchangePlugin extends Plugin {
71+
@Override
72+
public List<Setting<?>> getSettings() {
73+
return List.of(
74+
Setting.timeSetting(
75+
ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING,
76+
TimeValue.timeValueSeconds(30),
77+
Setting.Property.NodeScope
78+
)
79+
);
80+
}
81+
}
82+
83+
public static class CountingPauseFieldPlugin extends SimplePauseFieldPlugin {
84+
public static AtomicLong count = new AtomicLong(0);
85+
86+
protected String scriptTypeName() {
87+
return "pause_count";
88+
}
89+
90+
public static void resetPlugin() {
91+
count.set(0);
92+
}
93+
94+
@Override
95+
public boolean onWait() throws InterruptedException {
96+
count.incrementAndGet();
97+
return allowEmitting.await(30, TimeUnit.SECONDS);
98+
}
99+
}
100+
101+
@Before
102+
public void resetPlugin() {
103+
SimplePauseFieldPlugin.resetPlugin();
104+
FailingPauseFieldPlugin.resetPlugin();
105+
CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.resetPlugin();
106+
}
107+
108+
protected void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster cluster, int numShards) {
109+
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
110+
assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
111+
assertThat(cluster.getTotalShards(), equalTo(numShards));
112+
assertThat(cluster.getSuccessfulShards(), equalTo(numShards));
113+
assertThat(cluster.getSkippedShards(), equalTo(0));
114+
assertThat(cluster.getFailedShards(), equalTo(0));
115+
assertThat(cluster.getFailures().size(), equalTo(0));
116+
}
117+
118+
protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta, int numClusters) {
119+
try {
120+
final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
121+
final Object clusters = esqlResponseAsMap.get("_clusters");
122+
if (responseExpectMeta) {
123+
assertNotNull(clusters);
124+
// test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response)
125+
@SuppressWarnings("unchecked")
126+
Map<String, Object> inner = (Map<String, Object>) clusters;
127+
assertTrue(inner.containsKey("total"));
128+
assertThat((int) inner.get("total"), equalTo(numClusters));
129+
assertTrue(inner.containsKey("details"));
130+
} else {
131+
assertNull(clusters);
132+
}
133+
} catch (IOException e) {
134+
fail("Could not convert ESQLQueryResponse to Map: " + e);
135+
}
136+
}
137+
138+
protected Map<String, Object> setupClusters(int numClusters) throws IOException {
139+
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
140+
int numShardsLocal = randomIntBetween(1, 5);
141+
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
142+
143+
int numShardsRemote = randomIntBetween(1, 5);
144+
populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
145+
146+
Map<String, Object> clusterInfo = new HashMap<>();
147+
clusterInfo.put("local.num_shards", numShardsLocal);
148+
clusterInfo.put("local.index", LOCAL_INDEX);
149+
clusterInfo.put("remote1.num_shards", numShardsRemote);
150+
clusterInfo.put("remote1.index", REMOTE_INDEX);
151+
clusterInfo.put("remote.num_shards", numShardsRemote);
152+
clusterInfo.put("remote.index", REMOTE_INDEX);
153+
154+
if (numClusters == 3) {
155+
int numShardsRemote2 = randomIntBetween(1, 5);
156+
populateRemoteIndices(REMOTE_CLUSTER_2, REMOTE_INDEX, numShardsRemote2);
157+
clusterInfo.put("remote2.index", REMOTE_INDEX);
158+
clusterInfo.put("remote2.num_shards", numShardsRemote2);
159+
}
160+
161+
String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER_1);
162+
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER_1).clusterService().getClusterSettings().get(skipUnavailableKey);
163+
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
164+
.getClusterSettings()
165+
.get(skipUnavailableSetting);
166+
clusterInfo.put("remote.skip_unavailable", skipUnavailable);
167+
168+
return clusterInfo;
169+
}
170+
171+
protected void populateLocalIndices(String indexName, int numShards) {
172+
Client localClient = client(LOCAL_CLUSTER);
173+
assertAcked(
174+
localClient.admin()
175+
.indices()
176+
.prepareCreate(indexName)
177+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
178+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long")
179+
);
180+
for (int i = 0; i < 10; i++) {
181+
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
182+
}
183+
localClient.admin().indices().prepareRefresh(indexName).get();
184+
}
185+
186+
protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException {
187+
populateRuntimeIndex(clusterAlias, langName, indexName, 10);
188+
}
189+
190+
protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName, int count) throws IOException {
191+
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
192+
mapping.startObject("runtime");
193+
{
194+
mapping.startObject("const");
195+
{
196+
mapping.field("type", "long");
197+
mapping.startObject("script").field("source", "").field("lang", langName).endObject();
198+
}
199+
mapping.endObject();
200+
}
201+
mapping.endObject();
202+
mapping.endObject();
203+
client(clusterAlias).admin().indices().prepareCreate(indexName).setMapping(mapping).get();
204+
BulkRequestBuilder bulk = client(clusterAlias).prepareBulk(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
205+
for (int i = 0; i < count; i++) {
206+
bulk.add(new IndexRequest().source("foo", i));
207+
}
208+
bulk.get();
209+
}
210+
211+
protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException {
212+
Client remoteClient = client(clusterAlias);
213+
assertAcked(
214+
remoteClient.admin()
215+
.indices()
216+
.prepareCreate(indexName)
217+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
218+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
219+
);
220+
for (int i = 0; i < 10; i++) {
221+
remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get();
222+
}
223+
remoteClient.admin().indices().prepareRefresh(indexName).get();
224+
}
225+
226+
protected void setSkipUnavailable(String clusterAlias, boolean skip) {
227+
client(LOCAL_CLUSTER).admin()
228+
.cluster()
229+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
230+
.setPersistentSettings(Settings.builder().put("cluster.remote." + clusterAlias + ".skip_unavailable", skip).build())
231+
.get();
232+
}
233+
234+
protected void clearSkipUnavailable(int numClusters) {
235+
assert numClusters == 2 || numClusters == 3 : "Only 2 or 3 clusters supported";
236+
Settings.Builder settingsBuilder = Settings.builder().putNull("cluster.remote." + REMOTE_CLUSTER_1 + ".skip_unavailable");
237+
if (numClusters == 3) {
238+
settingsBuilder.putNull("cluster.remote." + REMOTE_CLUSTER_2 + ".skip_unavailable");
239+
}
240+
client(LOCAL_CLUSTER).admin()
241+
.cluster()
242+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
243+
.setPersistentSettings(settingsBuilder.build())
244+
.get();
245+
}
246+
247+
protected void clearSkipUnavailable() {
248+
clearSkipUnavailable(3);
249+
}
250+
251+
protected EsqlQueryResponse runQuery(EsqlQueryRequest request) {
252+
return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
253+
}
254+
255+
protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
256+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
257+
request.query(query);
258+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
259+
request.profile(randomInt(5) == 2);
260+
request.columnar(randomBoolean());
261+
if (ccsMetadataInResponse != null) {
262+
request.includeCCSMetadata(ccsMetadataInResponse);
263+
}
264+
return runQuery(request);
265+
}
266+
}

0 commit comments

Comments
 (0)