diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java index 3ac6565d21c33..1bee68160e024 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java @@ -65,22 +65,27 @@ public int get(int i) { return; } int[] forwards = docs.shardSegmentDocMapForwards(); - loadFromSingleLeaf(target, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } + Block[] unshuffled = new Block[target.length]; + try { + loadFromSingleLeaf(unshuffled, new BlockLoader.Docs() { + @Override + public int count() { + return docs.getPositionCount(); + } - @Override - public int get(int i) { - return docs.docs().getInt(forwards[i]); - } - }); - final int[] backwards = docs.shardSegmentDocMapBackwards(); - for (int i = 0; i < target.length; i++) { - try (Block in = target[i]) { - target[i] = in.filter(backwards); + @Override + public int get(int i) { + return docs.docs().getInt(forwards[i]); + } + }); + final int[] backwards = docs.shardSegmentDocMapBackwards(); + for (int i = 0; i < unshuffled.length; i++) { + target[i] = unshuffled[i].filter(backwards); + unshuffled[i].close(); + unshuffled[i] = null; } + } finally { + Releasables.closeExpectNoException(unshuffled); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java index d3b8b0edcec3d..ebfac0cb24f7f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java @@ -36,6 +36,9 @@ public Block[] next() { boolean success = false; try { load(target, offset); + if (target[0].getPositionCount() != docs.getPositionCount()) { + throw new IllegalStateException("partial pages not yet supported"); + } success = true; for (Block b : target) { operator.valuesLoaded += b.getTotalValueCount(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 6c1e89925702b..c9b46eb764580 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BooleanBlock; @@ -446,12 +445,6 @@ protected void assertSimpleOutput(List input, List results) { assertThat(sum, equalTo(expectedSum)); } - @Override - protected ByteSizeValue enoughMemoryForSimple() { - assumeFalse("strange exception in the test, fix soon", true); - return ByteSizeValue.ofKb(1); - } - public void testLoadAll() { DriverContext driverContext = driverContext(); loadSimpleAndAssert( diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index 56ae2fb4119a8..f9f9769929f28 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -98,10 +98,16 @@ protected ByteSizeValue enoughMemoryForSimple() { * all pages. */ public final void testSimpleCircuitBreaking() { - ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple(); - Operator.OperatorFactory simple = simple(new SimpleOptions(true)); + /* + * Build the input before building `simple` to handle the rare + * cases where `simple` need some state from the input - mostly + * this is ValuesSourceReaderOperator. + */ DriverContext inputFactoryContext = driverContext(); List input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000))); + + ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple(); + Operator.OperatorFactory simple = simple(new SimpleOptions(true)); try { ByteSizeValue limit = BreakerTestUtil.findBreakerLimit(memoryLimitForSimple, l -> runWithLimit(simple, input, l)); ByteSizeValue testWithSize = ByteSizeValue.ofBytes(randomLongBetween(0, limit.getBytes())); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index 3c37f0ffbcc1d..446c287be64fc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -50,7 +50,6 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.After; import org.junit.Before; @@ -81,8 +80,8 @@ public class EnrichIT extends AbstractEsqlIntegTestCase { @Override protected Collection> nodePlugins() { - List> plugins = new ArrayList<>(super.nodePlugins()); - plugins.add(EsqlPlugin.class); + List> plugins = new ArrayList<>(); + plugins.add(EsqlActionBreakerIT.EsqlTestPluginWithMockBlockFactory.class); plugins.add(InternalExchangePlugin.class); plugins.add(LocalStateEnrich.class); plugins.add(IngestCommonPlugin.class);