diff --git a/docs/changelog/128531.yaml b/docs/changelog/128531.yaml deleted file mode 100644 index de4a767136ca7..0000000000000 --- a/docs/changelog/128531.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 128531 -summary: Combine small pages in Limit -area: ES|QL -type: enhancement -issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java index e22bd1ff9ed73..3ef9c420f59ff 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java @@ -14,20 +14,14 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; public class LimitOperator implements Operator { - private final BlockFactory blockFactory; - private final int pageSize; /** * Count of pages that have been processed by this operator. @@ -44,30 +38,25 @@ public class LimitOperator implements Operator { */ private long rowsEmitted; - private final Limiter limiter; + private Page lastInput; - private final List queue = new ArrayList<>(); - private int pendingRows; + private final Limiter limiter; private boolean finished; - public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) { + public LimitOperator(Limiter limiter) { this.limiter = limiter; - this.blockFactory = blockFactory; - this.pageSize = pageSize; } public static final class Factory implements OperatorFactory { private final Limiter limiter; - private final int pageSize; - public Factory(int limit, int pageSize) { + public Factory(int limit) { this.limiter = new Limiter(limit); - this.pageSize = pageSize; } @Override public LimitOperator get(DriverContext driverContext) { - return new LimitOperator(limiter, driverContext.blockFactory(), pageSize); + return new LimitOperator(limiter); } @Override @@ -78,20 +67,22 @@ public String describe() { @Override public boolean needsInput() { - return readyToEmit() == false; + return finished == false && lastInput == null && limiter.remaining() > 0; } @Override public void addInput(Page page) { - pagesProcessed++; - rowsReceived += page.getPositionCount(); + assert lastInput == null : "has pending input page"; final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount()); if (acceptedRows == 0) { page.releaseBlocks(); + assert isFinished(); + } else if (acceptedRows < page.getPositionCount()) { + lastInput = truncatePage(page, acceptedRows); } else { - queue.add(page); - pendingRows += acceptedRows; + lastInput = page; } + rowsReceived += acceptedRows; } @Override @@ -101,67 +92,41 @@ public void finish() { @Override public boolean isFinished() { - return pendingRows == 0 && (finished || limiter.remaining() == 0); - } - - private boolean readyToEmit() { - return finished || pendingRows >= pageSize || limiter.remaining() == 0; + return lastInput == null && (finished || limiter.remaining() == 0); } @Override public Page getOutput() { - if (pendingRows > 0 && readyToEmit()) { - final Page result = combinePages(queue, blockFactory, pendingRows); - pendingRows = 0; - rowsEmitted += result.getPositionCount(); - return result; - } else { + if (lastInput == null) { return null; } + final Page result = lastInput; + lastInput = null; + pagesProcessed++; + rowsEmitted += result.getPositionCount(); + return result; } - private static ElementType[] elementTypes(int blockCount, List pages) { - ElementType[] elementTypes = new ElementType[blockCount]; - for (Page page : pages) { - for (int b = 0; b < blockCount; b++) { - ElementType newType = page.getBlock(b).elementType(); - ElementType currType = elementTypes[b]; - if (currType == null || currType == ElementType.NULL) { - elementTypes[b] = newType; - } else { - assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType; - } - } - } - return elementTypes; - } - - private static Page combinePages(List pages, BlockFactory blockFactory, int upTo) { - assert pages.isEmpty() == false : "no pages to combine"; - if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) { - return pages.removeFirst(); + private static Page truncatePage(Page page, int upTo) { + int[] filter = new int[upTo]; + for (int i = 0; i < upTo; i++) { + filter[i] = i; } - int blockCount = pages.getFirst().getBlockCount(); - Block.Builder[] builders = new Block.Builder[blockCount]; + final Block[] blocks = new Block[page.getBlockCount()]; + Page result = null; try { - ElementType[] elementTypes = elementTypes(blockCount, pages); - for (int b = 0; b < blockCount; b++) { - builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory); - } - int accumulated = 0; - for (Page page : pages) { - int size = Math.min(page.getPositionCount(), upTo - accumulated); - for (int b = 0; b < blockCount; b++) { - Block block = page.getBlock(b); - builders[b].copyFrom(block, 0, size); - } - accumulated += size; + for (int b = 0; b < blocks.length; b++) { + blocks[b] = page.getBlock(b).filter(filter); } - Block[] blocks = Block.Builder.buildAll(builders); - return new Page(blocks); + result = new Page(blocks); } finally { - Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders)); + if (result == null) { + Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks)); + } else { + page.releaseBlocks(); + } } + return result; } @Override @@ -171,7 +136,9 @@ public Status status() { @Override public void close() { - Releasables.close(queue); + if (lastInput != null) { + lastInput.releaseBlocks(); + } } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java index 77f94a21a9f10..4c482ddaf369d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java @@ -142,7 +142,7 @@ public void doClose() { if (randomBoolean()) { int limit = between(0, ids.size()); it = ids.subList(0, limit).iterator(); - intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024))); + intermediateOperators.add(new LimitOperator(new Limiter(limit))); } else { it = ids.iterator(); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java index 9d0db1da6d347..8740ec8135783 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java @@ -9,14 +9,12 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.RandomBlock; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; import org.elasticsearch.core.TimeValue; import org.hamcrest.Matcher; -import org.junit.Before; import java.util.ArrayList; import java.util.List; @@ -30,7 +28,7 @@ public class LimitOperatorTests extends OperatorTestCase { @Override protected LimitOperator.Factory simple(SimpleOptions options) { - return new LimitOperator.Factory(100, 500); + return new LimitOperator.Factory(100); } @Override @@ -48,30 +46,6 @@ protected Matcher expectedToStringOfSimple() { return equalTo("LimitOperator[limit = 100/100]"); } - private ElementType elementType; - - @Before - public void setUpElementTypes() throws Exception { - elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF); - } - - private Page randomPage(BlockFactory blockFactory, int size) { - if (randomBoolean()) { - return new Page(blockFactory.newConstantNullBlock(size)); - } - Block block = RandomBlock.randomBlock( - blockFactory, - elementType, - size, - elementType == ElementType.NULL || randomBoolean(), - 1, - 1, - 0, - 0 - ).block(); - return new Page(block); - } - @Override protected void assertSimpleOutput(List input, List results) { int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum(); @@ -91,7 +65,6 @@ public void testStatus() { Page p = new Page(blockFactory.newConstantNullBlock(10)); try { op.addInput(p); - op.finish(); assertSame(p, op.getOutput()); } finally { p.releaseBlocks(); @@ -104,10 +77,9 @@ public void testStatus() { public void testNeedInput() { BlockFactory blockFactory = driverContext().blockFactory(); - // small page size - try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) { + try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) { assertTrue(op.needsInput()); - Page p = randomPage(blockFactory, 10); + Page p = new Page(blockFactory.newConstantNullBlock(10)); op.addInput(p); assertFalse(op.needsInput()); op.getOutput().releaseBlocks(); @@ -115,19 +87,6 @@ public void testNeedInput() { op.finish(); assertFalse(op.needsInput()); } - // small page size - try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) { - for (int i = 0; i < 5; i++) { - assertTrue(op.needsInput()); - Page p = randomPage(blockFactory, 10); - op.addInput(p); - } - assertFalse(op.needsInput()); - op.getOutput().releaseBlocks(); - assertTrue(op.needsInput()); - op.finish(); - assertFalse(op.needsInput()); - } } public void testBlockBiggerThanRemaining() { @@ -135,7 +94,7 @@ public void testBlockBiggerThanRemaining() { for (int i = 0; i < 100; i++) { try (var op = simple().get(driverContext())) { assertTrue(op.needsInput()); - Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view + Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view op.addInput(p); assertFalse(op.needsInput()); Page result = op.getOutput(); @@ -155,7 +114,7 @@ public void testBlockPreciselyRemaining() { for (int i = 0; i < 100; i++) { try (var op = simple().get(driverContext())) { assertTrue(op.needsInput()); - Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page + Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page op.addInput(p); assertFalse(op.needsInput()); Page result = op.getOutput(); @@ -174,7 +133,7 @@ public void testEarlyTermination() { int numDrivers = between(1, 4); final List drivers = new ArrayList<>(); final int limit = between(1, 10_000); - final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048)); + final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit); final AtomicInteger receivedRows = new AtomicInteger(); for (int i = 0; i < numDrivers; i++) { DriverContext driverContext = driverContext(); @@ -193,8 +152,7 @@ public boolean isFinished() { @Override public Page getOutput() { - return randomPage(blockFactory(), between(1, 100)); - + return new Page(randomBlock(driverContext.blockFactory(), between(1, 100))); } @Override diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java index c18605165957a..08d59eade600e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java @@ -19,10 +19,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.exchange.ExchangeService; -import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; @@ -49,7 +46,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -212,34 +208,6 @@ public void sendResponse(Exception exception) { } } - public void testLimitCombineSmallerPages() { - QueryPragmas queryPragmas = randomPragmas(); - if (canUseQueryPragmas()) { - Settings.Builder settings = Settings.builder().put(queryPragmas.getSettings()); - settings.remove(QueryPragmas.NODE_LEVEL_REDUCTION.getKey()); - settings.remove(QueryPragmas.PAGE_SIZE.getKey()); - queryPragmas = new QueryPragmas(settings.build()); - } - var request = new EsqlQueryRequest(); - request.query("FROM test-* | KEEP user | LIMIT 100"); - request.pragmas(queryPragmas); - request.profile(true); - try (EsqlQueryResponse resp = run(request)) { - List nodeReduce = resp.profile().drivers().stream().filter(s -> s.description().equals("node_reduce")).toList(); - for (DriverProfile driverProfile : nodeReduce) { - if (driverProfile.operators().size() == 2) { - continue; // when the target node is also the coordinator node - } - assertThat(driverProfile.operators(), hasSize(3)); - OperatorStatus exchangeSink = driverProfile.operators().get(2); - assertThat(exchangeSink.status(), instanceOf(ExchangeSinkOperator.Status.class)); - ExchangeSinkOperator.Status exchangeStatus = (ExchangeSinkOperator.Status) exchangeSink.status(); - assertThat(exchangeStatus.pagesReceived(), lessThanOrEqualTo(1)); - } - assertThat(resp.pages(), hasSize(1)); - } - } - static class SearchContextCounter { private final int maxAllowed; private final AtomicInteger current = new AtomicInteger(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 412bdbaa6914e..a4a419cd5646a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -837,10 +837,7 @@ private PhysicalOperation planFilter(FilterExec filter, LocalExecutionPlannerCon private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContext context) { PhysicalOperation source = plan(limit.child(), context); - final Integer rowSize = limit.estimatedRowSize(); - assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set"; - int pageSize = context.pageSize(rowSize); - return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx), pageSize), source.layout); + return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx)), source.layout); } private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) {