Skip to content

Commit 4bd6449

Browse files
authored
[8.16] Fix BWC for ES|QL cluster request (#117864)
We identified a BWC bug in the cluster computer request. Specifically, the indices options were not properly selected for requests from an older querying cluster. This caused the search_shards API on the remote cluster to use restricted indices options, leading to failures when resolving wildcard index patterns. Our tests didn't catch this issue because the current BWC tests for cross-cluster queries only cover one direction: the querying cluster on the current version and the remote cluster on a compatible version. This PR fixes the issue and expands BWC tests to support both directions: the querying cluster on the current version with the remote cluster on a compatible version, and vice versa.
1 parent bfd66d6 commit 4bd6449

File tree

9 files changed

+345
-42
lines changed

9 files changed

+345
-42
lines changed

docs/changelog/117865.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117865
2+
summary: Fix BWC for ES|QL cluster request
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/qa/server/multi-clusters/build.gradle

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,22 @@ def supportedVersion = bwcVersion -> {
2323
}
2424

2525
buildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->
26-
tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
26+
tasks.register("${baseName}#newToOld", StandaloneRestIntegTestTask) {
27+
usesBwcDistribution(bwcVersion)
28+
systemProperty("tests.version.remote_cluster", bwcVersion)
29+
maxParallelForks = 1
30+
}
31+
32+
tasks.register("${baseName}#oldToNew", StandaloneRestIntegTestTask) {
2733
usesBwcDistribution(bwcVersion)
28-
systemProperty("tests.old_cluster_version", bwcVersion)
34+
systemProperty("tests.version.local_cluster", bwcVersion)
35+
maxParallelForks = 1
36+
}
37+
38+
// TODO: avoid running tests twice with the current version
39+
tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
40+
dependsOn tasks.named("${baseName}#oldToNew")
41+
dependsOn tasks.named("${baseName}#newToOld")
2942
maxParallelForks = 1
3043
}
3144
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public static ElasticsearchCluster remoteCluster() {
2020
return ElasticsearchCluster.local()
2121
.name(REMOTE_CLUSTER_NAME)
2222
.distribution(DistributionType.DEFAULT)
23-
.version(Version.fromString(System.getProperty("tests.old_cluster_version")))
23+
.version(distributionVersion("tests.version.remote_cluster"))
2424
.nodes(2)
2525
.setting("node.roles", "[data,ingest,master]")
2626
.setting("xpack.security.enabled", "false")
@@ -34,7 +34,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
3434
return ElasticsearchCluster.local()
3535
.name(LOCAL_CLUSTER_NAME)
3636
.distribution(DistributionType.DEFAULT)
37-
.version(Version.CURRENT)
37+
.version(distributionVersion("tests.version.local_cluster"))
3838
.nodes(2)
3939
.setting("xpack.security.enabled", "false")
4040
.setting("xpack.license.self_generated.type", "trial")
@@ -46,7 +46,18 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
4646
.build();
4747
}
4848

49-
public static org.elasticsearch.Version oldVersion() {
50-
return org.elasticsearch.Version.fromString(System.getProperty("tests.old_cluster_version"));
49+
public static org.elasticsearch.Version localClusterVersion() {
50+
String prop = System.getProperty("tests.version.local_cluster");
51+
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
52+
}
53+
54+
public static org.elasticsearch.Version remoteClusterVersion() {
55+
String prop = System.getProperty("tests.version.remote_cluster");
56+
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
57+
}
58+
59+
private static Version distributionVersion(String key) {
60+
final String val = System.getProperty(key);
61+
return val != null ? Version.fromString(val) : Version.CURRENT;
5162
}
5263
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
1111

1212
import org.apache.http.HttpHost;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.client.RestClient;
1415
import org.elasticsearch.core.IOUtils;
1516
import org.elasticsearch.test.TestClustersThreadFilter;
1617
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1718
import org.elasticsearch.xpack.esql.qa.rest.EsqlRestValidationTestCase;
1819
import org.junit.AfterClass;
20+
import org.junit.Before;
1921
import org.junit.ClassRule;
2022
import org.junit.rules.RuleChain;
2123
import org.junit.rules.TestRule;
@@ -78,4 +80,9 @@ private RestClient remoteClusterClient() throws IOException {
7880
}
7981
return remoteClient;
8082
}
83+
84+
@Before
85+
public void skipTestOnOldVersions() {
86+
assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_16_0));
87+
}
8188
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.apache.http.HttpEntity;
1414
import org.apache.http.HttpHost;
15+
import org.elasticsearch.Version;
1516
import org.elasticsearch.client.Request;
1617
import org.elasticsearch.client.Response;
1718
import org.elasticsearch.client.RestClient;
@@ -105,10 +106,8 @@ protected void shouldSkipTest(String testName) throws IOException {
105106
super.shouldSkipTest(testName);
106107
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
107108
assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
108-
assumeTrue(
109-
"Test " + testName + " is skipped on " + Clusters.oldVersion(),
110-
isEnabled(testName, instructions, Clusters.oldVersion())
111-
);
109+
Version oldVersion = Version.min(Clusters.localClusterVersion(), Clusters.remoteClusterVersion());
110+
assumeTrue("Test " + testName + " is skipped on " + oldVersion, isEnabled(testName, instructions, oldVersion));
112111
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats"));
113112
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats_v2"));
114113
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
1111

1212
import org.apache.http.HttpHost;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.client.Request;
1415
import org.elasticsearch.client.RestClient;
1516
import org.elasticsearch.common.Strings;
@@ -29,7 +30,6 @@
2930
import java.io.IOException;
3031
import java.util.List;
3132
import java.util.Map;
32-
import java.util.Optional;
3333
import java.util.Set;
3434
import java.util.stream.Collectors;
3535
import java.util.stream.IntStream;
@@ -127,10 +127,12 @@ void indexDocs(RestClient client, String index, List<Doc> docs) throws IOExcepti
127127
}
128128

