From aa331a588e5c64bbba2896ffbd593a023737bac1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 27 Apr 2025 15:56:22 -0700 Subject: [PATCH 1/7] Push down field extraction to time-series source --- .../compute/lucene/ShardContext.java | 14 + ...TimeSeriesSortedSourceOperatorFactory.java | 347 ++++++++++++++---- .../lucene/ValuesSourceReaderOperator.java | 55 +-- .../lucene/LuceneSourceOperatorTests.java | 16 + .../TimeSeriesSortedSourceOperatorTests.java | 48 ++- .../ValuesSourceReaderOperatorTests.java | 2 +- .../optimizer/LocalPhysicalPlanOptimizer.java | 4 +- ...DownFieldExtractionToTimeSeriesSource.java | 105 ++++++ ...PushFieldExtractionToTimeSeriesSource.java | 37 -- .../plan/physical/TimeSeriesSourceExec.java | 116 +++++- .../planner/EsPhysicalOperationProviders.java | 35 +- .../LocalPhysicalPlanOptimizerTests.java | 37 ++ 12 files changed, 631 insertions(+), 185 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java index 5bf6ac8532f48..ea7dc53203bb7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.lucene; import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; @@ -39,4 +43,14 @@ public interface ShardContext { * {@code _cat/shards}. */ String shardIdentifier(); + + /** + * Build something to load source {@code _source}. + */ + SourceLoader newSourceLoader(); + + /** + * Returns something to load values from this field into a {@link Block}. + */ + BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index d7b018b353aa2..e8d3008686ef8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.lucene; import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedDocValues; @@ -18,6 +19,7 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.DocVector; @@ -29,6 +31,11 @@ import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; import java.io.IOException; import java.io.UncheckedIOException; @@ -51,9 +58,13 @@ public class TimeSeriesSortedSourceOperatorFactory extends LuceneOperator.Factory { private final int maxPageSize; + private final boolean emitDocIds; + private final List fieldsToExact; private TimeSeriesSortedSourceOperatorFactory( List contexts, + boolean emitDocIds, + List fieldsToExact, Function queryFunction, int taskConcurrency, int maxPageSize, @@ -70,11 +81,13 @@ private TimeSeriesSortedSourceOperatorFactory( ScoreMode.COMPLETE_NO_SCORES ); this.maxPageSize = maxPageSize; + this.emitDocIds = emitDocIds; + this.fieldsToExact = fieldsToExact; } @Override public SourceOperator get(DriverContext driverContext) { - return new Impl(driverContext.blockFactory(), sliceQueue, maxPageSize, limit); + return new Impl(driverContext.blockFactory(), emitDocIds, fieldsToExact, sliceQueue, maxPageSize, limit); } @Override @@ -86,32 +99,52 @@ public static TimeSeriesSortedSourceOperatorFactory create( int limit, int maxPageSize, int taskConcurrency, - List searchContexts, + boolean emitDocIds, + List contexts, + List fieldsToExact, Function queryFunction ) { - return new TimeSeriesSortedSourceOperatorFactory(searchContexts, queryFunction, taskConcurrency, maxPageSize, limit); + return new TimeSeriesSortedSourceOperatorFactory( + contexts, + emitDocIds, + fieldsToExact, + queryFunction, + taskConcurrency, + maxPageSize, + limit + ); } static final class Impl extends SourceOperator { + private final boolean emitDocIds; private final int maxPageSize; private final BlockFactory blockFactory; private final LuceneSliceQueue sliceQueue; private int currentPagePos = 0; private int remainingDocs; private boolean doneCollecting; - private IntVector.Builder docsBuilder; - private IntVector.Builder segmentsBuilder; + private LongVector.Builder timestampsBuilder; private TsidBuilder tsHashesBuilder; private SegmentsIterator iterator; - - Impl(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int maxPageSize, int limit) { + private final List fieldsToExact; + private ShardLevelFieldsReader fieldsReader; + private DocIdCollector docCollector; + + Impl( + BlockFactory blockFactory, + boolean emitDocIds, + List fieldsToExtract, + LuceneSliceQueue sliceQueue, + int maxPageSize, + int limit + ) { this.maxPageSize = maxPageSize; this.blockFactory = blockFactory; + this.fieldsToExact = fieldsToExtract; + this.emitDocIds = emitDocIds; this.remainingDocs = limit; - this.docsBuilder = blockFactory.newIntVectorBuilder(Math.min(limit, maxPageSize)); - this.segmentsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize)); this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize)); this.tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(limit, maxPageSize)); this.sliceQueue = sliceQueue; @@ -139,11 +172,7 @@ public Page getOutput() { } Page page = null; - IntVector shards = null; - IntVector segments = null; - IntVector docs = null; - LongVector timestamps = null; - BytesRefVector tsids = null; + Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExact.size()]; try { if (iterator == null) { var slice = sliceQueue.nextSlice(); @@ -151,86 +180,53 @@ public Page getOutput() { doneCollecting = true; return null; } + Releasables.close(fieldsReader); + fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExact); iterator = new SegmentsIterator(slice); + if (emitDocIds) { + docCollector = new DocIdCollector(blockFactory, slice.shardContext()); + } } + if (docCollector != null) { + docCollector.prepareForCollecting(Math.min(remainingDocs, maxPageSize)); + } + fieldsReader.prepareForReading(Math.min(remainingDocs, maxPageSize)); iterator.readDocsForNextPage(); if (currentPagePos > 0) { - shards = blockFactory.newConstantIntVector(iterator.luceneSlice.shardContext().index(), currentPagePos); - docs = docsBuilder.build(); - docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize)); - timestamps = timestampsBuilder.build(); - timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize)); - tsids = tsHashesBuilder.build(); + int blockIndex = 0; + if (emitDocIds) { + blocks[blockIndex++] = docCollector.build().asBlock(); + } + blocks[blockIndex++] = tsHashesBuilder.build().asBlock(); tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize)); - segments = segmentsBuilder.build(); - segmentsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize)); - page = new Page( - currentPagePos, - buildDocVector(shards, segments, docs, iterator.docPerSegments).asBlock(), - tsids.asBlock(), - timestamps.asBlock() - ); + blocks[blockIndex++] = timestampsBuilder.build().asBlock(); + timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize)); + System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExact.size()); + page = new Page(currentPagePos, blocks); currentPagePos = 0; } if (iterator.completed()) { + Releasables.close(docCollector, fieldsReader); iterator = null; } } catch (IOException e) { throw new UncheckedIOException(e); } finally { if (page == null) { - Releasables.closeExpectNoException(shards, segments, docs, timestamps, tsids); + Releasables.closeExpectNoException(blocks); } } return page; } - private DocVector buildDocVector(IntVector shards, IntVector segments, IntVector docs, int[] docPerSegments) { - if (segments.isConstant()) { - // DocIds are sorted in each segment. Hence, if docIds come from a single segment, we can mark this DocVector - // as singleSegmentNonDecreasing to enable optimizations in the ValuesSourceReaderOperator. - return new DocVector(shards, segments, docs, true); - } - boolean success = false; - int positionCount = shards.getPositionCount(); - long estimatedSize = DocVector.sizeOfSegmentDocMap(positionCount); - blockFactory.adjustBreaker(estimatedSize); - // Use docPerSegments to build a forward/backward docMap in O(N) - // instead of O(N*log(N)) in DocVector#buildShardSegmentDocMapIfMissing. - try { - final int[] forwards = new int[positionCount]; - final int[] starts = new int[docPerSegments.length]; - for (int i = 1; i < starts.length; i++) { - starts[i] = starts[i - 1] + docPerSegments[i - 1]; - } - for (int i = 0; i < segments.getPositionCount(); i++) { - final int segment = segments.getInt(i); - assert forwards[starts[segment]] == 0 : "must not set"; - forwards[starts[segment]++] = i; - } - final int[] backwards = new int[forwards.length]; - for (int p = 0; p < forwards.length; p++) { - backwards[forwards[p]] = p; - } - final DocVector docVector = new DocVector(shards, segments, docs, forwards, backwards); - success = true; - return docVector; - } finally { - if (success == false) { - blockFactory.adjustBreaker(-estimatedSize); - } - } - } - @Override public void close() { - Releasables.closeExpectNoException(docsBuilder, segmentsBuilder, timestampsBuilder, tsHashesBuilder); + Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector, fieldsReader); } class SegmentsIterator { private final PriorityQueue mainQueue; private final PriorityQueue oneTsidQueue; - final int[] docPerSegments; final LuceneSlice luceneSlice; SegmentsIterator(LuceneSlice luceneSlice) throws IOException { @@ -257,12 +253,10 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) { return a.timestamp > b.timestamp; } }; - this.docPerSegments = new int[maxSegmentOrd + 1]; } // TODO: add optimize for one leaf? void readDocsForNextPage() throws IOException { - Arrays.fill(docPerSegments, 0); Thread executingThread = Thread.currentThread(); for (LeafIterator leaf : mainQueue) { leaf.reinitializeIfNeeded(executingThread); @@ -287,11 +281,12 @@ private boolean readValuesForOneTsid(PriorityQueue sub) throws IOE LeafIterator top = sub.top(); currentPagePos++; remainingDocs--; - segmentsBuilder.appendInt(top.segmentOrd); - docPerSegments[top.segmentOrd]++; - docsBuilder.appendInt(top.docID); + if (docCollector != null) { + docCollector.collect(top.segmentOrd, top.docID); + } tsHashesBuilder.appendOrdinal(); timestampsBuilder.appendLong(top.timestamp); + fieldsReader.readValues(top.segmentOrd, top.docID); if (top.nextDoc()) { sub.updateTop(); } else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) { @@ -394,6 +389,123 @@ public String toString() { } + static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory { + BlockLoaderFactory(BlockFactory factory) { + super(factory); + } + + @Override + public BlockLoader.Block constantNulls() { + throw new UnsupportedOperationException("must not be used by column readers"); + } + + @Override + public BlockLoader.Block constantBytes(BytesRef value) { + throw new UnsupportedOperationException("must not be used by column readers"); + } + + @Override + public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { + throw new UnsupportedOperationException("must not be used by column readers"); + } + } + + static final class ShardLevelFieldsReader implements Releasable { + private final BlockLoaderFactory blockFactory; + private final SegmentLevelFieldsReader[] segments; + private final BlockLoader[] loaders; + private final Block.Builder[] builders; + private final StoredFieldsSpec storedFieldsSpec; + private final SourceLoader sourceLoader; + + ShardLevelFieldsReader(BlockFactory blockFactory, ShardContext shardContext, List fields) { + this.blockFactory = new BlockLoaderFactory(blockFactory); + final IndexReader indexReader = shardContext.searcher().getIndexReader(); + this.segments = new SegmentLevelFieldsReader[indexReader.leaves().size()]; + this.loaders = new BlockLoader[fields.size()]; + this.builders = new Block.Builder[loaders.length]; + StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS; + for (int i = 0; i < fields.size(); i++) { + BlockLoader loader = fields.get(i).blockLoader().apply(shardContext.index()); + storedFieldsSpec = storedFieldsSpec.merge(loader.rowStrideStoredFieldSpec()); + loaders[i] = loader; + } + for (int i = 0; i < indexReader.leaves().size(); i++) { + LeafReaderContext leafReaderContext = indexReader.leaves().get(i); + segments[i] = new SegmentLevelFieldsReader(leafReaderContext, loaders); + } + if (storedFieldsSpec.requiresSource()) { + sourceLoader = shardContext.newSourceLoader(); + } else { + sourceLoader = null; + } + this.storedFieldsSpec = storedFieldsSpec; + } + + void readValues(int segment, int docID) throws IOException { + segments[segment].read(docID, builders); + } + + void prepareForReading(int estimatedSize) throws IOException { + if (this.builders.length > 0 && this.builders[0] == null) { + for (int f = 0; f < builders.length; f++) { + builders[f] = (Block.Builder) loaders[f].builder(blockFactory, estimatedSize); + } + } + for (SegmentLevelFieldsReader segment : segments) { + if (segment != null) { + segment.reinitializeIfNeeded(sourceLoader, storedFieldsSpec); + } + } + } + + Block[] buildBlocks() { + Block[] blocks = Block.Builder.buildAll(builders); + Arrays.fill(builders, null); + return blocks; + } + + @Override + public void close() { + Releasables.close(builders); + } + } + + static final class SegmentLevelFieldsReader { + private final BlockLoader.RowStrideReader[] rowStride; + private final BlockLoader[] loaders; + private final LeafReaderContext leafContext; + private BlockLoaderStoredFieldsFromLeafLoader storedFields; + private Thread loadedThread = null; + + SegmentLevelFieldsReader(LeafReaderContext leafContext, BlockLoader[] loaders) { + this.leafContext = leafContext; + this.loaders = loaders; + this.rowStride = new BlockLoader.RowStrideReader[loaders.length]; + } + + private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec storedFieldsSpec) throws IOException { + final Thread currentThread = Thread.currentThread(); + if (loadedThread != currentThread) { + loadedThread = currentThread; + for (int f = 0; f < loaders.length; f++) { + rowStride[f] = loaders[f].rowStrideReader(leafContext); + } + storedFields = new BlockLoaderStoredFieldsFromLeafLoader( + StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(leafContext, null), + sourceLoader != null ? sourceLoader.leaf(leafContext.reader(), null) : null + ); + } + } + + void read(int docId, Block.Builder[] builder) throws IOException { + storedFields.advanceTo(docId); + for (int i = 0; i < rowStride.length; i++) { + rowStride[i].read(docId, storedFields, builder[i]); + } + } + } + /** * Collect tsids then build a {@link OrdinalBytesRefVector} */ @@ -447,4 +559,93 @@ BytesRefVector build() throws IOException { return result; } } + + static final class DocIdCollector implements Releasable { + private final BlockFactory blockFactory; + private final ShardContext shardContext; + private IntVector.Builder docsBuilder; + private IntVector.Builder segmentsBuilder; + private final int[] docPerSegments; + + DocIdCollector(BlockFactory blockFactory, ShardContext shardContext) { + this.blockFactory = blockFactory; + this.shardContext = shardContext; + docPerSegments = new int[shardContext.searcher().getIndexReader().leaves().size()]; + } + + void prepareForCollecting(int estimatedSize) { + assert docsBuilder == null; + docsBuilder = blockFactory.newIntVectorBuilder(estimatedSize); + segmentsBuilder = blockFactory.newIntVectorBuilder(estimatedSize); + Arrays.fill(docPerSegments, 0); + } + + void collect(int segment, int docId) { + docsBuilder.appendInt(docId); + segmentsBuilder.appendInt(segment); + docPerSegments[segment]++; + } + + DocVector build() { + IntVector shards = null; + IntVector segments = null; + IntVector docs = null; + DocVector docVector = null; + try { + docs = docsBuilder.build(); + docsBuilder = null; + segments = segmentsBuilder.build(); + segmentsBuilder = null; + shards = blockFactory.newConstantIntVector(shardContext.index(), docs.getPositionCount()); + docVector = buildDocVector(shards, segments, docs, docPerSegments); + return docVector; + } finally { + if (docVector == null) { + Releasables.close(docs, segments, shards); + } + } + } + + private DocVector buildDocVector(IntVector shards, IntVector segments, IntVector docs, int[] docPerSegments) { + if (segments.isConstant()) { + // DocIds are sorted in each segment. Hence, if docIds come from a single segment, we can mark this DocVector + // as singleSegmentNonDecreasing to enable optimizations in the ValuesSourceReaderOperator. + return new DocVector(shards, segments, docs, true); + } + boolean success = false; + int positionCount = shards.getPositionCount(); + long estimatedSize = DocVector.sizeOfSegmentDocMap(positionCount); + blockFactory.adjustBreaker(estimatedSize); + // Use docPerSegments to build a forward/backward docMap in O(N) + // instead of O(N*log(N)) in DocVector#buildShardSegmentDocMapIfMissing. + try { + final int[] forwards = new int[positionCount]; + final int[] starts = new int[docPerSegments.length]; + for (int i = 1; i < starts.length; i++) { + starts[i] = starts[i - 1] + docPerSegments[i - 1]; + } + for (int i = 0; i < segments.getPositionCount(); i++) { + final int segment = segments.getInt(i); + assert forwards[starts[segment]] == 0 : "must not set"; + forwards[starts[segment]++] = i; + } + final int[] backwards = new int[forwards.length]; + for (int p = 0; p < forwards.length; p++) { + backwards[forwards[p]] = p; + } + final DocVector docVector = new DocVector(shards, segments, docs, forwards, backwards); + success = true; + return docVector; + } finally { + if (success == false) { + blockFactory.adjustBreaker(-estimatedSize); + } + } + } + + @Override + public void close() { + Releasables.close(docsBuilder, segmentsBuilder); + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 8d2465d4664f8..cae85ebd7ba6e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -639,16 +639,44 @@ public String toString() { } } - private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable { - private final BlockFactory factory; + private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable { private final int pageSize; private Block nullBlock; private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) { - this.factory = factory; + super(factory); this.pageSize = pageSize; } + @Override + public Block constantNulls() { + if (nullBlock == null) { + nullBlock = factory.newConstantNullBlock(pageSize); + } + nullBlock.incRef(); + return nullBlock; + } + + @Override + public void close() { + if (nullBlock != null) { + nullBlock.close(); + } + } + + @Override + public BytesRefBlock constantBytes(BytesRef value) { + return factory.newConstantBytesRefBlockWith(value, pageSize); + } + } + + public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory { + protected final BlockFactory factory; + + protected DelegatingBlockLoaderFactory(BlockFactory factory) { + this.factory = factory; + } + @Override public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING); @@ -704,27 +732,6 @@ public BlockLoader.Builder nulls(int expectedCount) { return ElementType.NULL.newBlockBuilder(expectedCount, factory); } - @Override - public Block constantNulls() { - if (nullBlock == null) { - nullBlock = factory.newConstantNullBlock(pageSize); - } - nullBlock.incRef(); - return nullBlock; - } - - @Override - public void close() { - if (nullBlock != null) { - nullBlock.close(); - } - } - - @Override - public BytesRefBlock constantBytes(BytesRef value) { - return factory.newConstantBytesRefBlockWith(value, pageSize); - } - @Override public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { return new SingletonOrdinalsBuilder(factory, ordinals, count); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index a079d0519f110..6aacf4a6c3bf1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -35,8 +35,10 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy; +import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.indices.CrankyCircuitBreakerService; import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.sort.SortAndFormats; @@ -299,6 +301,20 @@ public IndexSearcher searcher() { return searcher; } + @Override + public SourceLoader newSourceLoader() { + return SourceLoader.FROM_STORED_SOURCE; + } + + @Override + public BlockLoader blockLoader( + String name, + boolean asUnsupportedSource, + MappedFieldType.FieldExtractPreference fieldExtractPreference + ) { + throw new UnsupportedOperationException(); + } + @Override public Optional buildSort(List> sorts) { return Optional.empty(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java index ce0f5c1ccf2ce..f6e9481c26af5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.RoutingPathFields; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; @@ -81,7 +82,7 @@ public void testSimple() { int numSamplesPerTS = 10; long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z"); int maxPageSize = between(1, 1024); - List results = runDriver(1024, maxPageSize, randomBoolean(), numTimeSeries, numSamplesPerTS, timestampStart); + List results = runDriver(1024, maxPageSize, true, numTimeSeries, numSamplesPerTS, timestampStart); // for now we emit at most one time series each page int offset = 0; for (Page page : results) { @@ -155,9 +156,12 @@ record Doc(int host, long timestamp, long metric) {} } int maxPageSize = between(1, 1024); int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE; + var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG); var timeSeriesFactory = createTimeSeriesSourceOperator( directory, r -> this.reader = r, + true, + List.of(new ExtractField(metricField, ElementType.LONG)), limit, maxPageSize, randomBoolean(), @@ -171,12 +175,11 @@ record Doc(int host, long timestamp, long metric) {} ); DriverContext driverContext = driverContext(); List results = new ArrayList<>(); - var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG); OperatorTestCase.runDriver( TestDriverFactory.create( driverContext, timeSeriesFactory.get(driverContext), - List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)), + List.of(), new TestResultPageSinkOperator(results::add) ) ); @@ -240,7 +243,9 @@ public void testMatchNone() throws Exception { Integer.MAX_VALUE, randomIntBetween(1, 1024), 1, + randomBoolean(), List.of(ctx), + List.of(), unused -> query ); var driverContext = driverContext(); @@ -260,7 +265,7 @@ public void testMatchNone() throws Exception { @Override protected Operator.OperatorFactory simple() { - return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, writer -> { + return createTimeSeriesSourceOperator(directory, r -> this.reader = r, randomBoolean(), List.of(), 1, 1, false, writer -> { long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z"); writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 }); return 1; @@ -279,9 +284,13 @@ protected Matcher expectedToStringOfSimple() { List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) { var ctx = driverContext(); + var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG); + var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname"); var timeSeriesFactory = createTimeSeriesSourceOperator( directory, indexReader -> this.reader = indexReader, + true, + List.of(new ExtractField(voltageField, ElementType.LONG), new ExtractField(hostnameField, ElementType.BYTES_REF)), limit, maxPageSize, forceMerge, @@ -300,18 +309,8 @@ List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime ); List results = new ArrayList<>(); - var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG); - var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname"); OperatorTestCase.runDriver( - TestDriverFactory.create( - ctx, - timeSeriesFactory.get(ctx), - List.of( - ValuesSourceReaderOperatorTests.factory(reader, voltageField, ElementType.LONG).get(ctx), - ValuesSourceReaderOperatorTests.factory(reader, hostnameField, ElementType.BYTES_REF).get(ctx) - ), - new TestResultPageSinkOperator(results::add) - ) + TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add)) ); OperatorTestCase.assertDriverContext(ctx); for (Page result : results) { @@ -321,9 +320,15 @@ List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime return results; } + public record ExtractField(MappedFieldType ft, ElementType elementType) { + + } + public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator( Directory directory, Consumer readerConsumer, + boolean emitDocIds, + List extractFields, int limit, int maxPageSize, boolean forceMerge, @@ -354,7 +359,18 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat } var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0); Function queryFunction = c -> new MatchAllDocsQuery(); - return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction); + + var fieldInfos = extractFields.stream() + .map( + f -> new ValuesSourceReaderOperator.FieldInfo( + f.ft.name(), + f.elementType, + n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext()) + ) + ) + .toList(); + + return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction); } public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 564d7ccc64854..2b97f4fefe5ea 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -495,7 +495,7 @@ private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft }); } - private static MappedFieldType.BlockLoaderContext blContext() { + public static MappedFieldType.BlockLoaderContext blContext() { return new MappedFieldType.BlockLoaderContext() { @Override public String indexName() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index c837f7d041bba..eb6ca71a2678e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -11,7 +11,7 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction; -import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushDownFieldExtractionToTimeSeriesSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource; @@ -81,7 +81,7 @@ protected static List> rules(boolean optimizeForEsSource) { new InsertFieldExtraction(), new SpatialDocValuesExtraction(), new SpatialShapeBoundsExtraction(), - new PushFieldExtractionToTimeSeriesSource() + new PushDownFieldExtractionToTimeSeriesSource() ); return List.of(pushdown, fieldExtraction); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java new file mode 100644 index 0000000000000..f16eb115b6a19 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.LimitExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * A rule that pushes down field extractions to occur before filter/limit/topN in the time-series source plan. + */ +public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule< + PhysicalPlan, + LocalPhysicalOptimizerContext> { + + @Override + public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) { + if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) { + return plan; + } + final List pushDownExtracts = new ArrayList<>(); + final Holder keepDocIds = new Holder<>(Boolean.FALSE); + plan.forEachDown(p -> { + if (p instanceof FieldExtractExec) { + pushDownExtracts.add((FieldExtractExec) p); + } else if (stopPushDownExtract(p)) { + if (pushDownExtracts.isEmpty() == false) { + keepDocIds.set(Boolean.TRUE); + pushDownExtracts.clear(); + } + } + }); + final Holder aborted = new Holder<>(Boolean.FALSE); + return plan.transformUp(PhysicalPlan.class, p -> { + if (aborted.get()) { + return p; + } + if (p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) { + return addFieldExtract(context, q, keepDocIds.get(), pushDownExtracts); + } + if (stopPushDownExtract(p)) { + aborted.set(Boolean.TRUE); + return p; + } + if (p instanceof FieldExtractExec e) { + return e.child(); + } + return p; + }); + } + + private static boolean stopPushDownExtract(PhysicalPlan p) { + return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec; + } + + private TimeSeriesSourceExec addFieldExtract( + LocalPhysicalOptimizerContext context, + EsQueryExec query, + boolean keepDocAttribute, + List extracts + ) { + Set docValuesAttributes = new HashSet<>(); + Set boundsAttributes = new HashSet<>(); + List attributesToExtract = new ArrayList<>(); + for (FieldExtractExec extract : extracts) { + docValuesAttributes.addAll(extract.docValuesAttributes()); + boundsAttributes.addAll(extract.boundsAttributes()); + attributesToExtract.addAll(extract.attributesToExtract()); + } + List attrs = query.attrs(); + if (keepDocAttribute == false) { + attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList(); + } + return new TimeSeriesSourceExec( + query.source(), + attrs, + query.query(), + query.limit(), + context.configuration().pragmas().fieldExtractPreference(), + docValuesAttributes, + boundsAttributes, + attributesToExtract, + query.estimatedRowSize() + ); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java deleted file mode 100644 index 544c5333e8c29..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; - -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; -import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules; -import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; -import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; - -/** - * A rule that moves field extractions to occur before the time-series aggregation in the time-series source plan. - */ -public class PushFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule< - EsQueryExec, - LocalPhysicalOptimizerContext> { - - public PushFieldExtractionToTimeSeriesSource() { - super(OptimizerRules.TransformDirection.UP); - } - - @Override - public PhysicalPlan rule(EsQueryExec plan, LocalPhysicalOptimizerContext context) { - if (plan.indexMode() == IndexMode.TIME_SERIES) { - return new TimeSeriesSourceExec(plan.source(), plan.output(), plan.query(), plan.limit(), plan.estimatedRowSize()); - } else { - return plan; - } - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java index b83c3c7ab2ed4..12d2d9eebeb2e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java @@ -9,16 +9,20 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.NodeUtils; import org.elasticsearch.xpack.esql.core.tree.Source; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Similar to {@link EsQueryExec}, but this is a physical plan specifically for time series indices. @@ -26,22 +30,41 @@ * and avoiding the cost of sorting and rebuilding blocks. */ public class TimeSeriesSourceExec extends LeafExec implements EstimatesRowSize { + + private final MappedFieldType.FieldExtractPreference defaultPreference; + private final Set docValuesAttributes; + private final Set boundsAttributes; + private final List attributesToExtract; + private final List attrs; private final QueryBuilder query; private final Expression limit; - - /** - * Estimate of the number of bytes that'll be loaded per position before - * the stream of pages is consumed. - */ private final Integer estimatedRowSize; - - public TimeSeriesSourceExec(Source source, List attrs, QueryBuilder query, Expression limit, Integer estimatedRowSize) { + private List lazyOutput; + + public TimeSeriesSourceExec( + Source source, + List attrs, + QueryBuilder query, + Expression limit, + MappedFieldType.FieldExtractPreference defaultPreference, + Set docValuesAttributes, + Set boundsAttributes, + List attributesToExtract, + Integer estimatedRowSize + ) { super(source); this.attrs = attrs; this.query = query; this.limit = limit; + this.defaultPreference = defaultPreference; + this.docValuesAttributes = docValuesAttributes; + this.boundsAttributes = boundsAttributes; + this.attributesToExtract = attributesToExtract; this.estimatedRowSize = estimatedRowSize; + if (this.attributesToExtract.isEmpty()) { + lazyOutput = attrs; + } } @Override @@ -56,46 +79,101 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, TimeSeriesSourceExec::new, attrs, query, limit, estimatedRowSize); + return NodeInfo.create( + this, + TimeSeriesSourceExec::new, + attrs, + query, + limit, + defaultPreference, + docValuesAttributes, + boundsAttributes, + attributesToExtract, + estimatedRowSize + ); } public QueryBuilder query() { return query; } + public List attrs() { + return attrs; + } + @Override public List output() { - return attrs; + if (lazyOutput == null) { + lazyOutput = new ArrayList<>(attrs.size() + attributesToExtract.size()); + lazyOutput.addAll(attrs); + lazyOutput.addAll(attributesToExtract); + } + return lazyOutput; } - public List attrs() { - return attrs; + @Override + protected AttributeSet computeReferences() { + return super.computeReferences(); } public Expression limit() { return limit; } - /** - * Estimate of the number of bytes that'll be loaded per position before - * the stream of pages is consumed. - */ public Integer estimatedRowSize() { return estimatedRowSize; } + public List attributesToExtract() { + return attributesToExtract; + } + + public MappedFieldType.FieldExtractPreference fieldExtractPreference(Attribute attr) { + if (boundsAttributes.contains(attr)) { + return MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS; + } + if (docValuesAttributes.contains(attr)) { + return MappedFieldType.FieldExtractPreference.DOC_VALUES; + } + return defaultPreference; + } + @Override public PhysicalPlan estimateRowSize(State state) { state.add(false, Integer.BYTES * 2); state.add(false, 22); // tsid state.add(false, 8); // timestamp + state.add(false, attributesToExtract); int size = state.consumeAllFields(false); - return Objects.equals(this.estimatedRowSize, size) ? this : new TimeSeriesSourceExec(source(), attrs, query, limit, size); + if (Objects.equals(this.estimatedRowSize, size)) { + return this; + } else { + return new TimeSeriesSourceExec( + source(), + attrs, + query, + limit, + defaultPreference, + docValuesAttributes, + boundsAttributes, + attributesToExtract, + size + ); + } } @Override public int hashCode() { - return Objects.hash(attrs, query, limit, estimatedRowSize); + return Objects.hash( + attrs, + query, + defaultPreference, + docValuesAttributes, + boundsAttributes, + attributesToExtract, + limit, + estimatedRowSize + ); } @Override @@ -110,6 +188,10 @@ public boolean equals(Object obj) { TimeSeriesSourceExec other = (TimeSeriesSourceExec) obj; return Objects.equals(attrs, other.attrs) + && Objects.equals(defaultPreference, other.defaultPreference) + && Objects.equals(docValuesAttributes, other.docValuesAttributes) + && Objects.equals(boundsAttributes, other.boundsAttributes) + && Objects.equals(attributesToExtract, other.attributesToExtract) && Objects.equals(query, other.query) && Objects.equals(limit, other.limit) && Objects.equals(estimatedRowSize, other.estimatedRowSize); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index ed5a499519f9e..d60e402153578 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -86,20 +86,11 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi * Context of each shard we're operating against. */ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardContext { - /** - * Build something to load source {@code _source}. - */ - SourceLoader newSourceLoader(); /** * Convert a {@link QueryBuilder} into a real {@link Query lucene query}. */ Query toQuery(QueryBuilder queryBuilder); - - /** - * Returns something to load values from this field into a {@link Block}. - */ - BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference); } private final List shardContexts; @@ -123,16 +114,11 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi List readers = shardContexts.stream() .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader)) .toList(); - List fields = new ArrayList<>(); int docChannel = source.layout.get(sourceAttr.id()).channel(); for (Attribute attr : fieldExtractExec.attributesToExtract()) { layout.append(attr); - DataType dataType = attr.dataType(); - MappedFieldType.FieldExtractPreference fieldExtractPreference = fieldExtractExec.fieldExtractPreference(attr); - ElementType elementType = PlannerUtils.toElementType(dataType, fieldExtractPreference); - IntFunction loader = s -> getBlockLoaderFor(s, attr, fieldExtractPreference); - fields.add(new ValuesSourceReaderOperator.FieldInfo(getFieldName(attr), elementType, loader)); } + var fields = extractFields(fieldExtractExec.attributesToExtract(), fieldExtractExec::fieldExtractPreference); return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build()); } @@ -239,12 +225,16 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, @Override public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) { + final int limit = ts.limit() != null ? (Integer) ts.limit().fold(context.foldCtx()) : NO_LIMIT; + final boolean emitDocIds = ts.attrs().stream().anyMatch(EsQueryExec::isSourceAttribute); LuceneOperator.Factory luceneFactory = TimeSeriesSortedSourceOperatorFactory.create( limit, context.pageSize(ts.estimatedRowSize()), context.queryPragmas().taskConcurrency(), + emitDocIds, shardContexts, + extractFields(ts.attributesToExtract(), ts::fieldExtractPreference), querySupplier(ts.query()) ); Layout.Builder layout = new Layout.Builder(); @@ -254,6 +244,21 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca return PhysicalOperation.fromSource(luceneFactory, layout.build()); } + private List extractFields( + List attributes, + Function preference + ) { + List fieldInfos = new ArrayList<>(attributes.size()); + for (Attribute attr : attributes) { + DataType dataType = attr.dataType(); + var fieldExtractPreference = preference.apply(attr); + ElementType elementType = PlannerUtils.toElementType(dataType, fieldExtractPreference); + IntFunction loader = s -> getBlockLoaderFor(s, attr, fieldExtractPreference); + fieldInfos.add(new ValuesSourceReaderOperator.FieldInfo(getFieldName(attr), elementType, loader)); + } + return fieldInfos; + } + /** * Build a {@link SourceOperator.SourceOperatorFactory} that counts documents in the search index. */ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index 9c54a233e016f..e540649e8c602 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.Build; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Fuzziness; @@ -39,6 +40,7 @@ import org.elasticsearch.xpack.esql.analysis.Verifier; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -69,6 +71,8 @@ import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.planner.FilterTests; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -100,6 +104,7 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termsQuery; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; @@ -136,6 +141,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase { public static final String MATCH_FUNCTION_QUERY = "from test | where match(%s, %s)"; private TestPlannerOptimizer plannerOptimizer; + private Analyzer timeSeriesAnalyzer; private final Configuration config; private final SearchStats IS_SV_STATS = new TestSearchStats() { @Override @@ -178,6 +184,18 @@ public void init() { ) ); plannerOptimizer = new TestPlannerOptimizer(config, makeAnalyzer("mapping-basic.json", enrichResolution)); + var timeSeriesMapping = loadMapping("k8s-mappings.json"); + var timeSeriesIndex = IndexResolution.valid(new EsIndex("k8s", timeSeriesMapping, Map.of("k8s", IndexMode.TIME_SERIES))); + timeSeriesAnalyzer = new Analyzer( + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + timeSeriesIndex, + enrichResolution, + emptyInferenceResolution() + ), + TEST_VERIFIER + ); } private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichResolution) { @@ -1819,6 +1837,25 @@ public void testMatchFunctionWithPushableDisjunction() { assertThat(esQuery.query().toString(), equalTo(expected.toString())); } + public void testPushDownFieldExtractToTimeSeriesSource() { + assumeTrue("requires snapshot builds", Build.current().isSnapshot()); + var query = "TS k8s | STATS max(rate(network.total_bytes_in))"; + var optimizer = new TestPlannerOptimizer(config, timeSeriesAnalyzer); + PhysicalPlan plan = optimizer.plan(query); + var limit = as(plan, LimitExec.class); + var finalAgg = as(limit.child(), AggregateExec.class); + var partialAgg = as(finalAgg.child(), AggregateExec.class); + var timeSeriesFinalAgg = as(partialAgg.child(), TimeSeriesAggregateExec.class); + var exchange = as(timeSeriesFinalAgg.child(), ExchangeExec.class); + var timeSeriesPartialAgg = as(exchange.child(), TimeSeriesAggregateExec.class); + var timeSeriesSource = as(timeSeriesPartialAgg.child(), TimeSeriesSourceExec.class); + assertThat(timeSeriesSource.attributesToExtract(), hasSize(1)); + FieldAttribute field = as(timeSeriesSource.attributesToExtract().getFirst(), FieldAttribute.class); + assertThat(field.name(), equalTo("network.total_bytes_in")); + assertThat(timeSeriesSource.attrs(), hasSize(2)); + assertTrue(timeSeriesSource.attrs().stream().noneMatch(EsQueryExec::isSourceAttribute)); + } + private QueryBuilder wrapWithSingleQuery(String query, QueryBuilder inner, String fieldName, Source source) { return FilterTests.singleValueQuery(query, inner, fieldName, source); } From 7e510aed2db346c8a5642607509f071dff481f1f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Apr 2025 09:50:56 -0700 Subject: [PATCH 2/7] naming --- .../lucene/TimeSeriesSortedSourceOperatorFactory.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index e8d3008686ef8..f8ebc673a44a8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -128,7 +128,7 @@ static final class Impl extends SourceOperator { private LongVector.Builder timestampsBuilder; private TsidBuilder tsHashesBuilder; private SegmentsIterator iterator; - private final List fieldsToExact; + private final List fieldsToExtracts; private ShardLevelFieldsReader fieldsReader; private DocIdCollector docCollector; @@ -142,7 +142,7 @@ static final class Impl extends SourceOperator { ) { this.maxPageSize = maxPageSize; this.blockFactory = blockFactory; - this.fieldsToExact = fieldsToExtract; + this.fieldsToExtracts = fieldsToExtract; this.emitDocIds = emitDocIds; this.remainingDocs = limit; this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize)); @@ -172,7 +172,7 @@ public Page getOutput() { } Page page = null; - Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExact.size()]; + Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()]; try { if (iterator == null) { var slice = sliceQueue.nextSlice(); @@ -181,7 +181,7 @@ public Page getOutput() { return null; } Releasables.close(fieldsReader); - fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExact); + fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts); iterator = new SegmentsIterator(slice); if (emitDocIds) { docCollector = new DocIdCollector(blockFactory, slice.shardContext()); @@ -201,7 +201,7 @@ public Page getOutput() { tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize)); blocks[blockIndex++] = timestampsBuilder.build().asBlock(); timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize)); - System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExact.size()); + System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExtracts.size()); page = new Page(currentPagePos, blocks); currentPagePos = 0; } From 845c2d656cb1d075e022401c89140b3c84803efe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Apr 2025 10:46:21 -0700 Subject: [PATCH 3/7] consistency --- .../compute/lucene/TimeSeriesSortedSourceOperatorFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index f8ebc673a44a8..96f832946fa72 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -194,7 +194,7 @@ public Page getOutput() { iterator.readDocsForNextPage(); if (currentPagePos > 0) { int blockIndex = 0; - if (emitDocIds) { + if (docCollector != null) { blocks[blockIndex++] = docCollector.build().asBlock(); } blocks[blockIndex++] = tsHashesBuilder.build().asBlock(); From 77af75215b9315500c0b397cd30d090f9c7f2d23 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Apr 2025 10:48:23 -0700 Subject: [PATCH 4/7] merge with stored-fields from loaders --- .../compute/lucene/TimeSeriesSortedSourceOperatorFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index 96f832946fa72..dd68a2ffc3306 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -436,10 +436,12 @@ static final class ShardLevelFieldsReader implements Releasable { } if (storedFieldsSpec.requiresSource()) { sourceLoader = shardContext.newSourceLoader(); + storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(false, false, sourceLoader.requiredStoredFields())); } else { sourceLoader = null; } this.storedFieldsSpec = storedFieldsSpec; + ; } void readValues(int segment, int docID) throws IOException { From c034b2f8f5189104c135b28ef00d50050e12ecac Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Apr 2025 10:48:49 -0700 Subject: [PATCH 5/7] leftover --- .../compute/lucene/TimeSeriesSortedSourceOperatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java index f6e9481c26af5..c4128bc88254d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java @@ -82,7 +82,7 @@ public void testSimple() { int numSamplesPerTS = 10; long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z"); int maxPageSize = between(1, 1024); - List results = runDriver(1024, maxPageSize, true, numTimeSeries, numSamplesPerTS, timestampStart); + List results = runDriver(1024, maxPageSize, randomBoolean(), numTimeSeries, numSamplesPerTS, timestampStart); // for now we emit at most one time series each page int offset = 0; for (Page page : results) { From 144f13d85af1ae50491b2ccb139183cee4e8b5d8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Apr 2025 11:02:49 -0700 Subject: [PATCH 6/7] oops --- .../compute/lucene/TimeSeriesSortedSourceOperatorFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index dd68a2ffc3306..c909f874f359b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -441,7 +441,6 @@ static final class ShardLevelFieldsReader implements Releasable { sourceLoader = null; } this.storedFieldsSpec = storedFieldsSpec; - ; } void readValues(int segment, int docID) throws IOException { From 164d7889c4ac1b43e85d49a23ad6f4c493dfade9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Apr 2025 12:44:18 -0700 Subject: [PATCH 7/7] javadoc --- .../local/PushDownFieldExtractionToTimeSeriesSource.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java index f16eb115b6a19..ced194dc6a1b0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java @@ -26,7 +26,13 @@ import java.util.Set; /** - * A rule that pushes down field extractions to occur before filter/limit/topN in the time-series source plan. + * An optimization rule that pushes down field extractions to occur at the lowest filter, limit, or topN in the time-series source plan. + * For example: + * `TS index | WHERE host = 'a' AND cluster = 'b' | STATS max(rate(counter)) BY host, bucket(1minute)` + * In this query, the extraction of the `host` and `cluster` fields will be pushed down to the time-series source, + * while the extraction of the `counter` field will occur later. In such cases, the `doc_ids` still need to be returned + * for the later extraction. However, if the filter (`host = 'a' AND cluster = 'b'`) is pushed down to Lucene, all field extractions + * (e.g., `host` and `counter`) will be pushed down to the time-series source, and `doc_ids` will not be returned. */ public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule< PhysicalPlan,