Skip to content

Commit 49254b0

Browse files
authored
Remove page alignment in exchange sink (#124610)
I see that planning the ExchangeSinkExec takes a few milliseconds when benchmarking simple queries with 10K fields. It spends time checking if we need to realign the incoming pages. However, the exchange has the exact same attributes as its child, so the incoming layout should match its attributes perfectly. This change removes the realignment.
1 parent cb3c357 commit 49254b0

File tree

7 files changed

+15
-23
lines changed

7 files changed

+15
-23
lines changed

docs/changelog/124610.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124610
2+
summary: Remove page alignment in exchange sink
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.io.IOException;
2525
import java.util.Objects;
26-
import java.util.function.Function;
2726
import java.util.function.Supplier;
2827

2928
/**
@@ -32,17 +31,14 @@
3231
public class ExchangeSinkOperator extends SinkOperator {
3332

3433
private final ExchangeSink sink;
35-
private final Function<Page, Page> transformer;
3634
private int pagesReceived;
3735
private long rowsReceived;
3836

39-
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
40-
implements
41-
SinkOperatorFactory {
37+
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {
4238

4339
@Override
4440
public SinkOperator get(DriverContext driverContext) {
45-
return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
41+
return new ExchangeSinkOperator(exchangeSinks.get());
4642
}
4743

4844
@Override
@@ -51,9 +47,8 @@ public String describe() {
5147
}
5248
}
5349

54-
public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
50+
public ExchangeSinkOperator(ExchangeSink sink) {
5551
this.sink = sink;
56-
this.transformer = transformer;
5752
}
5853

5954
@Override
@@ -84,7 +79,7 @@ public boolean needsInput() {
8479
protected void doAddInput(Page page) {
8580
pagesReceived++;
8681
rowsReceived += page.getPositionCount();
87-
sink.addPage(transformer.apply(page));
82+
sink.addPage(page);
8883
}
8984

9085
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.concurrent.CyclicBarrier;
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicInteger;
48-
import java.util.function.Function;
4948
import java.util.function.LongSupplier;
5049

5150
import static org.hamcrest.Matchers.either;
@@ -328,7 +327,7 @@ public void testEarlyTermination() {
328327
final int maxAllowedRows = between(1, 100);
329328
final AtomicInteger processedRows = new AtomicInteger(0);
330329
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
331-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
330+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
332331
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
333332
@Override
334333
public Block eval(Page page) {
@@ -365,7 +364,7 @@ public void testResumeOnEarlyFinish() throws Exception {
365364
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
366365
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
367366
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
368-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
367+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
369368
Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator);
370369
PlainActionFuture<Void> future = new PlainActionFuture<>();
371370
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.Iterator;
4040
import java.util.List;
4141
import java.util.Set;
42-
import java.util.function.Function;
4342
import java.util.stream.Collectors;
4443
import java.util.stream.IntStream;
4544
import java.util.stream.Stream;
@@ -242,7 +241,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
242241
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
243242
intermediateOperatorItr.next()
244243
),
245-
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity())
244+
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}))
246245
)
247246
);
248247
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.atomic.AtomicBoolean;
6767
import java.util.concurrent.atomic.AtomicInteger;
68-
import java.util.function.Function;
6968
import java.util.function.Supplier;
7069
import java.util.stream.Collectors;
7170
import java.util.stream.IntStream;
@@ -305,7 +304,7 @@ Set<Integer> runConcurrentTest(
305304
"sink-" + i,
306305
dc,
307306
seqNoGenerator.get(dc),
308-
new ExchangeSinkOperator(exchangeSink.get(), Function.identity())
307+
new ExchangeSinkOperator(exchangeSink.get())
309308
);
310309
drivers.add(d);
311310
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec {
2727
);
2828

2929
private final List<Attribute> output;
30+
// TODO: remove this flag
3031
private final boolean intermediateAgg;
3132

3233
public ExchangeSinkExec(Source source, List<Attribute> output, boolean intermediateAgg, PhysicalPlan child) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,8 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution
385385
private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
386386
Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided");
387387
var child = exchangeSink.child();
388-
389388
PhysicalOperation source = plan(child, context);
390-
391-
Function<Page, Page> transformer = exchangeSink.isIntermediateAgg()
392-
? Function.identity()
393-
: alignPageToAttributes(exchangeSink.output(), source.layout);
394-
395-
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
389+
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
396390
}
397391

398392
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {

0 commit comments

Comments
 (0)