Skip to content

Commit 86005c0

Browse files
committed
Remove page alignment in ExchangeSink
1 parent 6036a1e commit 86005c0

File tree

3 files changed

+6
-16
lines changed

3 files changed

+6
-16
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.io.IOException;
2525
import java.util.Objects;
26-
import java.util.function.Function;
2726
import java.util.function.Supplier;
2827

2928
/**
@@ -32,17 +31,14 @@
3231
public class ExchangeSinkOperator extends SinkOperator {
3332

3433
private final ExchangeSink sink;
35-
private final Function<Page, Page> transformer;
3634
private int pagesReceived;
3735
private long rowsReceived;
3836

39-
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
40-
implements
41-
SinkOperatorFactory {
37+
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {
4238

4339
@Override
4440
public SinkOperator get(DriverContext driverContext) {
45-
return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
41+
return new ExchangeSinkOperator(exchangeSinks.get());
4642
}
4743

4844
@Override
@@ -51,9 +47,8 @@ public String describe() {
5147
}
5248
}
5349

54-
public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
50+
public ExchangeSinkOperator(ExchangeSink sink) {
5551
this.sink = sink;
56-
this.transformer = transformer;
5752
}
5853

5954
@Override
@@ -84,7 +79,7 @@ public boolean needsInput() {
8479
protected void doAddInput(Page page) {
8580
pagesReceived++;
8681
rowsReceived += page.getPositionCount();
87-
sink.addPage(transformer.apply(page));
82+
sink.addPage(page);
8883
}
8984

9085
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec {
2727
);
2828

2929
private final List<Attribute> output;
30+
// TODO: remove this flag
3031
private final boolean intermediateAgg;
3132

3233
public ExchangeSinkExec(Source source, List<Attribute> output, boolean intermediateAgg, PhysicalPlan child) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,8 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution
385385
private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
386386
Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided");
387387
var child = exchangeSink.child();
388-
389388
PhysicalOperation source = plan(child, context);
390-
391-
Function<Page, Page> transformer = exchangeSink.isIntermediateAgg()
392-
? Function.identity()
393-
: alignPageToAttributes(exchangeSink.output(), source.layout);
394-
395-
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
389+
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
396390
}
397391

398392
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {

0 commit comments

Comments
 (0)