Skip to content

Commit 606cf66

Browse files
committed
Refactor mapper, allow SORT and ban LIMIT pre-remote join
1 parent 3855484 commit 606cf66

File tree

5 files changed

+19
-18
lines changed

5 files changed

+19
-18
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ public MultiClusterSpecIT(
119119
"StatsAndLookupMessageFromIndex",
120120
"MvJoinKeyOnTheLookupIndexAfterStats",
121121
"MvJoinKeyOnFromAfterStats",
122-
// Lookup join after SORT is not supported in CCS yet
123-
"SortBeforeAndAfterJoin",
124-
"SortEvalBeforeLookup",
125-
"NullifiedJoinKeyToPurgeTheJoin",
126-
"SortBeforeAndAfterMultipleJoinAndMvExpand"
122+
// Lookup join after SORT+LIMIT is not supported in CCS yet
123+
"NullifiedJoinKeyToPurgeTheJoin"
124+
// "SortBeforeAndAfterJoin",
125+
// "SortEvalBeforeLookup",
126+
// "SortBeforeAndAfterMultipleJoinAndMvExpand"
127127
);
128128

129129
@Override
@@ -148,7 +148,7 @@ protected void shouldSkipTest(String testName) throws IOException {
148148
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName()));
149149
if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) {
150150
assumeTrue("LOKUP JOIN not supported", supportsIndexModeLookup());
151-
assumeTrue("LOOKUP JOIN not yet supported in CCS", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.1.0")));
151+
assumeTrue("LOOKUP JOIN not yet supported in CCS", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.2.0")));
152152
}
153153
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
154154
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ private static boolean checkVersion(org.elasticsearch.Version version) {
150150
}
151151

152152
public void testIndicesDontExistWithRemotePattern() throws IOException {
153-
// TODO: add 8.19 if this is merged to 8.x
154-
assumeTrue("Only works with remote LOOKUP JOIN support", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.1.0")));
153+
assumeTrue("Only works with remote LOOKUP JOIN support", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.2.0")));
155154

156155
int docsTest1 = randomIntBetween(1, 5);
157156
indexTimestampData(docsTest1, "test1", "2024-11-26", "id1");

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ emp_no:integer
331331
10001
332332
;
333333

334-
dropAllFieldsUsedInLookupOnTheCoordinator
334+
dropAllFieldsUsedInLookupOnMaybeTheCoordinator
335335
required_capability: join_lookup_v12
336336

337337
FROM employees
@@ -367,7 +367,7 @@ language_code:integer | language_name:keyword | country:text
367367
1 | English | null
368368
;
369369

370-
nonUniqueRightKeyOnTheCoordinatorLateLimit
370+
nonUniqueRightKeyOnMaybeTheCoordinatorLateLimit
371371
required_capability: join_lookup_v12
372372
required_capability: join_lookup_fix_limit_pushdown
373373

@@ -714,7 +714,7 @@ emp_no:integer | language_code:integer | language_name:keyword
714714
10003 | 4 | German
715715
;
716716

717-
filterOnTheDataNodeThenFilterOnTheCoordinator
717+
filterOnTheDataNodeThenFilterOnMaybeTheCoordinator
718718
required_capability: join_lookup_v12
719719

720720
FROM employees

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.xpack.esql.core.tree.Source;
1616
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1717
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
18+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
1819
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1920
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
2021
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
@@ -98,15 +99,15 @@ public void postAnalysisVerification(Failures failures) {
9899
private void checkRemoteJoin(Failures failures) {
99100
boolean[] agg = { false };
100101
boolean[] enrichCoord = { false };
101-
boolean[] sort = { false };
102+
boolean[] limit = { false };
102103

103104
this.forEachUp(UnaryPlan.class, u -> {
104105
if (u instanceof Aggregate) {
105106
agg[0] = true;
106107
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
107108
enrichCoord[0] = true;
108-
} else if (u instanceof OrderBy) {
109-
sort[0] = true;
109+
} else if (u instanceof Limit) {
110+
limit[0] = true;
110111
}
111112
});
112113
if (agg[0]) {
@@ -115,8 +116,8 @@ private void checkRemoteJoin(Failures failures) {
115116
if (enrichCoord[0]) {
116117
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after ENRICH with coordinator policy"));
117118
}
118-
if (sort[0]) {
119-
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after SORT"));
119+
if (limit[0]) {
120+
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after LIMIT"));
120121
}
121122
}
122123

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,13 @@ private PhysicalPlan mapToFragmentExec(Join logical, PhysicalPlan child) {
230230
}
231231
}
232232
if (f instanceof UnaryExec unaryExec) {
233-
if (f instanceof AggregateExec || f instanceof TopNExec) {
233+
if (f instanceof AggregateExec) {
234234
// We can't make a fragment here...
235+
assert logical.isRemote() == false : "Unexpected Aggregate inside remote join";
235236
forceLocal.set(true);
236237
return f;
237238
}
238-
if (f instanceof LimitExec || f instanceof ExchangeExec) {
239+
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) {
239240
return f;
240241
} else {
241242
return unaryExec.child();

0 commit comments

Comments
 (0)