diff --git a/docs/changelog/128531.yaml b/docs/changelog/128531.yaml
new file mode 100644
index 0000000000000..de4a767136ca7
--- /dev/null
+++ b/docs/changelog/128531.yaml
@@ -0,0 +1,5 @@
+pr: 128531
+summary: Combine small pages in Limit
+area: ES|QL
+type: enhancement
+issues: []
diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java
index c30e23aaab10a..bfe32ca591eeb 100644
--- a/server/src/main/java/org/elasticsearch/TransportVersions.java
+++ b/server/src/main/java/org/elasticsearch/TransportVersions.java
@@ -271,6 +271,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION = def(9_082_0_00);
public static final TransportVersion ML_INFERENCE_VERTEXAI_CHATCOMPLETION_ADDED = def(9_083_0_00);
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED = def(9_084_0_00);
+ public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
+
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java
index f08c66f4f9e6c..4a08d9cf5dadd 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java
@@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import java.io.IOException;
@@ -28,7 +29,7 @@
*
*
Pages are immutable and can be passed between threads.
*/
-public final class Page implements Writeable {
+public final class Page implements Writeable, Releasable {
private final Block[] blocks;
@@ -244,6 +245,11 @@ public void releaseBlocks() {
Releasables.closeExpectNoException(blocks);
}
+ @Override
+ public void close() {
+ releaseBlocks();
+ }
+
/**
* Before passing a Page to another Driver, it is necessary to switch the owning block factories of its Blocks to their parents,
* which are associated with the global circuit breaker. This ensures that when the new driver releases this Page, it returns
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java
index 3ef9c420f59ff..e22bd1ff9ed73 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java
@@ -14,14 +14,20 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
public class LimitOperator implements Operator {
+ private final BlockFactory blockFactory;
+ private final int pageSize;
/**
* Count of pages that have been processed by this operator.
@@ -38,25 +44,30 @@ public class LimitOperator implements Operator {
*/
private long rowsEmitted;
- private Page lastInput;
-
private final Limiter limiter;
+
+ private final List queue = new ArrayList<>();
+ private int pendingRows;
private boolean finished;
- public LimitOperator(Limiter limiter) {
+ public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
this.limiter = limiter;
+ this.blockFactory = blockFactory;
+ this.pageSize = pageSize;
}
public static final class Factory implements OperatorFactory {
private final Limiter limiter;
+ private final int pageSize;
- public Factory(int limit) {
+ public Factory(int limit, int pageSize) {
this.limiter = new Limiter(limit);
+ this.pageSize = pageSize;
}
@Override
public LimitOperator get(DriverContext driverContext) {
- return new LimitOperator(limiter);
+ return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
}
@Override
@@ -67,22 +78,20 @@ public String describe() {
@Override
public boolean needsInput() {
- return finished == false && lastInput == null && limiter.remaining() > 0;
+ return readyToEmit() == false;
}
@Override
public void addInput(Page page) {
- assert lastInput == null : "has pending input page";
+ pagesProcessed++;
+ rowsReceived += page.getPositionCount();
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
if (acceptedRows == 0) {
page.releaseBlocks();
- assert isFinished();
- } else if (acceptedRows < page.getPositionCount()) {
- lastInput = truncatePage(page, acceptedRows);
} else {
- lastInput = page;
+ queue.add(page);
+ pendingRows += acceptedRows;
}
- rowsReceived += acceptedRows;
}
@Override
@@ -92,41 +101,67 @@ public void finish() {
@Override
public boolean isFinished() {
- return lastInput == null && (finished || limiter.remaining() == 0);
+ return pendingRows == 0 && (finished || limiter.remaining() == 0);
+ }
+
+ private boolean readyToEmit() {
+ return finished || pendingRows >= pageSize || limiter.remaining() == 0;
}
@Override
public Page getOutput() {
- if (lastInput == null) {
+ if (pendingRows > 0 && readyToEmit()) {
+ final Page result = combinePages(queue, blockFactory, pendingRows);
+ pendingRows = 0;
+ rowsEmitted += result.getPositionCount();
+ return result;
+ } else {
return null;
}
- final Page result = lastInput;
- lastInput = null;
- pagesProcessed++;
- rowsEmitted += result.getPositionCount();
- return result;
}
- private static Page truncatePage(Page page, int upTo) {
- int[] filter = new int[upTo];
- for (int i = 0; i < upTo; i++) {
- filter[i] = i;
+ private static ElementType[] elementTypes(int blockCount, List pages) {
+ ElementType[] elementTypes = new ElementType[blockCount];
+ for (Page page : pages) {
+ for (int b = 0; b < blockCount; b++) {
+ ElementType newType = page.getBlock(b).elementType();
+ ElementType currType = elementTypes[b];
+ if (currType == null || currType == ElementType.NULL) {
+ elementTypes[b] = newType;
+ } else {
+ assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
+ }
+ }
}
- final Block[] blocks = new Block[page.getBlockCount()];
- Page result = null;
+ return elementTypes;
+ }
+
+ private static Page combinePages(List pages, BlockFactory blockFactory, int upTo) {
+ assert pages.isEmpty() == false : "no pages to combine";
+ if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
+ return pages.removeFirst();
+ }
+ int blockCount = pages.getFirst().getBlockCount();
+ Block.Builder[] builders = new Block.Builder[blockCount];
try {
- for (int b = 0; b < blocks.length; b++) {
- blocks[b] = page.getBlock(b).filter(filter);
+ ElementType[] elementTypes = elementTypes(blockCount, pages);
+ for (int b = 0; b < blockCount; b++) {
+ builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
}
- result = new Page(blocks);
- } finally {
- if (result == null) {
- Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
- } else {
- page.releaseBlocks();
+ int accumulated = 0;
+ for (Page page : pages) {
+ int size = Math.min(page.getPositionCount(), upTo - accumulated);
+ for (int b = 0; b < blockCount; b++) {
+ Block block = page.getBlock(b);
+ builders[b].copyFrom(block, 0, size);
+ }
+ accumulated += size;
}
+ Block[] blocks = Block.Builder.buildAll(builders);
+ return new Page(blocks);
+ } finally {
+ Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
}
- return result;
}
@Override
@@ -136,9 +171,7 @@ public Status status() {
@Override
public void close() {
- if (lastInput != null) {
- lastInput.releaseBlocks();
- }
+ Releasables.close(queue);
}
@Override
diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java
index 4c482ddaf369d..77f94a21a9f10 100644
--- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java
+++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java
@@ -142,7 +142,7 @@ public void doClose() {
if (randomBoolean()) {
int limit = between(0, ids.size());
it = ids.subList(0, limit).iterator();
- intermediateOperators.add(new LimitOperator(new Limiter(limit)));
+ intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
} else {
it = ids.iterator();
}
diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java
index 8740ec8135783..9d0db1da6d347 100644
--- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java
+++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java
@@ -9,12 +9,14 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.compute.test.RandomBlock;
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
import org.elasticsearch.core.TimeValue;
import org.hamcrest.Matcher;
+import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
@@ -28,7 +30,7 @@
public class LimitOperatorTests extends OperatorTestCase {
@Override
protected LimitOperator.Factory simple(SimpleOptions options) {
- return new LimitOperator.Factory(100);
+ return new LimitOperator.Factory(100, 500);
}
@Override
@@ -46,6 +48,30 @@ protected Matcher expectedToStringOfSimple() {
return equalTo("LimitOperator[limit = 100/100]");
}
+ private ElementType elementType;
+
+ @Before
+ public void setUpElementTypes() throws Exception {
+ elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
+ }
+
+ private Page randomPage(BlockFactory blockFactory, int size) {
+ if (randomBoolean()) {
+ return new Page(blockFactory.newConstantNullBlock(size));
+ }
+ Block block = RandomBlock.randomBlock(
+ blockFactory,
+ elementType,
+ size,
+ elementType == ElementType.NULL || randomBoolean(),
+ 1,
+ 1,
+ 0,
+ 0
+ ).block();
+ return new Page(block);
+ }
+
@Override
protected void assertSimpleOutput(List input, List results) {
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
@@ -65,6 +91,7 @@ public void testStatus() {
Page p = new Page(blockFactory.newConstantNullBlock(10));
try {
op.addInput(p);
+ op.finish();
assertSame(p, op.getOutput());
} finally {
p.releaseBlocks();
@@ -77,9 +104,10 @@ public void testStatus() {
public void testNeedInput() {
BlockFactory blockFactory = driverContext().blockFactory();
- try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
+ // small page size
+ try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
assertTrue(op.needsInput());
- Page p = new Page(blockFactory.newConstantNullBlock(10));
+ Page p = randomPage(blockFactory, 10);
op.addInput(p);
assertFalse(op.needsInput());
op.getOutput().releaseBlocks();
@@ -87,6 +115,19 @@ public void testNeedInput() {
op.finish();
assertFalse(op.needsInput());
}
+ // small page size
+ try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
+ for (int i = 0; i < 5; i++) {
+ assertTrue(op.needsInput());
+ Page p = randomPage(blockFactory, 10);
+ op.addInput(p);
+ }
+ assertFalse(op.needsInput());
+ op.getOutput().releaseBlocks();
+ assertTrue(op.needsInput());
+ op.finish();
+ assertFalse(op.needsInput());
+ }
}
public void testBlockBiggerThanRemaining() {
@@ -94,7 +135,7 @@ public void testBlockBiggerThanRemaining() {
for (int i = 0; i < 100; i++) {
try (var op = simple().get(driverContext())) {
assertTrue(op.needsInput());
- Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
+ Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
op.addInput(p);
assertFalse(op.needsInput());
Page result = op.getOutput();
@@ -114,7 +155,7 @@ public void testBlockPreciselyRemaining() {
for (int i = 0; i < 100; i++) {
try (var op = simple().get(driverContext())) {
assertTrue(op.needsInput());
- Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
+ Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
op.addInput(p);
assertFalse(op.needsInput());
Page result = op.getOutput();
@@ -133,7 +174,7 @@ public void testEarlyTermination() {
int numDrivers = between(1, 4);
final List drivers = new ArrayList<>();
final int limit = between(1, 10_000);
- final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
+ final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
final AtomicInteger receivedRows = new AtomicInteger();
for (int i = 0; i < numDrivers; i++) {
DriverContext driverContext = driverContext();
@@ -152,7 +193,8 @@ public boolean isFinished() {
@Override
public Page getOutput() {
- return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
+ return randomPage(blockFactory(), between(1, 100));
+
}
@Override
diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java
index 08d59eade600e..c18605165957a 100644
--- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java
+++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java
@@ -19,7 +19,10 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
+import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
@@ -46,6 +49,7 @@
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -208,6 +212,34 @@ public void sendResponse(Exception exception) {
}
}
+ public void testLimitCombineSmallerPages() {
+ QueryPragmas queryPragmas = randomPragmas();
+ if (canUseQueryPragmas()) {
+ Settings.Builder settings = Settings.builder().put(queryPragmas.getSettings());
+ settings.remove(QueryPragmas.NODE_LEVEL_REDUCTION.getKey());
+ settings.remove(QueryPragmas.PAGE_SIZE.getKey());
+ queryPragmas = new QueryPragmas(settings.build());
+ }
+ var request = new EsqlQueryRequest();
+ request.query("FROM test-* | KEEP user | LIMIT 100");
+ request.pragmas(queryPragmas);
+ request.profile(true);
+ try (EsqlQueryResponse resp = run(request)) {
+ List nodeReduce = resp.profile().drivers().stream().filter(s -> s.description().equals("node_reduce")).toList();
+ for (DriverProfile driverProfile : nodeReduce) {
+ if (driverProfile.operators().size() == 2) {
+ continue; // when the target node is also the coordinator node
+ }
+ assertThat(driverProfile.operators(), hasSize(3));
+ OperatorStatus exchangeSink = driverProfile.operators().get(2);
+ assertThat(exchangeSink.status(), instanceOf(ExchangeSinkOperator.Status.class));
+ ExchangeSinkOperator.Status exchangeStatus = (ExchangeSinkOperator.Status) exchangeSink.status();
+ assertThat(exchangeStatus.pagesReceived(), lessThanOrEqualTo(1));
+ }
+ assertThat(resp.pages(), hasSize(1));
+ }
+ }
+
static class SearchContextCounter {
private final int maxAllowed;
private final AtomicInteger current = new AtomicInteger();
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java
index 8445fea08111c..614c2b0431ca8 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java
@@ -7,18 +7,22 @@
package org.elasticsearch.xpack.esql.plan.physical;
+import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import java.io.IOException;
+import java.util.List;
import java.util.Objects;
-public class LimitExec extends UnaryExec {
+public class LimitExec extends UnaryExec implements EstimatesRowSize {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
"LimitExec",
@@ -26,14 +30,21 @@ public class LimitExec extends UnaryExec {
);
private final Expression limit;
+ private final Integer estimatedRowSize;
- public LimitExec(Source source, PhysicalPlan child, Expression limit) {
+ public LimitExec(Source source, PhysicalPlan child, Expression limit, Integer estimatedRowSize) {
super(source, child);
this.limit = limit;
+ this.estimatedRowSize = estimatedRowSize;
}
private LimitExec(StreamInput in) throws IOException {
- this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class), in.readNamedWriteable(Expression.class));
+ this(
+ Source.readFrom((PlanStreamInput) in),
+ in.readNamedWriteable(PhysicalPlan.class),
+ in.readNamedWriteable(Expression.class),
+ in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LIMIT_ROW_SIZE) ? in.readOptionalVInt() : null
+ );
}
@Override
@@ -41,6 +52,9 @@ public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeNamedWriteable(child());
out.writeNamedWriteable(limit());
+ if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LIMIT_ROW_SIZE)) {
+ out.writeOptionalVInt(estimatedRowSize);
+ }
}
@Override
@@ -50,21 +64,36 @@ public String getWriteableName() {
@Override
protected NodeInfo extends LimitExec> info() {
- return NodeInfo.create(this, LimitExec::new, child(), limit);
+ return NodeInfo.create(this, LimitExec::new, child(), limit, estimatedRowSize);
}
@Override
public LimitExec replaceChild(PhysicalPlan newChild) {
- return new LimitExec(source(), newChild, limit);
+ return new LimitExec(source(), newChild, limit, estimatedRowSize);
}
public Expression limit() {
return limit;
}
+ public Integer estimatedRowSize() {
+ return estimatedRowSize;
+ }
+
+ @Override
+ public PhysicalPlan estimateRowSize(State unused) {
+ final List output = output();
+ EstimatesRowSize.State state = new EstimatesRowSize.State();
+ final boolean needsSortedDocIds = output.stream().anyMatch(a -> a.dataType() == DataType.DOC_DATA_TYPE);
+ state.add(needsSortedDocIds, output);
+ int size = state.consumeAllFields(true);
+ size = Math.max(size, 1);
+ return Objects.equals(this.estimatedRowSize, size) ? this : new LimitExec(source(), child(), limit, size);
+ }
+
@Override
public int hashCode() {
- return Objects.hash(limit, child());
+ return Objects.hash(limit, estimatedRowSize, child());
}
@Override
@@ -78,6 +107,9 @@ public boolean equals(Object obj) {
}
LimitExec other = (LimitExec) obj;
- return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child());
+ return Objects.equals(limit, other.limit)
+ && Objects.equals(estimatedRowSize, other.estimatedRowSize)
+ && Objects.equals(child(), other.child());
+
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
index 277b65dd00708..3a91ac0513b2a 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
@@ -818,7 +818,10 @@ private PhysicalOperation planFilter(FilterExec filter, LocalExecutionPlannerCon
private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContext context) {
PhysicalOperation source = plan(limit.child(), context);
- return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx)), source.layout);
+ final Integer rowSize = limit.estimatedRowSize();
+ assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
+ int pageSize = context.pageSize(rowSize);
+ return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx), pageSize), source.layout);
}
private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java
index 4eba58edbe762..f0064178a57f2 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java
@@ -78,7 +78,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
}
if (unary instanceof Limit limit) {
- return new LimitExec(limit.source(), mappedChild, limit.limit());
+ return new LimitExec(limit.source(), mappedChild, limit.limit(), null);
}
if (unary instanceof TopN topN) {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
index 1af000a7a36bd..5586140d809ea 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
@@ -168,7 +168,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
if (unary instanceof Limit limit) {
mappedChild = addExchangeForFragment(limit, mappedChild);
- return new LimitExec(limit.source(), mappedChild, limit.limit());
+ return new LimitExec(limit.source(), mappedChild, limit.limit(), null);
}
if (unary instanceof TopN topN) {
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
index 62cb19609834a..8ce7124b501f2 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
@@ -2788,7 +2788,7 @@ public void testVerifierOnDuplicateOutputAttributes() {
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan));
assertThat(
e.getMessage(),
- containsString("Plan [LimitExec[1000[INTEGER]]] optimized incorrectly due to duplicate output attribute emp_no{f}#")
+ containsString("Plan [LimitExec[1000[INTEGER],null]] optimized incorrectly due to duplicate output attribute emp_no{f}#")
);
}
@@ -2821,7 +2821,12 @@ public void testProjectAwayColumns() {
Attribute some_field2 = relation.output().get(1);
FragmentExec fragment = new FragmentExec(relation);
ExchangeExec exchange = new ExchangeExec(Source.EMPTY, fragment);
- LimitExec limitThenFragment = new LimitExec(Source.EMPTY, exchange, new Literal(Source.EMPTY, 10000, DataType.INTEGER));
+ LimitExec limitThenFragment = new LimitExec(
+ Source.EMPTY,
+ exchange,
+ new Literal(Source.EMPTY, 10000, DataType.INTEGER),
+ randomIntBetween(0, 1024)
+ );
// All the relation's fields are required.
PhysicalPlan plan = rule.apply(limitThenFragment);
@@ -7795,6 +7800,14 @@ public void testReductionPlanForAggs() {
assertThat(reductionAggs.estimatedRowSize(), equalTo(58)); // double and keyword
}
+ public void testReductionPlanForLimit() {
+ var plan = physicalPlan("FROM test | LIMIT 10");
+ Tuple plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(plan, config);
+ PhysicalPlan reduction = PlannerUtils.reductionPlan(plans.v2());
+ LimitExec limitExec = as(reduction, LimitExec.class);
+ assertThat(limitExec.estimatedRowSize(), equalTo(328));
+ }
+
public void testEqualsPushdownToDelegate() {
var optimized = optimizedPlan(physicalPlan("""
FROM test
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LimitExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LimitExecSerializationTests.java
index ba2724c8ae6ef..133f420fc3fc2 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LimitExecSerializationTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LimitExecSerializationTests.java
@@ -19,7 +19,7 @@ public static LimitExec randomLimitExec(int depth) {
Source source = randomSource();
PhysicalPlan child = randomChild(depth);
Expression limit = randomLimit();
- return new LimitExec(source, child, limit);
+ return new LimitExec(source, child, limit, randomEstimatedRowSize());
}
private static Expression randomLimit() {
@@ -34,13 +34,15 @@ protected LimitExec createTestInstance() {
@Override
protected LimitExec mutateInstance(LimitExec instance) throws IOException {
PhysicalPlan child = instance.child();
- Expression limit = randomLimit();
- if (randomBoolean()) {
- child = randomValueOtherThan(child, () -> randomChild(0));
- } else {
- limit = randomValueOtherThan(limit, LimitExecSerializationTests::randomLimit);
+ Expression limit = instance.limit();
+ Integer estimatedRowSize = instance.estimatedRowSize();
+ switch (between(0, 2)) {
+ case 0 -> child = randomValueOtherThan(child, () -> randomChild(0));
+ case 1 -> limit = randomValueOtherThan(limit, LimitExecSerializationTests::randomLimit);
+ case 2 -> estimatedRowSize = randomValueOtherThan(estimatedRowSize, LimitExecSerializationTests::randomEstimatedRowSize);
+ default -> throw new AssertionError("Unexpected case");
}
- return new LimitExec(instance.source(), child, limit);
+ return new LimitExec(instance.source(), child, limit, estimatedRowSize);
}
@Override
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java
index db1f18a0e6cd5..0fee0c13178d4 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java
@@ -222,7 +222,8 @@ public void testParallel() throws Exception {
var limitExec = new LimitExec(
Source.EMPTY,
new ParallelExec(queryExec.source(), queryExec),
- new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER)
+ new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER),
+ randomEstimatedRowSize(estimatedRowSizeIsHuge)
);
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), limitExec);
assertThat(plan.driverFactories, hasSize(2));