Skip to content

Commit 3f78b48

Browse files
committed
Combine small pages in Limit
1 parent 8484b71 commit 3f78b48

File tree

14 files changed

+233
-68
lines changed

14 files changed

+233
-68
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ static TransportVersion def(int id) {
266266
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_RERANK_ADDED = def(9_080_0_00);
267267
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_081_0_00);
268268
public static final TransportVersion ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION = def(9_082_0_00);
269+
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_083_0_00);
270+
269271
/*
270272
* STOP! READ THIS FIRST! No, really,
271273
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.core.Releasable;
1415
import org.elasticsearch.core.Releasables;
1516

1617
import java.io.IOException;
@@ -28,7 +29,7 @@
2829
*
2930
* <p> Pages are immutable and can be passed between threads.
3031
*/
31-
public final class Page implements Writeable {
32+
public final class Page implements Writeable, Releasable {
3233

3334
private final Block[] blocks;
3435

@@ -244,6 +245,11 @@ public void releaseBlocks() {
244245
Releasables.closeExpectNoException(blocks);
245246
}
246247

248+
@Override
249+
public void close() {
250+
releaseBlocks();
251+
}
252+
247253
/**
248254
* Before passing a Page to another Driver, it is necessary to switch the owning block factories of its Blocks to their parents,
249255
* which are associated with the global circuit breaker. This ensures that when the new driver releases this Page, it returns

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

Lines changed: 71 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.compute.data.Block;
17+
import org.elasticsearch.compute.data.BlockFactory;
18+
import org.elasticsearch.compute.data.ElementType;
1719
import org.elasticsearch.compute.data.Page;
1820
import org.elasticsearch.core.Releasables;
1921
import org.elasticsearch.xcontent.XContentBuilder;
2022

2123
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.List;
2226
import java.util.Objects;
2327

2428
public class LimitOperator implements Operator {
@@ -38,51 +42,56 @@ public class LimitOperator implements Operator {
3842
*/
3943
private long rowsEmitted;
4044

41-
private Page lastInput;
42-
4345
private final Limiter limiter;
4446
private boolean finished;
4547

46-
public LimitOperator(Limiter limiter) {
48+
private final int pageSize;
49+
private int pendingRows;
50+
private final List<Page> queue = new ArrayList<>();
51+
private final BlockFactory blockFactory;
52+
53+
public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
4754
this.limiter = limiter;
55+
this.blockFactory = blockFactory;
56+
this.pageSize = pageSize;
4857
}
4958

5059
public static final class Factory implements OperatorFactory {
5160
private final Limiter limiter;
61+
private final int pageSize;
5262

53-
public Factory(int limit) {
63+
public Factory(int limit, int pageSize) {
5464
this.limiter = new Limiter(limit);
65+
this.pageSize = pageSize;
5566
}
5667

5768
@Override
5869
public LimitOperator get(DriverContext driverContext) {
59-
return new LimitOperator(limiter);
70+
return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
6071
}
6172

6273
@Override
6374
public String describe() {
64-
return "LimitOperator[limit = " + limiter.limit() + "]";
75+
return "LimitOperator[limit = " + limiter.limit() + ", pageSize = " + pageSize + "]";
6576
}
6677
}
6778

6879
@Override
6980
public boolean needsInput() {
70-
return finished == false && lastInput == null && limiter.remaining() > 0;
81+
return readyToEmit() == false;
7182
}
7283

7384
@Override
7485
public void addInput(Page page) {
75-
assert lastInput == null : "has pending input page";
86+
pagesProcessed++;
87+
rowsReceived += page.getPositionCount();
7688
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
7789
if (acceptedRows == 0) {
7890
page.releaseBlocks();
79-
assert isFinished();
80-
} else if (acceptedRows < page.getPositionCount()) {
81-
lastInput = truncatePage(page, acceptedRows);
8291
} else {
83-
lastInput = page;
92+
queue.add(page);
93+
pendingRows += acceptedRows;
8494
}
85-
rowsReceived += acceptedRows;
8695
}
8796

8897
@Override
@@ -92,41 +101,67 @@ public void finish() {
92101

93102
@Override
94103
public boolean isFinished() {
95-
return lastInput == null && (finished || limiter.remaining() == 0);
104+
return pendingRows == 0 && (finished || limiter.remaining() == 0);
105+
}
106+
107+
private boolean readyToEmit() {
108+
return finished || pendingRows >= pageSize || limiter.remaining() == 0;
96109
}
97110

98111
@Override
99112
public Page getOutput() {
100-
if (lastInput == null) {
113+
if (pendingRows > 0 && readyToEmit()) {
114+
final Page result = combinePages(queue, blockFactory, pendingRows);
115+
pendingRows = 0;
116+
rowsEmitted += result.getPositionCount();
117+
return result;
118+
} else {
101119
return null;
102120
}
103-
final Page result = lastInput;
104-
lastInput = null;
105-
pagesProcessed++;
106-
rowsEmitted += result.getPositionCount();
107-
return result;
108121
}
109122

110-
private static Page truncatePage(Page page, int upTo) {
111-
int[] filter = new int[upTo];
112-
for (int i = 0; i < upTo; i++) {
113-
filter[i] = i;
123+
private static ElementType[] elementTypes(int blockCount, List<Page> pages) {
124+
ElementType[] elementTypes = new ElementType[blockCount];
125+
for (Page page : pages) {
126+
for (int b = 0; b < blockCount; b++) {
127+
ElementType newType = page.getBlock(b).elementType();
128+
ElementType currType = elementTypes[b];
129+
if (currType == null || currType == ElementType.NULL) {
130+
elementTypes[b] = newType;
131+
} else {
132+
assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
133+
}
134+
}
114135
}
115-
final Block[] blocks = new Block[page.getBlockCount()];
116-
Page result = null;
136+
return elementTypes;
137+
}
138+
139+
private static Page combinePages(List<Page> pages, BlockFactory blockFactory, int upTo) {
140+
assert pages.isEmpty() == false : "no pages to combine";
141+
if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
142+
return pages.removeFirst();
143+
}
144+
int blockCount = pages.getFirst().getBlockCount();
145+
Block.Builder[] builders = new Block.Builder[blockCount];
117146
try {
118-
for (int b = 0; b < blocks.length; b++) {
119-
blocks[b] = page.getBlock(b).filter(filter);
147+
ElementType[] elementTypes = elementTypes(blockCount, pages);
148+
for (int b = 0; b < blockCount; b++) {
149+
builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
120150
}
121-
result = new Page(blocks);
122-
} finally {
123-
if (result == null) {
124-
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
125-
} else {
126-
page.releaseBlocks();
151+
int accumulated = 0;
152+
for (Page page : pages) {
153+
int size = Math.min(page.getPositionCount(), upTo - accumulated);
154+
for (int b = 0; b < blockCount; b++) {
155+
Block block = page.getBlock(b);
156+
builders[b].copyFrom(block, 0, size);
157+
}
158+
accumulated += size;
127159
}
160+
Block[] blocks = Block.Builder.buildAll(builders);
161+
return new Page(blocks);
162+
} finally {
163+
Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
128164
}
129-
return result;
130165
}
131166

132167
@Override
@@ -136,9 +171,7 @@ public Status status() {
136171

137172
@Override
138173
public void close() {
139-
if (lastInput != null) {
140-
lastInput.releaseBlocks();
141-
}
174+
Releasables.close(queue);
142175
}
143176

144177
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public void testLimitOperator() {
254254
var driver = TestDriverFactory.create(
255255
driverContext,
256256
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
257-
List.of((new LimitOperator.Factory(limit)).get(driverContext)),
257+
List.of((new LimitOperator.Factory(limit, between(1024, 2048))).get(driverContext)),
258258
new PageConsumerOperator(page -> {
259259
LongBlock block = page.getBlock(0);
260260
for (int i = 0; i < page.getPositionCount(); i++) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void doClose() {
142142
if (randomBoolean()) {
143143
int limit = between(0, ids.size());
144144
it = ids.subList(0, limit).iterator();
145-
intermediateOperators.add(new LimitOperator(new Limiter(limit)));
145+
intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
146146
} else {
147147
it = ids.iterator();
148148
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99

1010
import org.elasticsearch.compute.data.Block;
1111
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.ElementType;
1213
import org.elasticsearch.compute.data.Page;
1314
import org.elasticsearch.compute.test.OperatorTestCase;
1415
import org.elasticsearch.compute.test.RandomBlock;
1516
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
1617
import org.elasticsearch.core.TimeValue;
1718
import org.hamcrest.Matcher;
19+
import org.junit.Before;
1820

1921
import java.util.ArrayList;
2022
import java.util.List;
@@ -28,7 +30,11 @@
2830
public class LimitOperatorTests extends OperatorTestCase {
2931
@Override
3032
protected LimitOperator.Factory simple(SimpleOptions options) {
31-
return new LimitOperator.Factory(100);
33+
return new LimitOperator.Factory(100, 500);
34+
}
35+
36+
private LimitOperator.Factory limitOperator(int limit, int pageSize) {
37+
return new LimitOperator.Factory(limit, pageSize);
3238
}
3339

3440
@Override
@@ -38,14 +44,38 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
3844

3945
@Override
4046
protected Matcher<String> expectedDescriptionOfSimple() {
41-
return equalTo("LimitOperator[limit = 100]");
47+
return equalTo("LimitOperator[limit = 100, pageSize = 500]");
4248
}
4349

4450
@Override
4551
protected Matcher<String> expectedToStringOfSimple() {
4652
return equalTo("LimitOperator[limit = 100/100]");
4753
}
4854

55+
private ElementType elementType;
56+
57+
@Before
58+
public void setUpElementTypes() throws Exception {
59+
elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
60+
}
61+
62+
private Page randomPage(BlockFactory blockFactory, int size) {
63+
if (randomBoolean()) {
64+
return new Page(blockFactory.newConstantNullBlock(size));
65+
}
66+
Block block = RandomBlock.randomBlock(
67+
blockFactory,
68+
elementType,
69+
size,
70+
elementType == ElementType.NULL || randomBoolean(),
71+
1,
72+
1,
73+
0,
74+
0
75+
).block();
76+
return new Page(block);
77+
}
78+
4979
@Override
5080
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
5181
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
@@ -65,6 +95,7 @@ public void testStatus() {
6595
Page p = new Page(blockFactory.newConstantNullBlock(10));
6696
try {
6797
op.addInput(p);
98+
op.finish();
6899
assertSame(p, op.getOutput());
69100
} finally {
70101
p.releaseBlocks();
@@ -77,24 +108,38 @@ public void testStatus() {
77108

78109
public void testNeedInput() {
79110
BlockFactory blockFactory = driverContext().blockFactory();
80-
try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
111+
// small page size
112+
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
81113
assertTrue(op.needsInput());
82-
Page p = new Page(blockFactory.newConstantNullBlock(10));
114+
Page p = randomPage(blockFactory, 10);
83115
op.addInput(p);
84116
assertFalse(op.needsInput());
85117
op.getOutput().releaseBlocks();
86118
assertTrue(op.needsInput());
87119
op.finish();
88120
assertFalse(op.needsInput());
89121
}
122+
// small page size
123+
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
124+
for (int i = 0; i < 5; i++) {
125+
assertTrue(op.needsInput());
126+
Page p = randomPage(blockFactory, 10);
127+
op.addInput(p);
128+
}
129+
assertFalse(op.needsInput());
130+
op.getOutput().releaseBlocks();
131+
assertTrue(op.needsInput());
132+
op.finish();
133+
assertFalse(op.needsInput());
134+
}
90135
}
91136

92137
public void testBlockBiggerThanRemaining() {
93138
BlockFactory blockFactory = driverContext().blockFactory();
94139
for (int i = 0; i < 100; i++) {
95140
try (var op = simple().get(driverContext())) {
96141
assertTrue(op.needsInput());
97-
Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
142+
Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
98143
op.addInput(p);
99144
assertFalse(op.needsInput());
100145
Page result = op.getOutput();
@@ -114,7 +159,7 @@ public void testBlockPreciselyRemaining() {
114159
for (int i = 0; i < 100; i++) {
115160
try (var op = simple().get(driverContext())) {
116161
assertTrue(op.needsInput());
117-
Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
162+
Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
118163
op.addInput(p);
119164
assertFalse(op.needsInput());
120165
Page result = op.getOutput();
@@ -133,7 +178,7 @@ public void testEarlyTermination() {
133178
int numDrivers = between(1, 4);
134179
final List<Driver> drivers = new ArrayList<>();
135180
final int limit = between(1, 10_000);
136-
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
181+
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
137182
final AtomicInteger receivedRows = new AtomicInteger();
138183
for (int i = 0; i < numDrivers; i++) {
139184
DriverContext driverContext = driverContext();
@@ -152,7 +197,8 @@ public boolean isFinished() {
152197

153198
@Override
154199
public Page getOutput() {
155-
return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
200+
return randomPage(blockFactory(), between(1, 100));
201+
156202
}
157203

158204
@Override

0 commit comments

Comments
 (0)