Skip to content

Commit ca904ce

Browse files
authored
ES|QL: Fix ProjectAwayColumns for FORK (#128839)
1 parent 43a9e6f commit ca904ce

File tree

7 files changed

+142
-34
lines changed

7 files changed

+142
-34
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
4747
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
4848
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
49-
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V5;
49+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V6;
5050
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
5151
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
5252
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7;
@@ -132,7 +132,7 @@ protected void shouldSkipTest(String testName) throws IOException {
132132
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
133133
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
134134
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
135-
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V5.capabilityName()));
135+
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V6.capabilityName()));
136136
}
137137

138138
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44

55
simpleFork
6-
required_capability: fork_v5
6+
required_capability: fork_v6
77

88
FROM employees
99
| FORK ( WHERE emp_no == 10001 )
@@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
1818
;
1919

2020
forkWithWhereSortAndLimit
21-
required_capability: fork_v5
21+
required_capability: fork_v6
2222

2323
FROM employees
2424
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
@@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
3838
;
3939

4040
fiveFork
41-
required_capability: fork_v5
41+
required_capability: fork_v6
4242

4343
FROM employees
4444
| FORK ( WHERE emp_no == 10005 )
@@ -59,7 +59,7 @@ fork5 | 10001
5959
;
6060

6161
forkWithWhereSortDescAndLimit
62-
required_capability: fork_v5
62+
required_capability: fork_v6
6363

6464
FROM employees
6565
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
@@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
7676
;
7777

7878
forkWithCommonPrefilter
79-
required_capability: fork_v5
79+
required_capability: fork_v6
8080

8181
FROM employees
8282
| WHERE emp_no > 10050
@@ -94,7 +94,7 @@ fork2 | 10100
9494
;
9595

9696
forkWithSemanticSearchAndScore
97-
required_capability: fork_v5
97+
required_capability: fork_v6
9898
required_capability: semantic_text_field_caps
9999
required_capability: metadata_score
100100

@@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
114114
;
115115

116116
forkWithEvals
117-
required_capability: fork_v5
117+
required_capability: fork_v6
118118

119119
FROM employees
120120
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
@@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
131131
;
132132

133133
forkWithStats
134-
required_capability: fork_v5
134+
required_capability: fork_v6
135135

136136
FROM employees
137137
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
@@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
152152
;
153153

154154
forkWithDissect
155-
required_capability: fork_v5
155+
required_capability: fork_v6
156156

157157
FROM employees
158158
| WHERE emp_no == 10048 OR emp_no == 10081
@@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
172172
;
173173

174174
forkWithMixOfCommands
175-
required_capability: fork_v5
175+
required_capability: fork_v6
176176

177177
FROM employees
178178
| WHERE emp_no == 10048 OR emp_no == 10081
@@ -197,7 +197,7 @@ fork4 | 10081 | abc | aaa | null | null
197197
;
198198

199199
forkWithFiltersOnConstantValues
200-
required_capability: fork_v5
200+
required_capability: fork_v6
201201

202202
FROM employees
203203
| EVAL z = 1
@@ -218,7 +218,7 @@ fork3 | null | 100 | 10100 | 10001
218218
;
219219

220220
forkWithUnsupportedAttributes
221-
required_capability: fork_v5
221+
required_capability: fork_v6
222222

