Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/128531.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128531
summary: Combine small pages in Limit
area: ES|QL
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_RERANK_ADDED = def(9_080_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_081_0_00);
public static final TransportVersion ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION = def(9_082_0_00);
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_083_0_00);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand All @@ -28,7 +29,7 @@
*
* <p> Pages are immutable and can be passed between threads.
*/
public final class Page implements Writeable {
public final class Page implements Writeable, Releasable {

private final Block[] blocks;

Expand Down Expand Up @@ -244,6 +245,11 @@ public void releaseBlocks() {
Releasables.closeExpectNoException(blocks);
}

@Override
public void close() {
releaseBlocks();
}

/**
* Before passing a Page to another Driver, it is necessary to switch the owning block factories of its Blocks to their parents,
* which are associated with the global circuit breaker. This ensures that when the new driver releases this Page, it returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class LimitOperator implements Operator {
Expand All @@ -38,25 +42,32 @@ public class LimitOperator implements Operator {
*/
private long rowsEmitted;

private Page lastInput;

private final Limiter limiter;
private boolean finished;

public LimitOperator(Limiter limiter) {
private final int pageSize;
private int pendingRows;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this one below the final ones? I just want to keep the mutable ones not mixed in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I regrouped these in c94c72c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

private final List<Page> queue = new ArrayList<>();
private final BlockFactory blockFactory;

public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
this.limiter = limiter;
this.blockFactory = blockFactory;
this.pageSize = pageSize;
}

public static final class Factory implements OperatorFactory {
private final Limiter limiter;
private final int pageSize;

public Factory(int limit) {
public Factory(int limit, int pageSize) {
this.limiter = new Limiter(limit);
this.pageSize = pageSize;
}

@Override
public LimitOperator get(DriverContext driverContext) {
return new LimitOperator(limiter);
return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
}

@Override
Expand All @@ -67,22 +78,20 @@ public String describe() {

@Override
public boolean needsInput() {
return finished == false && lastInput == null && limiter.remaining() > 0;
return readyToEmit() == false;
}

@Override
public void addInput(Page page) {
assert lastInput == null : "has pending input page";
pagesProcessed++;
rowsReceived += page.getPositionCount();
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
if (acceptedRows == 0) {
page.releaseBlocks();
assert isFinished();
} else if (acceptedRows < page.getPositionCount()) {
lastInput = truncatePage(page, acceptedRows);
} else {
lastInput = page;
queue.add(page);
pendingRows += acceptedRows;
}
rowsReceived += acceptedRows;
}

@Override
Expand All @@ -92,41 +101,67 @@ public void finish() {

@Override
public boolean isFinished() {
return lastInput == null && (finished || limiter.remaining() == 0);
return pendingRows == 0 && (finished || limiter.remaining() == 0);
}

private boolean readyToEmit() {
return finished || pendingRows >= pageSize || limiter.remaining() == 0;
}

@Override
public Page getOutput() {
if (lastInput == null) {
if (pendingRows > 0 && readyToEmit()) {
final Page result = combinePages(queue, blockFactory, pendingRows);
pendingRows = 0;
rowsEmitted += result.getPositionCount();
return result;
} else {
return null;
}
final Page result = lastInput;
lastInput = null;
pagesProcessed++;
rowsEmitted += result.getPositionCount();
return result;
}

private static Page truncatePage(Page page, int upTo) {
int[] filter = new int[upTo];
for (int i = 0; i < upTo; i++) {
filter[i] = i;
private static ElementType[] elementTypes(int blockCount, List<Page> pages) {
ElementType[] elementTypes = new ElementType[blockCount];
for (Page page : pages) {
for (int b = 0; b < blockCount; b++) {
ElementType newType = page.getBlock(b).elementType();
ElementType currType = elementTypes[b];
if (currType == null || currType == ElementType.NULL) {
elementTypes[b] = newType;
} else {
assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
}
}
}
final Block[] blocks = new Block[page.getBlockCount()];
Page result = null;
return elementTypes;
}

private static Page combinePages(List<Page> pages, BlockFactory blockFactory, int upTo) {
assert pages.isEmpty() == false : "no pages to combine";
if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
return pages.removeFirst();
}
int blockCount = pages.getFirst().getBlockCount();
Block.Builder[] builders = new Block.Builder[blockCount];
try {
for (int b = 0; b < blocks.length; b++) {
blocks[b] = page.getBlock(b).filter(filter);
ElementType[] elementTypes = elementTypes(blockCount, pages);
for (int b = 0; b < blockCount; b++) {
builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
}
result = new Page(blocks);
} finally {
if (result == null) {
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
} else {
page.releaseBlocks();
int accumulated = 0;
for (Page page : pages) {
int size = Math.min(page.getPositionCount(), upTo - accumulated);
for (int b = 0; b < blockCount; b++) {
Block block = page.getBlock(b);
builders[b].copyFrom(block, 0, size);
}
accumulated += size;
}
Block[] blocks = Block.Builder.buildAll(builders);
return new Page(blocks);
} finally {
Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
}
return result;
}

@Override
Expand All @@ -136,9 +171,7 @@ public Status status() {

@Override
public void close() {
if (lastInput != null) {
lastInput.releaseBlocks();
}
Releasables.close(queue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testLimitOperator() {
var driver = TestDriverFactory.create(
driverContext,
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
List.of((new LimitOperator.Factory(limit)).get(driverContext)),
List.of((new LimitOperator.Factory(limit, between(1024, 2048))).get(driverContext)),
new PageConsumerOperator(page -> {
LongBlock block = page.getBlock(0);
for (int i = 0; i < page.getPositionCount(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void doClose() {
if (randomBoolean()) {
int limit = between(0, ids.size());
it = ids.subList(0, limit).iterator();
intermediateOperators.add(new LimitOperator(new Limiter(limit)));
intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
} else {
it = ids.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.compute.test.RandomBlock;
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
import org.elasticsearch.core.TimeValue;
import org.hamcrest.Matcher;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,7 +30,7 @@
public class LimitOperatorTests extends OperatorTestCase {
@Override
protected LimitOperator.Factory simple(SimpleOptions options) {
return new LimitOperator.Factory(100);
return new LimitOperator.Factory(100, 500);
}

@Override
Expand All @@ -46,6 +48,30 @@ protected Matcher<String> expectedToStringOfSimple() {
return equalTo("LimitOperator[limit = 100/100]");
}

private ElementType elementType;

@Before
public void setUpElementTypes() throws Exception {
elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
}

private Page randomPage(BlockFactory blockFactory, int size) {
if (randomBoolean()) {
return new Page(blockFactory.newConstantNullBlock(size));
}
Block block = RandomBlock.randomBlock(
blockFactory,
elementType,
size,
elementType == ElementType.NULL || randomBoolean(),
1,
1,
0,
0
).block();
return new Page(block);
}

@Override
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
Expand All @@ -65,6 +91,7 @@ public void testStatus() {
Page p = new Page(blockFactory.newConstantNullBlock(10));
try {
op.addInput(p);
op.finish();
assertSame(p, op.getOutput());
} finally {
p.releaseBlocks();
Expand All @@ -77,24 +104,38 @@ public void testStatus() {

public void testNeedInput() {
BlockFactory blockFactory = driverContext().blockFactory();
try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
// small page size
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
assertTrue(op.needsInput());
Page p = new Page(blockFactory.newConstantNullBlock(10));
Page p = randomPage(blockFactory, 10);
op.addInput(p);
assertFalse(op.needsInput());
op.getOutput().releaseBlocks();
assertTrue(op.needsInput());
op.finish();
assertFalse(op.needsInput());
}
// small page size
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
for (int i = 0; i < 5; i++) {
assertTrue(op.needsInput());
Page p = randomPage(blockFactory, 10);
op.addInput(p);
}
assertFalse(op.needsInput());
op.getOutput().releaseBlocks();
assertTrue(op.needsInput());
op.finish();
assertFalse(op.needsInput());
}
}

public void testBlockBiggerThanRemaining() {
BlockFactory blockFactory = driverContext().blockFactory();
for (int i = 0; i < 100; i++) {
try (var op = simple().get(driverContext())) {
assertTrue(op.needsInput());
Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
op.addInput(p);
assertFalse(op.needsInput());
Page result = op.getOutput();
Expand All @@ -114,7 +155,7 @@ public void testBlockPreciselyRemaining() {
for (int i = 0; i < 100; i++) {
try (var op = simple().get(driverContext())) {
assertTrue(op.needsInput());
Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
op.addInput(p);
assertFalse(op.needsInput());
Page result = op.getOutput();
Expand All @@ -133,7 +174,7 @@ public void testEarlyTermination() {
int numDrivers = between(1, 4);
final List<Driver> drivers = new ArrayList<>();
final int limit = between(1, 10_000);
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
final AtomicInteger receivedRows = new AtomicInteger();
for (int i = 0; i < numDrivers; i++) {
DriverContext driverContext = driverContext();
Expand All @@ -152,7 +193,8 @@ public boolean isFinished() {

@Override
public Page getOutput() {
return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
return randomPage(blockFactory(), between(1, 100));

}

@Override
Expand Down
Loading
Loading