diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java index a4504bedb3644..d592366835c08 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -41,7 +41,8 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ShardRefCounted; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; import org.elasticsearch.compute.operator.topn.TopNOperator; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -343,7 +344,7 @@ public void benchmark() { ); long sum = 0; for (Page page : pages) { - op.addInput(page); + op.addInput(page.shallowCopy()); switch (name) { case "long" -> { LongVector values = op.getOutput().getBlock(1).asVector(); @@ -411,7 +412,7 @@ public void benchmark() { throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]"); } boolean foundStoredFieldLoader = false; - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status(); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); for (Map.Entry e : status.readersBuilt().entrySet()) { if (e.getKey().indexOf("stored_fields") >= 0) { foundStoredFieldLoader = true; diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 07bf5e3d50267..5fbbe85d1a22f 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -209,6 +209,8 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59); public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60); public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61); + public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62); + public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63); public static final TransportVersion V_9_0_0 = def(9_000_0_09); public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10); public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11); @@ -325,6 +327,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00); public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00); public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00); + public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/module-info.java b/x-pack/plugin/esql/compute/src/main/java/module-info.java index c4a042d692ea1..f21ed72d7eb21 100644 --- a/x-pack/plugin/esql/compute/src/main/java/module-info.java +++ b/x-pack/plugin/esql/compute/src/main/java/module-info.java @@ -36,4 +36,5 @@ exports org.elasticsearch.compute.aggregation.table; exports org.elasticsearch.compute.data.sort; exports org.elasticsearch.compute.querydsl.query; + exports org.elasticsearch.compute.lucene.read; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java deleted file mode 100644 index 500bef6d2a597..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ /dev/null @@ -1,792 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.lucene; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.TransportVersion; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.DocBlock; -import org.elasticsearch.compute.data.DocVector; -import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.IntVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.data.SingletonOrdinalsBuilder; -import org.elasticsearch.compute.operator.AbstractPageMappingOperator; -import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.compute.operator.Operator; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; -import org.elasticsearch.index.mapper.BlockLoader; -import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; -import org.elasticsearch.index.mapper.SourceLoader; -import org.elasticsearch.search.fetch.StoredFieldsSpec; -import org.elasticsearch.xcontent.XContentBuilder; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TreeMap; -import java.util.function.IntFunction; -import java.util.function.Supplier; - -import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; -import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; - -/** - * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} - * and outputs them to a new column. - */ -public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { - /** - * Minimum number of documents for which it is more efficient to use a - * sequential stored field reader when reading stored fields. - *

- * The sequential stored field reader decompresses a whole block of docs - * at a time so for very short lists it won't be faster to use it. We use - * {@code 10} documents as the boundary for "very short" because it's what - * search does, not because we've done extensive testing on the number. - *

- */ - static final int SEQUENTIAL_BOUNDARY = 10; - - /** - * Creates a factory for {@link ValuesSourceReaderOperator}. - * @param fields fields to load - * @param shardContexts per-shard loading information - * @param docChannel the channel containing the shard, leaf/segment and doc id - */ - public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { - @Override - public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); - } - - @Override - public String describe() { - StringBuilder sb = new StringBuilder(); - sb.append("ValuesSourceReaderOperator[fields = ["); - if (fields.size() < 10) { - boolean first = true; - for (FieldInfo f : fields) { - if (first) { - first = false; - } else { - sb.append(", "); - } - sb.append(f.name); - } - } else { - sb.append(fields.size()).append(" fields"); - } - return sb.append("]]").toString(); - } - } - - /** - * Configuration for a field to load. - * - * {@code blockLoader} maps shard index to the {@link BlockLoader}s - * which load the actual blocks. - */ - public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} - - public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} - - private final FieldWork[] fields; - private final List shardContexts; - private final int docChannel; - private final BlockFactory blockFactory; - - private final Map readersBuilt = new TreeMap<>(); - private long valuesLoaded; - - int lastShard = -1; - int lastSegment = -1; - - /** - * Creates a new extractor - * @param fields fields to load - * @param docChannel the channel containing the shard, leaf/segment and doc id - */ - public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { - this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new); - this.shardContexts = shardContexts; - this.docChannel = docChannel; - this.blockFactory = blockFactory; - } - - @Override - protected Page process(Page page) { - DocVector docVector = page.getBlock(docChannel).asVector(); - - Block[] blocks = new Block[fields.length]; - boolean success = false; - try { - if (docVector.singleSegmentNonDecreasing()) { - IntVector docs = docVector.docs(); - int shard = docVector.shards().getInt(0); - int segment = docVector.segments().getInt(0); - loadFromSingleLeaf(blocks, shard, segment, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } - - @Override - public int get(int i) { - return docs.getInt(i); - } - }); - } else if (docVector.singleSegment()) { - loadFromSingleLeafUnsorted(blocks, docVector); - } else { - try (LoadFromMany many = new LoadFromMany(blocks, docVector)) { - many.run(); - } - } - success = true; - for (Block b : blocks) { - valuesLoaded += b.getTotalValueCount(); - } - return page.appendBlocks(blocks); - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - if (success == false) { - Releasables.closeExpectNoException(blocks); - } - } - } - - private void positionFieldWork(int shard, int segment, int firstDoc) { - if (lastShard == shard) { - if (lastSegment == segment) { - for (FieldWork w : fields) { - w.sameSegment(firstDoc); - } - return; - } - lastSegment = segment; - for (FieldWork w : fields) { - w.sameShardNewSegment(); - } - return; - } - lastShard = shard; - lastSegment = segment; - for (FieldWork w : fields) { - w.newShard(shard); - } - } - - private boolean positionFieldWorkDocGuarteedAscending(int shard, int segment) { - if (lastShard == shard) { - if (lastSegment == segment) { - return false; - } - lastSegment = segment; - for (FieldWork w : fields) { - w.sameShardNewSegment(); - } - return true; - } - lastShard = shard; - lastSegment = segment; - for (FieldWork w : fields) { - w.newShard(shard); - } - return true; - } - - private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoader.Docs docs) throws IOException { - int firstDoc = docs.get(0); - positionFieldWork(shard, segment, firstDoc); - StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; - List rowStrideReaders = new ArrayList<>(fields.length); - LeafReaderContext ctx = ctx(shard, segment); - try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.count())) { - for (int f = 0; f < fields.length; f++) { - FieldWork field = fields[f]; - BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx); - if (columnAtATime != null) { - blocks[f] = (Block) columnAtATime.read(loaderBlockFactory, docs); - sanityCheckBlock(columnAtATime, docs.count(), blocks[f], f); - } else { - rowStrideReaders.add( - new RowStrideReaderWork( - field.rowStride(ctx), - (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()), - field.loader, - f - ) - ); - storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); - } - } - - SourceLoader sourceLoader = null; - ShardContext shardContext = shardContexts.get(shard); - if (storedFieldsSpec.requiresSource()) { - sourceLoader = shardContext.newSourceLoader.get(); - storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); - } - - if (rowStrideReaders.isEmpty()) { - return; - } - if (storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { - throw new IllegalStateException( - "found row stride readers [" + rowStrideReaders + "] without stored fields [" + storedFieldsSpec + "]" - ); - } - StoredFieldLoader storedFieldLoader; - if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) { - storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec); - trackStoredFields(storedFieldsSpec, true); - } else { - storedFieldLoader = StoredFieldLoader.fromSpec(storedFieldsSpec); - trackStoredFields(storedFieldsSpec, false); - } - BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader( - storedFieldLoader.getLoader(ctx, null), - sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null - ); - for (int p = 0; p < docs.count(); p++) { - int doc = docs.get(p); - storedFields.advanceTo(doc); - for (RowStrideReaderWork work : rowStrideReaders) { - work.read(doc, storedFields); - } - } - for (RowStrideReaderWork work : rowStrideReaders) { - blocks[work.offset] = work.build(); - sanityCheckBlock(work.reader, docs.count(), blocks[work.offset], work.offset); - } - } finally { - Releasables.close(rowStrideReaders); - } - } - - private void loadFromSingleLeafUnsorted(Block[] blocks, DocVector docVector) throws IOException { - IntVector docs = docVector.docs(); - int[] forwards = docVector.shardSegmentDocMapForwards(); - int shard = docVector.shards().getInt(0); - int segment = docVector.segments().getInt(0); - loadFromSingleLeaf(blocks, shard, segment, new BlockLoader.Docs() { - @Override - public int count() { - return docs.getPositionCount(); - } - - @Override - public int get(int i) { - return docs.getInt(forwards[i]); - } - }); - final int[] backwards = docVector.shardSegmentDocMapBackwards(); - for (int i = 0; i < blocks.length; i++) { - Block in = blocks[i]; - blocks[i] = in.filter(backwards); - in.close(); - } - } - - private class LoadFromMany implements Releasable { - private final Block[] target; - private final IntVector shards; - private final IntVector segments; - private final IntVector docs; - private final int[] forwards; - private final int[] backwards; - private final Block.Builder[][] builders; - private final BlockLoader[][] converters; - private final Block.Builder[] fieldTypeBuilders; - private final BlockLoader.RowStrideReader[] rowStride; - - BlockLoaderStoredFieldsFromLeafLoader storedFields; - - LoadFromMany(Block[] target, DocVector docVector) { - this.target = target; - shards = docVector.shards(); - segments = docVector.segments(); - docs = docVector.docs(); - forwards = docVector.shardSegmentDocMapForwards(); - backwards = docVector.shardSegmentDocMapBackwards(); - fieldTypeBuilders = new Block.Builder[target.length]; - builders = new Block.Builder[target.length][shardContexts.size()]; - converters = new BlockLoader[target.length][shardContexts.size()]; - rowStride = new BlockLoader.RowStrideReader[target.length]; - } - - void run() throws IOException { - for (int f = 0; f < fields.length; f++) { - /* - * Important note: each field has a desired type, which might not match the mapped type (in the case of union-types). - * We create the final block builders using the desired type, one for each field, but then also use inner builders - * (one for each field and shard), and converters (again one for each field and shard) to actually perform the field - * loading in a way that is correct for the mapped field type, and then convert between that type and the desired type. - */ - fieldTypeBuilders[f] = fields[f].info.type.newBlockBuilder(docs.getPositionCount(), blockFactory); - builders[f] = new Block.Builder[shardContexts.size()]; - converters[f] = new BlockLoader[shardContexts.size()]; - } - try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.getPositionCount())) { - int p = forwards[0]; - int shard = shards.getInt(p); - int segment = segments.getInt(p); - int firstDoc = docs.getInt(p); - positionFieldWork(shard, segment, firstDoc); - LeafReaderContext ctx = ctx(shard, segment); - fieldsMoved(ctx, shard); - verifyBuilders(loaderBlockFactory, shard); - read(firstDoc, shard); - for (int i = 1; i < forwards.length; i++) { - p = forwards[i]; - shard = shards.getInt(p); - segment = segments.getInt(p); - boolean changedSegment = positionFieldWorkDocGuarteedAscending(shard, segment); - if (changedSegment) { - ctx = ctx(shard, segment); - fieldsMoved(ctx, shard); - } - verifyBuilders(loaderBlockFactory, shard); - read(docs.getInt(p), shard); - } - } - for (int f = 0; f < target.length; f++) { - for (int s = 0; s < shardContexts.size(); s++) { - if (builders[f][s] != null) { - try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) { - fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount()); - } - } - } - try (Block targetBlock = fieldTypeBuilders[f].build()) { - target[f] = targetBlock.filter(backwards); - } - sanityCheckBlock(rowStride[f], docs.getPositionCount(), target[f], f); - } - } - - private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException { - StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; - for (int f = 0; f < fields.length; f++) { - FieldWork field = fields[f]; - rowStride[f] = field.rowStride(ctx); - storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); - } - SourceLoader sourceLoader = null; - if (storedFieldsSpec.requiresSource()) { - sourceLoader = shardContexts.get(shard).newSourceLoader.get(); - storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); - } - storedFields = new BlockLoaderStoredFieldsFromLeafLoader( - StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null), - sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null - ); - if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { - trackStoredFields(storedFieldsSpec, false); - } - } - - private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) { - for (int f = 0; f < fields.length; f++) { - if (builders[f][shard] == null) { - // Note that this relies on field.newShard() to set the loader and converter correctly for the current shard - builders[f][shard] = (Block.Builder) fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount()); - converters[f][shard] = fields[f].loader; - } - } - } - - private void read(int doc, int shard) throws IOException { - storedFields.advanceTo(doc); - for (int f = 0; f < builders.length; f++) { - rowStride[f].read(doc, storedFields, builders[f][shard]); - } - } - - @Override - public void close() { - Releasables.closeExpectNoException(fieldTypeBuilders); - for (int f = 0; f < fields.length; f++) { - Releasables.closeExpectNoException(builders[f]); - } - } - } - - /** - * Is it more efficient to use a sequential stored field reader - * when reading stored fields for the documents contained in {@code docIds}? - */ - private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) { - int count = docs.count(); - if (count < SEQUENTIAL_BOUNDARY) { - return false; - } - int range = docs.get(count - 1) - docs.get(0); - return range * storedFieldsSequentialProportion <= count; - } - - private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { - readersBuilt.merge( - "stored_fields[" - + "requires_source:" - + spec.requiresSource() - + ", fields:" - + spec.requiredStoredFields().size() - + ", sequential: " - + sequential - + "]", - 1, - (prev, one) -> prev + one - ); - } - - private class FieldWork { - final FieldInfo info; - - BlockLoader loader; - BlockLoader.ColumnAtATimeReader columnAtATime; - BlockLoader.RowStrideReader rowStride; - - FieldWork(FieldInfo info) { - this.info = info; - } - - void sameSegment(int firstDoc) { - if (columnAtATime != null && columnAtATime.canReuse(firstDoc) == false) { - columnAtATime = null; - } - if (rowStride != null && rowStride.canReuse(firstDoc) == false) { - rowStride = null; - } - } - - void sameShardNewSegment() { - columnAtATime = null; - rowStride = null; - } - - void newShard(int shard) { - loader = info.blockLoader.apply(shard); - columnAtATime = null; - rowStride = null; - } - - BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException { - if (columnAtATime == null) { - columnAtATime = loader.columnAtATimeReader(ctx); - trackReader("column_at_a_time", this.columnAtATime); - } - return columnAtATime; - } - - BlockLoader.RowStrideReader rowStride(LeafReaderContext ctx) throws IOException { - if (rowStride == null) { - rowStride = loader.rowStrideReader(ctx); - trackReader("row_stride", this.rowStride); - } - return rowStride; - } - - private void trackReader(String type, BlockLoader.Reader reader) { - readersBuilt.merge(info.name + ":" + type + ":" + reader, 1, (prev, one) -> prev + one); - } - } - - private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset) - implements - Releasable { - void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException { - reader.read(doc, storedFields, builder); - } - - Block build() { - return (Block) loader.convert(builder.build()); - } - - @Override - public void close() { - builder.close(); - } - } - - private LeafReaderContext ctx(int shard, int segment) { - return shardContexts.get(shard).reader().leaves().get(segment); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("ValuesSourceReaderOperator[fields = ["); - if (fields.length < 10) { - boolean first = true; - for (FieldWork f : fields) { - if (first) { - first = false; - } else { - sb.append(", "); - } - sb.append(f.info.name); - } - } else { - sb.append(fields.length).append(" fields"); - } - return sb.append("]]").toString(); - } - - @Override - protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { - return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); - } - - /** - * Quick checks for on the loaded block to make sure it looks reasonable. - * @param loader the object that did the loading - we use it to make error messages if the block is busted - * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has - * @param block the block to sanity check - * @param field offset into the {@link #fields} array for the block being loaded - */ - private void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { - if (block.getPositionCount() != expectedPositions) { - throw new IllegalStateException( - sanityCheckBlockErrorPrefix(loader, block, field) - + " has [" - + block.getPositionCount() - + "] positions instead of [" - + expectedPositions - + "]" - ); - } - if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { - throw new IllegalStateException( - sanityCheckBlockErrorPrefix(loader, block, field) - + "'s element_type [" - + block.elementType() - + "] NOT IN (NULL, " - + fields[field].info.type - + ")" - ); - } - } - - private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { - return fields[field].info.name + "[" + loader + "]: " + block; - } - - public static class Status extends AbstractPageMappingOperator.Status { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( - Operator.Status.class, - "values_source_reader", - Status::new - ); - - private final Map readersBuilt; - private final long valuesLoaded; - - Status( - Map readersBuilt, - long processNanos, - int pagesProcessed, - long rowsReceived, - long rowsEmitted, - long valuesLoaded - ) { - super(processNanos, pagesProcessed, rowsReceived, rowsEmitted); - this.readersBuilt = readersBuilt; - this.valuesLoaded = valuesLoaded; - } - - Status(StreamInput in) throws IOException { - super(in); - readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); - valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(readersBuilt, StreamOutput::writeVInt); - if (supportsValuesLoaded(out.getTransportVersion())) { - out.writeVLong(valuesLoaded); - } - } - - private static boolean supportsValuesLoaded(TransportVersion version) { - return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) - || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); - } - - @Override - public String getWriteableName() { - return ENTRY.name; - } - - public Map readersBuilt() { - return readersBuilt; - } - - @Override - public long valuesLoaded() { - return valuesLoaded; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startObject("readers_built"); - for (Map.Entry e : readersBuilt.entrySet()) { - builder.field(e.getKey(), e.getValue()); - } - builder.endObject(); - builder.field("values_loaded", valuesLoaded); - innerToXContent(builder); - return builder.endObject(); - } - - @Override - public boolean equals(Object o) { - if (super.equals(o) == false) return false; - Status status = (Status) o; - return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); - } - - @Override - public String toString() { - return Strings.toString(this); - } - } - - private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable { - private final int pageSize; - private Block nullBlock; - - private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) { - super(factory); - this.pageSize = pageSize; - } - - @Override - public Block constantNulls() { - if (nullBlock == null) { - nullBlock = factory.newConstantNullBlock(pageSize); - } - nullBlock.incRef(); - return nullBlock; - } - - @Override - public void close() { - if (nullBlock != null) { - nullBlock.close(); - } - } - - @Override - public BytesRefBlock constantBytes(BytesRef value) { - return factory.newConstantBytesRefBlockWith(value, pageSize); - } - } - - public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory { - protected final BlockFactory factory; - - protected DelegatingBlockLoaderFactory(BlockFactory factory) { - this.factory = factory; - } - - @Override - public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { - return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.BooleanBuilder booleans(int expectedCount) { - return factory.newBooleanBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) { - return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); - } - - @Override - public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) { - return factory.newBytesRefBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) { - return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.DoubleBuilder doubles(int expectedCount) { - return factory.newDoubleBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) { - return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions); - } - - @Override - public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) { - return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.IntBuilder ints(int expectedCount) { - return factory.newIntBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) { - return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); - } - - @Override - public BlockLoader.LongBuilder longs(int expectedCount) { - return factory.newLongBlockBuilder(expectedCount); - } - - @Override - public BlockLoader.Builder nulls(int expectedCount) { - return ElementType.NULL.newBlockBuilder(expectedCount, factory); - } - - @Override - public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { - return new SingletonOrdinalsBuilder(factory, ordinals, count); - } - - @Override - public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) { - return factory.newAggregateMetricDoubleBlockBuilder(count); - } - } -} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java new file mode 100644 index 0000000000000..f7f5f541c747f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.core.Releasable; + +class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable { + private final int pageSize; + private Block nullBlock; + + ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) { + super(factory); + this.pageSize = pageSize; + } + + @Override + public Block constantNulls() { + if (nullBlock == null) { + nullBlock = factory.newConstantNullBlock(pageSize); + } + nullBlock.incRef(); + return nullBlock; + } + + @Override + public void close() { + if (nullBlock != null) { + nullBlock.close(); + } + } + + @Override + public BytesRefBlock constantBytes(BytesRef value) { + return factory.newConstantBytesRefBlockWith(value, pageSize); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java new file mode 100644 index 0000000000000..8dc5b6cc43ecf --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.SortedDocValues; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.SingletonOrdinalsBuilder; +import org.elasticsearch.index.mapper.BlockLoader; + +public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory { + protected final BlockFactory factory; + + protected DelegatingBlockLoaderFactory(BlockFactory factory) { + this.factory = factory; + } + + @Override + public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { + return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.BooleanBuilder booleans(int expectedCount) { + return factory.newBooleanBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) { + return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); + } + + @Override + public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) { + return factory.newBytesRefBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) { + return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.DoubleBuilder doubles(int expectedCount) { + return factory.newDoubleBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) { + return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions); + } + + @Override + public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) { + return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.IntBuilder ints(int expectedCount) { + return factory.newIntBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) { + return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); + } + + @Override + public BlockLoader.LongBuilder longs(int expectedCount) { + return factory.newLongBlockBuilder(expectedCount); + } + + @Override + public BlockLoader.Builder nulls(int expectedCount) { + return ElementType.NULL.newBlockBuilder(expectedCount, factory); + } + + @Override + public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { + return new SingletonOrdinalsBuilder(factory, ordinals, count); + } + + @Override + public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) { + return factory.newAggregateMetricDoubleBlockBuilder(count); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java similarity index 98% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java rename to x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java index f535bc462fdfc..9ec5802b43f98 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; @@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.OrdinalBytesRefBlock; import org.elasticsearch.compute.data.OrdinalBytesRefVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -191,7 +192,7 @@ public void close() { Releasables.close(fieldsReader, super::close); } - static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory { + static class BlockLoaderFactory extends DelegatingBlockLoaderFactory { BlockLoaderFactory(BlockFactory factory) { super(factory); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java new file mode 100644 index 0000000000000..7ff6e7211b7f2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; + +/** + * Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}. + */ +class ValuesFromManyReader extends ValuesReader { + private final int[] forwards; + private final int[] backwards; + private final BlockLoader.RowStrideReader[] rowStride; + + private BlockLoaderStoredFieldsFromLeafLoader storedFields; + + ValuesFromManyReader(ValuesSourceReaderOperator operator, DocVector docs) { + super(operator, docs); + forwards = docs.shardSegmentDocMapForwards(); + backwards = docs.shardSegmentDocMapBackwards(); + rowStride = new BlockLoader.RowStrideReader[operator.fields.length]; + } + + @Override + protected void load(Block[] target, int offset) throws IOException { + try (Run run = new Run(target)) { + run.run(offset); + } + } + + class Run implements Releasable { + private final Block[] target; + private final Block.Builder[][] builders; + private final BlockLoader[][] converters; + private final Block.Builder[] fieldTypeBuilders; + + Run(Block[] target) { + this.target = target; + fieldTypeBuilders = new Block.Builder[target.length]; + builders = new Block.Builder[target.length][operator.shardContexts.size()]; + converters = new BlockLoader[target.length][operator.shardContexts.size()]; + } + + void run(int offset) throws IOException { + assert offset == 0; // TODO allow non-0 offset to support splitting pages + for (int f = 0; f < operator.fields.length; f++) { + /* + * Important note: each field has a desired type, which might not match the mapped type (in the case of union-types). + * We create the final block builders using the desired type, one for each field, but then also use inner builders + * (one for each field and shard), and converters (again one for each field and shard) to actually perform the field + * loading in a way that is correct for the mapped field type, and then convert between that type and the desired type. + */ + fieldTypeBuilders[f] = operator.fields[f].info.type().newBlockBuilder(docs.getPositionCount(), operator.blockFactory); + builders[f] = new Block.Builder[operator.shardContexts.size()]; + converters[f] = new BlockLoader[operator.shardContexts.size()]; + } + try ( + ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount()) + ) { + int p = forwards[offset]; + int shard = docs.shards().getInt(p); + int segment = docs.segments().getInt(p); + int firstDoc = docs.docs().getInt(p); + operator.positionFieldWork(shard, segment, firstDoc); + LeafReaderContext ctx = operator.ctx(shard, segment); + fieldsMoved(ctx, shard); + verifyBuilders(loaderBlockFactory, shard); + read(firstDoc, shard); + + int i = offset + 1; + while (i < forwards.length) { + p = forwards[i]; + shard = docs.shards().getInt(p); + segment = docs.segments().getInt(p); + boolean changedSegment = operator.positionFieldWorkDocGuaranteedAscending(shard, segment); + if (changedSegment) { + ctx = operator.ctx(shard, segment); + fieldsMoved(ctx, shard); + } + verifyBuilders(loaderBlockFactory, shard); + read(docs.docs().getInt(p), shard); + i++; + } + buildBlocks(); + } + } + + private void buildBlocks() { + for (int f = 0; f < target.length; f++) { + for (int s = 0; s < operator.shardContexts.size(); s++) { + if (builders[f][s] != null) { + try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) { + fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount()); + } + } + } + try (Block targetBlock = fieldTypeBuilders[f].build()) { + target[f] = targetBlock.filter(backwards); + } + operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f); + } + } + + private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) { + for (int f = 0; f < operator.fields.length; f++) { + if (builders[f][shard] == null) { + // Note that this relies on field.newShard() to set the loader and converter correctly for the current shard + builders[f][shard] = (Block.Builder) operator.fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount()); + converters[f][shard] = operator.fields[f].loader; + } + } + } + + private void read(int doc, int shard) throws IOException { + storedFields.advanceTo(doc); + for (int f = 0; f < builders.length; f++) { + rowStride[f].read(doc, storedFields, builders[f][shard]); + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(fieldTypeBuilders); + for (int f = 0; f < operator.fields.length; f++) { + Releasables.closeExpectNoException(builders[f]); + } + } + } + + private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException { + StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; + for (int f = 0; f < operator.fields.length; f++) { + ValuesSourceReaderOperator.FieldWork field = operator.fields[f]; + rowStride[f] = field.rowStride(ctx); + storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); + } + SourceLoader sourceLoader = null; + if (storedFieldsSpec.requiresSource()) { + sourceLoader = operator.shardContexts.get(shard).newSourceLoader().get(); + storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); + } + storedFields = new BlockLoaderStoredFieldsFromLeafLoader( + StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null), + sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null + ); + if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { + operator.trackStoredFields(storedFieldsSpec, false); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java new file mode 100644 index 0000000000000..3ac6565d21c33 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Loads values from a single leaf. Much more efficient than {@link ValuesFromManyReader}. + */ +class ValuesFromSingleReader extends ValuesReader { + /** + * Minimum number of documents for which it is more efficient to use a + * sequential stored field reader when reading stored fields. + *

+ * The sequential stored field reader decompresses a whole block of docs + * at a time so for very short lists it won't be faster to use it. We use + * {@code 10} documents as the boundary for "very short" because it's what + * search does, not because we've done extensive testing on the number. + *

+ */ + static final int SEQUENTIAL_BOUNDARY = 10; + + private final int shard; + private final int segment; + + ValuesFromSingleReader(ValuesSourceReaderOperator operator, DocVector docs) { + super(operator, docs); + this.shard = docs.shards().getInt(0); + this.segment = docs.segments().getInt(0); + } + + @Override + protected void load(Block[] target, int offset) throws IOException { + assert offset == 0; // TODO allow non-0 offset to support splitting pages + if (docs.singleSegmentNonDecreasing()) { + loadFromSingleLeaf(target, new BlockLoader.Docs() { + @Override + public int count() { + return docs.getPositionCount(); + } + + @Override + public int get(int i) { + return docs.docs().getInt(i); + } + }); + return; + } + int[] forwards = docs.shardSegmentDocMapForwards(); + loadFromSingleLeaf(target, new BlockLoader.Docs() { + @Override + public int count() { + return docs.getPositionCount(); + } + + @Override + public int get(int i) { + return docs.docs().getInt(forwards[i]); + } + }); + final int[] backwards = docs.shardSegmentDocMapBackwards(); + for (int i = 0; i < target.length; i++) { + try (Block in = target[i]) { + target[i] = in.filter(backwards); + } + } + } + + private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IOException { + int firstDoc = docs.get(0); + operator.positionFieldWork(shard, segment, firstDoc); + StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; + List rowStrideReaders = new ArrayList<>(operator.fields.length); + LeafReaderContext ctx = operator.ctx(shard, segment); + try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.count())) { + for (int f = 0; f < operator.fields.length; f++) { + ValuesSourceReaderOperator.FieldWork field = operator.fields[f]; + BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx); + if (columnAtATime != null) { + target[f] = (Block) columnAtATime.read(loaderBlockFactory, docs); + operator.sanityCheckBlock(columnAtATime, docs.count(), target[f], f); + } else { + rowStrideReaders.add( + new RowStrideReaderWork( + field.rowStride(ctx), + (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()), + field.loader, + f + ) + ); + storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec()); + } + } + + if (rowStrideReaders.isEmpty() == false) { + loadFromRowStrideReaders(target, storedFieldsSpec, rowStrideReaders, ctx, docs); + } + } finally { + Releasables.close(rowStrideReaders); + } + } + + private void loadFromRowStrideReaders( + Block[] target, + StoredFieldsSpec storedFieldsSpec, + List rowStrideReaders, + LeafReaderContext ctx, + BlockLoader.Docs docs + ) throws IOException { + SourceLoader sourceLoader = null; + ValuesSourceReaderOperator.ShardContext shardContext = operator.shardContexts.get(shard); + if (storedFieldsSpec.requiresSource()) { + sourceLoader = shardContext.newSourceLoader().get(); + storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); + } + if (storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) { + throw new IllegalStateException( + "found row stride readers [" + rowStrideReaders + "] without stored fields [" + storedFieldsSpec + "]" + ); + } + StoredFieldLoader storedFieldLoader; + if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) { + storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec); + operator.trackStoredFields(storedFieldsSpec, true); + } else { + storedFieldLoader = StoredFieldLoader.fromSpec(storedFieldsSpec); + operator.trackStoredFields(storedFieldsSpec, false); + } + BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader( + storedFieldLoader.getLoader(ctx, null), + sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null + ); + int p = 0; + while (p < docs.count()) { + int doc = docs.get(p++); + storedFields.advanceTo(doc); + for (RowStrideReaderWork work : rowStrideReaders) { + work.read(doc, storedFields); + } + } + for (RowStrideReaderWork work : rowStrideReaders) { + target[work.offset] = work.build(); + operator.sanityCheckBlock(work.reader, p, target[work.offset], work.offset); + } + } + + /** + * Is it more efficient to use a sequential stored field reader + * when reading stored fields for the documents contained in {@code docIds}? + */ + private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) { + int count = docs.count(); + if (count < SEQUENTIAL_BOUNDARY) { + return false; + } + int range = docs.get(count - 1) - docs.get(0); + return range * storedFieldsSequentialProportion <= count; + } + + private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset) + implements + Releasable { + void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException { + reader.read(doc, storedFields, builder); + } + + Block build() { + return (Block) loader.convert(builder.build()); + } + + @Override + public void close() { + builder.close(); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java new file mode 100644 index 0000000000000..d3b8b0edcec3d --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +import java.io.IOException; +import java.io.UncheckedIOException; + +public abstract class ValuesReader implements ReleasableIterator { + protected final ValuesSourceReaderOperator operator; + protected final DocVector docs; + private int offset; + + ValuesReader(ValuesSourceReaderOperator operator, DocVector docs) { + this.operator = operator; + this.docs = docs; + } + + @Override + public boolean hasNext() { + return offset < docs.getPositionCount(); + } + + @Override + public Block[] next() { + Block[] target = new Block[operator.fields.length]; + boolean success = false; + try { + load(target, offset); + success = true; + for (Block b : target) { + operator.valuesLoaded += b.getTotalValueCount(); + } + offset += target[0].getPositionCount(); + return target; + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (success == false) { + Releasables.closeExpectNoException(target); + } + } + } + + protected abstract void load(Block[] target, int offset) throws IOException; + + @Override + public void close() {} +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java new file mode 100644 index 0000000000000..2fd4784224087 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DocBlock; +import org.elasticsearch.compute.data.DocVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.IntFunction; +import java.util.function.Supplier; + +/** + * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} + * and outputs them to a new column. + */ +public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOperator { + /** + * Creates a factory for {@link ValuesSourceReaderOperator}. + * @param fields fields to load + * @param shardContexts per-shard loading information + * @param docChannel the channel containing the shard, leaf/segment and doc id + */ + public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { + public Factory { + if (fields.isEmpty()) { + throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields"); + } + } + + @Override + public Operator get(DriverContext driverContext) { + return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); + } + + @Override + public String describe() { + StringBuilder sb = new StringBuilder(); + sb.append("ValuesSourceReaderOperator[fields = ["); + if (fields.size() < 10) { + boolean first = true; + for (FieldInfo f : fields) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(f.name); + } + } else { + sb.append(fields.size()).append(" fields"); + } + return sb.append("]]").toString(); + } + } + + /** + * Configuration for a field to load. + * + * {@code blockLoader} maps shard index to the {@link BlockLoader}s + * which load the actual blocks. + */ + public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} + + public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} + + final FieldWork[] fields; + final List shardContexts; + private final int docChannel; + final BlockFactory blockFactory; + + private final Map readersBuilt = new TreeMap<>(); + long valuesLoaded; + + private int lastShard = -1; + private int lastSegment = -1; + + /** + * Creates a new extractor + * @param fields fields to load + * @param docChannel the channel containing the shard, leaf/segment and doc id + */ + public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { + if (fields.isEmpty()) { + throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields"); + } + this.fields = fields.stream().map(FieldWork::new).toArray(FieldWork[]::new); + this.shardContexts = shardContexts; + this.docChannel = docChannel; + this.blockFactory = blockFactory; + } + + @Override + protected ReleasableIterator receive(Page page) { + DocVector docVector = page.getBlock(docChannel).asVector(); + return appendBlockArrays( + page, + docVector.singleSegment() ? new ValuesFromSingleReader(this, docVector) : new ValuesFromManyReader(this, docVector) + ); + } + + void positionFieldWork(int shard, int segment, int firstDoc) { + if (lastShard == shard) { + if (lastSegment == segment) { + for (FieldWork w : fields) { + w.sameSegment(firstDoc); + } + return; + } + lastSegment = segment; + for (FieldWork w : fields) { + w.sameShardNewSegment(); + } + return; + } + lastShard = shard; + lastSegment = segment; + for (FieldWork w : fields) { + w.newShard(shard); + } + } + + boolean positionFieldWorkDocGuaranteedAscending(int shard, int segment) { + if (lastShard == shard) { + if (lastSegment == segment) { + return false; + } + lastSegment = segment; + for (FieldWork w : fields) { + w.sameShardNewSegment(); + } + return true; + } + lastShard = shard; + lastSegment = segment; + for (FieldWork w : fields) { + w.newShard(shard); + } + return true; + } + + void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { + readersBuilt.merge( + "stored_fields[" + + "requires_source:" + + spec.requiresSource() + + ", fields:" + + spec.requiredStoredFields().size() + + ", sequential: " + + sequential + + "]", + 1, + (prev, one) -> prev + one + ); + } + + protected class FieldWork { + final FieldInfo info; + + BlockLoader loader; + BlockLoader.ColumnAtATimeReader columnAtATime; + BlockLoader.RowStrideReader rowStride; + + FieldWork(FieldInfo info) { + this.info = info; + } + + void sameSegment(int firstDoc) { + if (columnAtATime != null && columnAtATime.canReuse(firstDoc) == false) { + columnAtATime = null; + } + if (rowStride != null && rowStride.canReuse(firstDoc) == false) { + rowStride = null; + } + } + + void sameShardNewSegment() { + columnAtATime = null; + rowStride = null; + } + + void newShard(int shard) { + loader = info.blockLoader.apply(shard); + columnAtATime = null; + rowStride = null; + } + + BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException { + if (columnAtATime == null) { + columnAtATime = loader.columnAtATimeReader(ctx); + trackReader("column_at_a_time", this.columnAtATime); + } + return columnAtATime; + } + + BlockLoader.RowStrideReader rowStride(LeafReaderContext ctx) throws IOException { + if (rowStride == null) { + rowStride = loader.rowStrideReader(ctx); + trackReader("row_stride", this.rowStride); + } + return rowStride; + } + + private void trackReader(String type, BlockLoader.Reader reader) { + readersBuilt.merge(info.name + ":" + type + ":" + reader, 1, (prev, one) -> prev + one); + } + } + + LeafReaderContext ctx(int shard, int segment) { + return shardContexts.get(shard).reader().leaves().get(segment); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ValuesSourceReaderOperator[fields = ["); + if (fields.length < 10) { + boolean first = true; + for (FieldWork f : fields) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(f.info.name); + } + } else { + sb.append(fields.length).append(" fields"); + } + return sb.append("]]").toString(); + } + + @Override + protected ValuesSourceReaderOperatorStatus status( + long processNanos, + int pagesReceived, + int pagesEmitted, + long rowsReceived, + long rowsEmitted + ) { + return new ValuesSourceReaderOperatorStatus( + new TreeMap<>(readersBuilt), + processNanos, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted, + valuesLoaded + ); + } + + /** + * Quick checks for on the loaded block to make sure it looks reasonable. + * @param loader the object that did the loading - we use it to make error messages if the block is busted + * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has + * @param block the block to sanity check + * @param field offset into the {@link #fields} array for the block being loaded + */ + void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { + if (block.getPositionCount() != expectedPositions) { + throw new IllegalStateException( + sanityCheckBlockErrorPrefix(loader, block, field) + + " has [" + + block.getPositionCount() + + "] positions instead of [" + + expectedPositions + + "]" + ); + } + if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { + throw new IllegalStateException( + sanityCheckBlockErrorPrefix(loader, block, field) + + "'s element_type [" + + block.elementType() + + "] NOT IN (NULL, " + + fields[field].info.type + + ")" + ); + } + } + + private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { + return fields[field].info.name + "[" + loader + "]: " + block; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java new file mode 100644 index 0000000000000..ede86e6d78f12 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.operator.AbstractPageMappingOperator; +import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; +import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES_8_19; +import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES_9_1; + +public class ValuesSourceReaderOperatorStatus extends AbstractPageMappingToIteratorOperator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "values_source_reader", + ValuesSourceReaderOperatorStatus::readFrom + ); + + private final Map readersBuilt; + private final long valuesLoaded; + + public ValuesSourceReaderOperatorStatus( + Map readersBuilt, + long processNanos, + int pagesReceived, + int pagesEmitted, + long rowsReceived, + long rowsEmitted, + long valuesLoaded + ) { + super(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); + this.readersBuilt = readersBuilt; + this.valuesLoaded = valuesLoaded; + } + + static ValuesSourceReaderOperatorStatus readFrom(StreamInput in) throws IOException { + long processNanos; + int pagesReceived; + int pagesEmitted; + long rowsReceived; + long rowsEmitted; + if (supportsSplitOnBigValues(in.getTransportVersion())) { + AbstractPageMappingToIteratorOperator.Status status = new AbstractPageMappingToIteratorOperator.Status(in); + processNanos = status.processNanos(); + pagesReceived = status.pagesReceived(); + pagesEmitted = status.pagesEmitted(); + rowsReceived = status.rowsReceived(); + rowsEmitted = status.rowsEmitted(); + } else { + AbstractPageMappingOperator.Status status = new AbstractPageMappingOperator.Status(in); + processNanos = status.processNanos(); + pagesReceived = status.pagesProcessed(); + pagesEmitted = status.pagesProcessed(); + rowsReceived = status.rowsReceived(); + rowsEmitted = status.rowsEmitted(); + } + Map readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); + long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0; + return new ValuesSourceReaderOperatorStatus( + readersBuilt, + processNanos, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted, + valuesLoaded + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (supportsSplitOnBigValues(out.getTransportVersion())) { + super.writeTo(out); + } else { + /* + * Before we knew how to split pages when reading large values + * our status just contained one int per page - just like AbstractPageMappingOperator.Status. + */ + new AbstractPageMappingOperator.Status(processNanos(), pagesEmitted(), rowsReceived(), rowsEmitted()).writeTo(out); + } + out.writeMap(readersBuilt, StreamOutput::writeVInt); + if (supportsValuesLoaded(out.getTransportVersion())) { + out.writeVLong(valuesLoaded); + } + } + + private static boolean supportsSplitOnBigValues(TransportVersion version) { + return version.onOrAfter(ESQL_SPLIT_ON_BIG_VALUES_9_1) || version.isPatchFrom(ESQL_SPLIT_ON_BIG_VALUES_8_19); + } + + private static boolean supportsValuesLoaded(TransportVersion version) { + return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) + || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + public Map readersBuilt() { + return readersBuilt; + } + + @Override + public long valuesLoaded() { + return valuesLoaded; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("readers_built"); + for (Map.Entry e : readersBuilt.entrySet()) { + builder.field(e.getKey(), e.getValue()); + } + builder.endObject(); + builder.field("values_loaded", valuesLoaded); + innerToXContent(builder); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o) == false) return false; + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) o; + return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java index d0b4aaad22a3e..84affa27dc5ec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java @@ -127,7 +127,7 @@ public Status(long processNanos, int pagesProcessed, long rowsReceived, long row this.rowsEmitted = rowsEmitted; } - protected Status(StreamInput in) throws IOException { + public Status(StreamInput in) throws IOException { processNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0; pagesProcessed = in.readVInt(); if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java index 6a165fdfa055b..055359c6a389a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java @@ -64,13 +64,38 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator */ protected abstract ReleasableIterator receive(Page page); + /** + * Append an {@link Iterator} of arrays of {@link Block}s to a + * {@link Page}, one after the other. It's required that the + * iterator emit as many positions as there were + * in the page. + */ + public static ReleasableIterator appendBlockArrays(Page page, ReleasableIterator toAdd) { + return new AppendBlocksIterator(page, toAdd); + } + /** * Append an {@link Iterator} of {@link Block}s to a {@link Page}, one * after the other. It's required that the iterator emit as many * positions as there were in the page. */ public static ReleasableIterator appendBlocks(Page page, ReleasableIterator toAdd) { - return new AppendBlocksIterator(page, toAdd); + return appendBlockArrays(page, new ReleasableIterator<>() { + @Override + public boolean hasNext() { + return toAdd.hasNext(); + } + + @Override + public Block[] next() { + return new Block[] { toAdd.next() }; + } + + @Override + public void close() { + toAdd.close(); + } + }); } @Override @@ -86,13 +111,24 @@ public final void addInput(Page page) { if (next != null) { assert next.hasNext() == false : "has pending input page"; next.close(); + next = null; } if (page.getPositionCount() == 0) { return; } - next = new RuntimeTrackingIterator(receive(page)); - pagesReceived++; - rowsReceived += page.getPositionCount(); + try { + next = new RuntimeTrackingIterator(receive(page)); + pagesReceived++; + rowsReceived += page.getPositionCount(); + } finally { + if (next == null) { + /* + * The `receive` operation failed, we need to release the incoming page + * because it's no longer owned by anyone. + */ + page.releaseBlocks(); + } + } } @Override @@ -183,7 +219,7 @@ public Status(long processNanos, int pagesProcessed, int pagesEmitted, long rows this.rowsEmitted = rowsEmitted; } - protected Status(StreamInput in) throws IOException { + public Status(StreamInput in) throws IOException { processNanos = in.readVLong(); pagesReceived = in.readVInt(); pagesEmitted = in.readVInt(); @@ -284,11 +320,11 @@ public TransportVersion getMinimalSupportedVersion() { private static class AppendBlocksIterator implements ReleasableIterator { private final Page page; - private final ReleasableIterator next; + private final ReleasableIterator next; private int positionOffset; - protected AppendBlocksIterator(Page page, ReleasableIterator next) { + protected AppendBlocksIterator(Page page, ReleasableIterator next) { this.page = page; this.next = next; } @@ -305,17 +341,17 @@ public final boolean hasNext() { @Override public final Page next() { - Block read = next.next(); + Block[] read = next.next(); int start = positionOffset; - positionOffset += read.getPositionCount(); - if (start == 0 && read.getPositionCount() == page.getPositionCount()) { + positionOffset += read[0].getPositionCount(); + if (start == 0 && read[0].getPositionCount() == page.getPositionCount()) { for (int b = 0; b < page.getBlockCount(); b++) { page.getBlock(b).incRef(); } - return page.appendBlock(read); + return page.appendBlocks(read); } - Block[] newBlocks = new Block[page.getBlockCount() + 1]; - newBlocks[page.getBlockCount()] = read; + Block[] newBlocks = new Block[page.getBlockCount() + read.length]; + System.arraycopy(read, 0, newBlocks, page.getBlockCount(), read.length); try { // TODO a way to filter with a range please. int[] positions = IntStream.range(start, positionOffset).toArray(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java index 4e06c1f0f4b69..05f60c1b6834d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java @@ -14,7 +14,7 @@ /** * {@link Block#lookup Looks up} values from a provided {@link Block} and - * mergeds them into each {@link Page}. + * merged them into each {@link Page}. */ public class ColumnLoadOperator extends AbstractPageMappingToIteratorOperator { public record Values(String name, Block block) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java index 7e63fe1681dd3..8f88f9d73b0d4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java @@ -95,8 +95,9 @@ public static DriverCompletionInfo readFrom(StreamInput in) throws IOException { in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom), in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN) - ? in.readCollectionAsImmutableList(PlanProfile::readFrom) - : List.of() + || in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19) + ? in.readCollectionAsImmutableList(PlanProfile::readFrom) + : List.of() ); } @@ -105,7 +106,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(documentsFound); out.writeVLong(valuesLoaded); out.writeCollection(driverProfiles); - if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) { + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN) + || out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)) { out.writeCollection(planProfiles); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 9c15b0f3fc7d5..58466cffee78e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -32,7 +32,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 1a88bac8b3953..6d345d510f4b0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -52,7 +52,7 @@ import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; import org.elasticsearch.compute.lucene.ShardContext; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 4828f70e51dcd..655f7b54c61c0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.data.Vector; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index a8cb202f2be2c..91b8de1a08573 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index db60d3cd19cb3..7ba1f9790ecbe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.test.OperatorTestCase; @@ -52,7 +53,7 @@ public class LuceneTopNSourceOperatorScoringTests extends LuceneTopNSourceOperat private IndexReader reader; @After - private void closeIndex() throws IOException { + public void closeScoringIndex() throws IOException { IOUtils.close(reader, directory); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index 0c9bf676e0547..26540caee0b1f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.test.AnyOperatorTestCase; @@ -57,7 +58,7 @@ public class LuceneTopNSourceOperatorTests extends AnyOperatorTestCase { private IndexReader reader; @After - private void closeIndex() throws IOException { + public void closeIndex() throws IOException { IOUtils.close(reader, directory); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java index 15ae1d506a2fe..3a3fac1afc595 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java @@ -34,6 +34,9 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.read.TimeSeriesExtractFieldOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.DriverStatus; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java similarity index 99% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java index 88211a9170034..79ff6642a5ac3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoubleDocValuesField; @@ -48,6 +48,12 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; +import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.DriverRunner; @@ -629,7 +635,9 @@ private void loadSimpleAndAssert( } } for (Operator op : operators) { - assertThat(((ValuesSourceReaderOperator) op).status().pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); } assertDriverContext(driverContext); } @@ -716,8 +724,9 @@ private void testLoadAllStatus(boolean allInOnePage) { } drive(operators, input.iterator(), driverContext); for (int i = 0; i < cases.size(); i++) { - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) operators.get(i).status(); - assertThat(status.pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); FieldCase fc = cases.get(i); fc.checkReaders.check(fc.info.name(), allInOnePage, input.size(), totalSize, status.readersBuilt()); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java similarity index 60% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java index af1463b88c62c..f81398eb67695 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; @@ -18,9 +18,9 @@ import static org.hamcrest.Matchers.equalTo; -public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { - public static ValuesSourceReaderOperator.Status simple() { - return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222, 1000); +public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { + public static ValuesSourceReaderOperatorStatus simple() { + return new ValuesSourceReaderOperatorStatus(Map.of("ReaderType", 3), 1022323, 123, 200, 111, 222, 1000); } public static String simpleToJson() { @@ -32,7 +32,8 @@ public static String simpleToJson() { "values_loaded" : 1000, "process_nanos" : 1022323, "process_time" : "1ms", - "pages_processed" : 123, + "pages_received" : 123, + "pages_emitted" : 200, "rows_received" : 111, "rows_emitted" : 222 }"""; @@ -43,16 +44,17 @@ public void testToXContent() { } @Override - protected Writeable.Reader instanceReader() { - return ValuesSourceReaderOperator.Status::new; + protected Writeable.Reader instanceReader() { + return ValuesSourceReaderOperatorStatus::readFrom; } @Override - public ValuesSourceReaderOperator.Status createTestInstance() { - return new ValuesSourceReaderOperator.Status( + public ValuesSourceReaderOperatorStatus createTestInstance() { + return new ValuesSourceReaderOperatorStatus( randomReadersBuilt(), randomNonNegativeLong(), randomNonNegativeInt(), + randomNonNegativeInt(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong() @@ -69,22 +71,32 @@ private Map randomReadersBuilt() { } @Override - protected ValuesSourceReaderOperator.Status mutateInstance(ValuesSourceReaderOperator.Status instance) throws IOException { + protected ValuesSourceReaderOperatorStatus mutateInstance(ValuesSourceReaderOperatorStatus instance) throws IOException { Map readersBuilt = instance.readersBuilt(); long processNanos = instance.processNanos(); - int pagesProcessed = instance.pagesProcessed(); + int pagesReceived = instance.pagesReceived(); + int pagesEmitted = instance.pagesEmitted(); long rowsReceived = instance.rowsReceived(); long rowsEmitted = instance.rowsEmitted(); long valuesLoaded = instance.valuesLoaded(); - switch (between(0, 5)) { + switch (between(0, 6)) { case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt); case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); - case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); - case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); - case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); - case 5 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); + case 2 -> pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); + case 3 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + case 4 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 5 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 6 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); + return new ValuesSourceReaderOperatorStatus( + readersBuilt, + processNanos, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted, + valuesLoaded + ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java similarity index 91% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 1550b6dc013ab..6c1e89925702b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.lucene; +package org.elasticsearch.compute.lucene.read; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoubleDocValuesField; @@ -45,6 +45,12 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; +import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; +import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -176,6 +182,10 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv } catch (IOException e) { throw new RuntimeException(e); } + return sourceOperator(context, pageSize); + } + + private SourceOperator sourceOperator(DriverContext context, int pageSize) { var luceneFactory = new LuceneSourceOperator.Factory( List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), @@ -205,6 +215,7 @@ private void initMapping() throws IOException { simpleField(b, "missing_text", "text"); b.startObject("source_text").field("type", "text").field("store", false).endObject(); b.startObject("mv_source_text").field("type", "text").field("store", false).endObject(); + b.startObject("long_source_text").field("type", "text").field("store", false).endObject(); b.startObject("stored_text").field("type", "text").field("store", true).endObject(); b.startObject("mv_stored_text").field("type", "text").field("store", true).endObject(); @@ -380,6 +391,33 @@ private IndexReader initIndex(Directory directory, int size, int commitEvery) th return DirectoryReader.open(directory); } + private IndexReader initIndexLongField(Directory directory, int size, int commitEvery) throws IOException { + try ( + IndexWriter writer = new IndexWriter( + directory, + newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE).setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH) + ) + ) { + for (int d = 0; d < size; d++) { + XContentBuilder source = JsonXContent.contentBuilder(); + source.startObject(); + source.field("long_source_text", Integer.toString(d).repeat(100 * 1024)); + source.endObject(); + ParsedDocument doc = mapperService.documentParser() + .parseDocument( + new SourceToParse("id" + d, BytesReference.bytes(source), XContentType.JSON), + mapperService.mappingLookup() + ); + writer.addDocuments(doc.docs()); + + if (d % commitEvery == commitEvery - 1) { + writer.commit(); + } + } + } + return DirectoryReader.open(directory); + } + @Override protected Matcher expectedDescriptionOfSimple() { return equalTo("ValuesSourceReaderOperator[fields = [long]]"); @@ -491,16 +529,23 @@ public void testLoadAllInOnePageShuffled() { Page source = CannedSourceOperator.mergePages( CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(100, 5000))) ); - List shuffleList = new ArrayList<>(); - IntStream.range(0, source.getPositionCount()).forEach(i -> shuffleList.add(i)); - Randomness.shuffle(shuffleList); - int[] shuffleArray = shuffleList.stream().mapToInt(Integer::intValue).toArray(); - Block[] shuffledBlocks = new Block[source.getBlockCount()]; - for (int b = 0; b < shuffledBlocks.length; b++) { - shuffledBlocks[b] = source.getBlock(b).filter(shuffleArray); - } - source = new Page(shuffledBlocks); - loadSimpleAndAssert(driverContext, List.of(source), Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED); + loadSimpleAndAssert(driverContext, List.of(shuffle(source)), Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED); + } + + private Page shuffle(Page source) { + try { + List shuffleList = new ArrayList<>(); + IntStream.range(0, source.getPositionCount()).forEach(i -> shuffleList.add(i)); + Randomness.shuffle(shuffleList); + int[] shuffleArray = shuffleList.stream().mapToInt(Integer::intValue).toArray(); + Block[] shuffledBlocks = new Block[source.getBlockCount()]; + for (int b = 0; b < shuffledBlocks.length; b++) { + shuffledBlocks[b] = source.getBlock(b).filter(shuffleArray); + } + return new Page(shuffledBlocks); + } finally { + source.releaseBlocks(); + } } private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft, ElementType elementType) { @@ -612,7 +657,9 @@ private void loadSimpleAndAssert( } } for (Operator op : operators) { - assertThat(((ValuesSourceReaderOperator) op).status().pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); } assertDriverContext(driverContext); } @@ -696,8 +743,9 @@ private void testLoadAllStatus(boolean allInOnePage) { } drive(operators, input.iterator(), driverContext); for (int i = 0; i < cases.size(); i++) { - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) operators.get(i).status(); - assertThat(status.pagesProcessed(), equalTo(input.size())); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); FieldCase fc = cases.get(i); fc.checkReaders.check(fc.info.name(), allInOnePage, input.size(), reader.leaves().size(), status.readersBuilt()); } @@ -863,6 +911,73 @@ private List infoAndChecksForEachType( return r; } + public void testLoadLong() throws IOException { + testLoadLong(false, false); + } + + public void testLoadLongManySegments() throws IOException { + testLoadLong(false, true); + } + + public void testLoadLongShuffled() throws IOException { + testLoadLong(true, false); + } + + public void testLoadLongShuffledManySegments() throws IOException { + testLoadLong(true, true); + } + + private void testLoadLong(boolean shuffle, boolean manySegments) throws IOException { + int numDocs = between(10, 500); + initMapping(); + keyToTags.clear(); + reader = initIndexLongField(directory, numDocs, manySegments ? commitEvery(numDocs) : numDocs); + + DriverContext driverContext = driverContext(); + List input = CannedSourceOperator.collectPages(sourceOperator(driverContext, numDocs)); + assertThat(reader.leaves(), hasSize(manySegments ? greaterThan(5) : equalTo(1))); + assertThat(input, hasSize(reader.leaves().size())); + if (manySegments) { + input = List.of(CannedSourceOperator.mergePages(input)); + } + if (shuffle) { + input = input.stream().map(this::shuffle).toList(); + } + + Checks checks = new Checks(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); + + List cases = List.of( + new FieldCase( + mapperService.fieldType("long_source_text"), + ElementType.BYTES_REF, + checks::strings, + StatusChecks::longTextFromSource + ) + ); + // Build one operator for each field, so we get a unique map to assert on + List operators = cases.stream() + .map( + i -> new ValuesSourceReaderOperator.Factory( + List.of(i.info), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 + ).get(driverContext) + ) + .toList(); + drive(operators, input.iterator(), driverContext); + for (int i = 0; i < cases.size(); i++) { + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status(); + assertThat(status.pagesReceived(), equalTo(input.size())); + assertThat(status.pagesEmitted(), equalTo(input.size())); + } + } + record Checks(Block.MvOrdering booleanAndNumericalDocValuesMvOrdering, Block.MvOrdering bytesRefDocValuesMvOrdering) { void longs(Block block, int position, int key) { LongVector longs = ((LongBlock) block).asVector(); @@ -1076,6 +1191,10 @@ static void textFromSource(boolean forcedRowByRow, int pageCount, int segmentCou source("source_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers); } + static void longTextFromSource(boolean forcedRowByRow, int pageCount, int segmentCount, Map readers) { + source("long_source_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers); + } + static void textFromStored(boolean forcedRowByRow, int pageCount, int segmentCount, Map readers) { stored("stored_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers); } @@ -1482,13 +1601,13 @@ public void testNullsShared() { } public void testSequentialStoredFieldsTooSmall() throws IOException { - testSequentialStoredFields(false, between(1, ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY - 1)); + testSequentialStoredFields(false, between(1, ValuesFromSingleReader.SEQUENTIAL_BOUNDARY - 1)); } public void testSequentialStoredFieldsBigEnough() throws IOException { testSequentialStoredFields( true, - between(ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY, ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY * 2) + between(ValuesFromSingleReader.SEQUENTIAL_BOUNDARY, ValuesFromSingleReader.SEQUENTIAL_BOUNDARY * 2) ); } @@ -1519,7 +1638,7 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws int key = keys.getInt(p); checks.strings(results.get(0).getBlock(2), p, key); } - ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status(); + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status(); assertMap( status.readersBuilt(), matchesMap().entry("key:column_at_a_time:BlockDocValuesReader.SingletonInts", 1) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java index c8f8094f69c27..016997369d403 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java @@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.test.ESTestCase; @@ -166,7 +166,7 @@ protected DriverProfile mutateInstance(DriverProfile instance) throws IOExceptio @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry( - List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, ExchangeSinkOperator.Status.ENTRY) + List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperatorStatus.ENTRY, ExchangeSinkOperator.Status.ENTRY) ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java index df3583d0c99bd..c990fb99cd7dd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java @@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperatorStatusTests; import org.elasticsearch.test.AbstractWireSerializingTestCase; @@ -202,7 +202,7 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry( - List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, ExchangeSinkOperator.Status.ENTRY) + List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperatorStatus.ENTRY, ExchangeSinkOperator.Status.ENTRY) ); } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 6dca5cf4efaf5..68c606f2e3fa2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -943,7 +943,9 @@ private String checkOperatorProfile(Map o) { .entry("process_nanos", greaterThan(0)) .entry("processed_queries", List.of("*:*")) .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); - case "ValuesSourceReaderOperator" -> basicProfile().entry("values_loaded", greaterThanOrEqualTo(0)) + case "ValuesSourceReaderOperator" -> basicProfile().entry("pages_received", greaterThan(0)) + .entry("pages_emitted", greaterThan(0)) + .entry("values_loaded", greaterThanOrEqualTo(0)) .entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("rows_received", greaterThan(0)) @@ -954,7 +956,7 @@ private String checkOperatorProfile(Map o) { case "ExchangeSourceOperator" -> matchesMap().entry("pages_waiting", 0) .entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)); - case "ProjectOperator", "EvalOperator" -> basicProfile(); + case "ProjectOperator", "EvalOperator" -> basicProfile().entry("pages_processed", greaterThan(0)); case "LimitOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("limit", 1000) .entry("limit_remaining", 999) @@ -990,8 +992,7 @@ private String checkOperatorProfile(Map o) { } private MapMatcher basicProfile() { - return matchesMap().entry("pages_processed", greaterThan(0)) - .entry("process_nanos", greaterThan(0)) + return matchesMap().entry("process_nanos", greaterThan(0)) .entry("rows_received", greaterThan(0)) .entry("rows_emitted", greaterThan(0)); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index e98574926e586..efefde8871546 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -19,7 +19,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.OperatorStatus; @@ -129,12 +129,13 @@ public void testTaskContents() throws Exception { } if (o.operator().equals("ValuesSourceReaderOperator[fields = [pause_me]]")) { assertThat(description, equalTo("data")); - ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); + ValuesSourceReaderOperatorStatus oStatus = (ValuesSourceReaderOperatorStatus) o.status(); assertMap( oStatus.readersBuilt(), matchesMap().entry("pause_me:column_at_a_time:ScriptLongs", greaterThanOrEqualTo(1)) ); - assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); + assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(1)); + assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(1)); assertThat(oStatus.valuesLoaded(), greaterThanOrEqualTo(1L)); valuesSourceReaders++; continue; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 1355ffba796a8..1d63a2bcf5373 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -28,7 +28,7 @@ import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ShardContext; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.DriverRunner; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 8b1320a9a5b21..9661d571167ec 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -9,6 +9,7 @@ import org.elasticsearch.Build; import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction; import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; @@ -1016,7 +1017,7 @@ public enum Cap { FILTER_IN_CONVERTED_NULL, /** - * When creating constant null blocks in {@link org.elasticsearch.compute.lucene.ValuesSourceReaderOperator}, we also handed off + * When creating constant null blocks in {@link ValuesSourceReaderOperator}, we also handed off * the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases * in some union type queries. C.f. https://github.com/elastic/elasticsearch/issues/125850 */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 93c30470c316c..4afb1418b2585 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -402,15 +402,17 @@ public static Profile readFrom(StreamInput in) throws IOException { return new Profile( in.readCollectionAsImmutableList(DriverProfile::readFrom), in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN) - ? in.readCollectionAsImmutableList(PlanProfile::readFrom) - : List.of() + || in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19) + ? in.readCollectionAsImmutableList(PlanProfile::readFrom) + : List.of() ); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(drivers); - if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) { + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN) + || out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)) { out.writeCollection(plans); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 1c21f92053603..661b6d08d1455 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -35,7 +35,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.OrdinalBytesRefBlock; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; @@ -347,8 +347,14 @@ private void doLookup(T request, CancellableTask task, ActionListener warnings ); releasables.add(queryOperator); - var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); - releasables.add(extractFieldsOperator); + + List operators = new ArrayList<>(); + if (request.extractFields.isEmpty() == false) { + var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); + releasables.add(extractFieldsOperator); + operators.add(extractFieldsOperator); + } + operators.add(finishPages); /* * Collect all result Pages in a synchronizedList mostly out of paranoia. We'll @@ -370,7 +376,7 @@ private void doLookup(T request, CancellableTask task, ActionListener driverContext, request::toString, queryOperator, - List.of(extractFieldsOperator, finishPages), + operators, outputOperator, Driver.DEFAULT_STATUS_INTERVAL, Releasables.wrap(shardContext.release, localBreaker) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index acf685f3dcd9c..3f403d3e4fcd2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -28,9 +28,9 @@ import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; -import org.elasticsearch.compute.lucene.TimeSeriesExtractFieldOperator; import org.elasticsearch.compute.lucene.TimeSeriesSourceOperatorFactory; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.TimeSeriesExtractFieldOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OrdinalsGroupingOperator; import org.elasticsearch.compute.operator.SourceOperator; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 293a7be6be041..2c851431b59c6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -24,7 +24,7 @@ import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; import org.elasticsearch.compute.operator.AggregationOperator; @@ -320,7 +320,7 @@ public List getNamedWriteables() { entries.add(TimeSeriesSourceOperator.Status.ENTRY); entries.add(TopNOperatorStatus.ENTRY); entries.add(MvExpandOperator.Status.ENTRY); - entries.add(ValuesSourceReaderOperator.Status.ENTRY); + entries.add(ValuesSourceReaderOperatorStatus.ENTRY); entries.add(SingleValueQuery.ENTRY); entries.add(AsyncOperator.Status.ENTRY); entries.add(EnrichLookupOperator.Status.ENTRY); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 9bc2118c0451f..6749f03bedde7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; +import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.test.NoOpReleasable; import org.elasticsearch.compute.test.TestBlockFactory;