Skip to content

Commit fe0e5c4

Browse files
EQL: remove version limitations for CCS (#91409) (#91537)
1 parent 523b3d9 commit fe0e5c4

File tree

15 files changed

+301
-138
lines changed

15 files changed

+301
-138
lines changed

docs/changelog/91409.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 91409
2+
summary: Remove version limitations for CCS
3+
area: EQL
4+
type: enhancement
5+
issues: []
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
10+
import org.elasticsearch.gradle.Version
11+
import org.elasticsearch.gradle.internal.info.BuildParams
12+
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask
13+
14+
apply plugin: 'elasticsearch.internal-testclusters'
15+
apply plugin: 'elasticsearch.standalone-rest-test'
16+
apply plugin: 'elasticsearch.bwc-test'
17+
apply plugin: 'elasticsearch.rest-resources'
18+
19+
dependencies {
20+
testImplementation project(':client:rest-high-level')
21+
}
22+
23+
BuildParams.bwcVersions.withWireCompatible { bwcVersion, baseName ->
24+
25+
/**
26+
* We execute tests 3 times.
27+
* - The local cluster is unchanged and it consists of two new version nodes.
28+
* - Nodes in the remote cluster are upgraded one by one in three steps.
29+
* - Only node-0 and node-2 of the remote cluster can accept remote connections.
30+
*/
31+
def localCluster = testClusters.register("${baseName}-local") {
32+
testDistribution = 'DEFAULT'
33+
numberOfNodes = 2
34+
versions = [project.version, project.version]
35+
setting 'cluster.remote.node.attr', 'gateway'
36+
setting 'xpack.security.enabled', 'false'
37+
}
38+
def remoteCluster = testClusters.register("${baseName}-remote") {
39+
testDistribution = 'DEFAULT'
40+
numberOfNodes = 3
41+
versions = [bwcVersion.toString(), project.version]
42+
firstNode.setting 'node.attr.gateway', 'true'
43+
lastNode.setting 'node.attr.gateway', 'true'
44+
setting 'xpack.security.enabled', 'false'
45+
}
46+
47+
48+
tasks.withType(StandaloneRestIntegTestTask).matching { it.name.startsWith("${baseName}#") }.configureEach {
49+
useCluster localCluster
50+
useCluster remoteCluster
51+
systemProperty 'tests.upgrade_from_version', bwcVersion.toString().replace('-SNAPSHOT', '')
52+
53+
doFirst {
54+
nonInputProperties.systemProperty('tests.rest.cluster', localCluster.map(c -> c.allHttpSocketURI.join(",")))
55+
nonInputProperties.systemProperty('tests.rest.remote_cluster', remoteCluster.map(c -> c.allHttpSocketURI.join(",")))
56+
}
57+
}
58+
59+
tasks.register("${baseName}#oldClusterTest", StandaloneRestIntegTestTask) {
60+
dependsOn "processTestResources"
61+
mustRunAfter("precommit")
62+
doFirst {
63+
localCluster.get().nextNodeToNextVersion()
64+
}
65+
}
66+
67+
tasks.register("${baseName}#oneThirdUpgraded", StandaloneRestIntegTestTask) {
68+
dependsOn "${baseName}#oldClusterTest"
69+
doFirst {
70+
remoteCluster.get().nextNodeToNextVersion()
71+
}
72+
}
73+
74+
tasks.register("${baseName}#twoThirdUpgraded", StandaloneRestIntegTestTask) {
75+
dependsOn "${baseName}#oneThirdUpgraded"
76+
doFirst {
77+
remoteCluster.get().nextNodeToNextVersion()
78+
}
79+
}
80+
81+
tasks.register("${baseName}#fullUpgraded", StandaloneRestIntegTestTask) {
82+
dependsOn "${baseName}#twoThirdUpgraded"
83+
doFirst {
84+
remoteCluster.get().nextNodeToNextVersion()
85+
}
86+
}
87+
88+
tasks.register(bwcTaskName(bwcVersion)) {
89+
dependsOn tasks.named("${baseName}#fullUpgraded")
90+
}
91+
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.eql.qa.ccs_rolling_upgrade;
9+
10+
import org.apache.http.HttpHost;
11+
import org.apache.http.util.EntityUtils;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.Version;
15+
import org.elasticsearch.action.index.IndexRequest;
16+
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.client.RequestOptions;
18+
import org.elasticsearch.client.Response;
19+
import org.elasticsearch.client.RestClient;
20+
import org.elasticsearch.client.RestHighLevelClient;
21+
import org.elasticsearch.cluster.metadata.IndexMetadata;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.test.rest.ESRestTestCase;
24+
import org.elasticsearch.test.rest.ObjectPath;
25+
26+
import java.io.IOException;
27+
import java.io.UncheckedIOException;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.stream.Collectors;
33+
34+
import static org.hamcrest.Matchers.empty;
35+
import static org.hamcrest.Matchers.hasSize;
36+
import static org.hamcrest.Matchers.not;
37+
38+
/**
39+
* This test ensures that EQL can process CCS requests correctly when the local and remote clusters
40+
* have different but compatible versions.
41+
*/
42+
@SuppressWarnings("removal")
43+
public class EqlCcsRollingUpgradeIT extends ESRestTestCase {
44+
45+
private static final Logger LOGGER = LogManager.getLogger(EqlCcsRollingUpgradeIT.class);
46+
private static final String CLUSTER_ALIAS = "remote_cluster";
47+
48+
record Node(String id, String name, Version version, String transportAddress, String httpAddress, Map<String, Object> attributes) {}
49+
50+
static List<Node> getNodes(RestClient restClient) throws IOException {
51+
Response response = restClient.performRequest(new Request("GET", "_nodes"));
52+
ObjectPath objectPath = ObjectPath.createFromResponse(response);
53+
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
54+
final List<Node> nodes = new ArrayList<>();
55+
for (String id : nodeMap.keySet()) {
56+
final String name = objectPath.evaluate("nodes." + id + ".name");
57+
final Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version"));
58+
final String transportAddress = objectPath.evaluate("nodes." + id + ".transport.publish_address");
59+
final String httpAddress = objectPath.evaluate("nodes." + id + ".http.publish_address");
60+
final Map<String, Object> attributes = objectPath.evaluate("nodes." + id + ".attributes");
61+
nodes.add(new Node(id, name, version, transportAddress, httpAddress, attributes));
62+
}
63+
return nodes;
64+
}
65+
66+
static List<HttpHost> parseHosts(String props) {
67+
final String address = System.getProperty(props);
68+
assertNotNull("[" + props + "] is not configured", address);
69+
String[] stringUrls = address.split(",");
70+
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
71+
for (String stringUrl : stringUrls) {
72+
int portSeparator = stringUrl.lastIndexOf(':');
73+
if (portSeparator < 0) {
74+
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
75+
}
76+
String host = stringUrl.substring(0, portSeparator);
77+
int port = Integer.parseInt(stringUrl.substring(portSeparator + 1));
78+
hosts.add(new HttpHost(host, port, "http"));
79+
}
80+
assertThat("[" + props + "] is empty", hosts, not(empty()));
81+
return hosts;
82+
}
83+
84+
public static void configureRemoteClusters(List<Node> remoteNodes) throws Exception {
85+
assertThat(remoteNodes, hasSize(3));
86+
final String remoteClusterSettingPrefix = "cluster.remote." + CLUSTER_ALIAS + ".";
87+
try (RestClient localClient = newLocalClient().getLowLevelClient()) {
88+
final Settings remoteConnectionSettings;
89+
if (randomBoolean()) {
90+
final List<String> seeds = remoteNodes.stream()
91+
.filter(n -> n.attributes.containsKey("gateway"))
92+
.map(n -> n.transportAddress)
93+
.collect(Collectors.toList());
94+
assertThat(seeds, hasSize(2));
95+
LOGGER.info("--> use sniff mode with seed [{}], remote nodes [{}]", seeds, remoteNodes);
96+
remoteConnectionSettings = Settings.builder()
97+
.putNull(remoteClusterSettingPrefix + "proxy_address")
98+
.put(remoteClusterSettingPrefix + "mode", "sniff")
99+
.putList(remoteClusterSettingPrefix + "seeds", seeds)
100+
.build();
101+
} else {
102+
final Node proxyNode = randomFrom(remoteNodes);
103+
LOGGER.info("--> use proxy node [{}], remote nodes [{}]", proxyNode, remoteNodes);
104+
remoteConnectionSettings = Settings.builder()
105+
.putNull(remoteClusterSettingPrefix + "seeds")
106+
.put(remoteClusterSettingPrefix + "mode", "proxy")
107+
.put(remoteClusterSettingPrefix + "proxy_address", proxyNode.transportAddress)
108+
.build();
109+
}
110+
updateClusterSettings(localClient, remoteConnectionSettings);
111+
assertBusy(() -> {
112+
final Response resp = localClient.performRequest(new Request("GET", "/_remote/info"));
113+
assertOK(resp);
114+
final ObjectPath objectPath = ObjectPath.createFromResponse(resp);
115+
assertNotNull(objectPath.evaluate(CLUSTER_ALIAS));
116+
assertTrue(objectPath.evaluate(CLUSTER_ALIAS + ".connected"));
117+
}, 60, TimeUnit.SECONDS);
118+
}
119+
}
120+
121+
static RestHighLevelClient newLocalClient() {
122+
final List<HttpHost> hosts = parseHosts("tests.rest.cluster");
123+
final int index = random().nextInt(hosts.size());
124+
LOGGER.info("Using client node {}", index);
125+
return new RestHighLevelClient(RestClient.builder(hosts.get(index)));
126+
}
127+
128+
static RestHighLevelClient newRemoteClient() {
129+
return new RestHighLevelClient(RestClient.builder(randomFrom(parseHosts("tests.rest.remote_cluster"))));
130+
}
131+
132+
static int indexDocs(RestHighLevelClient client, String index, int numDocs) throws IOException {
133+
for (int i = 0; i < numDocs; i++) {
134+
client.index(new IndexRequest(index).id("id_" + i).source("f", i, "@timestamp", i), RequestOptions.DEFAULT);
135+
}
136+
137+
refresh(client.getLowLevelClient(), index);
138+
return numDocs;
139+
}
140+
141+
void verify(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs) {
142+
try (RestClient localClient = newLocalClient().getLowLevelClient()) {
143+
144+
Request request = new Request("POST", "/" + randomFrom(remoteIndex, localIndex + "," + remoteIndex) + "/_eql/search");
145+
int size = between(1, 100);
146+
int id1 = between(0, 5);
147+
int id2 = between(6, Math.min(localNumDocs - 1, remoteNumDocs - 1));
148+
request.setJsonEntity(
149+
"{\"query\": \"sequence [any where f == " + id1 + "] [any where f == " + id2 + "] \", \"size\": " + size + "}"
150+
);
151+
Response response = localClient.performRequest(request);
152+
String responseText = EntityUtils.toString(response.getEntity());
153+
assertTrue(responseText.contains("\"sequences\":[{"));
154+
assertTrue(responseText.contains("\"_id\":\"id_" + id1 + "\""));
155+
assertTrue(responseText.contains("\"_id\":\"id_" + id2 + "\""));
156+
} catch (IOException e) {
157+
throw new UncheckedIOException(e);
158+
}
159+
}
160+
161+
public void testSequences() throws Exception {
162+
String localIndex = "test_bwc_search_states_index";
163+
String remoteIndex = "test_bwc_search_states_remote_index";
164+
try (RestHighLevelClient localClient = newLocalClient(); RestHighLevelClient remoteClient = newRemoteClient()) {
165+
createIndex(
166+
localClient.getLowLevelClient(),
167+
localIndex,
168+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build(),
169+
"{\"properties\": {\"@timestamp\": {\"type\": \"date\"}}}",
170+
null
171+
);
172+
int localNumDocs = indexDocs(localClient, localIndex, between(10, 100));
173+
createIndex(
174+
remoteClient.getLowLevelClient(),
175+
remoteIndex,
176+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build(),
177+
"{\"properties\": {\"@timestamp\": {\"type\": \"date\"}}}",
178+
null
179+
);
180+
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 100));
181+
182+
configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()));
183+
int iterations = between(1, 20);
184+
for (int i = 0; i < iterations; i++) {
185+
verify(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs);
186+
}
187+
deleteIndex(localClient.getLowLevelClient(), localIndex);
188+
deleteIndex(remoteClient.getLowLevelClient(), remoteIndex);
189+
}
190+
}
191+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public LogicalPlan analyze(LogicalPlan plan) {
6060
}
6161

