Skip to content

Commit 7d081a6

Browse files
committed
Revert Combine small pages in Limit
1 parent 8c25295 commit 7d081a6

File tree

6 files changed

+46
-161
lines changed

6 files changed

+46
-161
lines changed

docs/changelog/128531.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

Lines changed: 37 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,14 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.compute.data.Block;
17-
import org.elasticsearch.compute.data.BlockFactory;
18-
import org.elasticsearch.compute.data.ElementType;
1917
import org.elasticsearch.compute.data.Page;
2018
import org.elasticsearch.core.Releasables;
2119
import org.elasticsearch.xcontent.XContentBuilder;
2220

2321
import java.io.IOException;
24-
import java.util.ArrayList;
25-
import java.util.List;
2622
import java.util.Objects;
2723

2824
public class LimitOperator implements Operator {
29-
private final BlockFactory blockFactory;
30-
private final int pageSize;
3125

3226
/**
3327
* Count of pages that have been processed by this operator.
@@ -44,30 +38,25 @@ public class LimitOperator implements Operator {
4438
*/
4539
private long rowsEmitted;
4640

47-
private final Limiter limiter;
41+
private Page lastInput;
4842

49-
private final List<Page> queue = new ArrayList<>();
50-
private int pendingRows;
43+
private final Limiter limiter;
5144
private boolean finished;
5245

53-
public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
46+
public LimitOperator(Limiter limiter) {
5447
this.limiter = limiter;
55-
this.blockFactory = blockFactory;
56-
this.pageSize = pageSize;
5748
}
5849

5950
public static final class Factory implements OperatorFactory {
6051
private final Limiter limiter;
61-
private final int pageSize;
6252

63-
public Factory(int limit, int pageSize) {
53+
public Factory(int limit) {
6454
this.limiter = new Limiter(limit);
65-
this.pageSize = pageSize;
6655
}
6756

6857
@Override
6958
public LimitOperator get(DriverContext driverContext) {
70-
return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
59+
return new LimitOperator(limiter);
7160
}
7261

7362
@Override
@@ -78,20 +67,22 @@ public String describe() {
7867

7968
@Override
8069
public boolean needsInput() {
81-
return readyToEmit() == false;
70+
return finished == false && lastInput == null && limiter.remaining() > 0;
8271
}
8372

8473
@Override
8574
public void addInput(Page page) {
86-
pagesProcessed++;
87-
rowsReceived += page.getPositionCount();
75+
assert lastInput == null : "has pending input page";
8876
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
8977
if (acceptedRows == 0) {
9078
page.releaseBlocks();
79+
assert isFinished();
80+
} else if (acceptedRows < page.getPositionCount()) {
81+
lastInput = truncatePage(page, acceptedRows);
9182
} else {
92-
queue.add(page);
93-
pendingRows += acceptedRows;
83+
lastInput = page;
9484
}
85+
rowsReceived += acceptedRows;
9586
}
9687

9788
@Override
@@ -101,67 +92,41 @@ public void finish() {
10192

10293
@Override
10394
public boolean isFinished() {
104-
return pendingRows == 0 && (finished || limiter.remaining() == 0);
105-
}
106-
107-
private boolean readyToEmit() {
108-
return finished || pendingRows >= pageSize || limiter.remaining() == 0;
95+
return lastInput == null && (finished || limiter.remaining() == 0);
10996
}
11097

11198
@Override
11299
public Page getOutput() {
113-
if (pendingRows > 0 && readyToEmit()) {
114-
final Page result = combinePages(queue, blockFactory, pendingRows);
115-
pendingRows = 0;
116-
rowsEmitted += result.getPositionCount();
117-
return result;
118-
} else {
100+
if (lastInput == null) {
119101
return null;
120102
}
103+
final Page result = lastInput;
104+
lastInput = null;
105+
pagesProcessed++;
106+
rowsEmitted += result.getPositionCount();
107+
return result;
121108
}
122109

123-
private static ElementType[] elementTypes(int blockCount, List<Page> pages) {
124-
ElementType[] elementTypes = new ElementType[blockCount];
125-
for (Page page : pages) {
126-
for (int b = 0; b < blockCount; b++) {
127-
ElementType newType = page.getBlock(b).elementType();
128-
ElementType currType = elementTypes[b];
129-
if (currType == null || currType == ElementType.NULL) {
130-
elementTypes[b] = newType;
131-
} else {
132-
assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
133-
}
134-
}
135-
}
136-
return elementTypes;
137-
}
138-
139-
private static Page combinePages(List<Page> pages, BlockFactory blockFactory, int upTo) {
140-
assert pages.isEmpty() == false : "no pages to combine";
141-
if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
142-
return pages.removeFirst();
110+
private static Page truncatePage(Page page, int upTo) {
111+
int[] filter = new int[upTo];
112+
for (int i = 0; i < upTo; i++) {
113+
filter[i] = i;
143114
}
144-
int blockCount = pages.getFirst().getBlockCount();
145-
Block.Builder[] builders = new Block.Builder[blockCount];
115+
final Block[] blocks = new Block[page.getBlockCount()];
116+
Page result = null;
146117
try {
147-
ElementType[] elementTypes = elementTypes(blockCount, pages);
148-
for (int b = 0; b < blockCount; b++) {
149-
builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
150-
}
151-
int accumulated = 0;
152-
for (Page page : pages) {
153-
int size = Math.min(page.getPositionCount(), upTo - accumulated);
154-
for (int b = 0; b < blockCount; b++) {
155-
Block block = page.getBlock(b);
156-
builders[b].copyFrom(block, 0, size);
157-
}
158-
accumulated += size;
118+
for (int b = 0; b < blocks.length; b++) {
119+
blocks[b] = page.getBlock(b).filter(filter);
159120
}
160-
Block[] blocks = Block.Builder.buildAll(builders);
161-
return new Page(blocks);
121+
result = new Page(blocks);
162122
} finally {
163-
Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
123+
if (result == null) {
124+
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
125+
} else {
126+
page.releaseBlocks();
127+
}
164128
}
129+
return result;
165130
}
166131

167132
@Override
@@ -171,7 +136,9 @@ public Status status() {
171136

172137
@Override
173138
public void close() {
174-
Releasables.close(queue);
139+
if (lastInput != null) {
140+
lastInput.releaseBlocks();
141+
}
175142
}
176143

177144
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void doClose() {
142142
if (randomBoolean()) {
143143
int limit = between(0, ids.size());
144144
it = ids.subList(0, limit).iterator();
145-
intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
145+
intermediateOperators.add(new LimitOperator(new Limiter(limit)));
146146
} else {
147147
it = ids.iterator();
148148
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99

1010
import org.elasticsearch.compute.data.Block;
1111
import org.elasticsearch.compute.data.BlockFactory;
12-
import org.elasticsearch.compute.data.ElementType;
1312
import org.elasticsearch.compute.data.Page;
1413
import org.elasticsearch.compute.test.OperatorTestCase;
1514
import org.elasticsearch.compute.test.RandomBlock;
1615
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
1716
import org.elasticsearch.core.TimeValue;
1817
import org.hamcrest.Matcher;
19-
import org.junit.Before;
2018

2119
import java.util.ArrayList;
2220
import java.util.List;
@@ -30,7 +28,7 @@
3028
public class LimitOperatorTests extends OperatorTestCase {
3129
@Override
3230
protected LimitOperator.Factory simple(SimpleOptions options) {
33-
return new LimitOperator.Factory(100, 500);
31+
return new LimitOperator.Factory(100);
3432
}
3533

3634
@Override
@@ -48,30 +46,6 @@ protected Matcher<String> expectedToStringOfSimple() {
4846
return equalTo("LimitOperator[limit = 100/100]");
4947
}
5048

51-
private ElementType elementType;
52-
53-
@Before
54-
public void setUpElementTypes() throws Exception {
55-
elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
56-
}
57-
58-
private Page randomPage(BlockFactory blockFactory, int size) {
59-
if (randomBoolean()) {
60-
return new Page(blockFactory.newConstantNullBlock(size));
61-
}
62-
Block block = RandomBlock.randomBlock(
63-
blockFactory,
64-
elementType,
65-
size,
66-
elementType == ElementType.NULL || randomBoolean(),
67-
1,
68-
1,
69-
0,
70-
0
71-
).block();
72-
return new Page(block);
73-
}
74-
7549
@Override
7650
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
7751
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
@@ -91,7 +65,6 @@ public void testStatus() {
9165
Page p = new Page(blockFactory.newConstantNullBlock(10));
9266
try {
9367
op.addInput(p);
94-
op.finish();
9568
assertSame(p, op.getOutput());
9669
} finally {
9770
p.releaseBlocks();
@@ -104,38 +77,24 @@ public void testStatus() {
10477

10578
public void testNeedInput() {
10679
BlockFactory blockFactory = driverContext().blockFactory();
107-
// small page size
108-
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
80+
try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
10981
assertTrue(op.needsInput());
110-
Page p = randomPage(blockFactory, 10);
82+
Page p = new Page(blockFactory.newConstantNullBlock(10));
11183
op.addInput(p);
11284
assertFalse(op.needsInput());
11385
op.getOutput().releaseBlocks();
11486
assertTrue(op.needsInput());
11587
op.finish();
11688
assertFalse(op.needsInput());
11789
}
118-
// small page size
119-
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
120-
for (int i = 0; i < 5; i++) {
121-
assertTrue(op.needsInput());
122-
Page p = randomPage(blockFactory, 10);
123-
op.addInput(p);
124-
}
125-
assertFalse(op.needsInput());
126-
op.getOutput().releaseBlocks();
127-
assertTrue(op.needsInput());
128-
op.finish();
129-
assertFalse(op.needsInput());
130-
}
13190
}
13291

13392
public void testBlockBiggerThanRemaining() {
13493
BlockFactory blockFactory = driverContext().blockFactory();
13594
for (int i = 0; i < 100; i++) {
13695
try (var op = simple().get(driverContext())) {
13796
assertTrue(op.needsInput());
138-
Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
97+
Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
13998
op.addInput(p);
14099
assertFalse(op.needsInput());
141100
Page result = op.getOutput();
@@ -155,7 +114,7 @@ public void testBlockPreciselyRemaining() {
155114
for (int i = 0; i < 100; i++) {
156115
try (var op = simple().get(driverContext())) {
157116
assertTrue(op.needsInput());
158-
Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
117+
Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
159118
op.addInput(p);
160119
assertFalse(op.needsInput());
161120
Page result = op.getOutput();
@@ -174,7 +133,7 @@ public void testEarlyTermination() {
174133
int numDrivers = between(1, 4);
175134
final List<Driver> drivers = new ArrayList<>();
176135
final int limit = between(1, 10_000);
177-
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
136+
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
178137
final AtomicInteger receivedRows = new AtomicInteger();
179138
for (int i = 0; i < numDrivers; i++) {
180139
DriverContext driverContext = driverContext();
@@ -193,8 +152,7 @@ public boolean isFinished() {
193152

194153
@Override
195154
public Page getOutput() {
196-
return randomPage(blockFactory(), between(1, 100));
197-
155+
return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
198156
}
199157

200158
@Override

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2020
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2121
import org.elasticsearch.common.util.iterable.Iterables;
22-
import org.elasticsearch.compute.operator.DriverProfile;
23-
import org.elasticsearch.compute.operator.OperatorStatus;
2422
import org.elasticsearch.compute.operator.exchange.ExchangeService;
25-
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
2623
import org.elasticsearch.core.TimeValue;
2724
import org.elasticsearch.plugins.Plugin;
2825
import org.elasticsearch.rest.RestStatus;
@@ -49,7 +46,6 @@
4946
import java.util.concurrent.atomic.AtomicReference;
5047

5148
import static org.hamcrest.Matchers.equalTo;
52-
import static org.hamcrest.Matchers.hasSize;
5349
import static org.hamcrest.Matchers.instanceOf;
5450
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5551

@@ -212,34 +208,6 @@ public void sendResponse(Exception exception) {
212208
}
213209
}
214210

215-
public void testLimitCombineSmallerPages() {
216-
QueryPragmas queryPragmas = randomPragmas();
217-
if (canUseQueryPragmas()) {
218-
Settings.Builder settings = Settings.builder().put(queryPragmas.getSettings());
219-
settings.remove(QueryPragmas.NODE_LEVEL_REDUCTION.getKey());
220-
settings.remove(QueryPragmas.PAGE_SIZE.getKey());
221-
queryPragmas = new QueryPragmas(settings.build());
222-
}
223-
var request = new EsqlQueryRequest();
224-
request.query("FROM test-* | KEEP user | LIMIT 100");
225-
request.pragmas(queryPragmas);
226-
request.profile(true);
227-
try (EsqlQueryResponse resp = run(request)) {
228-
List<DriverProfile> nodeReduce = resp.profile().drivers().stream().filter(s -> s.description().equals("node_reduce")).toList();
229-
for (DriverProfile driverProfile : nodeReduce) {
230-
if (driverProfile.operators().size() == 2) {
231-
continue; // when the target node is also the coordinator node
232-
}
233-
assertThat(driverProfile.operators(), hasSize(3));
234-
OperatorStatus exchangeSink = driverProfile.operators().get(2);
235-
assertThat(exchangeSink.status(), instanceOf(ExchangeSinkOperator.Status.class));
236-
ExchangeSinkOperator.Status exchangeStatus = (ExchangeSinkOperator.Status) exchangeSink.status();
237-
assertThat(exchangeStatus.pagesReceived(), lessThanOrEqualTo(1));
238-
}
239-
assertThat(resp.pages(), hasSize(1));
240-
}
241-
}
242-
243211
static class SearchContextCounter {
244212
private final int maxAllowed;
245213
private final AtomicInteger current = new AtomicInteger();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -837,10 +837,7 @@ private PhysicalOperation planFilter(FilterExec filter, LocalExecutionPlannerCon
837837

838838
private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContext context) {
839839
PhysicalOperation source = plan(limit.child(), context);
840-
final Integer rowSize = limit.estimatedRowSize();
841-
assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
842-
int pageSize = context.pageSize(rowSize);
843-
return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx), pageSize), source.layout);
840+
return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx)), source.layout);
844841
}
845842

846843
private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) {

0 commit comments

Comments
 (0)