diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java index 77c70bc3a10f4..20b5372d30ad7 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java @@ -181,9 +181,11 @@ private static Operator operator(DriverContext driverContext, String grouping, S ); default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]"); }; + int pageSize = 16 * 1024; return new HashAggregationOperator( List.of(supplier(op, dataType, filter).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(groups.size()))), - () -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false), + () -> BlockHash.build(groups, driverContext.blockFactory(), pageSize, false), + pageSize, driverContext ); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index c47b6cebdaddc..e6549bce4d632 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -22,6 +22,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -58,12 +59,14 @@ public Operator get(DriverContext driverContext) { analysisRegistry, maxPageSize ), + Integer.MAX_VALUE, // TODO: doesn't support chunk yet driverContext ); } return new HashAggregationOperator( aggregators, () -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false), + maxPageSize, driverContext ); } @@ -78,9 +81,10 @@ public String describe() { } } - private boolean finished; - private Page output; + private final int maxPageSize; + private Emitter emitter; + private boolean blockHashClosed = false; private final BlockHash blockHash; private final List aggregators; @@ -112,8 +116,10 @@ public String describe() { public HashAggregationOperator( List aggregators, Supplier blockHash, + int maxPageSize, DriverContext driverContext ) { + this.maxPageSize = maxPageSize; this.aggregators = new ArrayList<>(aggregators.size()); this.driverContext = driverContext; boolean success = false; @@ -132,7 +138,7 @@ public HashAggregationOperator( @Override public boolean needsInput() { - return finished == false; + return emitter == null; } @Override @@ -201,59 +207,98 @@ public void close() { @Override public Page getOutput() { - Page p = output; - if (p != null) { - rowsEmitted += p.getPositionCount(); + if (emitter == null) { + return null; } - output = null; - return p; + return emitter.nextPage(); } - @Override - public void finish() { - if (finished) { - return; + private class Emitter implements Releasable { + private final int[] aggBlockCounts; + private int position = -1; + private IntVector allSelected = null; + private Block[] allKeys; + + Emitter(int[] aggBlockCounts) { + this.aggBlockCounts = aggBlockCounts; } - finished = true; - Block[] blocks = null; - IntVector selected = null; - boolean success = false; - try { - selected = blockHash.nonEmpty(); - Block[] keys = blockHash.getKeys(); - int[] aggBlockCounts = aggregators.stream().mapToInt(GroupingAggregator::evaluateBlockCount).toArray(); - blocks = new Block[keys.length + Arrays.stream(aggBlockCounts).sum()]; - System.arraycopy(keys, 0, blocks, 0, keys.length); - int offset = keys.length; - for (int i = 0; i < aggregators.size(); i++) { - var aggregator = aggregators.get(i); - aggregator.evaluate(blocks, offset, selected, driverContext); - offset += aggBlockCounts[i]; + + Page nextPage() { + if (position == -1) { + position = 0; + // TODO: chunk selected and keys + allKeys = blockHash.getKeys(); + allSelected = blockHash.nonEmpty(); + blockHashClosed = true; + blockHash.close(); } - output = new Page(blocks); - success = true; - } finally { - // selected should always be closed - if (selected != null) { - selected.close(); + final int endPosition = Math.toIntExact(Math.min(position + (long) maxPageSize, allSelected.getPositionCount())); + if (endPosition == position) { + return null; } - if (success == false && blocks != null) { - Releasables.closeExpectNoException(blocks); + final boolean singlePage = position == 0 && endPosition == allSelected.getPositionCount(); + final Block[] blocks = new Block[allKeys.length + Arrays.stream(aggBlockCounts).sum()]; + IntVector selected = null; + boolean success = false; + try { + if (singlePage) { + this.allSelected.incRef(); + selected = this.allSelected; + for (int i = 0; i < allKeys.length; i++) { + allKeys[i].incRef(); + blocks[i] = allKeys[i]; + } + } else { + final int[] positions = new int[endPosition - position]; + for (int i = 0; i < positions.length; i++) { + positions[i] = position + i; + } + // TODO: allow to filter with IntVector + selected = allSelected.filter(positions); + for (int keyIndex = 0; keyIndex < allKeys.length; keyIndex++) { + blocks[keyIndex] = allKeys[keyIndex].filter(positions); + } + } + int blockOffset = allKeys.length; + for (int i = 0; i < aggregators.size(); i++) { + aggregators.get(i).evaluate(blocks, blockOffset, selected, driverContext); + blockOffset += aggBlockCounts[i]; + } + var output = new Page(blocks); + rowsEmitted += output.getPositionCount(); + success = true; + return output; + } finally { + position = endPosition; + Releasables.close(selected, success ? null : Releasables.wrap(blocks)); } } + + @Override + public void close() { + Releasables.close(allSelected, allKeys != null ? Releasables.wrap(allKeys) : null); + } + + boolean doneEmitting() { + return allSelected != null && position >= allSelected.getPositionCount(); + } + } + + @Override + public void finish() { + if (emitter == null) { + emitter = new Emitter(aggregators.stream().mapToInt(GroupingAggregator::evaluateBlockCount).toArray()); + } } @Override public boolean isFinished() { - return finished && output == null; + return emitter != null && emitter.doneEmitting(); } @Override public void close() { - if (output != null) { - output.releaseBlocks(); - } - Releasables.close(blockHash, () -> Releasables.close(aggregators)); + Releasables.close(emitter, blockHashClosed ? null : blockHash, () -> Releasables.close(aggregators)); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 7cf47bc7fed1c..ad538aecc47b0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -205,16 +205,16 @@ public Page getOutput() { return null; } if (valuesAggregator != null) { - try { - return valuesAggregator.getOutput(); - } finally { - final ValuesAggregator aggregator = this.valuesAggregator; - this.valuesAggregator = null; - Releasables.close(aggregator); + final Page output = valuesAggregator.getOutput(); + if (output == null) { + Releasables.close(valuesAggregator, () -> this.valuesAggregator = null); + } else { + return output; } } if (ordinalAggregators.isEmpty() == false) { try { + // TODO: chunk output pages return mergeOrdinalsSegmentResults(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -510,6 +510,7 @@ private static class ValuesAggregator implements Releasable { maxPageSize, false ), + maxPageSize, driverContext ); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java index 3b011d4a682ff..b2ed43fe61b2a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java @@ -64,6 +64,7 @@ public Operator get(DriverContext driverContext) { return new HashAggregationOperator( aggregators, () -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext), + maxPageSize, driverContext ); } @@ -99,6 +100,7 @@ public Operator get(DriverContext driverContext) { return new HashAggregationOperator( aggregators, () -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, false), + maxPageSize, driverContext ); } @@ -127,6 +129,7 @@ public Operator get(DriverContext driverContext) { return new HashAggregationOperator( aggregators, () -> BlockHash.build(groupings, driverContext.blockFactory(), maxPageSize, false), + maxPageSize, driverContext ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 401fa0d14cd9f..8058b801896d5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -211,6 +211,7 @@ public String toString() { randomPageSize(), false ), + randomPageSize(), driverContext ) ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java index d82a8487b5390..15dfd4805b322 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.SortedSet; import java.util.TreeSet; import java.util.function.Function; @@ -102,13 +103,25 @@ private Operator.OperatorFactory simpleWithMode( if (randomBoolean()) { supplier = chunkGroups(emitChunkSize, supplier); } - return new HashAggregationOperator.HashAggregationOperatorFactory( + final int maxPageSize = randomPageSize(); + final var hashOperatorFactory = new HashAggregationOperator.HashAggregationOperatorFactory( List.of(new BlockHash.GroupSpec(0, ElementType.LONG)), mode, List.of(supplier.groupingAggregatorFactory(mode, channels(mode))), - randomPageSize(), + maxPageSize, null ); + return new Operator.OperatorFactory() { + @Override + public Operator get(DriverContext driverContext) { + return assertingOutputPageSize(hashOperatorFactory.get(driverContext), driverContext.blockFactory(), maxPageSize); + } + + @Override + public String describe() { + return hashOperatorFactory.describe(); + } + }; } @Override @@ -761,4 +774,79 @@ public String describe() { }; } + static Operator assertingOutputPageSize(Operator operator, BlockFactory blockFactory, int maxPageSize) { + return new Operator() { + private final List pages = new ArrayList<>(); + + @Override + public boolean needsInput() { + return operator.needsInput(); + } + + @Override + public void addInput(Page page) { + operator.addInput(page); + } + + @Override + public void finish() { + operator.finish(); + } + + @Override + public boolean isFinished() { + return operator.isFinished(); + } + + @Override + public Page getOutput() { + final Page page = operator.getOutput(); + if (page != null && page.getPositionCount() > maxPageSize) { + page.releaseBlocks(); + throw new AssertionError( + String.format( + Locale.ROOT, + "Operator %s didn't chunk output pages properly; got an output page with %s positions, max_page_size=%s", + operator, + page.getPositionCount(), + maxPageSize + ) + ); + } + if (page != null) { + pages.add(page); + } + if (operator.isFinished()) { + // TODO: Remove this workaround. We need to merge pages since we have many existing assertions expect a single out page. + try { + return BlockTestUtils.mergePages(blockFactory, pages); + } finally { + pages.forEach(Page::releaseBlocks); + pages.clear(); + } + } else { + return null; + } + } + + @Override + public Status status() { + return operator.status(); + } + + @Override + public String toString() { + return operator.toString(); + } + + @Override + public void close() { + for (Page p : pages) { + p.releaseBlocks(); + } + operator.close(); + } + }; + } + } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java index 30579f864abcb..87e0262c55973 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.test.BlockTestUtils; import org.elasticsearch.core.Tuple; import org.hamcrest.Matcher; @@ -28,7 +29,7 @@ import static java.util.stream.IntStream.range; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class HashAggregationOperatorTests extends ForkingOperatorTestCase { @Override @@ -80,20 +81,26 @@ protected Matcher expectedToStringOfSimple() { @Override protected void assertSimpleOutput(List input, List results) { - assertThat(results, hasSize(1)); - assertThat(results.get(0).getBlockCount(), equalTo(3)); - assertThat(results.get(0).getPositionCount(), equalTo(5)); + assertThat(results.size(), greaterThanOrEqualTo(0)); + BlockFactory blockFactory = results.getFirst().getBlock(0).blockFactory(); + Page output = BlockTestUtils.mergePages(blockFactory, results); + try { + assertThat(output.getBlockCount(), equalTo(3)); + assertThat(output.getPositionCount(), equalTo(5)); - SumLongGroupingAggregatorFunctionTests sum = new SumLongGroupingAggregatorFunctionTests(); - MaxLongGroupingAggregatorFunctionTests max = new MaxLongGroupingAggregatorFunctionTests(); + SumLongGroupingAggregatorFunctionTests sum = new SumLongGroupingAggregatorFunctionTests(); + MaxLongGroupingAggregatorFunctionTests max = new MaxLongGroupingAggregatorFunctionTests(); - LongBlock groups = results.get(0).getBlock(0); - Block sums = results.get(0).getBlock(1); - Block maxs = results.get(0).getBlock(2); - for (int i = 0; i < 5; i++) { - long group = groups.getLong(i); - sum.assertSimpleGroup(input, sums, i, group); - max.assertSimpleGroup(input, maxs, i, group); + LongBlock groups = output.getBlock(0); + Block sums = output.getBlock(1); + Block maxs = output.getBlock(2); + for (int i = 0; i < 5; i++) { + long group = groups.getLong(i); + sum.assertSimpleGroup(input, sums, i, group); + max.assertSimpleGroup(input, maxs, i, group); + } + } finally { + output.releaseBlocks(); } } } diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index c4579978b207e..68d14f76a4e1a 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; import org.hamcrest.Matcher; import java.util.ArrayList; @@ -267,4 +268,44 @@ public static List> valuesAtPositions(Block block, int from, int to } return result; } + + /** + * Merges a list of pages to a single page + */ + public static Page mergePages(BlockFactory blockFactory, List pages) { + if (pages.isEmpty()) { + return null; + } + Page first = pages.getFirst(); + if (pages.size() == 1) { + pages.clear(); + return first; + } + boolean success = false; + final Block.Builder[] builders = new Block.Builder[first.getBlockCount()]; + try { + for (int b = 0; b < builders.length; b++) { + ElementType elementType = ElementType.NULL; + int totalPositions = 0; + for (Page p : pages) { + Block block = p.getBlock(b); + ElementType e = block.elementType(); + if (e != ElementType.NULL) { + assert elementType == ElementType.NULL || elementType == e : elementType + " != " + e; + elementType = e; + } + totalPositions += block.getPositionCount(); + } + builders[b] = elementType.newBlockBuilder(totalPositions, blockFactory); + for (Page p : pages) { + builders[b].copyFrom(p.getBlock(b), 0, p.getPositionCount()); + } + } + return new Page(Block.Builder.buildAll(builders)); + } finally { + if (success == false) { + Releasables.close(builders); + } + } + } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 58c82d800954c..959e01315aef7 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -497,33 +497,60 @@ public void testForceSleepsProfile() throws IOException { assertMap(p, commonProfile()); @SuppressWarnings("unchecked") Map sleeps = (Map) p.get("sleeps"); - String operators = p.get("operators").toString(); - MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty") + MapMatcher exchangeEmptyMatcher = matchesMap().entry("reason", "exchange empty") + .entry("sleep_millis", greaterThan(0L)) + .entry("wake_millis", greaterThan(0L)); + MapMatcher exchangeFullMatcher = matchesMap().entry("reason", "exchange full") + .entry("sleep_millis", greaterThan(0L)) + .entry("wake_millis", greaterThan(0L)); + MapMatcher exchangeEmptyOrFullMatcher = matchesMap().entry("reason", "exchange empty OR exchange full") .entry("sleep_millis", greaterThan(0L)) .entry("wake_millis", greaterThan(0L)); String taskDescription = p.get("task_description").toString(); switch (taskDescription) { - case "data" -> assertMap(sleeps, matchesMap().entry("counts", Map.of()).entry("first", List.of()).entry("last", List.of())); + case "data" -> { + if (sleeps.isEmpty() == false) { + assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange full", greaterThan(0))).extraOk()); + @SuppressWarnings("unchecked") + List> first = (List>) sleeps.get("first"); + for (Map s : first) { + assertMap(s, exchangeFullMatcher); + } + @SuppressWarnings("unchecked") + List> last = (List>) sleeps.get("last"); + for (Map s : last) { + assertMap(s, exchangeFullMatcher); + } + } + } case "node_reduce" -> { - assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0))).extraOk()); + assertMap( + sleeps, + matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0)).extraOk()).extraOk() + ); @SuppressWarnings("unchecked") List> first = (List>) sleeps.get("first"); for (Map s : first) { - assertMap(s, sleepMatcher); + assertMap(s, either(exchangeEmptyMatcher).or(exchangeFullMatcher).or(exchangeEmptyOrFullMatcher)); } @SuppressWarnings("unchecked") List> last = (List>) sleeps.get("last"); for (Map s : last) { - assertMap(s, sleepMatcher); + assertMap(s, either(exchangeEmptyMatcher).or(exchangeFullMatcher).or(exchangeEmptyOrFullMatcher)); } } case "final" -> { - assertMap( - sleeps, - matchesMap().entry("counts", matchesMap().entry("exchange empty", 1)) - .entry("first", List.of(sleepMatcher)) - .entry("last", List.of(sleepMatcher)) - ); + assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0))).extraOk()); + @SuppressWarnings("unchecked") + List> first = (List>) sleeps.get("first"); + for (Map s : first) { + assertMap(s, exchangeEmptyMatcher); + } + @SuppressWarnings("unchecked") + List> last = (List>) sleeps.get("last"); + for (Map s : last) { + assertMap(s, exchangeEmptyMatcher); + } } default -> throw new IllegalArgumentException("unknown task: " + taskDescription); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index cf2c5735310ae..0f3f8a42e00c1 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -367,9 +367,10 @@ private class TestHashAggregationOperator extends HashAggregationOperator { List aggregators, Supplier blockHash, Attribute attribute, + int maxPageSize, DriverContext driverContext ) { - super(aggregators, blockHash, driverContext); + super(aggregators, blockHash, maxPageSize, driverContext); this.attribute = attribute; } @@ -417,6 +418,7 @@ public Operator get(DriverContext driverContext) { false ), attribute, + pageSize, driverContext ); }