diff --git a/benchmarks/README.md b/benchmarks/README.md index af72d16d2ad4b..c5b8f5b9d2321 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -152,11 +152,10 @@ exit Grab the async profiler from https://github.com/jvm-profiling-tools/async-profiler and run `prof async` like so: ``` -gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/async-profiler-3.0-29ee888-linux-x64/lib/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"' +gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/async-profiler-4.0-linux-x64/lib/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"' ``` -Note: As of January 2025 the latest release of async profiler doesn't work - with our JDK but the nightly is fine. +Note: As of July 2025 the 4.0 release of the async profiler works well. If you are on Mac, this'll warn you that you downloaded the shared library from the internet. You'll need to go to settings and allow it to run. diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java index e6f6226111888..94483a136a5d2 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java @@ -24,8 +24,10 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; @@ -85,6 +87,10 @@ @State(Scope.Thread) @Fork(1) public class ValuesSourceReaderBenchmark { + static { + LogConfigurator.configureESLogging(); + } + private static final String[] SUPPORTED_LAYOUTS = new String[] { "in_order", "shuffled", "shuffled_singles" }; private static final String[] SUPPORTED_NAMES = new String[] { "long", @@ -345,6 +351,7 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { public void benchmark() { ValuesSourceReaderOperator op = new ValuesSourceReaderOperator( blockFactory, + ByteSizeValue.ofMb(1).getBytes(), fields(name), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException("can't load _source here"); diff --git a/docs/changelog/131053.yaml b/docs/changelog/131053.yaml new file mode 100644 index 0000000000000..b30a7c8ee8cc5 --- /dev/null +++ b/docs/changelog/131053.yaml @@ -0,0 +1,5 @@ +pr: 131053 +summary: Split large pages on load sometimes +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java index 22b198b10a7ad..f419d87d008fe 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java @@ -98,11 +98,11 @@ protected void writeExtent(BlockLoader.IntBuilder builder, Extent extent) { public BlockLoader.AllReader reader(LeafReaderContext context) throws IOException { return new BlockLoader.AllReader() { @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { var binaryDocValues = context.reader().getBinaryDocValues(fieldName); var reader = new GeometryDocValueReader(); - try (var builder = factory.ints(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (var builder = factory.ints(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(binaryDocValues, docs.get(i), reader, builder); } return builder.build(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java index 410679fa9cfd5..f95e35a5d0845 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java @@ -124,10 +124,10 @@ private static class SingletonLongs extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count())) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) { int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < lastDoc) { throw new IllegalStateException("docs within same block must be in order"); @@ -173,9 +173,9 @@ private static class Longs extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < this.docID) { throw new IllegalStateException("docs within same block must be in order"); @@ -259,10 +259,10 @@ private static class SingletonInts extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count())) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count() - offset)) { int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < lastDoc) { throw new IllegalStateException("docs within same block must be in order"); @@ -308,9 +308,9 @@ private static class Ints extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < this.docID) { throw new IllegalStateException("docs within same block must be in order"); @@ -408,10 +408,10 @@ private static class SingletonDoubles extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count())) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) { int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < lastDoc) { throw new IllegalStateException("docs within same block must be in order"); @@ -461,9 +461,9 @@ private static class Doubles extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < this.docID) { throw new IllegalStateException("docs within same block must be in order"); @@ -544,10 +544,10 @@ private static class DenseVectorValuesBlockReader extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { // Doubles from doc values ensures that the values are in order - try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count(), dimensions)) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count() - offset, dimensions)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < iterator.docID()) { throw new IllegalStateException("docs within same block must be in order"); @@ -645,19 +645,19 @@ private BlockLoader.Block readSingleDoc(BlockFactory factory, int docId) throws if (ordinals.advanceExact(docId)) { BytesRef v = ordinals.lookupOrd(ordinals.ordValue()); // the returned BytesRef can be reused - return factory.constantBytes(BytesRef.deepCopyOf(v)); + return factory.constantBytes(BytesRef.deepCopyOf(v), 1); } else { - return factory.constantNulls(); + return factory.constantNulls(1); } } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - if (docs.count() == 1) { - return readSingleDoc(factory, docs.get(0)); + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + if (docs.count() - offset == 1) { + return readSingleDoc(factory, docs.get(offset)); } - try (var builder = factory.singletonOrdinalsBuilder(ordinals, docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (var builder = factory.singletonOrdinalsBuilder(ordinals, docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < ordinals.docID()) { throw new IllegalStateException("docs within same block must be in order"); @@ -700,12 +700,12 @@ private static class Ordinals extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - if (docs.count() == 1) { - return readSingleDoc(factory, docs.get(0)); + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + if (docs.count() - offset == 1) { + return readSingleDoc(factory, docs.get(offset)); } - try (var builder = factory.sortedSetOrdinalsBuilder(ordinals, docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (var builder = factory.sortedSetOrdinalsBuilder(ordinals, docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < ordinals.docID()) { throw new IllegalStateException("docs within same block must be in order"); @@ -736,12 +736,12 @@ public void read(int docId, BlockLoader.StoredFields storedFields, Builder build private BlockLoader.Block readSingleDoc(BlockFactory factory, int docId) throws IOException { if (ordinals.advanceExact(docId) == false) { - return factory.constantNulls(); + return factory.constantNulls(1); } int count = ordinals.docValueCount(); if (count == 1) { BytesRef v = ordinals.lookupOrd(ordinals.nextOrd()); - return factory.constantBytes(BytesRef.deepCopyOf(v)); + return factory.constantBytes(BytesRef.deepCopyOf(v), 1); } try (var builder = factory.bytesRefsFromDocValues(count)) { builder.beginPositionEntry(); @@ -816,9 +816,9 @@ private static class BytesRefsFromBinary extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < docID) { throw new IllegalStateException("docs within same block must be in order"); @@ -915,9 +915,9 @@ private static class DenseVectorFromBinary extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count(), dimensions)) { - for (int i = 0; i < docs.count(); i++) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count() - offset, dimensions)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < docID) { throw new IllegalStateException("docs within same block must be in order"); @@ -999,10 +999,10 @@ private static class SingletonBooleans extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count())) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count() - offset)) { int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < lastDoc) { throw new IllegalStateException("docs within same block must be in order"); @@ -1048,9 +1048,9 @@ private static class Booleans extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { - try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < this.docID) { throw new IllegalStateException("docs within same block must be in order"); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index fc5d28509b3b6..a4a498e4048db 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -43,7 +43,7 @@ interface ColumnAtATimeReader extends Reader { /** * Reads the values of all documents in {@code docs}. */ - BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException; + BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException; } interface RowStrideReader extends Reader { @@ -149,8 +149,8 @@ public String toString() { */ class ConstantNullsReader implements AllReader { @Override - public Block read(BlockFactory factory, Docs docs) throws IOException { - return factory.constantNulls(); + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + return factory.constantNulls(docs.count() - offset); } @Override @@ -183,8 +183,8 @@ public Builder builder(BlockFactory factory, int expectedCount) { public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) { return new ColumnAtATimeReader() { @Override - public Block read(BlockFactory factory, Docs docs) { - return factory.constantBytes(value); + public Block read(BlockFactory factory, Docs docs, int offset) { + return factory.constantBytes(value, docs.count() - offset); } @Override @@ -261,8 +261,8 @@ public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws } return new ColumnAtATimeReader() { @Override - public Block read(BlockFactory factory, Docs docs) throws IOException { - return reader.read(factory, docs); + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + return reader.read(factory, docs, offset); } @Override @@ -408,13 +408,13 @@ interface BlockFactory { /** * Build a block that contains only {@code null}. */ - Block constantNulls(); + Block constantNulls(int count); /** * Build a block that contains {@code value} repeated * {@code size} times. */ - Block constantBytes(BytesRef value); + Block constantBytes(BytesRef value, int count); /** * Build a reader for reading {@link SortedDocValues} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java index a3b10ea901395..3a1a805a25b64 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java @@ -49,10 +49,10 @@ public int docId() { } @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { // Note that we don't emit falses before trues so we conform to the doc values contract and can use booleansFromDocValues - try (BlockLoader.BooleanBuilder builder = factory.booleans(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.BooleanBuilder builder = factory.booleans(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(docs.get(i), builder); } return builder.build(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java index fb97b0f84c50f..0ec899e19a1cd 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java @@ -49,10 +49,10 @@ public int docId() { } @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { // Note that we don't sort the values sort, so we can't use factory.longsFromDocValues - try (BlockLoader.LongBuilder builder = factory.longs(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.LongBuilder builder = factory.longs(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(docs.get(i), builder); } return builder.build(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java index d762acda9f7e4..f01cc65775e6e 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java @@ -49,10 +49,10 @@ public int docId() { } @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { // Note that we don't sort the values sort, so we can't use factory.doublesFromDocValues - try (BlockLoader.DoubleBuilder builder = factory.doubles(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.DoubleBuilder builder = factory.doubles(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(docs.get(i), builder); } return builder.build(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java index 48d78129b8781..b232a8e1fc45a 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java @@ -49,10 +49,10 @@ public int docId() { } @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { // Note that we don't pre-sort our output so we can't use bytesRefsFromDocValues - try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(docs.get(i), builder); } return builder.build(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java index cfc7045a55513..220bba3d3c079 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java @@ -51,10 +51,10 @@ public int docId() { } @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { // Note that we don't pre-sort our output so we can't use bytesRefsFromDocValues - try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(docs.get(i), builder); } return builder.build(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java index 0a1a8a86154ab..9c947a17de7b6 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java @@ -49,10 +49,10 @@ public int docId() { } @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException { + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { // Note that we don't pre-sort our output so we can't use longsFromDocValues - try (BlockLoader.LongBuilder builder = factory.longs(docs.count())) { - for (int i = 0; i < docs.count(); i++) { + try (BlockLoader.LongBuilder builder = factory.longs(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { read(docs.get(i), builder); } return builder.build(); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapperTests.java index 73d76ad48c955..6a81a93923abc 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapperTests.java @@ -125,7 +125,7 @@ private static void testBoundsBlockLoaderAux( for (int j : array) { expected.add(visitor.apply(geometries.get(j + currentIndex)).get()); } - try (var block = (TestBlock) loader.reader(leaf).read(TestBlock.factory(leafReader.numDocs()), TestBlock.docs(array))) { + try (var block = (TestBlock) loader.reader(leaf).read(TestBlock.factory(), TestBlock.docs(array), 0)) { for (int i = 0; i < block.size(); i++) { intArrayResults.add(block.get(i)); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/BlockSourceReaderTests.java b/server/src/test/java/org/elasticsearch/index/mapper/BlockSourceReaderTests.java index 357ada3ad656d..1fa9c85a5c738 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/BlockSourceReaderTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/BlockSourceReaderTests.java @@ -59,7 +59,7 @@ private void loadBlock(LeafReaderContext ctx, Consumer test) throws I StoredFieldLoader.fromSpec(loader.rowStrideStoredFieldSpec()).getLoader(ctx, null), loader.rowStrideStoredFieldSpec().requiresSource() ? SourceLoader.FROM_STORED_SOURCE.leaf(ctx.reader(), null) : null ); - BlockLoader.Builder builder = loader.builder(TestBlock.factory(ctx.reader().numDocs()), 1); + BlockLoader.Builder builder = loader.builder(TestBlock.factory(), 1); storedFields.advanceTo(0); reader.read(0, storedFields, builder); TestBlock block = (TestBlock) builder.build(); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java index ce9a9bc0688f3..54656ab1af3ee 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java @@ -446,7 +446,8 @@ public void testBlockLoader() throws IOException { try (DirectoryReader reader = iw.getReader()) { BooleanScriptFieldType fieldType = build("xor_param", Map.of("param", false), OnScriptError.FAIL); List expected = List.of(false, true); - assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(expected)); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(expected)); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(expected.subList(1, 2))); assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(expected)); } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java index 3d8ed5ea60262..1eb0ba07d58e2 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java @@ -493,9 +493,10 @@ public void testBlockLoader() throws IOException { try (DirectoryReader reader = iw.getReader()) { DateScriptFieldType fieldType = build("add_days", Map.of("days", 1), OnScriptError.FAIL); assertThat( - blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), + blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(List.of(1595518581354L, 1595518581355L)) ); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(1595518581355L))); assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(1595518581354L, 1595518581355L))); } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java index 140137015d98a..b1cda53876993 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java @@ -262,7 +262,8 @@ public void testBlockLoader() throws IOException { ); try (DirectoryReader reader = iw.getReader()) { DoubleScriptFieldType fieldType = build("add_param", Map.of("param", 1), OnScriptError.FAIL); - assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(List.of(2d, 3d))); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(List.of(2d, 3d))); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(3d))); assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(2d, 3d))); } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java index 281d2993fa29c..7e9a236f6cc74 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java @@ -273,7 +273,8 @@ public void testBlockLoader() throws IOException { new BytesRef(InetAddressPoint.encode(InetAddresses.forString("192.168.0.1"))), new BytesRef(InetAddressPoint.encode(InetAddresses.forString("192.168.1.1"))) ); - assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(expected)); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(expected)); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(expected.subList(1, 2))); assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(expected)); } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java index 57d52991a6442..ccc8ccac4deb4 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java @@ -409,9 +409,10 @@ public void testBlockLoader() throws IOException { try (DirectoryReader reader = iw.getReader()) { KeywordScriptFieldType fieldType = build("append_param", Map.of("param", "-Suffix"), OnScriptError.FAIL); assertThat( - blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), + blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(List.of(new BytesRef("1-Suffix"), new BytesRef("2-Suffix"))) ); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(new BytesRef("2-Suffix")))); assertThat( blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(new BytesRef("1-Suffix"), new BytesRef("2-Suffix"))) diff --git a/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java index a8cb4d51c5efa..01f96a1a4b1be 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java @@ -295,7 +295,8 @@ public void testBlockLoader() throws IOException { ); try (DirectoryReader reader = iw.getReader()) { LongScriptFieldType fieldType = build("add_param", Map.of("param", 1), OnScriptError.FAIL); - assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(List.of(2L, 3L))); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(List.of(2L, 3L))); + assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(3L))); assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(2L, 3L))); } } diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/Clusters.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/Clusters.java index 1c237404a78cc..bc5e1f123fe81 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/Clusters.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/Clusters.java @@ -22,6 +22,7 @@ static ElasticsearchCluster buildCluster() { .setting("xpack.security.enabled", "false") .setting("xpack.license.self_generated.type", "trial") .setting("esql.query.allow_partial_results", "false") + .setting("logger.org.elasticsearch.compute.lucene.read", "DEBUG") .jvmArg("-Xmx512m"); String javaVersion = JvmInfo.jvmInfo().version(); if (javaVersion.equals("20") || javaVersion.equals("21")) { diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 3912a63ef1514..893acbd22cc23 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -570,7 +570,7 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE } public void testFetchManyBigFields() throws IOException { - initManyBigFieldsIndex(100); + initManyBigFieldsIndex(100, "keyword"); Map response = fetchManyBigFields(100); ListMatcher columns = matchesList(); for (int f = 0; f < 1000; f++) { @@ -580,7 +580,7 @@ public void testFetchManyBigFields() throws IOException { } public void testFetchTooManyBigFields() throws IOException { - initManyBigFieldsIndex(500); + initManyBigFieldsIndex(500, "keyword"); // 500 docs is plenty to circuit break on most nodes assertCircuitBreaks(attempt -> fetchManyBigFields(attempt * 500)); } @@ -594,6 +594,58 @@ private Map fetchManyBigFields(int docs) throws IOException { return responseAsMap(query(query.toString(), "columns")); } + public void testAggManyBigTextFields() throws IOException { + int docs = 100; + int fields = 100; + initManyBigFieldsIndex(docs, "text"); + Map response = aggManyBigFields(fields); + ListMatcher columns = matchesList().item(matchesMap().entry("name", "sum").entry("type", "long")); + assertMap( + response, + matchesMap().entry("columns", columns).entry("values", matchesList().item(matchesList().item(1024 * fields * docs))) + ); + } + + /** + * Aggregates documents containing many fields which are {@code 1kb} each. + */ + private Map aggManyBigFields(int fields) throws IOException { + StringBuilder query = startQuery(); + query.append("FROM manybigfields | STATS sum = SUM("); + query.append("LENGTH(f").append(String.format(Locale.ROOT, "%03d", 0)).append(")"); + for (int f = 1; f < fields; f++) { + query.append(" + LENGTH(f").append(String.format(Locale.ROOT, "%03d", f)).append(")"); + } + query.append(")\"}"); + return responseAsMap(query(query.toString(), "columns,values")); + } + + /** + * Aggregates on the {@code LENGTH} of a giant text field. Without + * splitting pages on load (#131053) this throws a {@link CircuitBreakingException} + * when it tries to load a giant field. With that change it finishes + * after loading many single-row pages. + */ + public void testAggGiantTextField() throws IOException { + int docs = 100; + initGiantTextField(docs); + Map response = aggGiantTextField(); + ListMatcher columns = matchesList().item(matchesMap().entry("name", "sum").entry("type", "long")); + assertMap( + response, + matchesMap().entry("columns", columns).entry("values", matchesList().item(matchesList().item(1024 * 1024 * 5 * docs))) + ); + } + + /** + * Aggregates documents containing a text field that is {@code 1mb} each. + */ + private Map aggGiantTextField() throws IOException { + StringBuilder query = startQuery(); + query.append("FROM bigtext | STATS sum = SUM(LENGTH(f))\"}"); + return responseAsMap(query(query.toString(), "columns,values")); + } + public void testAggMvLongs() throws IOException { int fieldValues = 100; initMvLongsIndex(1, 3, fieldValues); @@ -788,7 +840,7 @@ private void initSingleDocIndex() throws IOException { """); } - private void initManyBigFieldsIndex(int docs) throws IOException { + private void initManyBigFieldsIndex(int docs, String type) throws IOException { logger.info("loading many documents with many big fields"); int docsPerBulk = 5; int fields = 1000; @@ -799,7 +851,7 @@ private void initManyBigFieldsIndex(int docs) throws IOException { config.startObject("settings").field("index.mapping.total_fields.limit", 10000).endObject(); config.startObject("mappings").startObject("properties"); for (int f = 0; f < fields; f++) { - config.startObject("f" + String.format(Locale.ROOT, "%03d", f)).field("type", "keyword").endObject(); + config.startObject("f" + String.format(Locale.ROOT, "%03d", f)).field("type", type).endObject(); } config.endObject().endObject(); request.setJsonEntity(Strings.toString(config.endObject())); @@ -831,6 +883,37 @@ private void initManyBigFieldsIndex(int docs) throws IOException { initIndex("manybigfields", bulk.toString()); } + private void initGiantTextField(int docs) throws IOException { + logger.info("loading many documents with one big text field"); + int docsPerBulk = 3; + int fieldSize = Math.toIntExact(ByteSizeValue.ofMb(5).getBytes()); + + Request request = new Request("PUT", "/bigtext"); + XContentBuilder config = JsonXContent.contentBuilder().startObject(); + config.startObject("mappings").startObject("properties"); + config.startObject("f").field("type", "text").endObject(); + config.endObject().endObject(); + request.setJsonEntity(Strings.toString(config.endObject())); + Response response = client().performRequest(request); + assertThat( + EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), + equalTo("{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"bigtext\"}") + ); + + StringBuilder bulk = new StringBuilder(); + for (int d = 0; d < docs; d++) { + bulk.append("{\"create\":{}}\n"); + bulk.append("{\"f\":\""); + bulk.append(Integer.toString(d % 10).repeat(fieldSize)); + bulk.append("\"}\n"); + if (d % docsPerBulk == docsPerBulk - 1 && d != docs - 1) { + bulk("bigtext", bulk.toString()); + bulk.setLength(0); + } + } + initIndex("bigtext", bulk.toString()); + } + private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOException { logger.info("loading documents with many multivalued longs"); int docsPerBulk = 100; diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java index 1c785d58f9804..f099aaac463db 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java @@ -420,13 +420,12 @@ public final void testCacheable() throws IOException { } } - protected final List blockLoaderReadValuesFromColumnAtATimeReader(DirectoryReader reader, MappedFieldType fieldType) + protected final List blockLoaderReadValuesFromColumnAtATimeReader(DirectoryReader reader, MappedFieldType fieldType, int offset) throws IOException { BlockLoader loader = fieldType.blockLoader(blContext()); List all = new ArrayList<>(); for (LeafReaderContext ctx : reader.leaves()) { - TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx) - .read(TestBlock.factory(ctx.reader().numDocs()), TestBlock.docs(ctx)); + TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx).read(TestBlock.factory(), TestBlock.docs(ctx), offset); for (int i = 0; i < block.size(); i++) { all.add(block.get(i)); } @@ -440,7 +439,7 @@ protected final List blockLoaderReadValuesFromRowStrideReader(DirectoryR List all = new ArrayList<>(); for (LeafReaderContext ctx : reader.leaves()) { BlockLoader.RowStrideReader blockReader = loader.rowStrideReader(ctx); - BlockLoader.Builder builder = loader.builder(TestBlock.factory(ctx.reader().numDocs()), ctx.reader().numDocs()); + BlockLoader.Builder builder = loader.builder(TestBlock.factory(), ctx.reader().numDocs()); for (int i = 0; i < ctx.reader().numDocs(); i++) { blockReader.read(i, null, builder); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java index e35a53c0ecca8..eeb1a349d8bbc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java @@ -36,6 +36,8 @@ import static org.apache.lucene.tests.util.LuceneTestCase.newDirectory; import static org.apache.lucene.tests.util.LuceneTestCase.random; import static org.elasticsearch.index.mapper.BlockLoaderTestRunner.PrettyEqual.prettyEqualTo; +import static org.elasticsearch.test.ESTestCase.between; +import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -69,7 +71,11 @@ private Object setupAndInvokeBlockLoader(MapperService mapperService, XContentBu ); LuceneDocument doc = mapperService.documentMapper().parse(source).rootDoc(); - iw.addDocument(doc); + /* + * Add three documents with doc id 0, 1, 2. The real document is 1. + * The other two are empty documents. + */ + iw.addDocuments(List.of(List.of(), doc, List.of())); iw.close(); try (DirectoryReader reader = DirectoryReader.open(directory)) { @@ -83,9 +89,32 @@ private Object load(BlockLoader blockLoader, LeafReaderContext context, MapperSe // `columnAtATimeReader` is tried first, we mimic `ValuesSourceReaderOperator` var columnAtATimeReader = blockLoader.columnAtATimeReader(context); if (columnAtATimeReader != null) { - BlockLoader.Docs docs = TestBlock.docs(0); - var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(context.reader().numDocs()), docs); - assertThat(block.size(), equalTo(1)); + int[] docArray; + int offset; + if (randomBoolean()) { + // Half the time we load a single document. Nice and simple. + docArray = new int[] { 1 }; + offset = 0; + } else { + /* + * The other half the time we emulate loading a larger page, + * starting part way through the page. + */ + docArray = new int[between(2, 10)]; + offset = between(0, docArray.length - 1); + for (int i = 0; i < docArray.length; i++) { + if (i < offset) { + docArray[i] = 0; + } else if (i == offset) { + docArray[i] = 1; + } else { + docArray[i] = 2; + } + } + } + BlockLoader.Docs docs = TestBlock.docs(docArray); + var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(), docs, offset); + assertThat(block.size(), equalTo(docArray.length - offset)); return block.get(0); } @@ -102,10 +131,10 @@ private Object load(BlockLoader blockLoader, LeafReaderContext context, MapperSe StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(context, null), leafSourceLoader ); - storedFieldsLoader.advanceTo(0); + storedFieldsLoader.advanceTo(1); - BlockLoader.Builder builder = blockLoader.builder(TestBlock.factory(context.reader().numDocs()), 1); - blockLoader.rowStrideReader(context).read(0, storedFieldsLoader, builder); + BlockLoader.Builder builder = blockLoader.builder(TestBlock.factory(), 1); + blockLoader.rowStrideReader(context).read(1, storedFieldsLoader, builder); var block = (TestBlock) builder.build(); assertThat(block.size(), equalTo(1)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java index 9b5a66765adbe..cb73dc96f69b2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.hamcrest.Matcher; import java.io.IOException; import java.io.UncheckedIOException; @@ -20,11 +21,14 @@ import java.util.HashMap; import java.util.List; +import static org.elasticsearch.test.ESTestCase.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; public class TestBlock implements BlockLoader.Block { - public static BlockLoader.BlockFactory factory(int pageSize) { + public static BlockLoader.BlockFactory factory() { return new BlockLoader.BlockFactory() { @Override public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { @@ -34,6 +38,10 @@ public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { @Override public BlockLoader.BooleanBuilder booleans(int expectedCount) { class BooleansBuilder extends TestBlock.Builder implements BlockLoader.BooleanBuilder { + private BooleansBuilder() { + super(expectedCount); + } + @Override public BooleansBuilder appendBoolean(boolean value) { add(value); @@ -45,12 +53,41 @@ public BooleansBuilder appendBoolean(boolean value) { @Override public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) { - return bytesRefs(expectedCount); + class BytesRefsFromDocValuesBuilder extends TestBlock.Builder implements BlockLoader.BytesRefBuilder { + private BytesRefsFromDocValuesBuilder() { + super(1); + } + + @Override + public BytesRefsFromDocValuesBuilder appendBytesRef(BytesRef value) { + add(BytesRef.deepCopyOf(value)); + return this; + } + + @Override + public TestBlock build() { + TestBlock result = super.build(); + List r; + if (result.values.get(0) instanceof List l) { + r = l; + } else { + r = List.of(result.values.get(0)); + } + assertThat(r, hasSize(expectedCount)); + return result; + } + + } + return new BytesRefsFromDocValuesBuilder(); } @Override public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) { class BytesRefsBuilder extends TestBlock.Builder implements BlockLoader.BytesRefBuilder { + private BytesRefsBuilder() { + super(expectedCount); + } + @Override public BytesRefsBuilder appendBytesRef(BytesRef value) { add(BytesRef.deepCopyOf(value)); @@ -68,6 +105,10 @@ public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) { @Override public BlockLoader.DoubleBuilder doubles(int expectedCount) { class DoublesBuilder extends TestBlock.Builder implements BlockLoader.DoubleBuilder { + private DoublesBuilder() { + super(expectedCount); + } + @Override public DoublesBuilder appendDouble(double value) { add(value); @@ -82,6 +123,10 @@ public BlockLoader.FloatBuilder denseVectors(int expectedCount, int dimensions) class FloatsBuilder extends TestBlock.Builder implements BlockLoader.FloatBuilder { int numElements = 0; + private FloatsBuilder() { + super(expectedCount); + } + @Override public BlockLoader.FloatBuilder appendFloat(float value) { add(value); @@ -118,6 +163,10 @@ public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) { @Override public BlockLoader.IntBuilder ints(int expectedCount) { class IntsBuilder extends TestBlock.Builder implements BlockLoader.IntBuilder { + private IntsBuilder() { + super(expectedCount); + } + @Override public IntsBuilder appendInt(int value) { add(value); @@ -135,6 +184,10 @@ public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) { @Override public BlockLoader.LongBuilder longs(int expectedCount) { class LongsBuilder extends TestBlock.Builder implements BlockLoader.LongBuilder { + private LongsBuilder() { + super(expectedCount); + } + @Override public LongsBuilder appendLong(long value) { add(value); @@ -150,26 +203,30 @@ public BlockLoader.Builder nulls(int expectedCount) { } @Override - public BlockLoader.Block constantNulls() { - BlockLoader.LongBuilder builder = longs(pageSize); - for (int i = 0; i < pageSize; i++) { + public BlockLoader.Block constantNulls(int count) { + BlockLoader.LongBuilder builder = longs(count); + for (int i = 0; i < count; i++) { builder.appendNull(); } return builder.build(); } @Override - public BlockLoader.Block constantBytes(BytesRef value) { - BlockLoader.BytesRefBuilder builder = bytesRefs(pageSize); - for (int i = 0; i < pageSize; i++) { + public BlockLoader.Block constantBytes(BytesRef value, int count) { + BlockLoader.BytesRefBuilder builder = bytesRefs(count); + for (int i = 0; i < count; i++) { builder.appendBytesRef(value); } return builder.build(); } @Override - public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { + public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int expectedCount) { class SingletonOrdsBuilder extends TestBlock.Builder implements BlockLoader.SingletonOrdinalsBuilder { + private SingletonOrdsBuilder() { + super(expectedCount); + } + @Override public SingletonOrdsBuilder appendOrd(int value) { try { @@ -184,12 +241,16 @@ public SingletonOrdsBuilder appendOrd(int value) { } @Override - public BlockLoader.SortedSetOrdinalsBuilder sortedSetOrdinalsBuilder(SortedSetDocValues ordinals, int count) { + public BlockLoader.SortedSetOrdinalsBuilder sortedSetOrdinalsBuilder(SortedSetDocValues ordinals, int expectedSize) { class SortedSetOrdinalBuilder extends TestBlock.Builder implements BlockLoader.SortedSetOrdinalsBuilder { + private SortedSetOrdinalBuilder() { + super(expectedSize); + } + @Override public SortedSetOrdinalBuilder appendOrd(int value) { try { - add(ordinals.lookupOrd(value)); + add(BytesRef.deepCopyOf(ordinals.lookupOrd(value))); return this; } catch (IOException e) { throw new UncheckedIOException(e); @@ -199,9 +260,8 @@ public SortedSetOrdinalBuilder appendOrd(int value) { return new SortedSetOrdinalBuilder(); } - @Override - public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) { - return new AggregateMetricDoubleBlockBuilder(); + public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int expectedSize) { + return new AggregateMetricDoubleBlockBuilder(expectedSize); } }; } @@ -256,8 +316,14 @@ public void close() { private abstract static class Builder implements BlockLoader.Builder { private final List values = new ArrayList<>(); + private Matcher expectedSize; + private List currentPosition = null; + private Builder(int expectedSize) { + this.expectedSize = equalTo(expectedSize); + } + @Override public Builder appendNull() { assertNull(currentPosition); @@ -286,6 +352,7 @@ protected void add(Object value) { @Override public TestBlock build() { + assertThat(values, hasSize(expectedSize)); return new TestBlock(values); } @@ -300,12 +367,23 @@ public void close() { * The implementation here is fairly close to the production one. */ private static class AggregateMetricDoubleBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder { - private final DoubleBuilder min = new DoubleBuilder(); - private final DoubleBuilder max = new DoubleBuilder(); - private final DoubleBuilder sum = new DoubleBuilder(); - private final IntBuilder count = new IntBuilder(); + private final DoubleBuilder min; + private final DoubleBuilder max; + private final DoubleBuilder sum; + private final IntBuilder count; + + private AggregateMetricDoubleBlockBuilder(int expectedSize) { + min = new DoubleBuilder(expectedSize); + max = new DoubleBuilder(expectedSize); + sum = new DoubleBuilder(expectedSize); + count = new IntBuilder(expectedSize); + } private static class DoubleBuilder extends TestBlock.Builder implements BlockLoader.DoubleBuilder { + private DoubleBuilder(int expectedSize) { + super(expectedSize); + } + @Override public BlockLoader.DoubleBuilder appendDouble(double value) { add(value); @@ -314,6 +392,10 @@ public BlockLoader.DoubleBuilder appendDouble(double value) { } private static class IntBuilder extends TestBlock.Builder implements BlockLoader.IntBuilder { + private IntBuilder(int expectedSize) { + super(expectedSize); + } + @Override public BlockLoader.IntBuilder appendInt(int value) { add(value); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java index 20ca4ed70e3f8..ccd0f82343401 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java @@ -138,7 +138,6 @@ private boolean checkIfSingleSegmentNonDecreasing() { prev = v; } return true; - } /** diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java index f7f5f541c747f..20e7ffc4ca2cb 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java @@ -14,18 +14,16 @@ import org.elasticsearch.core.Releasable; class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable { - private final int pageSize; private Block nullBlock; - ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) { + ComputeBlockLoaderFactory(BlockFactory factory) { super(factory); - this.pageSize = pageSize; } @Override - public Block constantNulls() { + public Block constantNulls(int count) { if (nullBlock == null) { - nullBlock = factory.newConstantNullBlock(pageSize); + nullBlock = factory.newConstantNullBlock(count); } nullBlock.incRef(); return nullBlock; @@ -39,7 +37,7 @@ public void close() { } @Override - public BytesRefBlock constantBytes(BytesRef value) { - return factory.newConstantBytesRefBlockWith(value, pageSize); + public BytesRefBlock constantBytes(BytesRef value, int count) { + return factory.newConstantBytesRefBlockWith(value, count); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java index 9ec5802b43f98..b5820929ba290 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java @@ -198,12 +198,12 @@ static class BlockLoaderFactory extends DelegatingBlockLoaderFactory { } @Override - public BlockLoader.Block constantNulls() { + public BlockLoader.Block constantNulls(int count) { throw new UnsupportedOperationException("must not be used by column readers"); } @Override - public BlockLoader.Block constantBytes(BytesRef value) { + public BlockLoader.Block constantBytes(BytesRef value, int count) { throw new UnsupportedOperationException("must not be used by column readers"); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java index 7ff6e7211b7f2..6f00e97a1f9f2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java @@ -16,6 +16,8 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.fetch.StoredFieldsSpec; import java.io.IOException; @@ -24,6 +26,8 @@ * Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}. */ class ValuesFromManyReader extends ValuesReader { + private static final Logger log = LogManager.getLogger(ValuesFromManyReader.class); + private final int[] forwards; private final int[] backwards; private final BlockLoader.RowStrideReader[] rowStride; @@ -35,6 +39,7 @@ class ValuesFromManyReader extends ValuesReader { forwards = docs.shardSegmentDocMapForwards(); backwards = docs.shardSegmentDocMapBackwards(); rowStride = new BlockLoader.RowStrideReader[operator.fields.length]; + log.debug("initializing {} positions", docs.getPositionCount()); } @Override @@ -70,9 +75,7 @@ void run(int offset) throws IOException { builders[f] = new Block.Builder[operator.shardContexts.size()]; converters[f] = new BlockLoader[operator.shardContexts.size()]; } - try ( - ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount()) - ) { + try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory)) { int p = forwards[offset]; int shard = docs.shards().getInt(p); int segment = docs.segments().getInt(p); @@ -84,7 +87,9 @@ void run(int offset) throws IOException { read(firstDoc, shard); int i = offset + 1; - while (i < forwards.length) { + long estimated = estimatedRamBytesUsed(); + long dangerZoneBytes = Long.MAX_VALUE; // TODO danger_zone if ascending + while (i < forwards.length && estimated < dangerZoneBytes) { p = forwards[i]; shard = docs.shards().getInt(p); segment = docs.segments().getInt(p); @@ -96,8 +101,17 @@ void run(int offset) throws IOException { verifyBuilders(loaderBlockFactory, shard); read(docs.docs().getInt(p), shard); i++; + estimated = estimatedRamBytesUsed(); + log.trace("{}: bytes loaded {}/{}", p, estimated, dangerZoneBytes); } buildBlocks(); + if (log.isDebugEnabled()) { + long actual = 0; + for (Block b : target) { + actual += b.ramBytesUsed(); + } + log.debug("loaded {} positions total estimated/actual {}/{} bytes", p, estimated, actual); + } } } @@ -115,6 +129,9 @@ private void buildBlocks() { } operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f); } + if (target[0].getPositionCount() != docs.getPositionCount()) { + throw new IllegalStateException("partial pages not yet supported"); + } } private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) { @@ -141,6 +158,18 @@ public void close() { Releasables.closeExpectNoException(builders[f]); } } + + private long estimatedRamBytesUsed() { + long estimated = 0; + for (Block.Builder[] builders : this.builders) { + for (Block.Builder builder : builders) { + if (builder != null) { + estimated += builder.estimatedBytes(); + } + } + } + return estimated; + } } private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java index 1bee68160e024..d47a015c24578 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java @@ -16,6 +16,8 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.fetch.StoredFieldsSpec; import java.io.IOException; @@ -26,6 +28,8 @@ * Loads values from a single leaf. Much more efficient than {@link ValuesFromManyReader}. */ class ValuesFromSingleReader extends ValuesReader { + private static final Logger log = LogManager.getLogger(ValuesFromSingleReader.class); + /** * Minimum number of documents for which it is more efficient to use a * sequential stored field reader when reading stored fields. @@ -45,39 +49,27 @@ class ValuesFromSingleReader extends ValuesReader { super(operator, docs); this.shard = docs.shards().getInt(0); this.segment = docs.segments().getInt(0); + log.debug("initialized {} positions", docs.getPositionCount()); } @Override protected void load(Block[] target, int offset) throws IOException { - assert offset == 0; // TODO allow non-0 offset to support splitting pages if (docs.singleSegmentNonDecreasing()) { - loadFromSingleLeaf(target, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } - - @Override - public int get(int i) { - return docs.docs().getInt(i); - } - }); + loadFromSingleLeaf(operator.jumboBytes, target, new ValuesReaderDocs(docs), offset); return; } + if (offset != 0) { + throw new IllegalStateException("can only load partial pages with single-segment non-decreasing pages"); + } int[] forwards = docs.shardSegmentDocMapForwards(); Block[] unshuffled = new Block[target.length]; try { - loadFromSingleLeaf(unshuffled, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } - - @Override - public int get(int i) { - return docs.docs().getInt(forwards[i]); - } - }); + loadFromSingleLeaf( + Long.MAX_VALUE, // Effectively disable splitting pages when we're not loading in order + unshuffled, + new ValuesReaderDocs(docs).mapped(forwards), + 0 + ); final int[] backwards = docs.shardSegmentDocMapBackwards(); for (int i = 0; i < unshuffled.length; i++) { target[i] = unshuffled[i].filter(backwards); @@ -89,24 +81,25 @@ public int get(int i) { } } - private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IOException { - int firstDoc = docs.get(0); + private void loadFromSingleLeaf(long jumboBytes, Block[] target, ValuesReaderDocs docs, int offset) throws IOException { + int firstDoc = docs.get(offset); operator.positionFieldWork(shard, segment, firstDoc); StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; - List rowStrideReaders = new ArrayList<>(operator.fields.length); LeafReaderContext ctx = operator.ctx(shard, segment); - try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.count())) { + + List columnAtATimeReaders = new ArrayList<>(operator.fields.length); + List rowStrideReaders = new ArrayList<>(operator.fields.length); + try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory)) { for (int f = 0; f < operator.fields.length; f++) { ValuesSourceReaderOperator.FieldWork field = operator.fields[f]; BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx); if (columnAtATime != null) { - target[f] = (Block) columnAtATime.read(loaderBlockFactory, docs); - operator.sanityCheckBlock(columnAtATime, docs.count(), target[f], f); + columnAtATimeReaders.add(new ColumnAtATimeWork(columnAtATime, f)); } else { rowStrideReaders.add( new RowStrideReaderWork( field.rowStride(ctx), - (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()), + (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count() - offset), field.loader, f ) @@ -116,7 +109,18 @@ private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IO } if (rowStrideReaders.isEmpty() == false) { - loadFromRowStrideReaders(target, storedFieldsSpec, rowStrideReaders, ctx, docs); + loadFromRowStrideReaders(jumboBytes, target, storedFieldsSpec, rowStrideReaders, ctx, docs, offset); + } + for (ColumnAtATimeWork r : columnAtATimeReaders) { + target[r.idx] = (Block) r.reader.read(loaderBlockFactory, docs, offset); + operator.sanityCheckBlock(r.reader, docs.count() - offset, target[r.idx], r.idx); + } + if (log.isDebugEnabled()) { + long total = 0; + for (Block b : target) { + total += b.ramBytesUsed(); + } + log.debug("loaded {} positions total ({} bytes)", target[0].getPositionCount(), total); } } finally { Releasables.close(rowStrideReaders); @@ -124,11 +128,13 @@ private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IO } private void loadFromRowStrideReaders( + long jumboBytes, Block[] target, StoredFieldsSpec storedFieldsSpec, List rowStrideReaders, LeafReaderContext ctx, - BlockLoader.Docs docs + ValuesReaderDocs docs, + int offset ) throws IOException { SourceLoader sourceLoader = null; ValuesSourceReaderOperator.ShardContext shardContext = operator.shardContexts.get(shard); @@ -153,18 +159,29 @@ private void loadFromRowStrideReaders( storedFieldLoader.getLoader(ctx, null), sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null ); - int p = 0; - while (p < docs.count()) { + int p = offset; + long estimated = 0; + while (p < docs.count() && estimated < jumboBytes) { int doc = docs.get(p++); storedFields.advanceTo(doc); for (RowStrideReaderWork work : rowStrideReaders) { work.read(doc, storedFields); } + estimated = estimatedRamBytesUsed(rowStrideReaders); + log.trace("{}: bytes loaded {}/{}", p, estimated, jumboBytes); } for (RowStrideReaderWork work : rowStrideReaders) { - target[work.offset] = work.build(); - operator.sanityCheckBlock(work.reader, p, target[work.offset], work.offset); + target[work.idx] = work.build(); + operator.sanityCheckBlock(work.reader, p - offset, target[work.idx], work.idx); } + if (log.isDebugEnabled()) { + long actual = 0; + for (RowStrideReaderWork work : rowStrideReaders) { + actual += target[work.idx].ramBytesUsed(); + } + log.debug("loaded {} positions row stride estimated/actual {}/{} bytes", p - offset, estimated, actual); + } + docs.setCount(p); } /** @@ -180,7 +197,21 @@ private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double st return range * storedFieldsSequentialProportion <= count; } - private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset) + /** + * Work for building a column-at-a-time. + * @param reader reads the values + * @param idx destination in array of {@linkplain Block}s we build + */ + private record ColumnAtATimeWork(BlockLoader.ColumnAtATimeReader reader, int idx) {} + + /** + * Work for + * @param reader + * @param builder + * @param loader + * @param idx + */ + private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int idx) implements Releasable { void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException { @@ -196,4 +227,12 @@ public void close() { builder.close(); } } + + private long estimatedRamBytesUsed(List rowStrideReaders) { + long estimated = 0; + for (RowStrideReaderWork r : rowStrideReaders) { + estimated += r.builder.estimatedBytes(); + } + return estimated; + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java index ebfac0cb24f7f..d3b8b0edcec3d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java @@ -36,9 +36,6 @@ public Block[] next() { boolean success = false; try { load(target, offset); - if (target[0].getPositionCount() != docs.getPositionCount()) { - throw new IllegalStateException("partial pages not yet supported"); - } success = true; for (Block b : target) { operator.valuesLoaded += b.getTotalValueCount(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReaderDocs.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReaderDocs.java new file mode 100644 index 0000000000000..2e138dc2d0446 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReaderDocs.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.mapper.BlockLoader; + +/** + * Implementation of {@link BlockLoader.Docs} for ESQL. It's important that + * only this implementation, and the implementation returned by {@link #mapped} + * exist. This allows the jvm to inline the {@code invokevirtual}s to call + * the interface in hot, hot code. + *

