Skip to content

Commit aa317da

Browse files
committed
Combine small pages in Limit (elastic#128531)
Currently, the Limit operator does not combine small pages into larger ones; it simply passes them along, except for chunking pages larger than the limit. This change integrates EstimatesRowSize into Limit and adjusts it to emit larger pages. As a result, pages up to twice the pageSize may be emitted, which is preferable to emitting undersized pages. This should reduce the number of transport requests and responses between clusters or coordinator-data nodes for queries without TopN or STATS when target shards produce small pages due to their size or highly selective filters.
1 parent 3b91069 commit aa317da

File tree

14 files changed

+237
-66
lines changed

14 files changed

+237
-66
lines changed

docs/changelog/128531.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128531
2+
summary: Combine small pages in Limit
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,8 @@ static TransportVersion def(int id) {
271271
public static final TransportVersion ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION = def(9_082_0_00);
272272
public static final TransportVersion ML_INFERENCE_VERTEXAI_CHATCOMPLETION_ADDED = def(9_083_0_00);
273273
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED = def(9_084_0_00);
274+
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
275+
274276
/*
275277
* STOP! READ THIS FIRST! No, really,
276278
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.core.Releasable;
1415
import org.elasticsearch.core.Releasables;
1516

1617
import java.io.IOException;
@@ -28,7 +29,7 @@
2829
*
2930
* <p> Pages are immutable and can be passed between threads.
3031
*/
31-
public final class Page implements Writeable {
32+
public final class Page implements Writeable, Releasable {
3233

3334
private final Block[] blocks;
3435

@@ -244,6 +245,11 @@ public void releaseBlocks() {
244245
Releasables.closeExpectNoException(blocks);
245246
}
246247

248+
@Override
249+
public void close() {
250+
releaseBlocks();
251+
}
252+
247253
/**
248254
* Before passing a Page to another Driver, it is necessary to switch the owning block factories of its Blocks to their parents,
249255
* which are associated with the global circuit breaker. This ensures that when the new driver releases this Page, it returns

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

Lines changed: 70 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,20 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.compute.data.Block;
17+
import org.elasticsearch.compute.data.BlockFactory;
18+
import org.elasticsearch.compute.data.ElementType;
1719
import org.elasticsearch.compute.data.Page;
1820
import org.elasticsearch.core.Releasables;
1921
import org.elasticsearch.xcontent.XContentBuilder;
2022

2123
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.List;
2226
import java.util.Objects;
2327

2428
public class LimitOperator implements Operator {
29+
private final BlockFactory blockFactory;
30+
private final int pageSize;
2531

2632
/**
2733
* Count of pages that have been processed by this operator.
@@ -38,25 +44,30 @@ public class LimitOperator implements Operator {
3844
*/
3945
private long rowsEmitted;
4046

41-
private Page lastInput;
42-
4347
private final Limiter limiter;
48+
49+
private final List<Page> queue = new ArrayList<>();
50+
private int pendingRows;
4451
private boolean finished;
4552

46-
public LimitOperator(Limiter limiter) {
53+
public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
4754
this.limiter = limiter;
55+
this.blockFactory = blockFactory;
56+
this.pageSize = pageSize;
4857
}
4958

5059
public static final class Factory implements OperatorFactory {
5160
private final Limiter limiter;
61+
private final int pageSize;
5262

53-
public Factory(int limit) {
63+
public Factory(int limit, int pageSize) {
5464
this.limiter = new Limiter(limit);
65+
this.pageSize = pageSize;
5566
}
5667

5768
@Override
5869
public LimitOperator get(DriverContext driverContext) {
59-
return new LimitOperator(limiter);
70+
return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
6071
}
6172

6273
@Override
@@ -67,22 +78,20 @@ public String describe() {
6778

6879
@Override
6980
public boolean needsInput() {
70-
return finished == false && lastInput == null && limiter.remaining() > 0;
81+
return readyToEmit() == false;
7182
}
7283

7384
@Override
7485
public void addInput(Page page) {
75-
assert lastInput == null : "has pending input page";
86+
pagesProcessed++;
87+
rowsReceived += page.getPositionCount();
7688
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
7789
if (acceptedRows == 0) {
7890
page.releaseBlocks();
79-
assert isFinished();
80-
} else if (acceptedRows < page.getPositionCount()) {
81-
lastInput = truncatePage(page, acceptedRows);
8291
} else {
83-
lastInput = page;
92+
queue.add(page);
93+
pendingRows += acceptedRows;
8494
}
85-
rowsReceived += acceptedRows;
8695
}
8796

8897
@Override
@@ -92,41 +101,67 @@ public void finish() {
92101

93102
@Override
94103
public boolean isFinished() {
95-
return lastInput == null && (finished || limiter.remaining() == 0);
104+
return pendingRows == 0 && (finished || limiter.remaining() == 0);
105+
}
106+
107+
private boolean readyToEmit() {
108+
return finished || pendingRows >= pageSize || limiter.remaining() == 0;
96109
}
97110

98111
@Override
99112
public Page getOutput() {
100-
if (lastInput == null) {
113+
if (pendingRows > 0 && readyToEmit()) {
114+
final Page result = combinePages(queue, blockFactory, pendingRows);
115+
pendingRows = 0;
116+
rowsEmitted += result.getPositionCount();
117+
return result;
118+
} else {
101119
return null;
102120
}
103-
final Page result = lastInput;
104-
lastInput = null;
105-
pagesProcessed++;
106-
rowsEmitted += result.getPositionCount();
107-
return result;
108121
}
109122

110-
private static Page truncatePage(Page page, int upTo) {
111-
int[] filter = new int[upTo];
112-
for (int i = 0; i < upTo; i++) {
113-
filter[i] = i;
123+
private static ElementType[] elementTypes(int blockCount, List<Page> pages) {
124+
ElementType[] elementTypes = new ElementType[blockCount];
125+
for (Page page : pages) {
126+
for (int b = 0; b < blockCount; b++) {
127+
ElementType newType = page.getBlock(b).elementType();
128+
ElementType currType = elementTypes[b];
129+
if (currType == null || currType == ElementType.NULL) {
130+
elementTypes[b] = newType;
131+
} else {
132+
assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
133+
}
134+
}
114135
}
115-
final Block[] blocks = new Block[page.getBlockCount()];
116-
Page result = null;
136+
return elementTypes;
137+
}
138+
139+
private static Page combinePages(List<Page> pages, BlockFactory blockFactory, int upTo) {
140+
assert pages.isEmpty() == false : "no pages to combine";
141+
if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
142+
return pages.removeFirst();
143+
}
144+
int blockCount = pages.getFirst().getBlockCount();
145+
Block.Builder[] builders = new Block.Builder[blockCount];
117146
try {
118-
for (int b = 0; b < blocks.length; b++) {
119-
blocks[b] = page.getBlock(b).filter(filter);
147+
ElementType[] elementTypes = elementTypes(blockCount, pages);
148+
for (int b = 0; b < blockCount; b++) {
149+
builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
120150
}
121-
result = new Page(blocks);
122-
} finally {
123-
if (result == null) {
124-
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
125-
} else {
126-
page.releaseBlocks();
151+
int accumulated = 0;
152+
for (Page page : pages) {
153+
int size = Math.min(page.getPositionCount(), upTo - accumulated);
154+
for (int b = 0; b < blockCount; b++) {
155+
Block block = page.getBlock(b);
156+
builders[b].copyFrom(block, 0, size);
157+
}
158+
accumulated += size;
127159
}
160+
Block[] blocks = Block.Builder.buildAll(builders);
161+
return new Page(blocks);
162+
} finally {
163+
Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
128164
}
129-
return result;
130165
}
131166

132167
@Override
@@ -136,9 +171,7 @@ public Status status() {
136171

137172
@Override
138173
public void close() {
139-
if (lastInput != null) {
140-
lastInput.releaseBlocks();
141-
}
174+
Releasables.close(queue);
142175
}
143176

144177
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void doClose() {
142142
if (randomBoolean()) {
143143
int limit = between(0, ids.size());
144144
it = ids.subList(0, limit).iterator();
145-
intermediateOperators.add(new LimitOperator(new Limiter(limit)));
145+
intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
146146
} else {
147147
it = ids.iterator();
148148
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99

1010
import org.elasticsearch.compute.data.Block;
1111
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.ElementType;
1213
import org.elasticsearch.compute.data.Page;
1314
import org.elasticsearch.compute.test.OperatorTestCase;
1415
import org.elasticsearch.compute.test.RandomBlock;
1516
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
1617
import org.elasticsearch.core.TimeValue;
1718
import org.hamcrest.Matcher;
19+
import org.junit.Before;
1820

1921
import java.util.ArrayList;
2022
import java.util.List;
@@ -28,7 +30,7 @@
2830
public class LimitOperatorTests extends OperatorTestCase {
2931
@Override
3032
protected LimitOperator.Factory simple(SimpleOptions options) {
31-
return new LimitOperator.Factory(100);
33+
return new LimitOperator.Factory(100, 500);
3234
}
3335

3436
@Override
@@ -46,6 +48,30 @@ protected Matcher<String> expectedToStringOfSimple() {
4648
return equalTo("LimitOperator[limit = 100/100]");
4749
}
4850

51+
private ElementType elementType;
52+
53+
@Before
54+
public void setUpElementTypes() throws Exception {
55+
elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
56+
}
57+
58+
private Page randomPage(BlockFactory blockFactory, int size) {
59+
if (randomBoolean()) {
60+
return new Page(blockFactory.newConstantNullBlock(size));
61+
}
62+
Block block = RandomBlock.randomBlock(
63+
blockFactory,
64+
elementType,
65+
size,
66+
elementType == ElementType.NULL || randomBoolean(),
67+
1,
68+
1,
69+
0,
70+
0
71+
).block();
72+
return new Page(block);
73+
}
74+
4975
@Override
5076
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
5177
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
@@ -65,6 +91,7 @@ public void testStatus() {
6591
Page p = new Page(blockFactory.newConstantNullBlock(10));
6692
try {
6793
op.addInput(p);
94+
op.finish();
6895
assertSame(p, op.getOutput());
6996
} finally {
7097
p.releaseBlocks();
@@ -77,24 +104,38 @@ public void testStatus() {
77104

78105
public void testNeedInput() {
79106
BlockFactory blockFactory = driverContext().blockFactory();
80-
try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
107+
// small page size
108+
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
81109
assertTrue(op.needsInput());
82-
Page p = new Page(blockFactory.newConstantNullBlock(10));
110+
Page p = randomPage(blockFactory, 10);
83111
op.addInput(p);
84112
assertFalse(op.needsInput());
85113
op.getOutput().releaseBlocks();
86114
assertTrue(op.needsInput());
87115
op.finish();
88116
assertFalse(op.needsInput());
89117
}
118+
// small page size
119+
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
120+
for (int i = 0; i < 5; i++) {
121+
assertTrue(op.needsInput());
122+
Page p = randomPage(blockFactory, 10);
123+
op.addInput(p);
124+
}
125+
assertFalse(op.needsInput());
126+
op.getOutput().releaseBlocks();
127+
assertTrue(op.needsInput());
128+
op.finish();
129+
assertFalse(op.needsInput());
130+
}
90131
}
91132

92133
public void testBlockBiggerThanRemaining() {
93134
BlockFactory blockFactory = driverContext().blockFactory();
94135
for (int i = 0; i < 100; i++) {
95136
try (var op = simple().get(driverContext())) {
96137
assertTrue(op.needsInput());
97-
Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
138+
Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
98139
op.addInput(p);
99140
assertFalse(op.needsInput());
100141
Page result = op.getOutput();
@@ -114,7 +155,7 @@ public void testBlockPreciselyRemaining() {
114155
for (int i = 0; i < 100; i++) {
115156
try (var op = simple().get(driverContext())) {
116157
assertTrue(op.needsInput());
117-
Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
158+
Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
118159
op.addInput(p);
119160
assertFalse(op.needsInput());
120161
Page result = op.getOutput();
@@ -133,7 +174,7 @@ public void testEarlyTermination() {
133174
int numDrivers = between(1, 4);
134175
final List<Driver> drivers = new ArrayList<>();
135176
final int limit = between(1, 10_000);
136-
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
177+
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
137178
final AtomicInteger receivedRows = new AtomicInteger();
138179
for (int i = 0; i < numDrivers; i++) {
139180
DriverContext driverContext = driverContext();
@@ -152,7 +193,8 @@ public boolean isFinished() {
152193

153194
@Override
154195
public Page getOutput() {
155-
return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
196+
return randomPage(blockFactory(), between(1, 100));
197+
156198
}
157199

158200
@Override

0 commit comments

Comments
 (0)