6262
private LogicalPlan verify(LogicalPlan plan) {
63-
Collection<Failure> failures = verifier.verify(plan, configuration.versionIncompatibleClusters());
63+
Collection<Failure> failures = verifier.verify(plan);
6464
if (failures.isEmpty() == false) {
6565
throw new VerificationException(failures);
6666
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.eql.analysis;
99

10-
import org.elasticsearch.Version;
1110
import org.elasticsearch.xpack.eql.plan.logical.Head;
1211
import org.elasticsearch.xpack.eql.plan.logical.Join;
1312
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
@@ -20,7 +19,6 @@
2019
import org.elasticsearch.xpack.ql.expression.Attribute;
2120
import org.elasticsearch.xpack.ql.expression.NamedExpression;
2221
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
23-
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
2422
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
2523
import org.elasticsearch.xpack.ql.type.DataTypes;
2624
import org.elasticsearch.xpack.ql.util.StringUtils;
@@ -31,7 +29,6 @@
3129
import java.util.LinkedHashSet;
3230
import java.util.List;
3331
import java.util.Set;
34-
import java.util.function.Function;
3532

3633
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.EVENT;
3734
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN;
@@ -69,7 +66,7 @@ public Verifier(Metrics metrics) {
6966
this.metrics = metrics;
7067
}
7168

72-
Collection<Failure> verify(LogicalPlan plan, Function<String, Collection<String>> versionIncompatibleClusters) {
69+
Collection<Failure> verify(LogicalPlan plan) {
7370
Set<Failure> failures = new LinkedHashSet<>();
7471

7572
// start bottom-up
@@ -151,7 +148,6 @@ Collection<Failure> verify(LogicalPlan plan, Function<String, Collection<String>
151148

152149
checkFilterConditionType(p, localFailures);
153150
checkJoinKeyTypes(p, localFailures);
154-
checkRemoteClusterOnSameVersion(p, versionIncompatibleClusters, localFailures);
155151
// mark the plan as analyzed
156152
// if everything checks out
157153
if (failures.isEmpty()) {
@@ -273,25 +269,4 @@ private static void doCheckKeyTypes(Join join, Set<Failure> localFailures, Named
273269
);
274270
}
275271
}
276-
277-
private void checkRemoteClusterOnSameVersion(
278-
LogicalPlan plan,
279-
Function<String, Collection<String>> versionIncompatibleClusters,
280-
Collection<Failure> localFailures
281-
) {
282-
if (plan instanceof EsRelation esRelation) {
283-
Collection<String> incompatibleClusters = versionIncompatibleClusters.apply(esRelation.index().name());
284-
if (incompatibleClusters.size() > 0) {
285-
localFailures.add(
286-
fail(
287-
esRelation,
288-
"the following remote cluster{} incompatible, being on a version different than local " + "cluster's [{}]: {}",
289-
incompatibleClusters.size() > 1 ? "s are" : " is",
290-
Version.CURRENT,
291-
incompatibleClusters
292-
)
293-
);
294-
}
295-
}
296-
}
297272
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ public static void operation(
232232
request.fetchSize(),
233233
clientId,
234234
new TaskId(nodeId, task.getId()),
235-
task,
236-
remoteClusterRegistry::versionIncompatibleClusters
235+
task
237236
);
238237
executeRequestWithRetryAttempt(
239238
clusterService,

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
1818

1919
import java.time.ZoneId;
20-
import java.util.Collection;
2120
import java.util.List;
2221
import java.util.Map;
23-
import java.util.function.Function;
2422

2523
public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configuration {
2624

@@ -52,10 +50,9 @@ public EqlConfiguration(
5250
int fetchSize,
5351
String clientId,
5452
TaskId taskId,
55-
EqlSearchTask task,
56-
Function<String, Collection<String>> versionIncompatibleClusters
53+
EqlSearchTask task
5754
) {
58-
super(zi, username, clusterName, versionIncompatibleClusters);
55+
super(zi, username, clusterName);
5956

6057
this.indices = indices;
6158
this.filter = filter;

0 commit comments

Comments
 (0)