223223
FROM heights
224224
| FORK (SORT description DESC | LIMIT 1 | EVAL x = length(description) )
@@ -230,3 +230,45 @@ description:keyword | height_range:unsupported | x:integer | _fork:keyword
230230
Very Tall | null | 9 | fork1
231231
Medium Height | null | null | fork2
232232
;
233+
234+
forkAfterLookupJoin
235+
required_capability: fork_v6
236+
237+
FROM employees
238+
| EVAL language_code = languages
239+
| LOOKUP JOIN languages_lookup ON language_code
240+
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
241+
(WHERE emp_no == 10081 OR emp_no == 10087)
242+
(WHERE emp_no == 10081 | EVAL language_name = "Klingon")
243+
| KEEP _fork, emp_no, language_code, language_name
244+
| SORT _fork, emp_no
245+
;
246+
247+
_fork:keyword | emp_no:integer | language_code:integer | language_name:keyword
248+
fork1 | 10048 | 3 | Spanish
249+
fork1 | 10081 | 2 | French
250+
fork2 | 10081 | 2 | French
251+
fork2 | 10087 | 5 | null
252+
fork3 | 10081 | 2 | Klingon
253+
;
254+
255+
forkBeforeLookupJoin
256+
required_capability: fork_v6
257+
258+
FROM employees
259+
| EVAL language_code = languages
260+
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
261+
(WHERE emp_no == 10081 OR emp_no == 10087)
262+
(WHERE emp_no == 10081 | EVAL language_name = "Klingon")
263+
| LOOKUP JOIN languages_lookup ON language_code
264+
| KEEP _fork, emp_no, language_code, language_name
265+
| SORT _fork, emp_no
266+
;
267+
268+
_fork:keyword | emp_no:integer | language_code:integer | language_name:keyword
269+
fork1 | 10048 | 3 | Spanish
270+
fork1 | 10081 | 2 | French
271+
fork2 | 10081 | 2 | French
272+
fork2 | 10087 | 5 | null
273+
fork3 | 10081 | 2 | French
274+
;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,46 @@ public void testWithFilteringOnConstantColumn() {
633633
}
634634
}
635635

