Skip to content

Commit b5ed06c

Browse files
committed
Add assertion for exchanges
1 parent fdfe63e commit b5ed06c

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -312,14 +312,14 @@ private PhysicalOperation planOutput(OutputExec outputExec, LocalExecutionPlanne
312312
return source.withSink(
313313
new OutputOperatorFactory(
314314
Expressions.names(output),
315-
alignPageToAttributes(output, source.layout),
315+
alignPageToAttributes(false, output, source.layout),
316316
outputExec.getPageConsumer()
317317
),
318318
source.layout
319319
);
320320
}
321321

322-
private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs, Layout layout) {
322+
private static Function<Page, Page> alignPageToAttributes(boolean exchange, List<Attribute> attrs, Layout layout) {
323323
// align the page layout with the operator output
324324
// extraction order - the list ordinal is the same as the column one
325325
// while the value represents the position in the original page
@@ -330,17 +330,21 @@ private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs,
330330
mappedPosition[++index] = layout.get(attribute.id()).channel();
331331
transformRequired |= mappedPosition[index] != index;
332332
}
333-
Function<Page, Page> transformer = transformRequired ? p -> {
333+
if (transformRequired == false) {
334+
return Function.identity();
335+
}
336+
if (exchange) {
337+
throw new AssertionError("require alignment attributes [" + attrs + "] layout [" + layout.inverse() + "]");
338+
}
339+
return p -> {
334340
var blocks = new Block[mappedPosition.length];
335341
for (int i = 0; i < blocks.length; i++) {
336342
blocks[i] = p.getBlock(mappedPosition[i]);
337343
blocks[i].incRef();
338344
}
339345
p.releaseBlocks();
340346
return new Page(blocks);
341-
} : Function.identity();
342-
343-
return transformer;
347+
};
344348
}
345349

346350
private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecutionPlannerContext context) {
@@ -355,7 +359,7 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE
355359

356360
Function<Page, Page> transformer = exchangeSink.isIntermediateAgg()
357361
? Function.identity()
358-
: alignPageToAttributes(exchangeSink.output(), source.layout);
362+
: alignPageToAttributes(true, exchangeSink.output(), source.layout);
359363

360364
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
361365
}

0 commit comments

Comments
 (0)