Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,27 @@ public int get(int i) {
return;
}
int[] forwards = docs.shardSegmentDocMapForwards();
loadFromSingleLeaf(target, new BlockLoader.Docs() {
@Override
public int count() {
return docs.getPositionCount();
}
Block[] unshuffled = new Block[target.length];
try {
loadFromSingleLeaf(unshuffled, new BlockLoader.Docs() {
@Override
public int count() {
return docs.getPositionCount();
}

@Override
public int get(int i) {
return docs.docs().getInt(forwards[i]);
}
});
final int[] backwards = docs.shardSegmentDocMapBackwards();
for (int i = 0; i < target.length; i++) {
try (Block in = target[i]) {
target[i] = in.filter(backwards);
@Override
public int get(int i) {
return docs.docs().getInt(forwards[i]);
}
});
final int[] backwards = docs.shardSegmentDocMapBackwards();
for (int i = 0; i < unshuffled.length; i++) {
target[i] = unshuffled[i].filter(backwards);
unshuffled[i].close();
unshuffled[i] = null;
}
} finally {
Releasables.closeExpectNoException(unshuffled);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public Block[] next() {
boolean success = false;
try {
load(target, offset);
if (target[0].getPositionCount() != docs.getPositionCount()) {
throw new IllegalStateException("partial pages not yet supported");
}
success = true;
for (Block b : target) {
operator.valuesLoaded += b.getTotalValueCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
Expand Down Expand Up @@ -446,12 +445,6 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {
assertThat(sum, equalTo(expectedSum));
}

@Override
protected ByteSizeValue enoughMemoryForSimple() {
assumeFalse("strange exception in the test, fix soon", true);
return ByteSizeValue.ofKb(1);
}

public void testLoadAll() {
DriverContext driverContext = driverContext();
loadSimpleAndAssert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,16 @@ protected ByteSizeValue enoughMemoryForSimple() {
* all pages.
*/
public final void testSimpleCircuitBreaking() {
ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple();
Operator.OperatorFactory simple = simple(new SimpleOptions(true));
/*
* Build the input before building `simple` to handle the rare
* cases where `simple` need some state from the input - mostly
* this is ValuesSourceReaderOperator.
*/
DriverContext inputFactoryContext = driverContext();
List<Page> input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000)));

ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple();
Operator.OperatorFactory simple = simple(new SimpleOptions(true));
try {
ByteSizeValue limit = BreakerTestUtil.findBreakerLimit(memoryLimitForSimple, l -> runWithLimit(simple, input, l));
ByteSizeValue testWithSize = ByteSizeValue.ofBytes(randomLongBetween(0, limit.getBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -81,8 +80,8 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(EsqlPlugin.class);
List<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(EsqlActionBreakerIT.EsqlTestPluginWithMockBlockFactory.class);
plugins.add(InternalExchangePlugin.class);
plugins.add(LocalStateEnrich.class);
plugins.add(IngestCommonPlugin.class);
Expand Down