636+
public void testWithLookUpJoinBeforeFork() {
637+
var query = """
638+
FROM test
639+
| LOOKUP JOIN test-lookup ON id
640+
| FORK (WHERE id == 2 OR id == 3)
641+
(WHERE id == 1 OR id == 2)
642+
| SORT _fork, id
643+
""";
644+
try (var resp = run(query)) {
645+
assertColumnNames(resp.columns(), List.of("content", "id", "animal", "_fork"));
646+
Iterable<Iterable<Object>> expectedValues = List.of(
647+
List.of("This is a brown dog", 2, "dog", "fork1"),
648+
List.of("This dog is really brown", 3, "dog", "fork1"),
649+
List.of("This is a brown fox", 1, "fox", "fork2"),
650+
List.of("This is a brown dog", 2, "dog", "fork2")
651+
);
652+
assertValues(resp.values(), expectedValues);
653+
}
654+
}
655+
656+
public void testWithLookUpAfterFork() {
657+
var query = """
658+
FROM test
659+
| FORK (WHERE id == 2 OR id == 3)
660+
(WHERE id == 1 OR id == 2)
661+
| LOOKUP JOIN test-lookup ON id
662+
| SORT _fork, id
663+
""";
664+
try (var resp = run(query)) {
665+
assertColumnNames(resp.columns(), List.of("content", "id", "_fork", "animal"));
666+
Iterable<Iterable<Object>> expectedValues = List.of(
667+
List.of("This is a brown dog", 2, "fork1", "dog"),
668+
List.of("This dog is really brown", 3, "fork1", "dog"),
669+
List.of("This is a brown fox", 1, "fork2", "fox"),
670+
List.of("This is a brown dog", 2, "fork2", "dog")
671+
);
672+
assertValues(resp.values(), expectedValues);
673+
}
674+
}
675+
636676
public void testWithEvalWithConflictingTypes() {
637677
var query = """
638678
FROM test
@@ -763,10 +803,10 @@ public void testProfile() {
763803
private void createAndPopulateIndex() {
764804
var indexName = "test";
765805
var client = client().admin().indices();
766-
var CreateRequest = client.prepareCreate(indexName)
806+
var createRequest = client.prepareCreate(indexName)
767807
.setSettings(Settings.builder().put("index.number_of_shards", 1))
768808
.setMapping("id", "type=integer", "content", "type=text");
769-
assertAcked(CreateRequest);
809+
assertAcked(createRequest);
770810
client().prepareBulk()
771811
.add(new IndexRequest(indexName).id("1").source("id", 1, "content", "This is a brown fox"))
772812
.add(new IndexRequest(indexName).id("2").source("id", 2, "content", "This is a brown dog"))
@@ -777,6 +817,23 @@ private void createAndPopulateIndex() {
777817
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
778818
.get();
779819
ensureYellow(indexName);
820+
821+
var lookupIndex = "test-lookup";
822+
createRequest = client.prepareCreate(lookupIndex)
823+
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup"))
824+
.setMapping("id", "type=integer", "animal", "type=keyword");
825+
assertAcked(createRequest);
826+
827+
client().prepareBulk()
828+
.add(new IndexRequest(lookupIndex).id("1").source("id", 1, "animal", "fox"))
829+
.add(new IndexRequest(lookupIndex).id("2").source("id", 2, "animal", "dog"))
830+
.add(new IndexRequest(lookupIndex).id("3").source("id", 3, "animal", "dog"))
831+
.add(new IndexRequest(lookupIndex).id("4").source("id", 4, "animal", "dog"))
832+
.add(new IndexRequest(lookupIndex).id("5").source("id", 5, "animal", "cat"))
833+
.add(new IndexRequest(lookupIndex).id("6").source("id", 6, "animal", List.of("fox", "dog")))
834+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
835+
.get();
836+
ensureYellow(lookupIndex);
780837
}
781838

782839
static Iterator<Iterator<Object>> valuesFilter(Iterator<Iterator<Object>> values, Predicate<Iterator<Object>> filter) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ public enum Cap {
10241024
/**
10251025
* Support streaming of sub plan results
10261026
*/
1027-
FORK_V5(Build.current().isSnapshot()),
1027+
FORK_V6(Build.current().isSnapshot()),
10281028

10291029
/**
10301030
* Support for the {@code leading_zeros} named parameter.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,29 @@ public PhysicalPlan apply(PhysicalPlan plan) {
4444
// and the overall output will not change.
4545
AttributeSet.Builder requiredAttrBuilder = plan.outputSet().asBuilder();
4646

47-
// This will require updating should we choose to have non-unary execution plans in the future.
4847
return plan.transformDown(currentPlanNode -> {
49-
if (currentPlanNode instanceof MergeExec) {
50-
keepTraversing.set(FALSE);
51-
}
52-
5348
if (keepTraversing.get() == false) {
5449
return currentPlanNode;
5550
}
51+
52+
// for non-unary execution plans, we apply the rule for each child
53+
if (currentPlanNode instanceof MergeExec mergeExec) {
54+
keepTraversing.set(FALSE);
55+
List<PhysicalPlan> newChildren = new ArrayList<>();
56+
boolean changed = false;
57+
58+
for (var child : mergeExec.children()) {
59+
var newChild = apply(child);
60+
61+
if (newChild != child) {
62+
changed = true;
63+
}
64+
65+
newChildren.add(newChild);
66+
}
67+
return changed ? new MergeExec(mergeExec.source(), newChildren, mergeExec.output()) : mergeExec;
68+
}
69+
5670
if (currentPlanNode instanceof ExchangeExec exec) {
5771
keepTraversing.set(FALSE);
5872
var child = exec.child();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.xpack.esql.plan.QueryPlan;
3939
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
4040
import org.elasticsearch.xpack.esql.plan.logical.Filter;
41-
import org.elasticsearch.xpack.esql.plan.logical.Project;
4241
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4342
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
4443
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -83,16 +82,12 @@ public class PlannerUtils {
8382
public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) {
8483
var subplans = new Holder<List<PhysicalPlan>>();
8584
PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> {
86-
subplans.set(me.children().stream().map(child -> {
87-
// TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation
88-
child = child.transformUp(FragmentExec.class, f -> {
89-
var logicalFragment = f.fragment();
90-
logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output());
91-
return new FragmentExec(logicalFragment);
92-
});
93-
94-
return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child);
95-
}).toList());
85+
subplans.set(
86+
me.children()
87+
.stream()
88+
.map(child -> (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child))
89+
.toList()
90+
);
9691
return new ExchangeSourceExec(me.source(), me.output(), false);
9792
});
9893

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public final void test() throws Throwable {
310310
);
311311
assumeFalse(
312312
"CSV tests cannot currently handle FORK",
313-
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V5.capabilityName())
313+
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V6.capabilityName())
314314
);
315315
assumeFalse(
316316
"CSV tests cannot currently handle multi_match function that depends on Lucene",

0 commit comments

Comments
 (0)