Skip to content

Commit 95abd6e

Browse files
dnhatnvaleriy42
authored andcommitted
Revert Combine small pages in Limit (elastic#129107)
This PR reverts elastic#128531. With elastic#128531, the Limit operator was updated to combine smaller pages into a larger page to reduce overhead, such as the number of exchange requests. However, this has a significant implication: the combined larger page does not retain the attributes of the blocks from the smaller pages. For example, if the smaller pages have ordinal-based BytesRef blocks, the larger page will not. This can cause a significant slowdown if subsequent operators have optimizations for ordinal-based blocks. The Enrich operator has such optimizations, and our benchmarks have shown this performance regression. One possible solution to reduce the regression is to set a threshold (e.g., 1000 rows), above which the Limit operator would pass the page along without combining. However, even with a threshold of 1000, the performance regression does not go away completely. Alternatively, we could allow exchange requests to return multiple pages (up to the page size limit). To minimize risk, this PR reverts the previous change, and we will reintroduce a new change later
1 parent eb95689 commit 95abd6e

File tree

6 files changed

+46
-161
lines changed

6 files changed

+46
-161
lines changed

docs/changelog/128531.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

Lines changed: 37 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,14 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.compute.data.Block;
17-
import org.elasticsearch.compute.data.BlockFactory;
18-
import org.elasticsearch.compute.data.ElementType;
1917
import org.elasticsearch.compute.data.Page;
2018
import org.elasticsearch.core.Releasables;
2119
import org.elasticsearch.xcontent.XContentBuilder;
2220

2321
import java.io.IOException;
24-
import java.util.ArrayList;
25-
import java.util.List;
2622
import java.util.Objects;
2723

2824
public class LimitOperator implements Operator {
29-
private final BlockFactory blockFactory;
30-
private final int pageSize;
3125

3226
/**
3327
* Count of pages that have been processed by this operator.
@@ -44,30 +38,25 @@ public class LimitOperator implements Operator {
4438
*/
4539
private long rowsEmitted;
4640

47-
private final Limiter limiter;
41+
private Page lastInput;
4842

49-
private final List<Page> queue = new ArrayList<>();
50-
private int pendingRows;
43+
private final Limiter limiter;
5144
private boolean finished;
5245

53-
public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
46+
public LimitOperator(Limiter limiter) {
5447
this.limiter = limiter;
55-
this.blockFactory = blockFactory;
56-
this.pageSize = pageSize;
5748
}
5849

5950
public static final class Factory implements OperatorFactory {
6051
private final Limiter limiter;
61-
private final int pageSize;
6252

63-
public Factory(int limit, int pageSize) {
53+
public Factory(int limit) {
6454
this.limiter = new Limiter(limit);
65-
this.pageSize = pageSize;
6655
}
6756

6857
@Override
6958
public LimitOperator get(DriverContext driverContext) {
70-
return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
59+
return new LimitOperator(limiter);
7160
}
7261

7362
@Override
@@ -78,20 +67,22 @@ public String describe() {
7867

7968
@Override
8069
public boolean needsInput() {
81-
return readyToEmit() == false;
70+
return finished == false && lastInput == null && limiter.remaining() > 0;
8271
}
8372

8473
@Override
8574
public void addInput(Page page) {
86-
pagesProcessed++;
87-
rowsReceived += page.getPositionCount();
75+
assert lastInput == null : "has pending input page";
8876
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
8977
if (acceptedRows == 0) {
9078
page.releaseBlocks();
79+
assert isFinished();
80+
} else if (acceptedRows < page.getPositionCount()) {
81+
lastInput = truncatePage(page, acceptedRows);
9182
} else {
92-
queue.add(page);
93-
pendingRows += acceptedRows;
83+
lastInput = page;
9484
}
85+
rowsReceived += acceptedRows;
9586
}
9687

9788
@Override
@@ -101,67 +92,41 @@ public void finish() {
10192

10293
@Override
10394
public boolean isFinished() {
104-
return pendingRows == 0 && (finished || limiter.remaining() == 0);
105-
}
106-
107-
private boolean readyToEmit() {
108-
return finished || pendingRows >= pageSize || limiter.remaining() == 0;
95+
return lastInput == null && (finished || limiter.remaining() == 0);
10996
}
11097

11198
@Override
11299
public Page getOutput() {
113-
if (pendingRows > 0 && readyToEmit()) {
114-
final Page result = combinePages(queue, blockFactory, pendingRows);
115-
pendingRows = 0;
116-
rowsEmitted += result.getPositionCount();
117-
return result;
118-
} else {
100+
if (lastInput == null) {
119101
return null;
120102
}
103+
final Page result = lastInput;
104+
lastInput = null;
105+
pagesProcessed++;
106+
rowsEmitted += result.getPositionCount();
107+
return result;
121108
}
122109

123-
private static ElementType[] elementTypes(int blockCount, List<Page> pages) {
124-
ElementType[] elementTypes = new ElementType[blockCount];
125-
for (Page page : pages) {
126-
for (int b = 0; b < blockCount; b++) {
127-
ElementType newType = page.getBlock(b).elementType();
128-
ElementType currType = elementTypes[b];
129-
if (currType == null || currType == ElementType.NULL) {
130-
elementTypes[b] = newType;
131-
} else {
132-
assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
133-
}
134-
}
135-
}
136-
return elementTypes;
137-
}
138-
139-
private static Page combinePages(List<Page> pages, BlockFactory blockFactory, int upTo) {
140-
assert pages.isEmpty() == false : "no pages to combine";
141-
if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
142-
return pages.removeFirst();
110+
private static Page truncatePage(Page page, int upTo) {
111+
int[] filter = new int[upTo];
112+
for (int i = 0; i < upTo; i++) {
113+
filter[i] = i;
143114
}
144-
int blockCount = pages.getFirst().getBlockCount();
145-
Block.Builder[] builders = new Block.Builder[blockCount];
115+
final Block[] blocks = new Block[page.getBlockCount()];
116+
Page result = null;
146117
try {
147-
ElementType[] elementTypes = elementTypes(blockCount, pages);
148-
for (int b = 0; b < blockCount; b++) {
149-
builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
150-
}
151-
int accumulated = 0;
152-
for (Page page : pages) {
153-
int size = Math.min(page.getPositionCount(), upTo - accumulated);
154-
for (int b = 0; b < blockCount; b++) {
155-
Block block = page.getBlock(b);
156-
builders[b].copyFrom(block, 0, size);
157-
}
158-
accumulated += size;
118+
for (int b = 0; b < blocks.length; b++) {
119+
blocks[b] = page.getBlock(b).filter(filter);
159120
}
160-
Block[] blocks = Block.Builder.buildAll(builders);
161-
return new Page(blocks);
121+
result = new Page(blocks);
162122
} finally {
163-
Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
123+
if (result == null) {
124+
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
125+
} else {
126+
page.releaseBlocks();
127+
}
164128
}
129+
return result;
165130
}
166131

167132
@Override
@@ -171,7 +136,9 @@ public Status status() {
171136

172137
@Override
173138
public void close() {
174-
Releasables.close(queue);
139+
if (lastInput != null) {
140+
lastInput.releaseBlocks();
141+
}
175142
}
176143

177144
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void doClose() {
142142
if (randomBoolean()) {
143143
int limit = between(0, ids.size());
144144
it = ids.subList(0, limit).iterator();
145-
intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
145+
intermediateOperators.add(new LimitOperator(new Limiter(limit)));
146146
} else {
147147
it = ids.iterator();
148148
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99

1010
import org.elasticsearch.compute.data.Block;
1111
import org.elasticsearch.compute.data.BlockFactory;
12-
import org.elasticsearch.compute.data.ElementType;
1312
import org.elasticsearch.compute.data.Page;
1413
import org.elasticsearch.compute.test.OperatorTestCase;
1514
import org.elasticsearch.compute.test.RandomBlock;
1615
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
1716
import org.elasticsearch.core.TimeValue;
1817
import org.hamcrest.Matcher;
19-
import org.junit.Before;
2018

2119
import java.util.ArrayList;
2220
import java.util.List;
@@ -30,7 +28,7 @@
3028
public class LimitOperatorTests extends OperatorTestCase {
3129
@Override
3230
protected LimitOperator.Factory simple(SimpleOptions options) {
33-
return new LimitOperator.Factory(100, 500);
31+
return new LimitOperator.Factory(100);
3432
}
3533

3634
@Override
@@ -48,30 +46,6 @@ protected Matcher<String> expectedToStringOfSimple() {
4846
return equalTo("LimitOperator[limit = 100/100]");
4947
}
5048

51-
private ElementType elementType;
52-
53-
@Before
54-
public void setUpElementTypes() throws Exception {
55-
elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
56-
}
57-
58-
private Page randomPage(BlockFactory blockFactory, int size) {
59-
if (randomBoolean()) {
60-
return new Page(blockFactory.newConstantNullBlock(size));
61-
}
62-
Block block = RandomBlock.randomBlock(
63-
blockFactory,
64-
elementType,
65-
size,
66-
elementType == ElementType.NULL || randomBoolean(),
67-
1,
68-
1,
69-
0,
70-
0
71-
).block();
72-
return new Page(block);
73-
}
74-
7549
@Override
7650
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
7751
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
@@ -91,7 +65,6 @@ public void testStatus() {
9165
Page p = new Page(blockFactory.newConstantNullBlock(10));
9266
try {
9367
op.addInput(p);
94-
op.finish();
9568
assertSame(p, op.getOutput());
9669
} finally {
9770
p.releaseBlocks();
@@ -104,38 +77,24 @@ public void testStatus() {
10477

10578
public void testNeedInput() {
10679
BlockFactory blockFactory = driverContext().blockFactory();
107-
// small page size
108-
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
80+
try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
10981
assertTrue(op.needsInput());
110-
Page p = randomPage(blockFactory, 10);
82+
Page p = new Page(blockFactory.newConstantNullBlock(10));
11183
op.addInput(p);
11284
assertFalse(op.needsInput());
11385
op.getOutput().releaseBlocks();
11486
assertTrue(op.needsInput());
11587
op.finish();
11688
assertFalse(op.needsInput());
11789
}
118-
// small page size
119-
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
120-
for (int i = 0; i < 5; i++) {
121-
assertTrue(op.needsInput());
122-
Page p = randomPage(blockFactory, 10);
123-
op.addInput(p);
124-
}
125-
assertFalse(op.needsInput());
126-
op.getOutput().releaseBlocks();
127-
assertTrue(op.needsInput());
128-
op.finish();
129-
assertFalse(op.needsInput());
130-
}
13190
}
13291

13392
public void testBlockBiggerThanRemaining() {
13493
BlockFactory blockFactory = driverContext().blockFactory();
13594
for (int i = 0; i < 100; i++) {
13695
try (var op = simple().get(driverContext())) {
13796
assertTrue(op.needsInput());
138-
Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
97+
Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
13998
op.addInput(p);
14099
assertFalse(op.needsInput());
141100
Page result = op.getOutput();
@@ -155,7 +114,7 @@ public void testBlockPreciselyRemaining() {
155114
for (int i = 0; i < 100; i++) {
156115
try (var op = simple().get(driverContext())) {
157116
assertTrue(op.needsInput());
158-
Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
117+
Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
159118
op.addInput(p);
160119
assertFalse(op.needsInput());
161120
Page result = op.getOutput();
@@ -174,7 +133,7 @@ public void testEarlyTermination() {
174133
int numDrivers = between(1, 4);
175134
final List<Driver> drivers = new ArrayList<>();
176135
final int limit = between(1, 10_000);
177-
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
136+
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
178137
final AtomicInteger receivedRows = new AtomicInteger();
179138
for (int i = 0; i < numDrivers; i++) {
180139
DriverContext driverContext = driverContext();
@@ -193,8 +152,7 @@ public boolean isFinished() {
193152

194153
@Override
195154
public Page getOutput() {
196-
return randomPage(blockFactory(), between(1, 100));
197-
155+
return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
198156
}
199157

200158
@Override

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2020
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2121
import org.elasticsearch.common.util.iterable.Iterables;
22-
import org.elasticsearch.compute.operator.DriverProfile;
23-
import org.elasticsearch.compute.operator.OperatorStatus;
2422
import org.elasticsearch.compute.operator.exchange.ExchangeService;
25-
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
2623
import org.elasticsearch.core.TimeValue;
2724
import org.elasticsearch.plugins.Plugin;
2825
import org.elasticsearch.rest.RestStatus;
@@ -49,7 +46,6 @@
4946
import java.util.concurrent.atomic.AtomicReference;
5047

5148
import static org.hamcrest.Matchers.equalTo;
52-
import static org.hamcrest.Matchers.hasSize;
5349
import static org.hamcrest.Matchers.instanceOf;
5450
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5551

@@ -212,34 +208,6 @@ public void sendResponse(Exception exception) {
212208
}
213209
}
214210

215-
public void testLimitCombineSmallerPages() {
216-
QueryPragmas queryPragmas = randomPragmas();
217-
if (canUseQueryPragmas()) {
218-
Settings.Builder settings = Settings.builder().put(queryPragmas.getSettings());
219-
settings.remove(QueryPragmas.NODE_LEVEL_REDUCTION.getKey());
220-
settings.remove(QueryPragmas.PAGE_SIZE.getKey());
221-
queryPragmas = new QueryPragmas(settings.build());
222-
}
223-
var request = new EsqlQueryRequest();
224-
request.query("FROM test-* | KEEP user | LIMIT 100");
225-
request.pragmas(queryPragmas);
226-
request.profile(true);
227-
try (EsqlQueryResponse resp = run(request)) {
228-
List<DriverProfile> nodeReduce = resp.profile().drivers().stream().filter(s -> s.description().equals("node_reduce")).toList();
229-
for (DriverProfile driverProfile : nodeReduce) {
230-
if (driverProfile.operators().size() == 2) {
231-
continue; // when the target node is also the coordinator node
232-
}
233-
assertThat(driverProfile.operators(), hasSize(3));
234-
OperatorStatus exchangeSink = driverProfile.operators().get(2);
235-
assertThat(exchangeSink.status(), instanceOf(ExchangeSinkOperator.Status.class));
236-
ExchangeSinkOperator.Status exchangeStatus = (ExchangeSinkOperator.Status) exchangeSink.status();
237-
assertThat(exchangeStatus.pagesReceived(), lessThanOrEqualTo(1));
238-
}
239-
assertThat(resp.pages(), hasSize(1));
240-
}
241-
}
242-
243211
static class SearchContextCounter {
244212
private final int maxAllowed;
245213
private final AtomicInteger current = new AtomicInteger();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -837,10 +837,7 @@ private PhysicalOperation planFilter(FilterExec filter, LocalExecutionPlannerCon
837837

838838
private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContext context) {
839839
PhysicalOperation source = plan(limit.child(), context);
840-
final Integer rowSize = limit.estimatedRowSize();
841-
assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
842-
int pageSize = context.pageSize(rowSize);
843-
return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx), pageSize), source.layout);
840+
return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx)), source.layout);
844841
}
845842

846843
private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) {

0 commit comments

Comments
 (0)