Skip to content

Commit 01be31c

Browse files
committed
WIP
1 parent a4ae763 commit 01be31c

File tree

6 files changed

+47
-23
lines changed

6 files changed

+47
-23
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OutputOperator.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import java.util.List;
1313
import java.util.function.Consumer;
14-
import java.util.function.Function;
1514

1615
/**
1716
* Sink operator that calls a given listener for each page received. The listener receives both the page as well as schema information,
@@ -21,15 +20,12 @@ public class OutputOperator extends SinkOperator {
2120

2221
private final List<String> columns;
2322
private final Consumer<Page> pageConsumer;
24-
private final Function<Page, Page> mapper;
2523

26-
public record OutputOperatorFactory(List<String> columns, Function<Page, Page> mapper, Consumer<Page> pageConsumer)
27-
implements
28-
SinkOperatorFactory {
24+
public record OutputOperatorFactory(List<String> columns, Consumer<Page> pageConsumer) implements SinkOperatorFactory {
2925

3026
@Override
3127
public SinkOperator get(DriverContext driverContext) {
32-
return new OutputOperator(columns, mapper, pageConsumer);
28+
return new OutputOperator(columns, pageConsumer);
3329
}
3430

3531
@Override
@@ -38,9 +34,8 @@ public String describe() {
3834
}
3935
}
4036

41-
public OutputOperator(List<String> columns, Function<Page, Page> mapper, Consumer<Page> pageConsumer) {
37+
public OutputOperator(List<String> columns, Consumer<Page> pageConsumer) {
4238
this.columns = columns;
43-
this.mapper = mapper;
4439
this.pageConsumer = pageConsumer;
4540
}
4641

@@ -63,7 +58,7 @@ public boolean needsInput() {
6358

6459
@Override
6560
protected void doAddInput(Page page) {
66-
pageConsumer.accept(mapper.apply(page));
61+
pageConsumer.accept(page);
6762
}
6863

6964
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OutputOperatorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
public class OutputOperatorTests extends AnyOperatorTestCase {
1919
@Override
2020
protected Operator.OperatorFactory simple() {
21-
return new OutputOperator.OutputOperatorFactory(List.of("a"), p -> p, p -> {});
21+
return new OutputOperator.OutputOperatorFactory(List.of("a"), p -> {});
2222
}
2323

2424
@Override
@@ -32,7 +32,7 @@ protected Matcher<String> expectedToStringOfSimple() {
3232
}
3333

3434
private Operator.OperatorFactory big() {
35-
return new OutputOperator.OutputOperatorFactory(IntStream.range(0, 20).mapToObj(i -> "a" + i).toList(), p -> p, p -> {});
35+
return new OutputOperator.OutputOperatorFactory(IntStream.range(0, 20).mapToObj(i -> "a" + i).toList(), p -> {});
3636
}
3737

3838
private String expectedDescriptionOfBig() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import java.util.Map;
8181
import java.util.Objects;
8282
import java.util.concurrent.Executor;
83-
import java.util.function.Function;
8483
import java.util.stream.IntStream;
8584

8685
/**
@@ -328,7 +327,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
328327
* because the list will never grow mega large.
329328
*/
330329
List<Page> collectedPages = Collections.synchronizedList(new ArrayList<>());
331-
OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), collectedPages::add);
330+
OutputOperator outputOperator = new OutputOperator(List.of(), collectedPages::add);
332331
releasables.add(outputOperator);
333332
Driver driver = new Driver(
334333
"enrich-lookup:" + request.sessionId,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans;
3333
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneFilters;
3434
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneLiteralsInOrderBy;
35+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneOverridingEval;
3536
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantOrderBy;
3637
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantSortClauses;
3738
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters;
@@ -148,8 +149,9 @@ protected static Batch<LogicalPlan> substitutions() {
148149
new ReplaceAliasingEvalWithProject(),
149150
new SkipQueryOnEmptyMappings(),
150151
new SubstituteSpatialSurrogates(),
151-
new ReplaceOrderByExpressionWithEval()
152+
new ReplaceOrderByExpressionWithEval(),
152153
// new NormalizeAggregate(), - waits on https://github.com/elastic/elasticsearch/issues/100634
154+
new PruneOverridingEval()
153155
);
154156
}
155157

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
11+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
12+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
13+
import org.elasticsearch.xpack.esql.plan.logical.Project;
14+
15+
import java.util.List;
16+
17+
public final class PruneOverridingEval extends OptimizerRules.OptimizerRule<Eval> {
18+
public PruneOverridingEval() {
19+
super(OptimizerRules.TransformDirection.UP);
20+
}
21+
22+
@Override
23+
protected LogicalPlan rule(Eval eval) {
24+
final List<Attribute> output = eval.output();
25+
final List<Attribute> childOutput = eval.child().output();
26+
if (output.size() < childOutput.size() + eval.fields().size()) {
27+
return new Project(eval.source(), eval, output);
28+
} else {
29+
return eval;
30+
}
31+
}
32+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -343,15 +343,8 @@ private PhysicalOperation planFieldExtractNode(FieldExtractExec fieldExtractExec
343343
private PhysicalOperation planOutput(OutputExec outputExec, LocalExecutionPlannerContext context) {
344344
PhysicalOperation source = plan(outputExec.child(), context);
345345
var output = outputExec.output();
346-
347-
return source.withSink(
348-
new OutputOperatorFactory(
349-
Expressions.names(output),
350-
alignPageToAttributes(output, source.layout),
351-
outputExec.getPageConsumer()
352-
),
353-
source.layout
354-
);
346+
alignPageToAttributes(output, source.layout);
347+
return source.withSink(new OutputOperatorFactory(Expressions.names(output), outputExec.getPageConsumer()), source.layout);
355348
}
356349

357350
private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs, Layout layout) {
@@ -365,6 +358,9 @@ private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs,
365358
mappedPosition[++index] = layout.get(attribute.id()).channel();
366359
transformRequired |= mappedPosition[index] != index;
367360
}
361+
if (transformRequired) {
362+
throw new AssertionError("transform required attributes are not supported");
363+
}
368364
Function<Page, Page> transformer = transformRequired ? p -> {
369365
var blocks = new Block[mappedPosition.length];
370366
for (int i = 0; i < blocks.length; i++) {

0 commit comments

Comments
 (0)