From b1de5f5ce3d51847608ea5c8fcd859967cd0d748 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 7 Jul 2025 20:04:11 -0400 Subject: [PATCH 1/3] ESQL: Refactor value reading so it can split Pages (#130573) This refactors our `ValuesSourceReaderOperator` so it can split pages when it reads large values. It does not *actually* split the pages as that's a bit tricky. But it sets the stage for the next PR that will do so. * Move `ValuesSourceReaderOperator` to it's own package * Move many inner classes into their own top level classes * Extend from `AbstractPageMappingToIteratorOperator` instead of `AbstractPageMappingToOperator` * This allows returning more than one `Page` per input `Page` * In this PR we still always return one `Page` per input `Page` * Make new `ReleasableIterator` subclasses to satisfy `AbstractPageMappingToIteratorOperator` * Change `status` of loading fields from `pages_processed` to `pages_received` and `pages_emitted` * Fix a bug in `AbstractPageMappingToOperator` which can leak circuit breaker allocation if we fail to during `receive`. This isn't possible in the existing implementations but is possible in `ValuesSourceReaderOperator`. * Add a test with large text fields. Right now it still comes back in one page because we don't cut the pages. Closes #130727 --- .../operator/ValuesSourceReaderBenchmark.java | 533 ------------ .../org/elasticsearch/TransportVersions.java | 1 + .../compute/src/main/java/module-info.java | 1 + .../lucene/ValuesSourceReaderOperator.java | 778 ------------------ .../read/ComputeBlockLoaderFactory.java | 45 + .../read/DelegatingBlockLoaderFactory.java | 93 +++ .../lucene/read/ValuesFromManyReader.java | 166 ++++ .../lucene/read/ValuesFromSingleReader.java | 194 +++++ .../compute/lucene/read/ValuesReader.java | 58 ++ .../read/ValuesSourceReaderOperator.java | 306 +++++++ .../ValuesSourceReaderOperatorStatus.java | 153 ++++ .../operator/AbstractPageMappingOperator.java | 2 +- ...AbstractPageMappingToIteratorOperator.java | 62 +- .../compute/operator/ColumnLoadOperator.java | 2 +- .../operator/OrdinalsGroupingOperator.java | 2 +- .../elasticsearch/compute/OperatorTests.java | 2 +- .../lucene/LuceneQueryEvaluatorTests.java | 1 + .../lucene/LuceneSourceOperatorTests.java | 1 + .../LuceneTopNSourceOperatorScoringTests.java | 3 +- .../lucene/LuceneTopNSourceOperatorTests.java | 3 +- .../TimeSeriesSortedSourceOperatorTests.java | 1 + .../ValueSourceReaderTypeConversionTests.java | 17 +- ...ValuesSourceReaderOperatorStatusTests.java | 46 +- .../ValuesSourceReaderOperatorTests.java | 153 +++- .../compute/operator/DriverProfileTests.java | 6 +- .../compute/operator/DriverStatusTests.java | 6 +- .../TimeSeriesAggregationOperatorTests.java | 2 +- .../xpack/esql/qa/single_node/RestEsqlIT.java | 9 +- .../xpack/esql/action/EsqlActionTaskIT.java | 7 +- .../xpack/esql/action/LookupFromIndexIT.java | 2 +- .../xpack/esql/action/EsqlCapabilities.java | 3 +- .../esql/enrich/AbstractLookupService.java | 14 +- .../planner/EsPhysicalOperationProviders.java | 2 +- .../xpack/esql/plugin/EsqlPlugin.java | 4 +- 34 files changed, 1287 insertions(+), 1391 deletions(-) delete mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java delete mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java rename x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/{ => read}/ValueSourceReaderTypeConversionTests.java (99%) rename x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/{ => read}/ValuesSourceReaderOperatorStatusTests.java (60%) rename x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/{ => read}/ValuesSourceReaderOperatorTests.java (91%) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java deleted file mode 100644 index c05c20dc8953f..0000000000000 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ /dev/null @@ -1,533 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.benchmark.compute.operator; - -import org.apache.lucene.document.FieldType; -import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.document.StoredField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.DocValuesType; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.store.ByteBuffersDirectory; -import org.apache.lucene.store.Directory; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.NumericUtils; -import org.elasticsearch.common.breaker.NoopCircuitBreaker; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.BytesRefVector; -import org.elasticsearch.compute.data.DocVector; -import org.elasticsearch.compute.data.DoubleBlock; -import org.elasticsearch.compute.data.DoubleVector; -import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.IntBlock; -import org.elasticsearch.compute.data.IntVector; -import org.elasticsearch.compute.data.LongBlock; -import org.elasticsearch.compute.data.LongVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.LuceneSourceOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; -import org.elasticsearch.compute.operator.topn.TopNOperator; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.mapper.BlockLoader; -import org.elasticsearch.index.mapper.FieldNamesFieldMapper; -import org.elasticsearch.index.mapper.KeywordFieldMapper; -import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.NumberFieldMapper; -import org.elasticsearch.search.lookup.SearchLookup; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OperationsPerInvocation; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PrimitiveIterator; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; - -@Warmup(iterations = 5) -@Measurement(iterations = 7) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -@Fork(1) -public class ValuesSourceReaderBenchmark { - private static final int BLOCK_LENGTH = 16 * 1024; - private static final int INDEX_SIZE = 10 * BLOCK_LENGTH; - private static final int COMMIT_INTERVAL = 500; - private static final BigArrays BIG_ARRAYS = BigArrays.NON_RECYCLING_INSTANCE; - private static final BlockFactory blockFactory = BlockFactory.getInstance( - new NoopCircuitBreaker("noop"), - BigArrays.NON_RECYCLING_INSTANCE - ); - - static { - // Smoke test all the expected values and force loading subclasses more like prod - try { - ValuesSourceReaderBenchmark benchmark = new ValuesSourceReaderBenchmark(); - benchmark.setupIndex(); - try { - for (String layout : ValuesSourceReaderBenchmark.class.getField("layout").getAnnotationsByType(Param.class)[0].value()) { - for (String name : ValuesSourceReaderBenchmark.class.getField("name").getAnnotationsByType(Param.class)[0].value()) { - benchmark.layout = layout; - benchmark.name = name; - try { - benchmark.setupPages(); - benchmark.benchmark(); - } catch (Exception e) { - throw new AssertionError("error initializing [" + layout + "/" + name + "]", e); - } - } - } - } finally { - benchmark.teardownIndex(); - } - } catch (IOException | NoSuchFieldException e) { - throw new AssertionError(e); - } - } - - private static List fields(String name) { - return switch (name) { - case "3_stored_keywords" -> List.of( - new ValuesSourceReaderOperator.FieldInfo("keyword_1", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_1")), - new ValuesSourceReaderOperator.FieldInfo("keyword_2", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_2")), - new ValuesSourceReaderOperator.FieldInfo("keyword_3", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_3")) - ); - default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType(name), shardIdx -> blockLoader(name))); - }; - } - - enum Where { - DOC_VALUES, - SOURCE, - STORED; - } - - private static ElementType elementType(String name) { - name = WhereAndBaseName.fromName(name).name; - switch (name) { - case "long": - return ElementType.LONG; - case "int": - return ElementType.INT; - case "double": - return ElementType.DOUBLE; - } - if (name.startsWith("keyword")) { - return ElementType.BYTES_REF; - } - throw new UnsupportedOperationException("no element type for [" + name + "]"); - } - - private static BlockLoader blockLoader(String name) { - WhereAndBaseName w = WhereAndBaseName.fromName(name); - switch (w.name) { - case "long": - return numericBlockLoader(w, NumberFieldMapper.NumberType.LONG); - case "int": - return numericBlockLoader(w, NumberFieldMapper.NumberType.INTEGER); - case "double": - return numericBlockLoader(w, NumberFieldMapper.NumberType.DOUBLE); - case "keyword": - w = new WhereAndBaseName(w.where, "keyword_1"); - } - if (w.name.startsWith("keyword")) { - boolean syntheticSource = false; - FieldType ft = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE); - switch (w.where) { - case DOC_VALUES: - break; - case SOURCE: - ft.setDocValuesType(DocValuesType.NONE); - break; - case STORED: - ft.setStored(true); - ft.setDocValuesType(DocValuesType.NONE); - syntheticSource = true; - break; - } - ft.freeze(); - return new KeywordFieldMapper.KeywordFieldType( - w.name, - ft, - Lucene.KEYWORD_ANALYZER, - Lucene.KEYWORD_ANALYZER, - Lucene.KEYWORD_ANALYZER, - new KeywordFieldMapper.Builder(name, IndexVersion.current()).docValues(ft.docValuesType() != DocValuesType.NONE), - syntheticSource - ).blockLoader(new MappedFieldType.BlockLoaderContext() { - @Override - public String indexName() { - return "benchmark"; - } - - @Override - public IndexSettings indexSettings() { - throw new UnsupportedOperationException(); - } - - @Override - public MappedFieldType.FieldExtractPreference fieldExtractPreference() { - return MappedFieldType.FieldExtractPreference.NONE; - } - - @Override - public SearchLookup lookup() { - throw new UnsupportedOperationException(); - } - - @Override - public Set sourcePaths(String name) { - return Set.of(name); - } - - @Override - public String parentField(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { - return FieldNamesFieldMapper.FieldNamesFieldType.get(true); - } - }); - } - throw new IllegalArgumentException("can't read [" + name + "]"); - } - - private record WhereAndBaseName(Where where, String name) { - static WhereAndBaseName fromName(String name) { - if (name.startsWith("stored_")) { - return new WhereAndBaseName(Where.STORED, name.substring("stored_".length())); - } else if (name.startsWith("source_")) { - return new WhereAndBaseName(Where.SOURCE, name.substring("source_".length())); - } - return new WhereAndBaseName(Where.DOC_VALUES, name); - } - } - - private static BlockLoader numericBlockLoader(WhereAndBaseName w, NumberFieldMapper.NumberType numberType) { - boolean stored = false; - boolean docValues = true; - switch (w.where) { - case DOC_VALUES: - break; - case SOURCE: - stored = true; - docValues = false; - break; - case STORED: - throw new UnsupportedOperationException(); - } - return new NumberFieldMapper.NumberFieldType( - w.name, - numberType, - true, - stored, - docValues, - true, - null, - Map.of(), - null, - false, - null, - null, - false - ).blockLoader(null); - } - - /** - * Layouts for the input blocks. - *
    - *
  • {@code in_order} is how {@link LuceneSourceOperator} produces them to read in - * the most efficient possible way. We
  • - *
  • {@code shuffled} is chunked the same size as {@link LuceneSourceOperator} but - * loads in a shuffled order, like a hypothetical {@link TopNOperator} that can - * output large blocks would output.
  • - *
  • {@code shuffled_singles} is shuffled in the same order as {@code shuffled} but - * each page has a single document rather than {@code BLOCK_SIZE} docs.
  • - *
- */ - @Param({ "in_order", "shuffled", "shuffled_singles" }) - public String layout; - - @Param({ "long", "int", "double", "keyword", "stored_keyword", "3_stored_keywords" }) - public String name; - - private Directory directory; - private IndexReader reader; - private List pages; - - @Benchmark - @OperationsPerInvocation(INDEX_SIZE) - public void benchmark() { - ValuesSourceReaderOperator op = new ValuesSourceReaderOperator( - blockFactory, - fields(name), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { - throw new UnsupportedOperationException("can't load _source here"); - }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), - 0 - ); - long sum = 0; - for (Page page : pages) { - op.addInput(page); - switch (name) { - case "long" -> { - LongVector values = op.getOutput().getBlock(1).asVector(); - for (int p = 0; p < values.getPositionCount(); p++) { - sum += values.getLong(p); - } - } - case "int" -> { - IntVector values = op.getOutput().getBlock(1).asVector(); - for (int p = 0; p < values.getPositionCount(); p++) { - sum += values.getInt(p); - } - } - case "double" -> { - DoubleVector values = op.getOutput().getBlock(1).asVector(); - for (int p = 0; p < values.getPositionCount(); p++) { - sum += (long) values.getDouble(p); - } - } - case "keyword", "stored_keyword" -> { - BytesRef scratch = new BytesRef(); - BytesRefVector values = op.getOutput().getBlock(1).asVector(); - for (int p = 0; p < values.getPositionCount(); p++) { - BytesRef r = values.getBytesRef(p, scratch); - r.offset++; - r.length--; - sum += Integer.parseInt(r.utf8ToString()); - } - } - case "3_stored_keywords" -> { - BytesRef scratch = new BytesRef(); - Page out = op.getOutput(); - for (BytesRefVector values : new BytesRefVector[] { - out.getBlock(1).asVector(), - out.getBlock(2).asVector(), - out.getBlock(3).asVector() }) { - - for (int p = 0; p < values.getPositionCount(); p++) { - BytesRef r = values.getBytesRef(p, scratch); - r.offset++; - r.length--; - sum += Integer.parseInt(r.utf8ToString()); - } - } - } - } - } - long expected = 0; - switch (name) { - case "keyword", "stored_keyword": - for (int i = 0; i < INDEX_SIZE; i++) { - expected += i % 1000; - } - break; - case "3_stored_keywords": - for (int i = 0; i < INDEX_SIZE; i++) { - expected += 3 * (i % 1000); - } - break; - default: - expected = INDEX_SIZE; - expected = expected * (expected - 1) / 2; - } - if (expected != sum) { - throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]"); - } - boolean foundStoredFieldLoader = false; - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status(); - for (Map.Entry e : status.readersBuilt().entrySet()) { - if (e.getKey().indexOf("stored_fields") >= 0) { - foundStoredFieldLoader = true; - } - } - if (name.indexOf("stored") >= 0) { - if (foundStoredFieldLoader == false) { - throw new AssertionError("expected to use a stored field loader but only had: " + status.readersBuilt()); - } - } else { - if (foundStoredFieldLoader) { - throw new AssertionError("expected not to use a stored field loader but only had: " + status.readersBuilt()); - } - } - } - - @Setup - public void setup() throws IOException { - setupIndex(); - setupPages(); - } - - private void setupIndex() throws IOException { - directory = new ByteBuffersDirectory(); - FieldType keywordFieldType = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE); - keywordFieldType.setStored(true); - keywordFieldType.freeze(); - try (IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE))) { - for (int i = 0; i < INDEX_SIZE; i++) { - String c = Character.toString('a' - ((i % 1000) % 26) + 26); - iw.addDocument( - List.of( - new NumericDocValuesField("long", i), - new StoredField("long", i), - new NumericDocValuesField("int", i), - new StoredField("int", i), - new NumericDocValuesField("double", NumericUtils.doubleToSortableLong(i)), - new StoredField("double", (double) i), - new KeywordFieldMapper.KeywordField("keyword_1", new BytesRef(c + i % 1000), keywordFieldType), - new KeywordFieldMapper.KeywordField("keyword_2", new BytesRef(c + i % 1000), keywordFieldType), - new KeywordFieldMapper.KeywordField("keyword_3", new BytesRef(c + i % 1000), keywordFieldType) - ) - ); - if (i % COMMIT_INTERVAL == 0) { - iw.commit(); - } - } - } - reader = DirectoryReader.open(directory); - } - - private void setupPages() { - pages = new ArrayList<>(); - switch (layout) { - case "in_order" -> { - IntVector.Builder docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); - for (LeafReaderContext ctx : reader.leaves()) { - int begin = 0; - while (begin < ctx.reader().maxDoc()) { - int end = Math.min(begin + BLOCK_LENGTH, ctx.reader().maxDoc()); - for (int doc = 0; doc < ctx.reader().maxDoc(); doc++) { - docs.appendInt(doc); - } - pages.add( - new Page( - new DocVector( - blockFactory.newConstantIntBlockWith(0, end - begin).asVector(), - blockFactory.newConstantIntBlockWith(ctx.ord, end - begin).asVector(), - docs.build(), - true - ).asBlock() - ) - ); - docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); - begin = end; - } - } - } - case "shuffled" -> { - record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {} - List docItrs = new ArrayList<>(reader.leaves().size()); - for (LeafReaderContext ctx : reader.leaves()) { - docItrs.add(new ItrAndOrd(IntStream.range(0, ctx.reader().maxDoc()).iterator(), ctx.ord)); - } - IntVector.Builder docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); - IntVector.Builder leafs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); - int size = 0; - while (docItrs.isEmpty() == false) { - Iterator itrItr = docItrs.iterator(); - while (itrItr.hasNext()) { - ItrAndOrd next = itrItr.next(); - if (false == next.itr.hasNext()) { - itrItr.remove(); - continue; - } - docs.appendInt(next.itr.nextInt()); - leafs.appendInt(next.ord); - size++; - if (size >= BLOCK_LENGTH) { - pages.add( - new Page( - new DocVector(blockFactory.newConstantIntVector(0, size), leafs.build(), docs.build(), null).asBlock() - ) - ); - docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); - leafs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); - size = 0; - } - } - } - if (size > 0) { - pages.add( - new Page( - new DocVector( - blockFactory.newConstantIntBlockWith(0, size).asVector(), - leafs.build().asBlock().asVector(), - docs.build(), - null - ).asBlock() - ) - ); - } - } - case "shuffled_singles" -> { - record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {} - List docItrs = new ArrayList<>(reader.leaves().size()); - for (LeafReaderContext ctx : reader.leaves()) { - docItrs.add(new ItrAndOrd(IntStream.range(0, ctx.reader().maxDoc()).iterator(), ctx.ord)); - } - while (docItrs.isEmpty() == false) { - Iterator itrItr = docItrs.iterator(); - while (itrItr.hasNext()) { - ItrAndOrd next = itrItr.next(); - if (false == next.itr.hasNext()) { - itrItr.remove(); - continue; - } - pages.add( - new Page( - new DocVector( - blockFactory.newConstantIntVector(0, 1), - blockFactory.newConstantIntVector(next.ord, 1), - blockFactory.newConstantIntVector(next.itr.nextInt(), 1), - true - ).asBlock() - ) - ); - } - } - } - default -> throw new IllegalArgumentException("unsupported layout [" + layout + "]"); - } - } - - @TearDown - public void teardownIndex() throws IOException { - IOUtils.close(reader, directory); - } -} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 8933322250b9a..da26fb61eedd6 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -255,6 +255,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60); public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61); public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62); + public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/module-info.java b/x-pack/plugin/esql/compute/src/main/java/module-info.java index c4a042d692ea1..f21ed72d7eb21 100644 --- a/x-pack/plugin/esql/compute/src/main/java/module-info.java +++ b/x-pack/plugin/esql/compute/src/main/java/module-info.java @@ -36,4 +36,5 @@ exports org.elasticsearch.compute.aggregation.table; exports org.elasticsearch.compute.data.sort; exports org.elasticsearch.compute.querydsl.query; + exports org.elasticsearch.compute.lucene.read; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java deleted file mode 100644 index ca24a86bc3087..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ /dev/null @@ -1,778 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.lucene; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.DocBlock; -import org.elasticsearch.compute.data.DocVector; -import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.IntVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.data.SingletonOrdinalsBuilder; -import org.elasticsearch.compute.operator.AbstractPageMappingOperator; -import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.compute.operator.Operator; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; -import org.elasticsearch.index.mapper.BlockLoader; -import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; -import org.elasticsearch.index.mapper.SourceLoader; -import org.elasticsearch.search.fetch.StoredFieldsSpec; -import org.elasticsearch.xcontent.XContentBuilder; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TreeMap; -import java.util.function.IntFunction; -import java.util.function.Supplier; - -import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; - -/** - * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} - * and outputs them to a new column. - */ -public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { - /** - * Minimum number of documents for which it is more efficient to use a - * sequential stored field reader when reading stored fields. - *

- * The sequential stored field reader decompresses a whole block of docs - * at a time so for very short lists it won't be faster to use it. We use - * {@code 10} documents as the boundary for "very short" because it's what - * search does, not because we've done extensive testing on the number. - *

- */ - static final int SEQUENTIAL_BOUNDARY = 10; - - /** - * Creates a factory for {@link ValuesSourceReaderOperator}. - * @param fields fields to load - * @param shardContexts per-shard loading information - * @param docChannel the channel containing the shard, leaf/segment and doc id - */ - public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { - @Override - public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); - } - - @Override - public String describe() { - StringBuilder sb = new StringBuilder(); - sb.append("ValuesSourceReaderOperator[fields = ["); - if (fields.size() < 10) { - boolean first = true; - for (FieldInfo f : fields) { - if (first) { - first = false; - } else { - sb.append(", "); - } - sb.append(f.name); - } - } else { - sb.append(fields.size()).append(" fields"); - } - return sb.append("]]").toString(); - } - } - - /** - * Configuration for a field to load. - * - * {@code blockLoader} maps shard index to the {@link BlockLoader}s - * which load the actual blocks. - */ - public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} - - public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} - - private final FieldWork[] fields; - private final List shardContexts; - private final int docChannel; - private final BlockFactory blockFactory; - - private final Map readersBuilt = new TreeMap<>(); - private long valuesLoaded; - - int lastShard = -1; - int lastSegment = -1; - - /** - * Creates a new extractor - * @param fields fields to load - * @param docChannel the channel containing the shard, leaf/segment and doc id - */ - public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { - this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new); - this.shardContexts = shardContexts; - this.docChannel = docChannel; - this.blockFactory = blockFactory; - } - - @Override - protected Page process(Page page) { - DocVector docVector = page.getBlock(docChannel).asVector(); - - Block[] blocks = new Block[fields.length]; - boolean success = false; - try { - if (docVector.singleSegmentNonDecreasing()) { - IntVector docs = docVector.docs(); - int shard = docVector.shards().getInt(0); - int segment = docVector.segments().getInt(0); - loadFromSingleLeaf(blocks, shard, segment, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } - - @Override - public int get(int i) { - return docs.getInt(i); - } - }); - } else if (docVector.singleSegment()) { - loadFromSingleLeafUnsorted(blocks, docVector); - } else { - try (LoadFromMany many = new LoadFromMany(blocks, docVector)) { - many.run(); - } - } - success = true; - for (Block b : blocks) { - valuesLoaded += b.getTotalValueCount(); - } - return page.appendBlocks(blocks); - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - if (success == false) { - Releasables.closeExpectNoException(blocks); - } - } - } - - private void positionFieldWork(int shard, int segment, int firstDoc) { - if (lastShard == shard) { - if (lastSegment == segment) { - for (FieldWork w : fields) { - w.sameSegment(firstDoc); - } - return; - } - lastSegment = segment; - for (FieldWork w : fields) { - w.sameShardNewSegment(); - } - return; - } - lastShard = shard; - lastSegment = segment; - for (FieldWork w : fields) { - w.newShard(shard); - } - } - - private boolean positionFieldWorkDocGuarteedAscending(int shard, int segment) { - if (lastShard == shard) { - if (lastSegment == segment) { - return false; - } - lastSegment = segment; - for (FieldWork w : fields) { - w.sameShardNewSegment(); - } - return true; - } - lastShard = shard; - lastSegment = segment; - for (FieldWork w : fields) { - w.newShard(shard); - } - return true; - } - - private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoader.Docs docs) throws IOException { - int firstDoc = docs.get(0); - positionFieldWork(shard, segment, firstDoc); - StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; - List rowStrideReaders = new ArrayList<>(fields.length); - LeafReaderContext ctx = ctx(shard, segment); - try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.count())) { - for (int f = 0; f < fields.length; f++) { - FieldWork field = fields[f]; - BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx); - if (columnAtATime != null) { - blocks[f] = (Block) columnAtATime.read(loaderBlockFactory, docs); - sanityCheckBlock(columnAtATime, docs.count(), blocks[f], f); - } else { - rowStrideReaders.add( - new RowStrideReaderWork( - field.rowStride(ctx), - (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()), - field.loader, - f - ) - ); - storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); - } - } - - SourceLoader sourceLoader = null; - ShardContext shardContext = shardContexts.get(shard); - if (storedFieldsSpec.requiresSource()) { - sourceLoader = shardContext.newSourceLoader.get(); - storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); - } - - if (rowStrideReaders.isEmpty()) { - return; - } - if (storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { - throw new IllegalStateException( - "found row stride readers [" + rowStrideReaders + "] without stored fields [" + storedFieldsSpec + "]" - ); - } - StoredFieldLoader storedFieldLoader; - if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) { - storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec); - trackStoredFields(storedFieldsSpec, true); - } else { - storedFieldLoader = StoredFieldLoader.fromSpec(storedFieldsSpec); - trackStoredFields(storedFieldsSpec, false); - } - BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader( - storedFieldLoader.getLoader(ctx, null), - sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null - ); - for (int p = 0; p < docs.count(); p++) { - int doc = docs.get(p); - storedFields.advanceTo(doc); - for (RowStrideReaderWork work : rowStrideReaders) { - work.read(doc, storedFields); - } - } - for (RowStrideReaderWork work : rowStrideReaders) { - blocks[work.offset] = work.build(); - sanityCheckBlock(work.reader, docs.count(), blocks[work.offset], work.offset); - } - } finally { - Releasables.close(rowStrideReaders); - } - } - - private void loadFromSingleLeafUnsorted(Block[] blocks, DocVector docVector) throws IOException { - IntVector docs = docVector.docs(); - int[] forwards = docVector.shardSegmentDocMapForwards(); - int shard = docVector.shards().getInt(0); - int segment = docVector.segments().getInt(0); - loadFromSingleLeaf(blocks, shard, segment, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } - - @Override - public int get(int i) { - return docs.getInt(forwards[i]); - } - }); - final int[] backwards = docVector.shardSegmentDocMapBackwards(); - for (int i = 0; i < blocks.length; i++) { - Block in = blocks[i]; - blocks[i] = in.filter(backwards); - in.close(); - } - } - - private class LoadFromMany implements Releasable { - private final Block[] target; - private final IntVector shards; - private final IntVector segments; - private final IntVector docs; - private final int[] forwards; - private final int[] backwards; - private final Block.Builder[][] builders; - private final BlockLoader[][] converters; - private final Block.Builder[] fieldTypeBuilders; - private final BlockLoader.RowStrideReader[] rowStride; - - BlockLoaderStoredFieldsFromLeafLoader storedFields; - - LoadFromMany(Block[] target, DocVector docVector) { - this.target = target; - shards = docVector.shards(); - segments = docVector.segments(); - docs = docVector.docs(); - forwards = docVector.shardSegmentDocMapForwards(); - backwards = docVector.shardSegmentDocMapBackwards(); - fieldTypeBuilders = new Block.Builder[target.length]; - builders = new Block.Builder[target.length][shardContexts.size()]; - converters = new BlockLoader[target.length][shardContexts.size()]; - rowStride = new BlockLoader.RowStrideReader[target.length]; - } - - void run() throws IOException { - for (int f = 0; f < fields.length; f++) { - /* - * Important note: each field has a desired type, which might not match the mapped type (in the case of union-types). - * We create the final block builders using the desired type, one for each field, but then also use inner builders - * (one for each field and shard), and converters (again one for each field and shard) to actually perform the field - * loading in a way that is correct for the mapped field type, and then convert between that type and the desired type. - */ - fieldTypeBuilders[f] = fields[f].info.type.newBlockBuilder(docs.getPositionCount(), blockFactory); - builders[f] = new Block.Builder[shardContexts.size()]; - converters[f] = new BlockLoader[shardContexts.size()]; - } - try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.getPositionCount())) { - int p = forwards[0]; - int shard = shards.getInt(p); - int segment = segments.getInt(p); - int firstDoc = docs.getInt(p); - positionFieldWork(shard, segment, firstDoc); - LeafReaderContext ctx = ctx(shard, segment); - fieldsMoved(ctx, shard); - verifyBuilders(loaderBlockFactory, shard); - read(firstDoc, shard); - for (int i = 1; i < forwards.length; i++) { - p = forwards[i]; - shard = shards.getInt(p); - segment = segments.getInt(p); - boolean changedSegment = positionFieldWorkDocGuarteedAscending(shard, segment); - if (changedSegment) { - ctx = ctx(shard, segment); - fieldsMoved(ctx, shard); - } - verifyBuilders(loaderBlockFactory, shard); - read(docs.getInt(p), shard); - } - } - for (int f = 0; f < target.length; f++) { - for (int s = 0; s < shardContexts.size(); s++) { - if (builders[f][s] != null) { - try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) { - fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount()); - } - } - } - try (Block targetBlock = fieldTypeBuilders[f].build()) { - target[f] = targetBlock.filter(backwards); - } - sanityCheckBlock(rowStride[f], docs.getPositionCount(), target[f], f); - } - } - - private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException { - StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; - for (int f = 0; f < fields.length; f++) { - FieldWork field = fields[f]; - rowStride[f] = field.rowStride(ctx); - storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); - } - SourceLoader sourceLoader = null; - if (storedFieldsSpec.requiresSource()) { - sourceLoader = shardContexts.get(shard).newSourceLoader.get(); - storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); - } - storedFields = new BlockLoaderStoredFieldsFromLeafLoader( - StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null), - sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null - ); - if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { - trackStoredFields(storedFieldsSpec, false); - } - } - - private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) { - for (int f = 0; f < fields.length; f++) { - if (builders[f][shard] == null) { - // Note that this relies on field.newShard() to set the loader and converter correctly for the current shard - builders[f][shard] = (Block.Builder) fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount()); - converters[f][shard] = fields[f].loader; - } - } - } - - private void read(int doc, int shard) throws IOException { - storedFields.advanceTo(doc); - for (int f = 0; f < builders.length; f++) { - rowStride[f].read(doc, storedFields, builders[f][shard]); - } - } - - @Override - public void close() { - Releasables.closeExpectNoException(fieldTypeBuilders); - for (int f = 0; f < fields.length; f++) { - Releasables.closeExpectNoException(builders[f]); - } - } - } - - /** - * Is it more efficient to use a sequential stored field reader - * when reading stored fields for the documents contained in {@code docIds}? - */ - private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) { - int count = docs.count(); - if (count < SEQUENTIAL_BOUNDARY) { - return false; - } - int range = docs.get(count - 1) - docs.get(0); - return range * storedFieldsSequentialProportion <= count; - } - - private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { - readersBuilt.merge( - "stored_fields[" - + "requires_source:" - + spec.requiresSource() - + ", fields:" - + spec.requiredStoredFields().size() - + ", sequential: " - + sequential - + "]", - 1, - (prev, one) -> prev + one - ); - } - - private class FieldWork { - final FieldInfo info; - - BlockLoader loader; - BlockLoader.ColumnAtATimeReader columnAtATime; - BlockLoader.RowStrideReader rowStride; - - FieldWork(FieldInfo info) { - this.info = info; - } - - void sameSegment(int firstDoc) { - if (columnAtATime != null && columnAtATime.canReuse(firstDoc) == false) { - columnAtATime = null; - } - if (rowStride != null && rowStride.canReuse(firstDoc) == false) { - rowStride = null; - } - } - - void sameShardNewSegment() { - columnAtATime = null; - rowStride = null; - } - - void newShard(int shard) { - loader = info.blockLoader.apply(shard); - columnAtATime = null; - rowStride = null; - } - - BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException { - if (columnAtATime == null) { - columnAtATime = loader.columnAtATimeReader(ctx); - trackReader("column_at_a_time", this.columnAtATime); - } - return columnAtATime; - } - - BlockLoader.RowStrideReader rowStride(LeafReaderContext ctx) throws IOException { - if (rowStride == null) { - rowStride = loader.rowStrideReader(ctx); - trackReader("row_stride", this.rowStride); - } - return rowStride; - } - - private void trackReader(String type, BlockLoader.Reader reader) { - readersBuilt.merge(info.name + ":" + type + ":" + reader, 1, (prev, one) -> prev + one); - } - } - - private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset) - implements - Releasable { - void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException { - reader.read(doc, storedFields, builder); - } - - Block build() { - return (Block) loader.convert(builder.build()); - } - - @Override - public void close() { - builder.close(); - } - } - - private LeafReaderContext ctx(int shard, int segment) { - return shardContexts.get(shard).reader.leaves().get(segment); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("ValuesSourceReaderOperator[fields = ["); - if (fields.length < 10) { - boolean first = true; - for (FieldWork f : fields) { - if (first) { - first = false; - } else { - sb.append(", "); - } - sb.append(f.info.name); - } - } else { - sb.append(fields.length).append(" fields"); - } - return sb.append("]]").toString(); - } - - @Override - protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { - return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); - } - - /** - * Quick checks for on the loaded block to make sure it looks reasonable. - * @param loader the object that did the loading - we use it to make error messages if the block is busted - * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has - * @param block the block to sanity check - * @param field offset into the {@link #fields} array for the block being loaded - */ - private void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { - if (block.getPositionCount() != expectedPositions) { - throw new IllegalStateException( - sanityCheckBlockErrorPrefix(loader, block, field) - + " has [" - + block.getPositionCount() - + "] positions instead of [" - + expectedPositions - + "]" - ); - } - if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { - throw new IllegalStateException( - sanityCheckBlockErrorPrefix(loader, block, field) - + "'s element_type [" - + block.elementType() - + "] NOT IN (NULL, " - + fields[field].info.type - + ")" - ); - } - } - - private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { - return fields[field].info.name + "[" + loader + "]: " + block; - } - - public static class Status extends AbstractPageMappingOperator.Status { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( - Operator.Status.class, - "values_source_reader", - Status::new - ); - - private final Map readersBuilt; - private final long valuesLoaded; - - Status( - Map readersBuilt, - long processNanos, - int pagesProcessed, - long rowsReceived, - long rowsEmitted, - long valuesLoaded - ) { - super(processNanos, pagesProcessed, rowsReceived, rowsEmitted); - this.readersBuilt = readersBuilt; - this.valuesLoaded = valuesLoaded; - } - - Status(StreamInput in) throws IOException { - super(in); - readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); - valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(readersBuilt, StreamOutput::writeVInt); - if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { - out.writeVLong(valuesLoaded); - } - } - - @Override - public String getWriteableName() { - return ENTRY.name; - } - - public Map readersBuilt() { - return readersBuilt; - } - - @Override - public long valuesLoaded() { - return valuesLoaded; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startObject("readers_built"); - for (Map.Entry e : readersBuilt.entrySet()) { - builder.field(e.getKey(), e.getValue()); - } - builder.endObject(); - builder.field("values_loaded", valuesLoaded); - innerToXContent(builder); - return builder.endObject(); - } - - @Override - public boolean equals(Object o) { - if (super.equals(o) == false) return false; - Status status = (Status) o; - return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); - } - - @Override - public String toString() { - return Strings.toString(this); - } - } - - private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable { - private final BlockFactory factory; - private final int pageSize; - private Block nullBlock; - - private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) { - this.factory = factory; - this.pageSize = pageSize; - } - - @Override - public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { - return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.BooleanBuilder booleans(int expectedCount) { - return factory.newBooleanBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) { - return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); - } - - @Override - public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) { - return factory.newBytesRefBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) { - return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.DoubleBuilder doubles(int expectedCount) { - return factory.newDoubleBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) { - return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions); - } - - @Override - public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) { - return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.IntBuilder ints(int expectedCount) { - return factory.newIntBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) { - return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.LongBuilder longs(int expectedCount) { - return factory.newLongBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.Builder nulls(int expectedCount) { - return ElementType.NULL.newBlockBuilder(expectedCount, factory); - } - - @Override - public Block constantNulls() { - if (nullBlock == null) { - nullBlock = factory.newConstantNullBlock(pageSize); - } - nullBlock.incRef(); - return nullBlock; - } - - @Override - public void close() { - if (nullBlock != null) { - nullBlock.close(); - } - } - - @Override - public BytesRefBlock constantBytes(BytesRef value) { - return factory.newConstantBytesRefBlockWith(value, pageSize); - } - - @Override - public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { - return new SingletonOrdinalsBuilder(factory, ordinals, count); - } - - @Override - public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) { - return factory.newAggregateMetricDoubleBlockBuilder(count); - } - } -} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java new file mode 100644 index 0000000000000..f7f5f541c747f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.core.Releasable; + +class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable { + private final int pageSize; + private Block nullBlock; + + ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) { + super(factory); + this.pageSize = pageSize; + } + + @Override + public Block constantNulls() { + if (nullBlock == null) { + nullBlock = factory.newConstantNullBlock(pageSize); + } + nullBlock.incRef(); + return nullBlock; + } + + @Override + public void close() { + if (nullBlock != null) { + nullBlock.close(); + } + } + + @Override + public BytesRefBlock constantBytes(BytesRef value) { + return factory.newConstantBytesRefBlockWith(value, pageSize); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java new file mode 100644 index 0000000000000..8dc5b6cc43ecf --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.SortedDocValues; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.SingletonOrdinalsBuilder; +import org.elasticsearch.index.mapper.BlockLoader; + +public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory { + protected final BlockFactory factory; + + protected DelegatingBlockLoaderFactory(BlockFactory factory) { + this.factory = factory; + } + + @Override + public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { + return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.BooleanBuilder booleans(int expectedCount) { + return factory.newBooleanBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) { + return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); + } + + @Override + public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) { + return factory.newBytesRefBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) { + return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.DoubleBuilder doubles(int expectedCount) { + return factory.newDoubleBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) { + return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions); + } + + @Override + public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) { + return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.IntBuilder ints(int expectedCount) { + return factory.newIntBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) { + return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.LongBuilder longs(int expectedCount) { + return factory.newLongBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.Builder nulls(int expectedCount) { + return ElementType.NULL.newBlockBuilder(expectedCount, factory); + } + + @Override + public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { + return new SingletonOrdinalsBuilder(factory, ordinals, count); + } + + @Override + public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) { + return factory.newAggregateMetricDoubleBlockBuilder(count); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java new file mode 100644 index 0000000000000..7ff6e7211b7f2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; + +/** + * Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}. + */ +class ValuesFromManyReader extends ValuesReader { + private final int[] forwards; + private final int[] backwards; + private final BlockLoader.RowStrideReader[] rowStride; + + private BlockLoaderStoredFieldsFromLeafLoader storedFields; + + ValuesFromManyReader(ValuesSourceReaderOperator operator, DocVector docs) { + super(operator, docs); + forwards = docs.shardSegmentDocMapForwards(); + backwards = docs.shardSegmentDocMapBackwards(); + rowStride = new BlockLoader.RowStrideReader[operator.fields.length]; + } + + @Override + protected void load(Block[] target, int offset) throws IOException { + try (Run run = new Run(target)) { + run.run(offset); + } + } + + class Run implements Releasable { + private final Block[] target; + private final Block.Builder[][] builders; + private final BlockLoader[][] converters; + private final Block.Builder[] fieldTypeBuilders; + + Run(Block[] target) { + this.target = target; + fieldTypeBuilders = new Block.Builder[target.length]; + builders = new Block.Builder[target.length][operator.shardContexts.size()]; + converters = new BlockLoader[target.length][operator.shardContexts.size()]; + } + + void run(int offset) throws IOException { + assert offset == 0; // TODO allow non-0 offset to support splitting pages + for (int f = 0; f < operator.fields.length; f++) { + /* + * Important note: each field has a desired type, which might not match the mapped type (in the case of union-types). + * We create the final block builders using the desired type, one for each field, but then also use inner builders + * (one for each field and shard), and converters (again one for each field and shard) to actually perform the field + * loading in a way that is correct for the mapped field type, and then convert between that type and the desired type. + */ + fieldTypeBuilders[f] = operator.fields[f].info.type().newBlockBuilder(docs.getPositionCount(), operator.blockFactory); + builders[f] = new Block.Builder[operator.shardContexts.size()]; + converters[f] = new BlockLoader[operator.shardContexts.size()]; + } + try ( + ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount()) + ) { + int p = forwards[offset]; + int shard = docs.shards().getInt(p); + int segment = docs.segments().getInt(p); + int firstDoc = docs.docs().getInt(p); + operator.positionFieldWork(shard, segment, firstDoc); + LeafReaderContext ctx = operator.ctx(shard, segment); + fieldsMoved(ctx, shard); + verifyBuilders(loaderBlockFactory, shard); + read(firstDoc, shard); + + int i = offset + 1; + while (i < forwards.length) { + p = forwards[i]; + shard = docs.shards().getInt(p); + segment = docs.segments().getInt(p); + boolean changedSegment = operator.positionFieldWorkDocGuaranteedAscending(shard, segment); + if (changedSegment) { + ctx = operator.ctx(shard, segment); + fieldsMoved(ctx, shard); + } + verifyBuilders(loaderBlockFactory, shard); + read(docs.docs().getInt(p), shard); + i++; + } + buildBlocks(); + } + } + + private void buildBlocks() { + for (int f = 0; f < target.length; f++) { + for (int s = 0; s < operator.shardContexts.size(); s++) { + if (builders[f][s] != null) { + try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) { + fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount()); + } + } + } + try (Block targetBlock = fieldTypeBuilders[f].build()) { + target[f] = targetBlock.filter(backwards); + } + operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f); + } + } + + private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) { + for (int f = 0; f < operator.fields.length; f++) { + if (builders[f][shard] == null) { + // Note that this relies on field.newShard() to set the loader and converter correctly for the current shard + builders[f][shard] = (Block.Builder) operator.fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount()); + converters[f][shard] = operator.fields[f].loader; + } + } + } + + private void read(int doc, int shard) throws IOException { + storedFields.advanceTo(doc); + for (int f = 0; f < builders.length; f++) { + rowStride[f].read(doc, storedFields, builders[f][shard]); + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(fieldTypeBuilders); + for (int f = 0; f < operator.fields.length; f++) { + Releasables.closeExpectNoException(builders[f]); + } + } + } + + private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException { + StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; + for (int f = 0; f < operator.fields.length; f++) { + ValuesSourceReaderOperator.FieldWork field = operator.fields[f]; + rowStride[f] = field.rowStride(ctx); + storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); + } + SourceLoader sourceLoader = null; + if (storedFieldsSpec.requiresSource()) { + sourceLoader = operator.shardContexts.get(shard).newSourceLoader().get(); + storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); + } + storedFields = new BlockLoaderStoredFieldsFromLeafLoader( + StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null), + sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null + ); + if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { + operator.trackStoredFields(storedFieldsSpec, false); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java new file mode 100644 index 0000000000000..3ac6565d21c33 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Loads values from a single leaf. Much more efficient than {@link ValuesFromManyReader}. + */ +class ValuesFromSingleReader extends ValuesReader { + /** + * Minimum number of documents for which it is more efficient to use a + * sequential stored field reader when reading stored fields. + *

+ * The sequential stored field reader decompresses a whole block of docs + * at a time so for very short lists it won't be faster to use it. We use + * {@code 10} documents as the boundary for "very short" because it's what + * search does, not because we've done extensive testing on the number. + *

+ */ + static final int SEQUENTIAL_BOUNDARY = 10; + + private final int shard; + private final int segment; + + ValuesFromSingleReader(ValuesSourceReaderOperator operator, DocVector docs) { + super(operator, docs); + this.shard = docs.shards().getInt(0); + this.segment = docs.segments().getInt(0); + } + + @Override + protected void load(Block[] target, int offset) throws IOException { + assert offset == 0; // TODO allow non-0 offset to support splitting pages + if (docs.singleSegmentNonDecreasing()) { + loadFromSingleLeaf(target, new BlockLoader.Docs() { + @Override + public int count() { + return docs.getPositionCount(); + } + + @Override + public int get(int i) { + return docs.docs().getInt(i); + } + }); + return; + } + int[] forwards = docs.shardSegmentDocMapForwards(); + loadFromSingleLeaf(target, new BlockLoader.Docs() { + @Override + public int count() { + return docs.getPositionCount(); + } + + @Override + public int get(int i) { + return docs.docs().getInt(forwards[i]); + } + }); + final int[] backwards = docs.shardSegmentDocMapBackwards(); + for (int i = 0; i < target.length; i++) { + try (Block in = target[i]) { + target[i] = in.filter(backwards); + } + } + } + + private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IOException { + int firstDoc = docs.get(0); + operator.positionFieldWork(shard, segment, firstDoc); + StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; + List rowStrideReaders = new ArrayList<>(operator.fields.length); + LeafReaderContext ctx = operator.ctx(shard, segment); + try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.count())) { + for (int f = 0; f < operator.fields.length; f++) { + ValuesSourceReaderOperator.FieldWork field = operator.fields[f]; + BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx); + if (columnAtATime != null) { + target[f] = (Block) columnAtATime.read(loaderBlockFactory, docs); + operator.sanityCheckBlock(columnAtATime, docs.count(), target[f], f); + } else { + rowStrideReaders.add( + new RowStrideReaderWork( + field.rowStride(ctx), + (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()), + field.loader, + f + ) + ); + storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); + } + } + + if (rowStrideReaders.isEmpty() == false) { + loadFromRowStrideReaders(target, storedFieldsSpec, rowStrideReaders, ctx, docs); + } + } finally { + Releasables.close(rowStrideReaders); + } + } + + private void loadFromRowStrideReaders( + Block[] target, + StoredFieldsSpec storedFieldsSpec, + List rowStrideReaders, + LeafReaderContext ctx, + BlockLoader.Docs docs + ) throws IOException { + SourceLoader sourceLoader = null; + ValuesSourceReaderOperator.ShardContext shardContext = operator.shardContexts.get(shard); + if (storedFieldsSpec.requiresSource()) { + sourceLoader = shardContext.newSourceLoader().get(); + storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); + } + if (storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { + throw new IllegalStateException( + "found row stride readers [" + rowStrideReaders + "] without stored fields [" + storedFieldsSpec + "]" + ); + } + StoredFieldLoader storedFieldLoader; + if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) { + storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec); + operator.trackStoredFields(storedFieldsSpec, true); + } else { + storedFieldLoader = StoredFieldLoader.fromSpec(storedFieldsSpec); + operator.trackStoredFields(storedFieldsSpec, false); + } + BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader( + storedFieldLoader.getLoader(ctx, null), + sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null + ); + int p = 0; + while (p < docs.count()) { + int doc = docs.get(p++); + storedFields.advanceTo(doc); + for (RowStrideReaderWork work : rowStrideReaders) { + work.read(doc, storedFields); + } + } + for (RowStrideReaderWork work : rowStrideReaders) { + target[work.offset] = work.build(); + operator.sanityCheckBlock(work.reader, p, target[work.offset], work.offset); + } + } + + /** + * Is it more efficient to use a sequential stored field reader + * when reading stored fields for the documents contained in {@code docIds}? + */ + private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) { + int count = docs.count(); + if (count < SEQUENTIAL_BOUNDARY) { + return false; + } + int range = docs.get(count - 1) - docs.get(0); + return range * storedFieldsSequentialProportion <= count; + } + + private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset) + implements + Releasable { + void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException { + reader.read(doc, storedFields, builder); + } + + Block build() { + return (Block) loader.convert(builder.build()); + } + + @Override + public void close() { + builder.close(); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java new file mode 100644 index 0000000000000..d3b8b0edcec3d --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +import java.io.IOException; +import java.io.UncheckedIOException; + +public abstract class ValuesReader implements ReleasableIterator { + protected final ValuesSourceReaderOperator operator; + protected final DocVector docs; + private int offset; + + ValuesReader(ValuesSourceReaderOperator operator, DocVector docs) { + this.operator = operator; + this.docs = docs; + } + + @Override + public boolean hasNext() { + return offset < docs.getPositionCount(); + } + + @Override + public Block[] next() { + Block[] target = new Block[operator.fields.length]; + boolean success = false; + try { + load(target, offset); + success = true; + for (Block b : target) { + operator.valuesLoaded += b.getTotalValueCount(); + } + offset += target[0].getPositionCount(); + return target; + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (success == false) { + Releasables.closeExpectNoException(target); + } + } + } + + protected abstract void load(Block[] target, int offset) throws IOException; + + @Override + public void close() {} +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java new file mode 100644 index 0000000000000..2fd4784224087 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DocBlock; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.IntFunction; +import java.util.function.Supplier; + +/** + * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} + * and outputs them to a new column. + */ +public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOperator { + /** + * Creates a factory for {@link ValuesSourceReaderOperator}. + * @param fields fields to load + * @param shardContexts per-shard loading information + * @param docChannel the channel containing the shard, leaf/segment and doc id + */ + public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { + public Factory { + if (fields.isEmpty()) { + throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields"); + } + } + + @Override + public Operator get(DriverContext driverContext) { + return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); + } + + @Override + public String describe() { + StringBuilder sb = new StringBuilder(); + sb.append("ValuesSourceReaderOperator[fields = ["); + if (fields.size() < 10) { + boolean first = true; + for (FieldInfo f : fields) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(f.name); + } + } else { + sb.append(fields.size()).append(" fields"); + } + return sb.append("]]").toString(); + } + } + + /** + * Configuration for a field to load. + * + * {@code blockLoader} maps shard index to the {@link BlockLoader}s + * which load the actual blocks. + */ + public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} + + public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} + + final FieldWork[] fields; + final List shardContexts; + private final int docChannel; + final BlockFactory blockFactory; + + private final Map readersBuilt = new TreeMap<>(); + long valuesLoaded; + + private int lastShard = -1; + private int lastSegment = -1; + + /** + * Creates a new extractor + * @param fields fields to load + * @param docChannel the channel containing the shard, leaf/segment and doc id + */ + public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { + if (fields.isEmpty()) { + throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields"); + } + this.fields = fields.stream().map(FieldWork::new).toArray(FieldWork[]::new); + this.shardContexts = shardContexts; + this.docChannel = docChannel; + this.blockFactory = blockFactory; + } + + @Override + protected ReleasableIterator receive(Page page) { + DocVector docVector = page.getBlock(docChannel).asVector(); + return appendBlockArrays( + page, + docVector.singleSegment() ? new ValuesFromSingleReader(this, docVector) : new ValuesFromManyReader(this, docVector) + ); + } + + void positionFieldWork(int shard, int segment, int firstDoc) { + if (lastShard == shard) { + if (lastSegment == segment) { + for (FieldWork w : fields) { + w.sameSegment(firstDoc); + } + return; + } + lastSegment = segment; + for (FieldWork w : fields) { + w.sameShardNewSegment(); + } + return; + } + lastShard = shard; + lastSegment = segment; + for (FieldWork w : fields) { + w.newShard(shard); + } + } + + boolean positionFieldWorkDocGuaranteedAscending(int shard, int segment) { + if (lastShard == shard) { + if (lastSegment == segment) { + return false; + } + lastSegment = segment; + for (FieldWork w : fields) { + w.sameShardNewSegment(); + } + return true; + } + lastShard = shard; + lastSegment = segment; + for (FieldWork w : fields) { + w.newShard(shard); + } + return true; + } + + void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { + readersBuilt.merge( + "stored_fields[" + + "requires_source:" + + spec.requiresSource() + + ", fields:" + + spec.requiredStoredFields().size() + + ", sequential: " + + sequential + + "]", + 1, + (prev, one) -> prev + one + ); + } + + protected class FieldWork { + final FieldInfo info; + + BlockLoader loader; + BlockLoader.ColumnAtATimeReader columnAtATime; + BlockLoader.RowStrideReader rowStride; + + FieldWork(FieldInfo info) { + this.info = info; + } + + void sameSegment(int firstDoc) { + if (columnAtATime != null && columnAtATime.canReuse(firstDoc) == false) { + columnAtATime = null; + } + if (rowStride != null && rowStride.canReuse(firstDoc) == false) { + rowStride = null; + } + } + + void sameShardNewSegment() { + columnAtATime = null; + rowStride = null; + } + + void newShard(int shard) { + loader = info.blockLoader.apply(shard); + columnAtATime = null; + rowStride = null; + } + + BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException { + if (columnAtATime == null) { + columnAtATime = loader.columnAtATimeReader(ctx); + trackReader("column_at_a_time", this.columnAtATime); + } + return columnAtATime; + } + + BlockLoader.RowStrideReader rowStride(LeafReaderContext ctx) throws IOException { + if (rowStride == null) { + rowStride = loader.rowStrideReader(ctx); + trackReader("row_stride", this.rowStride); + } + return rowStride; + } + + private void trackReader(String type, BlockLoader.Reader reader) { + readersBuilt.merge(info.name + ":" + type + ":" + reader, 1, (prev, one) -> prev + one); + } + } + + LeafReaderContext ctx(int shard, int segment) { + return shardContexts.get(shard).reader().leaves().get(segment); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ValuesSourceReaderOperator[fields = ["); + if (fields.length < 10) { + boolean first = true; + for (FieldWork f : fields) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(f.info.name); + } + } else { + sb.append(fields.length).append(" fields"); + } + return sb.append("]]").toString(); + } + + @Override + protected ValuesSourceReaderOperatorStatus status( + long processNanos, + int pagesReceived, + int pagesEmitted, + long rowsReceived, + long rowsEmitted + ) { + return new ValuesSourceReaderOperatorStatus( + new TreeMap<>(readersBuilt), + processNanos, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted, + valuesLoaded + ); + } + + /** + * Quick checks for on the loaded block to make sure it looks reasonable. + * @param loader the object that did the loading - we use it to make error messages if the block is busted + * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has + * @param block the block to sanity check + * @param field offset into the {@link #fields} array for the block being loaded + */ + void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { + if (block.getPositionCount() != expectedPositions) { + throw new IllegalStateException( + sanityCheckBlockErrorPrefix(loader, block, field) + + " has [" + + block.getPositionCount() + + "] positions instead of [" + + expectedPositions + + "]" + ); + } + if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { + throw new IllegalStateException( + sanityCheckBlockErrorPrefix(loader, block, field) + + "'s element_type [" + + block.elementType() + + "] NOT IN (NULL, " + + fields[field].info.type + + ")" + ); + } + } + + private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { + return fields[field].info.name + "[" + loader + "]: " + block; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java new file mode 100644 index 0000000000000..be62487d06307 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.operator.AbstractPageMappingOperator; +import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; +import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES_8_19; + +public class ValuesSourceReaderOperatorStatus extends AbstractPageMappingToIteratorOperator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "values_source_reader", + ValuesSourceReaderOperatorStatus::readFrom + ); + + private final Map readersBuilt; + private final long valuesLoaded; + + public ValuesSourceReaderOperatorStatus( + Map readersBuilt, + long processNanos, + int pagesReceived, + int pagesEmitted, + long rowsReceived, + long rowsEmitted, + long valuesLoaded + ) { + super(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); + this.readersBuilt = readersBuilt; + this.valuesLoaded = valuesLoaded; + } + + static ValuesSourceReaderOperatorStatus readFrom(StreamInput in) throws IOException { + long processNanos; + int pagesReceived; + int pagesEmitted; + long rowsReceived; + long rowsEmitted; + if (supportsSplitOnBigValues(in.getTransportVersion())) { + AbstractPageMappingToIteratorOperator.Status status = new AbstractPageMappingToIteratorOperator.Status(in); + processNanos = status.processNanos(); + pagesReceived = status.pagesReceived(); + pagesEmitted = status.pagesEmitted(); + rowsReceived = status.rowsReceived(); + rowsEmitted = status.rowsEmitted(); + } else { + AbstractPageMappingOperator.Status status = new AbstractPageMappingOperator.Status(in); + processNanos = status.processNanos(); + pagesReceived = status.pagesProcessed(); + pagesEmitted = status.pagesProcessed(); + rowsReceived = status.rowsReceived(); + rowsEmitted = status.rowsEmitted(); + } + Map readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); + long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0; + return new ValuesSourceReaderOperatorStatus( + readersBuilt, + processNanos, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted, + valuesLoaded + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (supportsSplitOnBigValues(out.getTransportVersion())) { + super.writeTo(out); + } else { + /* + * Before we knew how to split pages when reading large values + * our status just contained one int per page - just like AbstractPageMappingOperator.Status. + */ + new AbstractPageMappingOperator.Status(processNanos(), pagesEmitted(), rowsReceived(), rowsEmitted()).writeTo(out); + } + out.writeMap(readersBuilt, StreamOutput::writeVInt); + if (supportsValuesLoaded(out.getTransportVersion())) { + out.writeVLong(valuesLoaded); + } + } + + private static boolean supportsSplitOnBigValues(TransportVersion version) { + return version.onOrAfter(ESQL_SPLIT_ON_BIG_VALUES_8_19); + } + + private static boolean supportsValuesLoaded(TransportVersion version) { + return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + public Map readersBuilt() { + return readersBuilt; + } + + @Override + public long valuesLoaded() { + return valuesLoaded; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("readers_built"); + for (Map.Entry e : readersBuilt.entrySet()) { + builder.field(e.getKey(), e.getValue()); + } + builder.endObject(); + builder.field("values_loaded", valuesLoaded); + innerToXContent(builder); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o) == false) return false; + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) o; + return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java index 09d04d36f8313..bc83284611cb1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java @@ -127,7 +127,7 @@ public Status(long processNanos, int pagesProcessed, long rowsReceived, long row this.rowsEmitted = rowsEmitted; } - protected Status(StreamInput in) throws IOException { + public Status(StreamInput in) throws IOException { processNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0; pagesProcessed = in.readVInt(); if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java index 6a165fdfa055b..055359c6a389a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java @@ -64,13 +64,38 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator */ protected abstract ReleasableIterator receive(Page page); + /** + * Append an {@link Iterator} of arrays of {@link Block}s to a + * {@link Page}, one after the other. It's required that the + * iterator emit as many positions as there were + * in the page. + */ + public static ReleasableIterator appendBlockArrays(Page page, ReleasableIterator toAdd) { + return new AppendBlocksIterator(page, toAdd); + } + /** * Append an {@link Iterator} of {@link Block}s to a {@link Page}, one * after the other. It's required that the iterator emit as many * positions as there were in the page. */ public static ReleasableIterator appendBlocks(Page page, ReleasableIterator toAdd) { - return new AppendBlocksIterator(page, toAdd); + return appendBlockArrays(page, new ReleasableIterator<>() { + @Override + public boolean hasNext() { + return toAdd.hasNext(); + } + + @Override + public Block[] next() { + return new Block[] { toAdd.next() }; + } + + @Override + public void close() { + toAdd.close(); + } + }); } @Override @@ -86,13 +111,24 @@ public final void addInput(Page page) { if (next != null) { assert next.hasNext() == false : "has pending input page"; next.close(); + next = null; } if (page.getPositionCount() == 0) { return; } - next = new RuntimeTrackingIterator(receive(page)); - pagesReceived++; - rowsReceived += page.getPositionCount(); + try { + next = new RuntimeTrackingIterator(receive(page)); + pagesReceived++; + rowsReceived += page.getPositionCount(); + } finally { + if (next == null) { + /* + * The `receive` operation failed, we need to release the incoming page + * because it's no longer owned by anyone. + */ + page.releaseBlocks(); + } + } } @Override @@ -183,7 +219,7 @@ public Status(long processNanos, int pagesProcessed, int pagesEmitted, long rows this.rowsEmitted = rowsEmitted; } - protected Status(StreamInput in) throws IOException { + public Status(StreamInput in) throws IOException { processNanos = in.readVLong(); pagesReceived = in.readVInt(); pagesEmitted = in.readVInt(); @@ -284,11 +320,11 @@ public TransportVersion getMinimalSupportedVersion() { private static class AppendBlocksIterator implements ReleasableIterator { private final Page page; - private final ReleasableIterator next; + private final ReleasableIterator next; private int positionOffset; - protected AppendBlocksIterator(Page page, ReleasableIterator next) { + protected AppendBlocksIterator(Page page, ReleasableIterator next) { this.page = page; this.next = next; } @@ -305,17 +341,17 @@ public final boolean hasNext() { @Override public final Page next() { - Block read = next.next(); + Block[] read = next.next(); int start = positionOffset; - positionOffset += read.getPositionCount(); - if (start == 0 && read.getPositionCount() == page.getPositionCount()) { + positionOffset += read[0].getPositionCount(); + if (start == 0 && read[0].getPositionCount() == page.getPositionCount()) { for (int b = 0; b < page.getBlockCount(); b++) { page.getBlock(b).incRef(); } - return page.appendBlock(read); + return page.appendBlocks(read); } - Block[] newBlocks = new Block[page.getBlockCount() + 1]; - newBlocks[page.getBlockCount()] = read; + Block[] newBlocks = new Block[page.getBlockCount() + read.length]; + System.arraycopy(read, 0, newBlocks, page.getBlockCount(), read.length); try { // TODO a way to filter with a range please. int[] positions = IntStream.range(start, positionOffset).toArray(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java index 4e06c1f0f4b69..05f60c1b6834d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java @@ -14,7 +14,7 @@ /** * {@link Block#lookup Looks up} values from a provided {@link Block} and - * mergeds them into each {@link Page}. + * merged them into each {@link Page}. */ public class ColumnLoadOperator extends AbstractPageMappingToIteratorOperator { public record Values(String name, Block block) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 7cf47bc7fed1c..8d8c8c2e92306 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -31,7 +31,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.BlockLoader; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index e04032fc8ad26..9efff55fc51c8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -52,7 +52,7 @@ import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; import org.elasticsearch.compute.lucene.ShardContext; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 0462c146790fd..a4203b60eb50c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.data.Vector; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 65aec4aa6ef25..48b9a2e5bc167 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index e23febd5de212..99b7905df86b0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -52,7 +53,7 @@ public class LuceneTopNSourceOperatorScoringTests extends LuceneTopNSourceOperat private IndexReader reader; @After - private void closeIndex() throws IOException { + public void closeScoringIndex() throws IOException { IOUtils.close(reader, directory); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index 670bbda709e0a..273bdd5e67b51 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -57,7 +58,7 @@ public class LuceneTopNSourceOperatorTests extends AnyOperatorTestCase { private IndexReader reader; @After - private void closeIndex() throws IOException { + public void closeIndex() throws IOException { IOUtils.close(reader, directory); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java index bf605ecda4edb..b7d63797d7b23 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java similarity index 99% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java index 14a159b92c572..bbdae235f0f6e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoubleDocValuesField; @@ -48,6 +48,12 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; +import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.DriverRunner; @@ -628,7 +634,9 @@ private void loadSimpleAndAssert( } } for (Operator op : operators) { - assertThat(((ValuesSourceReaderOperator) op).status().pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); } assertDriverContext(driverContext); } @@ -715,8 +723,9 @@ private void testLoadAllStatus(boolean allInOnePage) { } drive(operators, input.iterator(), driverContext); for (int i = 0; i < cases.size(); i++) { - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) operators.get(i).status(); - assertThat(status.pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); FieldCase fc = cases.get(i); fc.checkReaders.check(fc.info.name(), allInOnePage, input.size(), totalSize, status.readersBuilt()); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java similarity index 60% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java index af1463b88c62c..f81398eb67695 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; @@ -18,9 +18,9 @@ import static org.hamcrest.Matchers.equalTo; -public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { - public static ValuesSourceReaderOperator.Status simple() { - return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222, 1000); +public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { + public static ValuesSourceReaderOperatorStatus simple() { + return new ValuesSourceReaderOperatorStatus(Map.of("ReaderType", 3), 1022323, 123, 200, 111, 222, 1000); } public static String simpleToJson() { @@ -32,7 +32,8 @@ public static String simpleToJson() { "values_loaded" : 1000, "process_nanos" : 1022323, "process_time" : "1ms", - "pages_processed" : 123, + "pages_received" : 123, + "pages_emitted" : 200, "rows_received" : 111, "rows_emitted" : 222 }"""; @@ -43,16 +44,17 @@ public void testToXContent() { } @Override - protected Writeable.Reader instanceReader() { - return ValuesSourceReaderOperator.Status::new; + protected Writeable.Reader instanceReader() { + return ValuesSourceReaderOperatorStatus::readFrom; } @Override - public ValuesSourceReaderOperator.Status createTestInstance() { - return new ValuesSourceReaderOperator.Status( + public ValuesSourceReaderOperatorStatus createTestInstance() { + return new ValuesSourceReaderOperatorStatus( randomReadersBuilt(), randomNonNegativeLong(), randomNonNegativeInt(), + randomNonNegativeInt(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong() @@ -69,22 +71,32 @@ private Map randomReadersBuilt() { } @Override - protected ValuesSourceReaderOperator.Status mutateInstance(ValuesSourceReaderOperator.Status instance) throws IOException { + protected ValuesSourceReaderOperatorStatus mutateInstance(ValuesSourceReaderOperatorStatus instance) throws IOException { Map readersBuilt = instance.readersBuilt(); long processNanos = instance.processNanos(); - int pagesProcessed = instance.pagesProcessed(); + int pagesReceived = instance.pagesReceived(); + int pagesEmitted = instance.pagesEmitted(); long rowsReceived = instance.rowsReceived(); long rowsEmitted = instance.rowsEmitted(); long valuesLoaded = instance.valuesLoaded(); - switch (between(0, 5)) { + switch (between(0, 6)) { case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt); case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); - case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); - case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); - case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); - case 5 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); + case 2 -> pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); + case 3 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + case 4 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 5 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 6 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); + return new ValuesSourceReaderOperatorStatus( + readersBuilt, + processNanos, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted, + valuesLoaded + ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java similarity index 91% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 44c3efc9c49d0..945c262ccec8d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoubleDocValuesField; @@ -45,6 +45,12 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; +import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -175,6 +181,10 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv } catch (IOException e) { throw new RuntimeException(e); } + return sourceOperator(context, pageSize); + } + + private SourceOperator sourceOperator(DriverContext context, int pageSize) { var luceneFactory = new LuceneSourceOperator.Factory( List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), @@ -204,6 +214,7 @@ private void initMapping() throws IOException { simpleField(b, "missing_text", "text"); b.startObject("source_text").field("type", "text").field("store", false).endObject(); b.startObject("mv_source_text").field("type", "text").field("store", false).endObject(); + b.startObject("long_source_text").field("type", "text").field("store", false).endObject(); b.startObject("stored_text").field("type", "text").field("store", true).endObject(); b.startObject("mv_stored_text").field("type", "text").field("store", true).endObject(); @@ -379,6 +390,33 @@ private IndexReader initIndex(Directory directory, int size, int commitEvery) th return DirectoryReader.open(directory); } + private IndexReader initIndexLongField(Directory directory, int size, int commitEvery) throws IOException { + try ( + IndexWriter writer = new IndexWriter( + directory, + newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE).setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH) + ) + ) { + for (int d = 0; d < size; d++) { + XContentBuilder source = JsonXContent.contentBuilder(); + source.startObject(); + source.field("long_source_text", Integer.toString(d).repeat(100 * 1024)); + source.endObject(); + ParsedDocument doc = mapperService.documentParser() + .parseDocument( + new SourceToParse("id" + d, BytesReference.bytes(source), XContentType.JSON), + mapperService.mappingLookup() + ); + writer.addDocuments(doc.docs()); + + if (d % commitEvery == commitEvery - 1) { + writer.commit(); + } + } + } + return DirectoryReader.open(directory); + } + @Override protected Matcher expectedDescriptionOfSimple() { return equalTo("ValuesSourceReaderOperator[fields = [long]]"); @@ -490,16 +528,23 @@ public void testLoadAllInOnePageShuffled() { Page source = CannedSourceOperator.mergePages( CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(100, 5000))) ); - List shuffleList = new ArrayList<>(); - IntStream.range(0, source.getPositionCount()).forEach(i -> shuffleList.add(i)); - Randomness.shuffle(shuffleList); - int[] shuffleArray = shuffleList.stream().mapToInt(Integer::intValue).toArray(); - Block[] shuffledBlocks = new Block[source.getBlockCount()]; - for (int b = 0; b < shuffledBlocks.length; b++) { - shuffledBlocks[b] = source.getBlock(b).filter(shuffleArray); - } - source = new Page(shuffledBlocks); - loadSimpleAndAssert(driverContext, List.of(source), Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED); + loadSimpleAndAssert(driverContext, List.of(shuffle(source)), Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED); + } + + private Page shuffle(Page source) { + try { + List shuffleList = new ArrayList<>(); + IntStream.range(0, source.getPositionCount()).forEach(i -> shuffleList.add(i)); + Randomness.shuffle(shuffleList); + int[] shuffleArray = shuffleList.stream().mapToInt(Integer::intValue).toArray(); + Block[] shuffledBlocks = new Block[source.getBlockCount()]; + for (int b = 0; b < shuffledBlocks.length; b++) { + shuffledBlocks[b] = source.getBlock(b).filter(shuffleArray); + } + return new Page(shuffledBlocks); + } finally { + source.releaseBlocks(); + } } private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft, ElementType elementType) { @@ -611,7 +656,9 @@ private void loadSimpleAndAssert( } } for (Operator op : operators) { - assertThat(((ValuesSourceReaderOperator) op).status().pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); } assertDriverContext(driverContext); } @@ -695,8 +742,9 @@ private void testLoadAllStatus(boolean allInOnePage) { } drive(operators, input.iterator(), driverContext); for (int i = 0; i < cases.size(); i++) { - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) operators.get(i).status(); - assertThat(status.pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); FieldCase fc = cases.get(i); fc.checkReaders.check(fc.info.name(), allInOnePage, input.size(), reader.leaves().size(), status.readersBuilt()); } @@ -862,6 +910,73 @@ private List infoAndChecksForEachType( return r; } + public void testLoadLong() throws IOException { + testLoadLong(false, false); + } + + public void testLoadLongManySegments() throws IOException { + testLoadLong(false, true); + } + + public void testLoadLongShuffled() throws IOException { + testLoadLong(true, false); + } + + public void testLoadLongShuffledManySegments() throws IOException { + testLoadLong(true, true); + } + + private void testLoadLong(boolean shuffle, boolean manySegments) throws IOException { + int numDocs = between(10, 500); + initMapping(); + keyToTags.clear(); + reader = initIndexLongField(directory, numDocs, manySegments ? commitEvery(numDocs) : numDocs); + + DriverContext driverContext = driverContext(); + List input = CannedSourceOperator.collectPages(sourceOperator(driverContext, numDocs)); + assertThat(reader.leaves(), hasSize(manySegments ? greaterThan(5) : equalTo(1))); + assertThat(input, hasSize(reader.leaves().size())); + if (manySegments) { + input = List.of(CannedSourceOperator.mergePages(input)); + } + if (shuffle) { + input = input.stream().map(this::shuffle).toList(); + } + + Checks checks = new Checks(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); + + List cases = List.of( + new FieldCase( + mapperService.fieldType("long_source_text"), + ElementType.BYTES_REF, + checks::strings, + StatusChecks::longTextFromSource + ) + ); + // Build one operator for each field, so we get a unique map to assert on + List operators = cases.stream() + .map( + i -> new ValuesSourceReaderOperator.Factory( + List.of(i.info), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 + ).get(driverContext) + ) + .toList(); + drive(operators, input.iterator(), driverContext); + for (int i = 0; i < cases.size(); i++) { + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); + } + } + record Checks(Block.MvOrdering booleanAndNumericalDocValuesMvOrdering, Block.MvOrdering bytesRefDocValuesMvOrdering) { void longs(Block block, int position, int key) { LongVector longs = ((LongBlock) block).asVector(); @@ -1075,6 +1190,10 @@ static void textFromSource(boolean forcedRowByRow, int pageCount, int segmentCou source("source_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers); } + static void longTextFromSource(boolean forcedRowByRow, int pageCount, int segmentCount, Map readers) { + source("long_source_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers); + } + static void textFromStored(boolean forcedRowByRow, int pageCount, int segmentCount, Map readers) { stored("stored_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers); } @@ -1485,13 +1604,13 @@ public void testNullsShared() { } public void testSequentialStoredFieldsTooSmall() throws IOException { - testSequentialStoredFields(false, between(1, ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY - 1)); + testSequentialStoredFields(false, between(1, ValuesFromSingleReader.SEQUENTIAL_BOUNDARY - 1)); } public void testSequentialStoredFieldsBigEnough() throws IOException { testSequentialStoredFields( true, - between(ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY, ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY * 2) + between(ValuesFromSingleReader.SEQUENTIAL_BOUNDARY, ValuesFromSingleReader.SEQUENTIAL_BOUNDARY * 2) ); } @@ -1522,7 +1641,7 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws int key = keys.getInt(p); checks.strings(results.get(0).getBlock(2), p, key); } - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status(); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); assertMap( status.readersBuilt(), matchesMap().entry("key:column_at_a_time:BlockDocValuesReader.SingletonInts", 1) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java index 3a0db28562ebe..107487d086070 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java @@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.test.ESTestCase; @@ -143,7 +143,7 @@ protected DriverProfile mutateInstance(DriverProfile instance) throws IOExceptio @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry( - List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, ExchangeSinkOperator.Status.ENTRY) + List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperatorStatus.ENTRY, ExchangeSinkOperator.Status.ENTRY) ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java index f8e4dc8d86abb..f4688f35e3202 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java @@ -14,8 +14,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperatorStatusTests; import org.elasticsearch.test.AbstractWireSerializingTestCase; @@ -198,7 +198,7 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry( - List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, ExchangeSinkOperator.Status.ENTRY) + List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperatorStatus.ENTRY, ExchangeSinkOperator.Status.ENTRY) ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java index 103a6a35651c7..d1193d00a92fe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.test.ComputeTestCase; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.TestResultPageSinkOperator; diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 69d746e7abbe9..e126d775d81c5 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -700,7 +700,9 @@ private String checkOperatorProfile(Map o) { .entry("processing_nanos", greaterThan(0)) .entry("processed_queries", List.of("*:*")) .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); - case "ValuesSourceReaderOperator" -> basicProfile().entry("values_loaded", greaterThanOrEqualTo(0)) + case "ValuesSourceReaderOperator" -> basicProfile().entry("pages_received", greaterThan(0)) + .entry("pages_emitted", greaterThan(0)) + .entry("values_loaded", greaterThanOrEqualTo(0)) .entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("rows_received", greaterThan(0)) @@ -711,7 +713,7 @@ private String checkOperatorProfile(Map o) { case "ExchangeSourceOperator" -> matchesMap().entry("pages_waiting", 0) .entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)); - case "ProjectOperator", "EvalOperator" -> basicProfile(); + case "ProjectOperator", "EvalOperator" -> basicProfile().entry("pages_processed", greaterThan(0)); case "LimitOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("limit", 1000) .entry("limit_remaining", 999) @@ -747,8 +749,7 @@ private String checkOperatorProfile(Map o) { } private MapMatcher basicProfile() { - return matchesMap().entry("pages_processed", greaterThan(0)) - .entry("process_nanos", greaterThan(0)) + return matchesMap().entry("process_nanos", greaterThan(0)) .entry("rows_received", greaterThan(0)) .entry("rows_emitted", greaterThan(0)); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 75c89c17a657e..bf94a875fe2c4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -19,7 +19,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.OperatorStatus; @@ -129,12 +129,13 @@ public void testTaskContents() throws Exception { } if (o.operator().equals("ValuesSourceReaderOperator[fields = [pause_me]]")) { assertThat(taskDescription, equalTo("data")); - ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); + ValuesSourceReaderOperatorStatus oStatus = (ValuesSourceReaderOperatorStatus) o.status(); assertMap( oStatus.readersBuilt(), matchesMap().entry("pause_me:column_at_a_time:ScriptLongs", greaterThanOrEqualTo(1)) ); - assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); + assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(1)); + assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(1)); assertThat(oStatus.valuesLoaded(), greaterThanOrEqualTo(1L)); valuesSourceReaders++; continue; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 990cd41b6ed84..4ef7680998fd6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -28,7 +28,7 @@ import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ShardContext; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.DriverRunner; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index e0513011f6b4f..37b3216b3f26e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -9,6 +9,7 @@ import org.elasticsearch.Build; import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction; import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; @@ -871,7 +872,7 @@ public enum Cap { DOCUMENTS_FOUND_AND_VALUES_LOADED, /** - * When creating constant null blocks in {@link org.elasticsearch.compute.lucene.ValuesSourceReaderOperator}, we also handed off + * When creating constant null blocks in {@link ValuesSourceReaderOperator}, we also handed off * the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases * in some union type queries. C.f. https://github.com/elastic/elasticsearch/issues/125850 */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 38ecc87ac3335..93c1114d67f82 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -34,7 +34,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.OrdinalBytesRefBlock; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -342,8 +342,14 @@ private void doLookup(T request, CancellableTask task, ActionListener warnings ); releasables.add(queryOperator); - var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); - releasables.add(extractFieldsOperator); + + List operators = new ArrayList<>(); + if (request.extractFields.isEmpty() == false) { + var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); + releasables.add(extractFieldsOperator); + operators.add(extractFieldsOperator); + } + operators.add(finishPages); /* * Collect all result Pages in a synchronizedList mostly out of paranoia. We'll @@ -363,7 +369,7 @@ private void doLookup(T request, CancellableTask task, ActionListener driverContext, request::toString, queryOperator, - List.of(extractFieldsOperator, finishPages), + operators, outputOperator, Driver.DEFAULT_STATUS_INTERVAL, Releasables.wrap(shardContext.release, localBreaker) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 8557cb63bdef5..b8a9bc1f13e20 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -24,7 +24,7 @@ import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorFactory; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OrdinalsGroupingOperator; import org.elasticsearch.compute.operator.SourceOperator; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index a35a1c8ddfb2c..c1988bca185e5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -25,7 +25,7 @@ import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; import org.elasticsearch.compute.operator.AggregationOperator; @@ -302,7 +302,7 @@ public List getNamedWriteables() { entries.add(LuceneOperator.Status.ENTRY); entries.add(TopNOperatorStatus.ENTRY); entries.add(MvExpandOperator.Status.ENTRY); - entries.add(ValuesSourceReaderOperator.Status.ENTRY); + entries.add(ValuesSourceReaderOperatorStatus.ENTRY); entries.add(SingleValueQuery.ENTRY); entries.add(AsyncOperator.Status.ENTRY); entries.add(EnrichLookupOperator.Status.ENTRY); From bf7af79176aa5a4c9ef10a784e503e8452e60316 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 7 Jul 2025 22:17:15 -0400 Subject: [PATCH 2/3] don't remove accident --- .../operator/ValuesSourceReaderBenchmark.java | 534 ++++++++++++++++++ 1 file changed, 534 insertions(+) create mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java new file mode 100644 index 0000000000000..b7fb291c59c34 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -0,0 +1,534 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.compute.operator; + +import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; +import org.elasticsearch.compute.operator.topn.TopNOperator; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.FieldNamesFieldMapper; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PrimitiveIterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@Warmup(iterations = 5) +@Measurement(iterations = 7) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +@Fork(1) +public class ValuesSourceReaderBenchmark { + private static final int BLOCK_LENGTH = 16 * 1024; + private static final int INDEX_SIZE = 10 * BLOCK_LENGTH; + private static final int COMMIT_INTERVAL = 500; + private static final BigArrays BIG_ARRAYS = BigArrays.NON_RECYCLING_INSTANCE; + private static final BlockFactory blockFactory = BlockFactory.getInstance( + new NoopCircuitBreaker("noop"), + BigArrays.NON_RECYCLING_INSTANCE + ); + + static { + // Smoke test all the expected values and force loading subclasses more like prod + try { + ValuesSourceReaderBenchmark benchmark = new ValuesSourceReaderBenchmark(); + benchmark.setupIndex(); + try { + for (String layout : ValuesSourceReaderBenchmark.class.getField("layout").getAnnotationsByType(Param.class)[0].value()) { + for (String name : ValuesSourceReaderBenchmark.class.getField("name").getAnnotationsByType(Param.class)[0].value()) { + benchmark.layout = layout; + benchmark.name = name; + try { + benchmark.setupPages(); + benchmark.benchmark(); + } catch (Exception e) { + throw new AssertionError("error initializing [" + layout + "/" + name + "]", e); + } + } + } + } finally { + benchmark.teardownIndex(); + } + } catch (IOException | NoSuchFieldException e) { + throw new AssertionError(e); + } + } + + private static List fields(String name) { + return switch (name) { + case "3_stored_keywords" -> List.of( + new ValuesSourceReaderOperator.FieldInfo("keyword_1", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_1")), + new ValuesSourceReaderOperator.FieldInfo("keyword_2", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_2")), + new ValuesSourceReaderOperator.FieldInfo("keyword_3", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_3")) + ); + default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType(name), shardIdx -> blockLoader(name))); + }; + } + + enum Where { + DOC_VALUES, + SOURCE, + STORED; + } + + private static ElementType elementType(String name) { + name = WhereAndBaseName.fromName(name).name; + switch (name) { + case "long": + return ElementType.LONG; + case "int": + return ElementType.INT; + case "double": + return ElementType.DOUBLE; + } + if (name.startsWith("keyword")) { + return ElementType.BYTES_REF; + } + throw new UnsupportedOperationException("no element type for [" + name + "]"); + } + + private static BlockLoader blockLoader(String name) { + WhereAndBaseName w = WhereAndBaseName.fromName(name); + switch (w.name) { + case "long": + return numericBlockLoader(w, NumberFieldMapper.NumberType.LONG); + case "int": + return numericBlockLoader(w, NumberFieldMapper.NumberType.INTEGER); + case "double": + return numericBlockLoader(w, NumberFieldMapper.NumberType.DOUBLE); + case "keyword": + w = new WhereAndBaseName(w.where, "keyword_1"); + } + if (w.name.startsWith("keyword")) { + boolean syntheticSource = false; + FieldType ft = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE); + switch (w.where) { + case DOC_VALUES: + break; + case SOURCE: + ft.setDocValuesType(DocValuesType.NONE); + break; + case STORED: + ft.setStored(true); + ft.setDocValuesType(DocValuesType.NONE); + syntheticSource = true; + break; + } + ft.freeze(); + return new KeywordFieldMapper.KeywordFieldType( + w.name, + ft, + Lucene.KEYWORD_ANALYZER, + Lucene.KEYWORD_ANALYZER, + Lucene.KEYWORD_ANALYZER, + new KeywordFieldMapper.Builder(name, IndexVersion.current()).docValues(ft.docValuesType() != DocValuesType.NONE), + syntheticSource + ).blockLoader(new MappedFieldType.BlockLoaderContext() { + @Override + public String indexName() { + return "benchmark"; + } + + @Override + public IndexSettings indexSettings() { + throw new UnsupportedOperationException(); + } + + @Override + public MappedFieldType.FieldExtractPreference fieldExtractPreference() { + return MappedFieldType.FieldExtractPreference.NONE; + } + + @Override + public SearchLookup lookup() { + throw new UnsupportedOperationException(); + } + + @Override + public Set sourcePaths(String name) { + return Set.of(name); + } + + @Override + public String parentField(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { + return FieldNamesFieldMapper.FieldNamesFieldType.get(true); + } + }); + } + throw new IllegalArgumentException("can't read [" + name + "]"); + } + + private record WhereAndBaseName(Where where, String name) { + static WhereAndBaseName fromName(String name) { + if (name.startsWith("stored_")) { + return new WhereAndBaseName(Where.STORED, name.substring("stored_".length())); + } else if (name.startsWith("source_")) { + return new WhereAndBaseName(Where.SOURCE, name.substring("source_".length())); + } + return new WhereAndBaseName(Where.DOC_VALUES, name); + } + } + + private static BlockLoader numericBlockLoader(WhereAndBaseName w, NumberFieldMapper.NumberType numberType) { + boolean stored = false; + boolean docValues = true; + switch (w.where) { + case DOC_VALUES: + break; + case SOURCE: + stored = true; + docValues = false; + break; + case STORED: + throw new UnsupportedOperationException(); + } + return new NumberFieldMapper.NumberFieldType( + w.name, + numberType, + true, + stored, + docValues, + true, + null, + Map.of(), + null, + false, + null, + null, + false + ).blockLoader(null); + } + + /** + * Layouts for the input blocks. + *
    + *
  • {@code in_order} is how {@link LuceneSourceOperator} produces them to read in + * the most efficient possible way. We
  • + *
  • {@code shuffled} is chunked the same size as {@link LuceneSourceOperator} but + * loads in a shuffled order, like a hypothetical {@link TopNOperator} that can + * output large blocks would output.
  • + *
  • {@code shuffled_singles} is shuffled in the same order as {@code shuffled} but + * each page has a single document rather than {@code BLOCK_SIZE} docs.
  • + *
+ */ + @Param({ "in_order", "shuffled", "shuffled_singles" }) + public String layout; + + @Param({ "long", "int", "double", "keyword", "stored_keyword", "3_stored_keywords" }) + public String name; + + private Directory directory; + private IndexReader reader; + private List pages; + + @Benchmark + @OperationsPerInvocation(INDEX_SIZE) + public void benchmark() { + ValuesSourceReaderOperator op = new ValuesSourceReaderOperator( + blockFactory, + fields(name), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { + throw new UnsupportedOperationException("can't load _source here"); + }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), + 0 + ); + long sum = 0; + for (Page page : pages) { + op.addInput(page); + switch (name) { + case "long" -> { + LongVector values = op.getOutput().getBlock(1).asVector(); + for (int p = 0; p < values.getPositionCount(); p++) { + sum += values.getLong(p); + } + } + case "int" -> { + IntVector values = op.getOutput().getBlock(1).asVector(); + for (int p = 0; p < values.getPositionCount(); p++) { + sum += values.getInt(p); + } + } + case "double" -> { + DoubleVector values = op.getOutput().getBlock(1).asVector(); + for (int p = 0; p < values.getPositionCount(); p++) { + sum += (long) values.getDouble(p); + } + } + case "keyword", "stored_keyword" -> { + BytesRef scratch = new BytesRef(); + BytesRefVector values = op.getOutput().getBlock(1).asVector(); + for (int p = 0; p < values.getPositionCount(); p++) { + BytesRef r = values.getBytesRef(p, scratch); + r.offset++; + r.length--; + sum += Integer.parseInt(r.utf8ToString()); + } + } + case "3_stored_keywords" -> { + BytesRef scratch = new BytesRef(); + Page out = op.getOutput(); + for (BytesRefVector values : new BytesRefVector[] { + out.getBlock(1).asVector(), + out.getBlock(2).asVector(), + out.getBlock(3).asVector() }) { + + for (int p = 0; p < values.getPositionCount(); p++) { + BytesRef r = values.getBytesRef(p, scratch); + r.offset++; + r.length--; + sum += Integer.parseInt(r.utf8ToString()); + } + } + } + } + } + long expected = 0; + switch (name) { + case "keyword", "stored_keyword": + for (int i = 0; i < INDEX_SIZE; i++) { + expected += i % 1000; + } + break; + case "3_stored_keywords": + for (int i = 0; i < INDEX_SIZE; i++) { + expected += 3 * (i % 1000); + } + break; + default: + expected = INDEX_SIZE; + expected = expected * (expected - 1) / 2; + } + if (expected != sum) { + throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]"); + } + boolean foundStoredFieldLoader = false; + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); + for (Map.Entry e : status.readersBuilt().entrySet()) { + if (e.getKey().indexOf("stored_fields") >= 0) { + foundStoredFieldLoader = true; + } + } + if (name.indexOf("stored") >= 0) { + if (foundStoredFieldLoader == false) { + throw new AssertionError("expected to use a stored field loader but only had: " + status.readersBuilt()); + } + } else { + if (foundStoredFieldLoader) { + throw new AssertionError("expected not to use a stored field loader but only had: " + status.readersBuilt()); + } + } + } + + @Setup + public void setup() throws IOException { + setupIndex(); + setupPages(); + } + + private void setupIndex() throws IOException { + directory = new ByteBuffersDirectory(); + FieldType keywordFieldType = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE); + keywordFieldType.setStored(true); + keywordFieldType.freeze(); + try (IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE))) { + for (int i = 0; i < INDEX_SIZE; i++) { + String c = Character.toString('a' - ((i % 1000) % 26) + 26); + iw.addDocument( + List.of( + new NumericDocValuesField("long", i), + new StoredField("long", i), + new NumericDocValuesField("int", i), + new StoredField("int", i), + new NumericDocValuesField("double", NumericUtils.doubleToSortableLong(i)), + new StoredField("double", (double) i), + new KeywordFieldMapper.KeywordField("keyword_1", new BytesRef(c + i % 1000), keywordFieldType), + new KeywordFieldMapper.KeywordField("keyword_2", new BytesRef(c + i % 1000), keywordFieldType), + new KeywordFieldMapper.KeywordField("keyword_3", new BytesRef(c + i % 1000), keywordFieldType) + ) + ); + if (i % COMMIT_INTERVAL == 0) { + iw.commit(); + } + } + } + reader = DirectoryReader.open(directory); + } + + private void setupPages() { + pages = new ArrayList<>(); + switch (layout) { + case "in_order" -> { + IntVector.Builder docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); + for (LeafReaderContext ctx : reader.leaves()) { + int begin = 0; + while (begin < ctx.reader().maxDoc()) { + int end = Math.min(begin + BLOCK_LENGTH, ctx.reader().maxDoc()); + for (int doc = 0; doc < ctx.reader().maxDoc(); doc++) { + docs.appendInt(doc); + } + pages.add( + new Page( + new DocVector( + blockFactory.newConstantIntBlockWith(0, end - begin).asVector(), + blockFactory.newConstantIntBlockWith(ctx.ord, end - begin).asVector(), + docs.build(), + true + ).asBlock() + ) + ); + docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); + begin = end; + } + } + } + case "shuffled" -> { + record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {} + List docItrs = new ArrayList<>(reader.leaves().size()); + for (LeafReaderContext ctx : reader.leaves()) { + docItrs.add(new ItrAndOrd(IntStream.range(0, ctx.reader().maxDoc()).iterator(), ctx.ord)); + } + IntVector.Builder docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); + IntVector.Builder leafs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); + int size = 0; + while (docItrs.isEmpty() == false) { + Iterator itrItr = docItrs.iterator(); + while (itrItr.hasNext()) { + ItrAndOrd next = itrItr.next(); + if (false == next.itr.hasNext()) { + itrItr.remove(); + continue; + } + docs.appendInt(next.itr.nextInt()); + leafs.appendInt(next.ord); + size++; + if (size >= BLOCK_LENGTH) { + pages.add( + new Page( + new DocVector(blockFactory.newConstantIntVector(0, size), leafs.build(), docs.build(), null).asBlock() + ) + ); + docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); + leafs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH); + size = 0; + } + } + } + if (size > 0) { + pages.add( + new Page( + new DocVector( + blockFactory.newConstantIntBlockWith(0, size).asVector(), + leafs.build().asBlock().asVector(), + docs.build(), + null + ).asBlock() + ) + ); + } + } + case "shuffled_singles" -> { + record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {} + List docItrs = new ArrayList<>(reader.leaves().size()); + for (LeafReaderContext ctx : reader.leaves()) { + docItrs.add(new ItrAndOrd(IntStream.range(0, ctx.reader().maxDoc()).iterator(), ctx.ord)); + } + while (docItrs.isEmpty() == false) { + Iterator itrItr = docItrs.iterator(); + while (itrItr.hasNext()) { + ItrAndOrd next = itrItr.next(); + if (false == next.itr.hasNext()) { + itrItr.remove(); + continue; + } + pages.add( + new Page( + new DocVector( + blockFactory.newConstantIntVector(0, 1), + blockFactory.newConstantIntVector(next.ord, 1), + blockFactory.newConstantIntVector(next.itr.nextInt(), 1), + true + ).asBlock() + ) + ); + } + } + } + default -> throw new IllegalArgumentException("unsupported layout [" + layout + "]"); + } + } + + @TearDown + public void teardownIndex() throws IOException { + IOUtils.close(reader, directory); + } +} From a47cc2306f2f800208cdc4e70589ef7acf3fd6a0 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 8 Jul 2025 16:06:17 -0400 Subject: [PATCH 3/3] Double close on shuffle during field loading (#130838) Fixes a bug during field loading where we could double-close blocks if we failed to allocate memory during the un-shuffling portion of field loading from single segments. Unit test incoming in the followup. Closes #130426 Closes #130790 Closes #130791 Closes #130792 Closes #130793 Closes #130270 Closes #130788 Closes #130122 Closes #130827 --- .../lucene/read/ValuesFromSingleReader.java | 33 +++++++++++-------- .../compute/lucene/read/ValuesReader.java | 3 ++ .../read/ValuesSourceReaderOperatorTests.java | 7 ---- .../compute/test/OperatorTestCase.java | 10 ++++-- .../xpack/esql/action/EnrichIT.java | 5 ++- 5 files changed, 32 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java index 3ac6565d21c33..1bee68160e024 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java @@ -65,22 +65,27 @@ public int get(int i) { return; } int[] forwards = docs.shardSegmentDocMapForwards(); - loadFromSingleLeaf(target, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } + Block[] unshuffled = new Block[target.length]; + try { + loadFromSingleLeaf(unshuffled, new BlockLoader.Docs() { + @Override + public int count() { + return docs.getPositionCount(); + } - @Override - public int get(int i) { - return docs.docs().getInt(forwards[i]); - } - }); - final int[] backwards = docs.shardSegmentDocMapBackwards(); - for (int i = 0; i < target.length; i++) { - try (Block in = target[i]) { - target[i] = in.filter(backwards); + @Override + public int get(int i) { + return docs.docs().getInt(forwards[i]); + } + }); + final int[] backwards = docs.shardSegmentDocMapBackwards(); + for (int i = 0; i < unshuffled.length; i++) { + target[i] = unshuffled[i].filter(backwards); + unshuffled[i].close(); + unshuffled[i] = null; } + } finally { + Releasables.closeExpectNoException(unshuffled); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java index d3b8b0edcec3d..ebfac0cb24f7f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java @@ -36,6 +36,9 @@ public Block[] next() { boolean success = false; try { load(target, offset); + if (target[0].getPositionCount() != docs.getPositionCount()) { + throw new IllegalStateException("partial pages not yet supported"); + } success = true; for (Block b : target) { operator.valuesLoaded += b.getTotalValueCount(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 945c262ccec8d..36f430452f7b8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BooleanBlock; @@ -445,12 +444,6 @@ protected void assertSimpleOutput(List input, List results) { assertThat(sum, equalTo(expectedSum)); } - @Override - protected ByteSizeValue enoughMemoryForSimple() { - assumeFalse("strange exception in the test, fix soon", true); - return ByteSizeValue.ofKb(1); - } - public void testLoadAll() { DriverContext driverContext = driverContext(); loadSimpleAndAssert( diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index 248e8f5ddbfd1..38833379078ef 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -98,10 +98,16 @@ protected ByteSizeValue enoughMemoryForSimple() { * all pages. */ public final void testSimpleCircuitBreaking() { - ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple(); - Operator.OperatorFactory simple = simple(); + /* + * Build the input before building `simple` to handle the rare + * cases where `simple` need some state from the input - mostly + * this is ValuesSourceReaderOperator. + */ DriverContext inputFactoryContext = driverContext(); List input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000))); + + ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple(); + Operator.OperatorFactory simple = simple(); try { ByteSizeValue limit = BreakerTestUtil.findBreakerLimit(memoryLimitForSimple, l -> runWithLimit(simple, input, l)); ByteSizeValue testWithSize = ByteSizeValue.ofBytes(randomLongBetween(0, limit.getBytes())); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index 19157b636dffc..c9f1ed57081ab 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -50,7 +50,6 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.After; import org.junit.Before; @@ -81,8 +80,8 @@ public class EnrichIT extends AbstractEsqlIntegTestCase { @Override protected Collection> nodePlugins() { - List> plugins = new ArrayList<>(super.nodePlugins()); - plugins.add(EsqlPlugin.class); + List> plugins = new ArrayList<>(); + plugins.add(EsqlActionBreakerIT.EsqlTestPluginWithMockBlockFactory.class); plugins.add(InternalExchangePlugin.class); plugins.add(LocalStateEnrich.class); plugins.add(IngestCommonPlugin.class);