+ * We've investigated moving the {@code offset} parameter from the + * {@link BlockLoader.ColumnAtATimeReader#read} into this. That's more + * readable, but a clock cycle slower. + *

+ *

+ * When we tried having a {@link Nullable} map member instead of a subclass + * that was also slower. + *

+ */ +class ValuesReaderDocs implements BlockLoader.Docs { + private final DocVector docs; + private int count; + + ValuesReaderDocs(DocVector docs) { + this.docs = docs; + this.count = docs.getPositionCount(); + } + + final Mapped mapped(int[] forwards) { + return new Mapped(docs, forwards); + } + + public final void setCount(int count) { + this.count = count; + } + + @Override + public final int count() { + return count; + } + + @Override + public int get(int i) { + return docs.docs().getInt(i); + } + + private class Mapped extends ValuesReaderDocs { + private final int[] forwards; + + private Mapped(DocVector docs, int[] forwards) { + super(docs); + this.forwards = forwards; + } + + @Override + public int get(int i) { + return super.get(forwards[i]); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java index 2fd4784224087..6d0ebb9c312d0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java @@ -9,6 +9,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocBlock; @@ -42,7 +43,9 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOpe * @param shardContexts per-shard loading information * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { + public record Factory(ByteSizeValue jumboSize, List fields, List shardContexts, int docChannel) + implements + OperatorFactory { public Factory { if (fields.isEmpty()) { throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields"); @@ -51,7 +54,7 @@ public record Factory(List fields, List shardContexts, @Override public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); + return new ValuesSourceReaderOperator(driverContext.blockFactory(), jumboSize.getBytes(), fields, shardContexts, docChannel); } @Override @@ -85,10 +88,21 @@ public record FieldInfo(String name, ElementType type, IntFunction public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} + final BlockFactory blockFactory; + /** + * When the loaded fields {@link Block}s' estimated size grows larger than this, + * we finish loading the {@linkplain Page} and return it, even if + * the {@linkplain Page} is shorter than the incoming {@linkplain Page}. + *

+ * NOTE: This only applies when loading single segment non-descending + * row stride bytes. This is the most common way to get giant fields, + * but it isn't all the ways. + *

+ */ + final long jumboBytes; final FieldWork[] fields; final List shardContexts; private final int docChannel; - final BlockFactory blockFactory; private final Map readersBuilt = new TreeMap<>(); long valuesLoaded; @@ -101,14 +115,21 @@ public record ShardContext(IndexReader reader, Supplier newSourceL * @param fields fields to load * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { + public ValuesSourceReaderOperator( + BlockFactory blockFactory, + long jumboBytes, + List fields, + List shardContexts, + int docChannel + ) { if (fields.isEmpty()) { throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields"); } + this.blockFactory = blockFactory; + this.jumboBytes = jumboBytes; this.fields = fields.stream().map(FieldWork::new).toArray(FieldWork[]::new); this.shardContexts = shardContexts; this.docChannel = docChannel; - this.blockFactory = blockFactory; } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index db4febdf8ddca..8185b045029b3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -172,6 +172,7 @@ public void testPushRoundToToQuery() throws IOException { LuceneOperator.NO_LIMIT ); ValuesSourceReaderOperator.Factory load = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of( new ValuesSourceReaderOperator.FieldInfo("v", ElementType.LONG, f -> new BlockDocValuesReader.LongsBlockLoader("v")) ), @@ -198,7 +199,6 @@ public void testPushRoundToToQuery() throws IOException { boolean sawSecondMax = false; boolean sawThirdMax = false; for (Page page : pages) { - logger.error("ADFA {}", page); LongVector group = page.getBlock(1).asVector(); LongVector value = page.getBlock(2).asVector(); for (int p = 0; p < page.getPositionCount(); p++) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 655f7b54c61c0..2ef64623daa74 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.OperatorTests; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; @@ -201,6 +202,7 @@ private List runQuery(Set values, Query query, boolean shuffleDocs operators.add( new ValuesSourceReaderOperator( blockFactory, + ByteSizeValue.ofGb(1).getBytes(), List.of( new ValuesSourceReaderOperator.FieldInfo( FIELD, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java index 2bd5cc95dd804..5a1f2ee7cc949 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.Block; @@ -241,12 +242,17 @@ private static Operator.OperatorFactory factory( ElementType elementType, BlockLoader loader ) { - return new ValuesSourceReaderOperator.Factory(List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> { - if (shardIdx < 0 || shardIdx >= INDICES.size()) { - fail("unexpected shardIdx [" + shardIdx + "]"); - } - return loader; - })), shardContexts, 0); + return new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), + List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> { + if (shardIdx < 0 || shardIdx >= INDICES.size()) { + fail("unexpected shardIdx [" + shardIdx + "]"); + } + return loader; + })), + shardContexts, + 0 + ); } protected SourceOperator simpleInput(DriverContext context, int size) { @@ -493,6 +499,7 @@ public void testManySingleDocPages() { // TODO: Add index2 operators.add( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(testCase.info, fieldInfo(mapperService(indexKey).fieldType("key"), ElementType.INT)), shardContexts, 0 @@ -600,6 +607,7 @@ private void loadSimpleAndAssert( List operators = new ArrayList<>(); operators.add( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of( fieldInfo(mapperService("index1").fieldType("key"), ElementType.INT), fieldInfo(mapperService("index1").fieldType("indexKey"), ElementType.BYTES_REF) @@ -614,7 +622,9 @@ private void loadSimpleAndAssert( cases.removeAll(b); tests.addAll(b); operators.add( - new ValuesSourceReaderOperator.Factory(b.stream().map(i -> i.info).toList(), shardContexts, 0).get(driverContext) + new ValuesSourceReaderOperator.Factory(ByteSizeValue.ofGb(1), b.stream().map(i -> i.info).toList(), shardContexts, 0).get( + driverContext + ) ); } List results = drive(operators, input.iterator(), driverContext); @@ -718,7 +728,7 @@ private void testLoadAllStatus(boolean allInOnePage) { Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING ); List operators = cases.stream() - .map(i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0).get(driverContext)) + .map(i -> new ValuesSourceReaderOperator.Factory(ByteSizeValue.ofGb(1), List.of(i.info), shardContexts, 0).get(driverContext)) .toList(); if (allInOnePage) { input = List.of(CannedSourceOperator.mergePages(input)); @@ -1390,6 +1400,7 @@ public void testNullsShared() { simpleInput(driverContext, 10), List.of( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of( new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) @@ -1424,6 +1435,7 @@ public void testDescriptionOfMany() throws IOException { List cases = infoAndChecksForEachType(ordering, ordering); ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), cases.stream().map(c -> c.info).toList(), List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), 0 @@ -1469,6 +1481,7 @@ public void testManyShards() throws IOException { // TODO add index2 MappedFieldType ft = mapperService(indexKey).fieldType("key"); var readerFactory = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, shardIdx -> { seenShards.add(shardIdx); return ft.blockLoader(blContext()); @@ -1676,8 +1689,8 @@ public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws } return new ColumnAtATimeReader() { @Override - public Block read(BlockFactory factory, Docs docs) throws IOException { - Block block = reader.read(factory, docs); + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + Block block = reader.read(factory, docs, offset); Page page = new Page((org.elasticsearch.compute.data.Block) block); return convertEvaluator.eval(page); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 0c227b5411e25..19a645c146242 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BooleanBlock; @@ -37,6 +38,7 @@ import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.DocBlock; +import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.ElementType; @@ -99,6 +101,7 @@ import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -150,12 +153,14 @@ public static Operator.OperatorFactory factory(IndexReader reader, MappedFieldTy } static Operator.OperatorFactory factory(IndexReader reader, String name, ElementType elementType, BlockLoader loader) { - return new ValuesSourceReaderOperator.Factory(List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> { - if (shardIdx != 0) { - fail("unexpected shardIdx [" + shardIdx + "]"); - } - return loader; - })), + return new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), + List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> { + if (shardIdx != 0) { + fail("unexpected shardIdx [" + shardIdx + "]"); + } + return loader; + })), List.of( new ValuesSourceReaderOperator.ShardContext( reader, @@ -401,7 +406,7 @@ private IndexReader initIndexLongField(Directory directory, int size, int commit for (int d = 0; d < size; d++) { XContentBuilder source = JsonXContent.contentBuilder(); source.startObject(); - source.field("long_source_text", Integer.toString(d).repeat(100 * 1024)); + source.field("long_source_text", d + "#" + "a".repeat(100 * 1024)); source.endObject(); ParsedDocument doc = mapperService.documentParser() .parseDocument( @@ -489,6 +494,7 @@ public void testManySingleDocPages() { ); operators.add( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -608,6 +614,7 @@ private void loadSimpleAndAssert( List operators = new ArrayList<>(); operators.add( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -626,6 +633,7 @@ private void loadSimpleAndAssert( tests.addAll(b); operators.add( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), b.stream().map(i -> i.info).toList(), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -724,6 +732,7 @@ private void testLoadAllStatus(boolean allInOnePage) { List operators = cases.stream() .map( i -> new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(i.info), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -928,7 +937,6 @@ public void testLoadLongShuffledManySegments() throws IOException { private void testLoadLong(boolean shuffle, boolean manySegments) throws IOException { int numDocs = between(10, 500); initMapping(); - keyToTags.clear(); reader = initIndexLongField(directory, numDocs, manySegments ? commitEvery(numDocs) : numDocs, manySegments == false); DriverContext driverContext = driverContext(); @@ -941,6 +949,7 @@ private void testLoadLong(boolean shuffle, boolean manySegments) throws IOExcept if (shuffle) { input = input.stream().map(this::shuffle).toList(); } + boolean willSplit = loadLongWillSplit(input); Checks checks = new Checks(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); @@ -956,6 +965,7 @@ private void testLoadLong(boolean shuffle, boolean manySegments) throws IOExcept List operators = cases.stream() .map( i -> new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(i.info), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -968,12 +978,55 @@ private void testLoadLong(boolean shuffle, boolean manySegments) throws IOExcept ).get(driverContext) ) .toList(); - drive(operators, input.iterator(), driverContext); + List result = drive(operators, input.iterator(), driverContext); + + boolean[] found = new boolean[numDocs]; + for (Page page : result) { + BytesRefVector bytes = page.getBlock(1).asVector(); + BytesRef scratch = new BytesRef(); + for (int p = 0; p < bytes.getPositionCount(); p++) { + BytesRef v = bytes.getBytesRef(p, scratch); + int d = Integer.valueOf(v.utf8ToString().split("#")[0]); + assertFalse("found a duplicate " + d, found[d]); + found[d] = true; + } + } + List missing = new ArrayList<>(); + for (int d = 0; d < numDocs; d++) { + if (found[d] == false) { + missing.add(d); + } + } + assertThat(missing, hasSize(0)); + assertThat(result, hasSize(willSplit ? greaterThanOrEqualTo(input.size()) : equalTo(input.size()))); + for (int i = 0; i < cases.size(); i++) { ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); assertThat(status.pagesReceived(), equalTo(input.size())); - assertThat(status.pagesEmitted(), equalTo(input.size())); + assertThat(status.pagesEmitted(), willSplit ? greaterThanOrEqualTo(input.size()) : equalTo(input.size())); + } + } + + private boolean loadLongWillSplit(List input) { + int nextDoc = -1; + for (Page page : input) { + DocVector doc = page.getBlock(0).asVector(); + for (int p = 0; p < doc.getPositionCount(); p++) { + if (doc.shards().getInt(p) != 0) { + return false; + } + if (doc.segments().getInt(p) != 0) { + return false; + } + if (nextDoc == -1) { + nextDoc = doc.docs().getInt(p); + } else if (doc.docs().getInt(p) != nextDoc) { + return false; + } + nextDoc++; + } } + return true; } record Checks(Block.MvOrdering booleanAndNumericalDocValuesMvOrdering, Block.MvOrdering bytesRefDocValuesMvOrdering) { @@ -1565,6 +1618,7 @@ public void testNullsShared() { simpleInput(driverContext.blockFactory(), 10), List.of( new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of( new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) @@ -1616,6 +1670,7 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws assertThat(source, hasSize(1)); // We want one page for simpler assertions, and we want them all in one segment assertTrue(source.get(0).getBlock(0).asVector().singleSegmentNonDecreasing()); Operator op = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of( fieldInfo(mapperService.fieldType("key"), ElementType.INT), fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF) @@ -1653,6 +1708,7 @@ public void testDescriptionOfMany() throws IOException { List cases = infoAndChecksForEachType(ordering, ordering); ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), cases.stream().map(c -> c.info).toList(), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -1706,6 +1762,7 @@ public void testManyShards() throws IOException { ); MappedFieldType ft = mapperService.fieldType("key"); var readerFactory = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, shardIdx -> { seenShards.add(shardIdx); return ft.blockLoader(blContext()); diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java index d01e1c9fb7f56..3484f19afa451 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java @@ -9,13 +9,21 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import org.elasticsearch.client.Request; +import org.elasticsearch.common.Strings; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; +import org.junit.Before; import org.junit.ClassRule; +import java.io.IOException; + @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class EsqlSpecIT extends EsqlSpecTestCase { @ClassRule @@ -50,4 +58,14 @@ protected boolean enableRoundingDoubleValuesOnAsserting() { protected boolean supportsSourceFieldMapping() { return cluster.getNumNodes() == 1; } + + @Before + public void configureChunks() throws IOException { + boolean smallChunks = randomBoolean(); + Request request = new Request("PUT", "/_cluster/settings"); + XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject("persistent"); + builder.field(PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null); + request.setJsonEntity(Strings.toString(builder.endObject().endObject())); + assertOK(client().performRequest(request)); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 1d63a2bcf5373..e25cb82f29851 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -60,6 +60,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -198,6 +199,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws false // no scoring ); ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory( + PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY), List.of( new ValuesSourceReaderOperator.FieldInfo( "key", diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 6d5630b0e6581..dd305f09c12dc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -450,6 +450,7 @@ private static Operator extractFieldsOperator( } return new ValuesSourceReaderOperator( driverContext.blockFactory(), + Long.MAX_VALUE, fields, List.of( new ValuesSourceReaderOperator.ShardContext( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 7abe84d99e5f2..e0b570267899b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -22,7 +22,6 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSliceQueue; @@ -140,17 +139,17 @@ public boolean hasReferences() { } private final List shardContexts; - private final DataPartitioning defaultDataPartitioning; + private final PhysicalSettings physicalSettings; public EsPhysicalOperationProviders( FoldContext foldContext, List shardContexts, AnalysisRegistry analysisRegistry, - DataPartitioning defaultDataPartitioning + PhysicalSettings physicalSettings ) { super(foldContext, analysisRegistry); this.shardContexts = shardContexts; - this.defaultDataPartitioning = defaultDataPartitioning; + this.physicalSettings = physicalSettings; } @Override @@ -175,7 +174,10 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi // TODO: consolidate with ValuesSourceReaderOperator return source.with(new TimeSeriesExtractFieldOperator.Factory(fields, shardContexts), layout.build()); } else { - return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build()); + return source.with( + new ValuesSourceReaderOperator.Factory(physicalSettings.valuesLoadingJumboSize(), fields, readers, docChannel), + layout.build() + ); } } @@ -278,7 +280,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(defaultDataPartitioning), + context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -289,7 +291,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(defaultDataPartitioning), + context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -341,7 +343,7 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont return new LuceneCountOperator.Factory( shardContexts, querySupplier(queryBuilder), - context.queryPragmas().dataPartitioning(defaultDataPartitioning), + context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx()) ); @@ -530,8 +532,8 @@ public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws } return new ColumnAtATimeReader() { @Override - public Block read(BlockFactory factory, Docs docs) throws IOException { - Block block = reader.read(factory, docs); + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + Block block = reader.read(factory, docs, offset); return typeConverter.convert((org.elasticsearch.compute.data.Block) block); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java new file mode 100644 index 0000000000000..4276eeaf39f9b --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.MemorySizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.monitor.jvm.JvmInfo; + +/** + * Values for cluster level settings used in physical planning. + */ +public class PhysicalSettings { + public static final Setting DEFAULT_DATA_PARTITIONING = Setting.enumSetting( + DataPartitioning.class, + "esql.default_data_partitioning", + DataPartitioning.AUTO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting VALUES_LOADING_JUMBO_SIZE = new Setting<>("esql.values_loading_jumbo_size", settings -> { + long proportional = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 1024; + return ByteSizeValue.ofBytes(Math.max(proportional, ByteSizeValue.ofMb(1).getBytes())).getStringRep(); + }, + s -> MemorySizeValue.parseBytesSizeValueOrHeapRatio(s, "esql.values_loading_jumbo_size"), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private volatile DataPartitioning defaultDataPartitioning; + private volatile ByteSizeValue valuesLoadingJumboSize; + + /** + * Ctor for prod that listens for updates from the {@link ClusterService}. + */ + public PhysicalSettings(ClusterService clusterService) { + clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); + clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); + } + + /** + * Ctor for testing. + */ + public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize) { + this.defaultDataPartitioning = defaultDataPartitioning; + this.valuesLoadingJumboSize = valuesLoadingJumboSize; + } + + public DataPartitioning defaultDataPartitioning() { + return defaultDataPartitioning; + } + + public ByteSizeValue valuesLoadingJumboSize() { + return valuesLoadingJumboSize; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 1ae6a4634644d..d0b6bdfc5668a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.FailureCollector; @@ -61,6 +60,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; @@ -139,8 +139,7 @@ public class ComputeService { private final DataNodeComputeHandler dataNodeComputeHandler; private final ClusterComputeHandler clusterComputeHandler; private final ExchangeService exchangeService; - - private volatile DataPartitioning defaultDataPartitioning; + private final PhysicalSettings physicalSettings; @SuppressWarnings("this-escape") public ComputeService( @@ -179,7 +178,7 @@ public ComputeService( esqlExecutor, dataNodeComputeHandler ); - clusterService.getClusterSettings().initializeAndWatch(EsqlPlugin.DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); + this.physicalSettings = new PhysicalSettings(clusterService); } public void execute( @@ -612,7 +611,7 @@ public SourceProvider createSourceProvider() { context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis(), - defaultDataPartitioning + physicalSettings ); try { LocalExecutionPlanner planner = new LocalExecutionPlanner( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 7cba5eeb56278..f2f5b6b640311 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockFactoryProvider; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; @@ -75,6 +74,7 @@ import org.elasticsearch.xpack.esql.io.stream.ExpressionQueryBuilder; import org.elasticsearch.xpack.esql.io.stream.PlanStreamWrapperQueryBuilder; import org.elasticsearch.xpack.esql.plan.PlanWritables; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog; import org.elasticsearch.xpack.esql.session.IndexResolver; @@ -160,14 +160,6 @@ public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin Setting.Property.Dynamic ); - public static final Setting DEFAULT_DATA_PARTITIONING = Setting.enumSetting( - DataPartitioning.class, - "esql.default_data_partitioning", - DataPartitioning.AUTO, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - /** * Tuning parameter for deciding when to use the "merge" stored field loader. * Think of it as "how similar to a sequential block of documents do I have to @@ -263,7 +255,8 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, ESQL_QUERYLOG_INCLUDE_USER_SETTING, - DEFAULT_DATA_PARTITIONING, + PhysicalSettings.DEFAULT_DATA_PARTITIONING, + PhysicalSettings.VALUES_LOADING_JUMBO_SIZE, STORED_FIELDS_SEQUENTIAL_PROPORTION, EsqlFlags.ESQL_STRING_LIKE_ON_INDEX ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 345bf3b8767ef..bdd0e382c3fd3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import java.io.IOException; import java.util.Locale; @@ -45,7 +46,7 @@ public final class QueryPragmas implements Writeable { * the enum {@link DataPartitioning} which has more documentation. Not an * {@link Setting#enumSetting} because those can't have {@code null} defaults. * {@code null} here means "use the default from the cluster setting - * named {@link EsqlPlugin#DEFAULT_DATA_PARTITIONING}." + * named {@link PhysicalSettings#DEFAULT_DATA_PARTITIONING}." */ public static final Setting DATA_PARTITIONING = Setting.simpleString("data_partitioning"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index a609a1e494e54..6721dc91fc847 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.aggregation.AggregatorMode; @@ -135,6 +136,7 @@ import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -7888,7 +7890,12 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP null, null, null, - new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO), + new EsPhysicalOperationProviders( + FoldContext.small(), + List.of(), + null, + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + ), List.of() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 6749f03bedde7..b56f4a3a4898b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -19,6 +19,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneSourceOperator; @@ -340,7 +341,12 @@ private Configuration config() { } private EsPhysicalOperationProviders esPhysicalOperationProviders(List shardContexts) { - return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO); + return new EsPhysicalOperationProviders( + FoldContext.small(), + shardContexts, + null, + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + ); } private List createShardContexts() throws IOException { diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java index 5eafb858eacbe..3658313642700 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java @@ -571,20 +571,24 @@ public String toString() { } @Override - public Block read(BlockFactory factory, Docs docs) throws IOException { - try (var builder = factory.aggregateMetricDoubleBuilder(docs.count())) { - copyDoubleValuesToBuilder(docs, builder.min(), minValues); - copyDoubleValuesToBuilder(docs, builder.max(), maxValues); - copyDoubleValuesToBuilder(docs, builder.sum(), sumValues); - copyIntValuesToBuilder(docs, builder.count(), valueCountValues); + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (var builder = factory.aggregateMetricDoubleBuilder(docs.count() - offset)) { + copyDoubleValuesToBuilder(docs, offset, builder.min(), minValues); + copyDoubleValuesToBuilder(docs, offset, builder.max(), maxValues); + copyDoubleValuesToBuilder(docs, offset, builder.sum(), sumValues); + copyIntValuesToBuilder(docs, offset, builder.count(), valueCountValues); return builder.build(); } } - private void copyDoubleValuesToBuilder(Docs docs, BlockLoader.DoubleBuilder builder, NumericDocValues values) - throws IOException { + private void copyDoubleValuesToBuilder( + Docs docs, + int offset, + BlockLoader.DoubleBuilder builder, + NumericDocValues values + ) throws IOException { int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < lastDoc) { throw new IllegalStateException("docs within same block must be in order"); @@ -600,10 +604,10 @@ private void copyDoubleValuesToBuilder(Docs docs, BlockLoader.DoubleBuilder buil } } - private void copyIntValuesToBuilder(Docs docs, BlockLoader.IntBuilder builder, NumericDocValues values) + private void copyIntValuesToBuilder(Docs docs, int offset, BlockLoader.IntBuilder builder, NumericDocValues values) throws IOException { int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { + for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); if (doc < lastDoc) { throw new IllegalStateException("docs within same block must be in order"); diff --git a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java index c0c2db53b97e9..1a94ca1b8d40a 100644 --- a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java +++ b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java @@ -276,7 +276,7 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { iw.close(); try (DirectoryReader reader = DirectoryReader.open(directory)) { TestBlock block = (TestBlock) loader.columnAtATimeReader(reader.leaves().get(0)) - .read(TestBlock.factory(reader.numDocs()), new BlockLoader.Docs() { + .read(TestBlock.factory(), new BlockLoader.Docs() { @Override public int count() { return 1; @@ -286,7 +286,7 @@ public int count() { public int get(int i) { return 0; } - }); + }, 0); assertThat(block.get(0), nullValue()); } }