Skip to content

Commit 86287a4

Browse files
committed
more tests
1 parent f369146 commit 86287a4

File tree

5 files changed

+148
-3
lines changed

5 files changed

+148
-3
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ public MultiClusterSpecIT(
115115
"StatsAndLookupIPAndMessageFromIndex",
116116
"JoinMaskingRegex",
117117
"StatsAndLookupIPFromIndex",
118-
"StatsAndLookupMessageFromIndex"
118+
"StatsAndLookupMessageFromIndex",
119+
"MvJoinKeyOnTheLookupIndexAfterStats",
120+
"MvJoinKeyOnFromAfterStats"
119121
);
120122

121123
@Override

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ protected void populateRuntimeIndex(String clusterAlias, String langName, String
255255
bulk.get();
256256
}
257257

258-
protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException {
258+
protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) {
259259
Client remoteClient = client(clusterAlias);
260260
assertAcked(
261261
remoteClient.admin()
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.test.junit.annotations.TestLogging;
14+
15+
import java.io.IOException;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
20+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
21+
import static org.hamcrest.Matchers.equalTo;
22+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
23+
import static org.hamcrest.Matchers.hasItems;
24+
import static org.hamcrest.Matchers.hasSize;
25+
26+
@TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning")
27+
public class CrossClusterLookupJoinIT extends AbstractCrossClusterTestCase {
28+
29+
public void testLookupJoinAcrossClusters() throws IOException {
30+
setupClustersAndLookups();
31+
32+
try (
33+
EsqlQueryResponse resp = runQuery(
34+
"FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
35+
randomBoolean()
36+
)
37+
) {
38+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
39+
assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag"));
40+
int vIndex = columns.indexOf("v");
41+
int lookupNameIndex = columns.indexOf("lookup_name");
42+
int tagIndex = columns.indexOf("tag");
43+
int lookupTagIndex = columns.indexOf("lookup_tag");
44+
45+
List<List<Object>> values = getValuesList(resp);
46+
assertThat(values, hasSize(20));
47+
for (var row : values) {
48+
assertThat(row, hasSize(7));
49+
Long v = (Long) row.get(vIndex);
50+
assertThat(v, greaterThanOrEqualTo(0L));
51+
if (v < 25) {
52+
assertThat((String) row.get(lookupNameIndex), equalTo("lookup_" + v));
53+
String tag = (String) row.get(tagIndex);
54+
if (tag.equals("local")) {
55+
assertThat(row.get(lookupTagIndex), equalTo("local"));
56+
} else {
57+
assertThat(row.get(lookupTagIndex), equalTo(REMOTE_CLUSTER_1));
58+
}
59+
} else {
60+
assertNull(row.get(lookupNameIndex));
61+
assertNull(row.get(lookupTagIndex));
62+
}
63+
}
64+
65+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
66+
assertCCSExecutionInfoDetails(executionInfo);
67+
}
68+
}
69+
70+
protected Map<String, Object> setupClustersAndLookups() throws IOException {
71+
var setupData = setupClusters(2);
72+
populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10);
73+
populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 25);
74+
return setupData;
75+
}
76+
77+
protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs) {
78+
Client client = client(clusterAlias);
79+
assertAcked(
80+
client.admin()
81+
.indices()
82+
.prepareCreate(indexName)
83+
.setSettings(Settings.builder().put("index.mode", "lookup"))
84+
.setMapping("lookup_key", "type=long", "lookup_name", "type=keyword", "lookup_tag", "type=keyword")
85+
);
86+
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
87+
for (int i = 0; i < numDocs; i++) {
88+
client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag).get();
89+
}
90+
client.admin().indices().prepareRefresh(indexName).get();
91+
}
92+
93+
private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
94+
assertNotNull(executionInfo);
95+
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
96+
assertTrue(executionInfo.isCrossClusterSearch());
97+
List<EsqlExecutionInfo.Cluster> clusters = executionInfo.clusterAliases().stream().map(executionInfo::getCluster).toList();
98+
99+
for (EsqlExecutionInfo.Cluster cluster : clusters) {
100+
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
101+
assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
102+
assertThat(cluster.getSkippedShards(), equalTo(0));
103+
assertThat(cluster.getFailedShards(), equalTo(0));
104+
}
105+
}
106+
107+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,37 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
200200
return MapperUtils.mapUnary(unary, mappedChild);
201201
}
202202

203+
private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child) {
204+
Holder<Boolean> hasFragment = new Holder<>(false);
205+
206+
var childTransformed = child.transformUp(f -> {
207+
// Once we reached FragmentExec, we stuff our Enrich under it
208+
if (f instanceof FragmentExec) {
209+
hasFragment.set(true);
210+
return new FragmentExec(logical);
211+
}
212+
if (f instanceof EnrichExec enrichExec) {
213+
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
214+
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
215+
return enrichExec.child();
216+
}
217+
if (f instanceof UnaryExec unaryExec) {
218+
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) {
219+
return f;
220+
} else {
221+
return unaryExec.child();
222+
}
223+
}
224+
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
225+
return f;
226+
});
227+
228+
if (hasFragment.get()) {
229+
return childTransformed;
230+
}
231+
return null;
232+
}
233+
203234
private PhysicalPlan mapBinary(BinaryPlan bp) {
204235
if (bp instanceof Join join) {
205236
JoinConfig config = join.config();
@@ -218,6 +249,11 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
218249
return new FragmentExec(bp);
219250
}
220251

252+
var leftPlan = mapToFragmentExec(bp, left);
253+
if (leftPlan != null) {
254+
return leftPlan;
255+
}
256+
221257
PhysicalPlan right = map(bp.right());
222258
// if the right is data we can use a hash join directly
223259
if (right instanceof LocalSourceExec localData) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private PreAnalysisResult receiveLookupIndexResolution(
479479
skipClusterOrError(
480480
clusterAlias,
481481
executionInfo,
482-
"Lookup index [" + index + "] is not available in cluster " + EsqlCCSUtils.inClusterName(clusterAlias)
482+
"lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias)
483483
);
484484
}
485485
});

0 commit comments

Comments
 (0)