Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/changelog/128531.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,14 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class LimitOperator implements Operator {
private final BlockFactory blockFactory;
private final int pageSize;

/**
* Count of pages that have been processed by this operator.
Expand All @@ -44,30 +38,25 @@ public class LimitOperator implements Operator {
*/
private long rowsEmitted;

private final Limiter limiter;
private Page lastInput;

private final List<Page> queue = new ArrayList<>();
private int pendingRows;
private final Limiter limiter;
private boolean finished;

public LimitOperator(Limiter limiter, BlockFactory blockFactory, int pageSize) {
public LimitOperator(Limiter limiter) {
this.limiter = limiter;
this.blockFactory = blockFactory;
this.pageSize = pageSize;
}

public static final class Factory implements OperatorFactory {
private final Limiter limiter;
private final int pageSize;

public Factory(int limit, int pageSize) {
public Factory(int limit) {
this.limiter = new Limiter(limit);
this.pageSize = pageSize;
}

@Override
public LimitOperator get(DriverContext driverContext) {
return new LimitOperator(limiter, driverContext.blockFactory(), pageSize);
return new LimitOperator(limiter);
}

@Override
Expand All @@ -78,20 +67,22 @@ public String describe() {

@Override
public boolean needsInput() {
return readyToEmit() == false;
return finished == false && lastInput == null && limiter.remaining() > 0;
}

@Override
public void addInput(Page page) {
pagesProcessed++;
rowsReceived += page.getPositionCount();
assert lastInput == null : "has pending input page";
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
if (acceptedRows == 0) {
page.releaseBlocks();
assert isFinished();
} else if (acceptedRows < page.getPositionCount()) {
lastInput = truncatePage(page, acceptedRows);
} else {
queue.add(page);
pendingRows += acceptedRows;
lastInput = page;
}
rowsReceived += acceptedRows;
}

@Override
Expand All @@ -101,67 +92,41 @@ public void finish() {

@Override
public boolean isFinished() {
return pendingRows == 0 && (finished || limiter.remaining() == 0);
}

private boolean readyToEmit() {
return finished || pendingRows >= pageSize || limiter.remaining() == 0;
return lastInput == null && (finished || limiter.remaining() == 0);
}

@Override
public Page getOutput() {
if (pendingRows > 0 && readyToEmit()) {
final Page result = combinePages(queue, blockFactory, pendingRows);
pendingRows = 0;
rowsEmitted += result.getPositionCount();
return result;
} else {
if (lastInput == null) {
return null;
}
final Page result = lastInput;
lastInput = null;
pagesProcessed++;
rowsEmitted += result.getPositionCount();
return result;
}

private static ElementType[] elementTypes(int blockCount, List<Page> pages) {
ElementType[] elementTypes = new ElementType[blockCount];
for (Page page : pages) {
for (int b = 0; b < blockCount; b++) {
ElementType newType = page.getBlock(b).elementType();
ElementType currType = elementTypes[b];
if (currType == null || currType == ElementType.NULL) {
elementTypes[b] = newType;
} else {
assert newType == ElementType.NULL || currType == newType : "element type mismatch: " + currType + " != " + newType;
}
}
}
return elementTypes;
}

private static Page combinePages(List<Page> pages, BlockFactory blockFactory, int upTo) {
assert pages.isEmpty() == false : "no pages to combine";
if (pages.size() == 1 && pages.getFirst().getPositionCount() == upTo) {
return pages.removeFirst();
private static Page truncatePage(Page page, int upTo) {
int[] filter = new int[upTo];
for (int i = 0; i < upTo; i++) {
filter[i] = i;
}
int blockCount = pages.getFirst().getBlockCount();
Block.Builder[] builders = new Block.Builder[blockCount];
final Block[] blocks = new Block[page.getBlockCount()];
Page result = null;
try {
ElementType[] elementTypes = elementTypes(blockCount, pages);
for (int b = 0; b < blockCount; b++) {
builders[b] = elementTypes[b].newBlockBuilder(upTo, blockFactory);
}
int accumulated = 0;
for (Page page : pages) {
int size = Math.min(page.getPositionCount(), upTo - accumulated);
for (int b = 0; b < blockCount; b++) {
Block block = page.getBlock(b);
builders[b].copyFrom(block, 0, size);
}
accumulated += size;
for (int b = 0; b < blocks.length; b++) {
blocks[b] = page.getBlock(b).filter(filter);
}
Block[] blocks = Block.Builder.buildAll(builders);
return new Page(blocks);
result = new Page(blocks);
} finally {
Releasables.close(Releasables.wrap(pages), pages::clear, Releasables.wrap(builders));
if (result == null) {
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
} else {
page.releaseBlocks();
}
}
return result;
}

@Override
Expand All @@ -171,7 +136,9 @@ public Status status() {

@Override
public void close() {
Releasables.close(queue);
if (lastInput != null) {
lastInput.releaseBlocks();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void doClose() {
if (randomBoolean()) {
int limit = between(0, ids.size());
it = ids.subList(0, limit).iterator();
intermediateOperators.add(new LimitOperator(new Limiter(limit), blockFactory(), between(1, 1024)));
intermediateOperators.add(new LimitOperator(new Limiter(limit)));
} else {
it = ids.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.compute.test.RandomBlock;
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
import org.elasticsearch.core.TimeValue;
import org.hamcrest.Matcher;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -30,7 +28,7 @@
public class LimitOperatorTests extends OperatorTestCase {
@Override
protected LimitOperator.Factory simple(SimpleOptions options) {
return new LimitOperator.Factory(100, 500);
return new LimitOperator.Factory(100);
}

@Override
Expand All @@ -48,30 +46,6 @@ protected Matcher<String> expectedToStringOfSimple() {
return equalTo("LimitOperator[limit = 100/100]");
}

private ElementType elementType;

@Before
public void setUpElementTypes() throws Exception {
elementType = randomFrom(ElementType.INT, ElementType.NULL, ElementType.BYTES_REF);
}

private Page randomPage(BlockFactory blockFactory, int size) {
if (randomBoolean()) {
return new Page(blockFactory.newConstantNullBlock(size));
}
Block block = RandomBlock.randomBlock(
blockFactory,
elementType,
size,
elementType == ElementType.NULL || randomBoolean(),
1,
1,
0,
0
).block();
return new Page(block);
}

@Override
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
int inputPositionCount = input.stream().mapToInt(p -> p.getPositionCount()).sum();
Expand All @@ -91,7 +65,6 @@ public void testStatus() {
Page p = new Page(blockFactory.newConstantNullBlock(10));
try {
op.addInput(p);
op.finish();
assertSame(p, op.getOutput());
} finally {
p.releaseBlocks();
Expand All @@ -104,38 +77,24 @@ public void testStatus() {

public void testNeedInput() {
BlockFactory blockFactory = driverContext().blockFactory();
// small page size
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 5)) {
try (LimitOperator op = simple(SimpleOptions.DEFAULT).get(driverContext())) {
assertTrue(op.needsInput());
Page p = randomPage(blockFactory, 10);
Page p = new Page(blockFactory.newConstantNullBlock(10));
op.addInput(p);
assertFalse(op.needsInput());
op.getOutput().releaseBlocks();
assertTrue(op.needsInput());
op.finish();
assertFalse(op.needsInput());
}
// small page size
try (LimitOperator op = new LimitOperator(new Limiter(100), blockFactory, 50)) {
for (int i = 0; i < 5; i++) {
assertTrue(op.needsInput());
Page p = randomPage(blockFactory, 10);
op.addInput(p);
}
assertFalse(op.needsInput());
op.getOutput().releaseBlocks();
assertTrue(op.needsInput());
op.finish();
assertFalse(op.needsInput());
}
}

public void testBlockBiggerThanRemaining() {
BlockFactory blockFactory = driverContext().blockFactory();
for (int i = 0; i < 100; i++) {
try (var op = simple().get(driverContext())) {
assertTrue(op.needsInput());
Page p = randomPage(blockFactory, 200); // test doesn't close because operator returns a view
Page p = new Page(randomBlock(blockFactory, 200)); // test doesn't close because operator returns a view
op.addInput(p);
assertFalse(op.needsInput());
Page result = op.getOutput();
Expand All @@ -155,7 +114,7 @@ public void testBlockPreciselyRemaining() {
for (int i = 0; i < 100; i++) {
try (var op = simple().get(driverContext())) {
assertTrue(op.needsInput());
Page p = randomPage(blockFactory, 100); // test doesn't close because operator returns same page
Page p = new Page(randomBlock(blockFactory, 100)); // test doesn't close because operator returns same page
op.addInput(p);
assertFalse(op.needsInput());
Page result = op.getOutput();
Expand All @@ -174,7 +133,7 @@ public void testEarlyTermination() {
int numDrivers = between(1, 4);
final List<Driver> drivers = new ArrayList<>();
final int limit = between(1, 10_000);
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit, between(1024, 2048));
final LimitOperator.Factory limitFactory = new LimitOperator.Factory(limit);
final AtomicInteger receivedRows = new AtomicInteger();
for (int i = 0; i < numDrivers; i++) {
DriverContext driverContext = driverContext();
Expand All @@ -193,8 +152,7 @@ public boolean isFinished() {

@Override
public Page getOutput() {
return randomPage(blockFactory(), between(1, 100));

return new Page(randomBlock(driverContext.blockFactory(), between(1, 100)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -49,7 +46,6 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -212,34 +208,6 @@ public void sendResponse(Exception exception) {
}
}

public void testLimitCombineSmallerPages() {
QueryPragmas queryPragmas = randomPragmas();
if (canUseQueryPragmas()) {
Settings.Builder settings = Settings.builder().put(queryPragmas.getSettings());
settings.remove(QueryPragmas.NODE_LEVEL_REDUCTION.getKey());
settings.remove(QueryPragmas.PAGE_SIZE.getKey());
queryPragmas = new QueryPragmas(settings.build());
}
var request = new EsqlQueryRequest();
request.query("FROM test-* | KEEP user | LIMIT 100");
request.pragmas(queryPragmas);
request.profile(true);
try (EsqlQueryResponse resp = run(request)) {
List<DriverProfile> nodeReduce = resp.profile().drivers().stream().filter(s -> s.description().equals("node_reduce")).toList();
for (DriverProfile driverProfile : nodeReduce) {
if (driverProfile.operators().size() == 2) {
continue; // when the target node is also the coordinator node
}
assertThat(driverProfile.operators(), hasSize(3));
OperatorStatus exchangeSink = driverProfile.operators().get(2);
assertThat(exchangeSink.status(), instanceOf(ExchangeSinkOperator.Status.class));
ExchangeSinkOperator.Status exchangeStatus = (ExchangeSinkOperator.Status) exchangeSink.status();
assertThat(exchangeStatus.pagesReceived(), lessThanOrEqualTo(1));
}
assertThat(resp.pages(), hasSize(1));
}
}

static class SearchContextCounter {
private final int maxAllowed;
private final AtomicInteger current = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,10 +837,7 @@ private PhysicalOperation planFilter(FilterExec filter, LocalExecutionPlannerCon

private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContext context) {
PhysicalOperation source = plan(limit.child(), context);
final Integer rowSize = limit.estimatedRowSize();
assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
int pageSize = context.pageSize(rowSize);
return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx), pageSize), source.layout);
return source.with(new LimitOperator.Factory((Integer) limit.limit().fold(context.foldCtx)), source.layout);
}

private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) {
Expand Down
Loading