129129
private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
130-
Map<String, Object> resp = runEsql(
131-
new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(includeCCSMetadata).build()
132-
);
133-
logger.info("--> query {} response {}", query, resp);
130+
var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query);
131+
if (includeCCSMetadata) {
132+
queryBuilder.includeCCSMetadata(true);
133+
}
134+
Map<String, Object> resp = runEsql(queryBuilder.build());
135+
logger.info("--> query {} response {}", queryBuilder, resp);
134136
return resp;
135137
}
136138

@@ -156,7 +158,7 @@ private Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder reques
156158

157159
public void testCount() throws Exception {
158160
{
159-
boolean includeCCSMetadata = randomBoolean();
161+
boolean includeCCSMetadata = includeCCSMetadata();
160162
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
161163
var columns = List.of(Map.of("name", "c", "type", "long"));
162164
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
@@ -165,13 +167,16 @@ public void testCount() throws Exception {
165167
if (includeCCSMetadata) {
166168
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
167169
}
168-
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
170+
if (ccsMetadataAvailable()) {
171+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
172+
}
173+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
169174
if (includeCCSMetadata) {
170175
assertClusterDetailsMap(result, false);
171176
}
172177
}
173178
{
174-
boolean includeCCSMetadata = randomBoolean();
179+
boolean includeCCSMetadata = includeCCSMetadata();
175180
Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
176181
var columns = List.of(Map.of("name", "c", "type", "long"));
177182
var values = List.of(List.of(remoteDocs.size()));
@@ -180,7 +185,10 @@ public void testCount() throws Exception {
180185
if (includeCCSMetadata) {
181186
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
182187
}
183-
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
188+
if (ccsMetadataAvailable()) {
189+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
190+
}
191+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
184192
if (includeCCSMetadata) {
185193
assertClusterDetailsMap(result, true);
186194
}
@@ -189,7 +197,7 @@ public void testCount() throws Exception {
189197

190198
public void testUngroupedAggs() throws Exception {
191199
{
192-
boolean includeCCSMetadata = randomBoolean();
200+
boolean includeCCSMetadata = includeCCSMetadata();
193201
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
194202
var columns = List.of(Map.of("name", "total", "type", "long"));
195203
long sum = Stream.concat(localDocs.stream(), remoteDocs.stream()).mapToLong(d -> d.data).sum();
@@ -200,13 +208,16 @@ public void testUngroupedAggs() throws Exception {
200208
if (includeCCSMetadata) {
201209
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
202210
}
203-
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
211+
if (ccsMetadataAvailable()) {
212+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
213+
}
214+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
204215
if (includeCCSMetadata) {
205216
assertClusterDetailsMap(result, false);
206217
}
207218
}
208219
{
209-
boolean includeCCSMetadata = randomBoolean();
220+
boolean includeCCSMetadata = includeCCSMetadata();
210221
Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
211222
var columns = List.of(Map.of("name", "total", "type", "long"));
212223
long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
@@ -216,12 +227,16 @@ public void testUngroupedAggs() throws Exception {
216227
if (includeCCSMetadata) {
217228
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
218229
}
219-
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
230+
if (ccsMetadataAvailable()) {
231+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
232+
}
233+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
220234
if (includeCCSMetadata) {
221235
assertClusterDetailsMap(result, true);
222236
}
223237
}
224238
{
239+
assumeTrue("requires ccs metadata", ccsMetadataAvailable());
225240
Map<String, Object> result = runWithColumnarAndIncludeCCSMetadata("FROM *:test-remote-index | STATS total = SUM(data)");
226241
var columns = List.of(Map.of("name", "total", "type", "long"));
227242
long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
@@ -293,7 +308,7 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO
293308

294309
public void testGroupedAggs() throws Exception {
295310
{
296-
boolean includeCCSMetadata = randomBoolean();
311+
boolean includeCCSMetadata = includeCCSMetadata();
297312
Map<String, Object> result = run(
298313
"FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color",
299314
includeCCSMetadata
@@ -311,13 +326,16 @@ public void testGroupedAggs() throws Exception {
311326
if (includeCCSMetadata) {
312327
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
313328
}
314-
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
329+
if (ccsMetadataAvailable()) {
330+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
331+
}
332+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
315333
if (includeCCSMetadata) {
316334
assertClusterDetailsMap(result, false);
317335
}
318336
}
319337
{
320-
boolean includeCCSMetadata = randomBoolean();
338+
boolean includeCCSMetadata = includeCCSMetadata();
321339
Map<String, Object> result = run(
322340
"FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color",
323341
includeCCSMetadata
@@ -336,29 +354,57 @@ public void testGroupedAggs() throws Exception {
336354
if (includeCCSMetadata) {
337355
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
338356
}
339-
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
357+
if (ccsMetadataAvailable()) {
358+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
359+
}
360+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
340361
if (includeCCSMetadata) {
341362
assertClusterDetailsMap(result, true);
342363
}
343364
}
344365
}
345366

367+
public void testIndexPattern() throws Exception {
368+
{
369+
String indexPattern = randomFrom(
370+
"test-local-index,*:test-remote-index",
371+
"test-local-index,*:test-remote-*",
372+
"test-local-index,*:test-*",
373+
"test-*,*:test-remote-index"
374+
);
375+
Map<String, Object> result = run("FROM " + indexPattern + " | STATS c = COUNT(*)", false);
376+
var columns = List.of(Map.of("name", "c", "type", "long"));
377+
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
378+
MapMatcher mapMatcher = matchesMap();
379+
if (ccsMetadataAvailable()) {
380+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
381+
}
382+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
383+
}
384+
{
385+
String indexPattern = randomFrom("*:test-remote-index", "*:test-remote-*", "*:test-*");
386+
Map<String, Object> result = run("FROM " + indexPattern + " | STATS c = COUNT(*)", false);
387+
var columns = List.of(Map.of("name", "c", "type", "long"));
388+
var values = List.of(List.of(remoteDocs.size()));
389+
390+
MapMatcher mapMatcher = matchesMap();
391+
if (ccsMetadataAvailable()) {
392+
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
393+
}
394+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
395+
}
396+
}
397+
346398
private RestClient remoteClusterClient() throws IOException {
347399
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
348400
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
349401
}
350402

351-
private TestFeatureService remoteFeaturesService() throws IOException {
352-
if (remoteFeaturesService == null) {
353-
try (RestClient remoteClient = remoteClusterClient()) {
354-
var remoteNodeVersions = readVersionsFromNodesInfo(remoteClient);
355-
var semanticNodeVersions = remoteNodeVersions.stream()
356-
.map(ESRestTestCase::parseLegacyVersion)
357-
.flatMap(Optional::stream)
358-
.collect(Collectors.toSet());
359-
remoteFeaturesService = createTestFeatureService(getClusterStateFeatures(remoteClient), semanticNodeVersions);
360-
}
361-
}
362-
return remoteFeaturesService;
403+
private static boolean ccsMetadataAvailable() {
404+
return Clusters.localClusterVersion().onOrAfter(Version.V_8_16_0);
405+
}
406+
407+
private static boolean includeCCSMetadata() {
408+
return ccsMetadataAvailable() && randomBoolean();
363409
}
364410
}

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ public void testBasicEsql() throws IOException {
7676
indexTimestampData(1);
7777

7878
RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)");
79-
requestObjectBuilder().includeCCSMetadata(randomBoolean());
8079
if (Build.current().isSnapshot()) {
8180
builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
8281
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteClusterPlan.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99

1010
import org.elasticsearch.TransportVersions;
1111
import org.elasticsearch.action.OriginalIndices;
12-
import org.elasticsearch.action.support.IndicesOptions;
12+
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
1414
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1515
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1616

1717
import java.io.IOException;
18+
import java.util.Arrays;
19+
import java.util.Objects;
1820

1921
record RemoteClusterPlan(PhysicalPlan plan, String[] targetIndices, OriginalIndices originalIndices) {
2022
static RemoteClusterPlan from(PlanStreamInput planIn) throws IOException {
@@ -24,7 +26,8 @@ static RemoteClusterPlan from(PlanStreamInput planIn) throws IOException {
2426
if (planIn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES)) {
2527
originalIndices = OriginalIndices.readOriginalIndices(planIn);
2628
} else {
27-
originalIndices = new OriginalIndices(planIn.readStringArray(), IndicesOptions.strictSingleIndexNoExpandForbidClosed());
29+
// fallback to the previous behavior
30+
originalIndices = new OriginalIndices(planIn.readStringArray(), SearchRequest.DEFAULT_INDICES_OPTIONS);
2831
}
2932
return new RemoteClusterPlan(plan, targetIndices, originalIndices);
3033
}
@@ -38,4 +41,18 @@ public void writeTo(PlanStreamOutput out) throws IOException {
3841
out.writeStringArray(originalIndices.indices());
3942
}
4043
}
44+
45+
@Override
46+
public boolean equals(Object o) {
47+
if (o == null || getClass() != o.getClass()) return false;
48+
RemoteClusterPlan that = (RemoteClusterPlan) o;
49+
return Objects.equals(plan, that.plan)
50+
&& Objects.deepEquals(targetIndices, that.targetIndices)
51+
&& Objects.equals(originalIndices, that.originalIndices);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(plan, Arrays.hashCode(targetIndices), originalIndices);
57+
}
4158
}

0 commit comments

Comments
 (0)