diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java
new file mode 100644
index 0000000000000..f535bc462fdfc
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.BytesRefVector;
+import org.elasticsearch.compute.data.DocBlock;
+import org.elasticsearch.compute.data.DocVector;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
+import org.elasticsearch.compute.data.OrdinalBytesRefVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
+import org.elasticsearch.index.mapper.BlockLoader;
+import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
+import org.elasticsearch.index.mapper.SourceLoader;
+import org.elasticsearch.search.fetch.StoredFieldsSpec;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A variant of {@link ValuesSourceReaderOperator} for extracting fields in time-series indices. The differences are:
+ * 1. Caches all segments of the last shard instead of only the last segment, since data in time-series can come from
+ * any segment at any time
+ * 2. Although docs do not arrive in the global order (by shard, then segment, then docId), they are still sorted
+ * within each segment; hence, this reader does not perform sorting and regrouping, which are expensive.
+ * 3. For dimension fields, values are read only once per tsid.
+ * These changes are made purely for performance reasons. We should look into consolidating this operator with
+ * {@link ValuesSourceReaderOperator} by adding some metadata to the {@link DocVector} and handling them accordingly.
+ */
+public class TimeSeriesExtractFieldOperator extends AbstractPageMappingOperator {
+
+ public record Factory(List fields, List extends ShardContext> shardContexts)
+ implements
+ OperatorFactory {
+ @Override
+ public Operator get(DriverContext driverContext) {
+ return new TimeSeriesExtractFieldOperator(driverContext.blockFactory(), fields, shardContexts);
+ }
+
+ @Override
+ public String describe() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("TimeSeriesExtractFieldOperator[fields = [");
+ if (fields.size() < 10) {
+ boolean first = true;
+ for (var f : fields) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(f.name());
+ }
+ } else {
+ sb.append(fields.size()).append(" fields");
+ }
+ return sb.append("]]").toString();
+ }
+ }
+
+ private final BlockFactory blockFactory;
+ private final List fields;
+ private final List extends ShardContext> shardContexts;
+
+ private ShardLevelFieldsReader fieldsReader;
+
+ public TimeSeriesExtractFieldOperator(
+ BlockFactory blockFactory,
+ List fields,
+ List extends ShardContext> shardContexts
+ ) {
+ this.blockFactory = blockFactory;
+ this.fields = fields;
+ this.shardContexts = shardContexts;
+ }
+
+ private OrdinalBytesRefVector getTsid(Page page, int channel) {
+ BytesRefBlock block = page.getBlock(channel);
+ OrdinalBytesRefBlock ordinals = block.asOrdinals();
+ if (ordinals == null) {
+ throw new IllegalArgumentException("tsid must be an ordinals block, got: " + block.getClass().getName());
+ }
+ OrdinalBytesRefVector vector = ordinals.asVector();
+ if (vector == null) {
+ throw new IllegalArgumentException("tsid must be an ordinals vector, got: " + block.getClass().getName());
+ }
+ return vector;
+ }
+
+ private DocVector getDocVector(Page page, int channel) {
+ DocBlock docBlock = page.getBlock(channel);
+ DocVector docVector = docBlock.asVector();
+ if (docVector == null) {
+ throw new IllegalArgumentException("doc must be a doc vector, got: " + docBlock.getClass().getName());
+ }
+ return docVector;
+ }
+
+ @Override
+ protected Page process(Page page) {
+ try {
+ return processUnchecked(page);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private Page processUnchecked(Page page) throws IOException {
+ DocVector docVector = getDocVector(page, 0);
+ IntVector shards = docVector.shards();
+ if (shards.isConstant() == false) {
+ throw new IllegalArgumentException("shards must be a constant vector, got: " + shards.getClass().getName());
+ }
+ OrdinalBytesRefVector tsidVector = getTsid(page, 1);
+ IntVector tsidOrdinals = tsidVector.getOrdinalsVector();
+ int shardIndex = shards.getInt(0);
+ if (fieldsReader == null || fieldsReader.shardIndex != shardIndex) {
+ Releasables.close(fieldsReader);
+ fieldsReader = new ShardLevelFieldsReader(shardIndex, blockFactory, shardContexts.get(shardIndex), fields);
+ }
+ fieldsReader.prepareForReading(page.getPositionCount());
+ IntVector docs = docVector.docs();
+ IntVector segments = docVector.segments();
+ int lastTsidOrdinal = -1;
+ for (int p = 0; p < docs.getPositionCount(); p++) {
+ int doc = docs.getInt(p);
+ int segment = segments.getInt(p);
+ int tsidOrd = tsidOrdinals.getInt(p);
+ if (tsidOrd == lastTsidOrdinal) {
+ fieldsReader.readValues(segment, doc, true);
+ } else {
+ fieldsReader.readValues(segment, doc, false);
+ lastTsidOrdinal = tsidOrd;
+ }
+ }
+ Block[] blocks = new Block[fields.size()];
+ Page result = null;
+ try {
+ fieldsReader.buildBlocks(blocks, tsidOrdinals);
+ result = page.appendBlocks(blocks);
+ return result;
+ } finally {
+ if (result == null) {
+ Releasables.close(blocks);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("TimeSeriesExtractFieldOperator[fields = [");
+ if (fields.size() < 10) {
+ boolean first = true;
+ for (var f : fields) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(f.name());
+ }
+ } else {
+ sb.append(fields.size()).append(" fields");
+ }
+ return sb.append("]]").toString();
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(fieldsReader, super::close);
+ }
+
+ static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
+ BlockLoaderFactory(BlockFactory factory) {
+ super(factory);
+ }
+
+ @Override
+ public BlockLoader.Block constantNulls() {
+ throw new UnsupportedOperationException("must not be used by column readers");
+ }
+
+ @Override
+ public BlockLoader.Block constantBytes(BytesRef value) {
+ throw new UnsupportedOperationException("must not be used by column readers");
+ }
+
+ @Override
+ public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
+ throw new UnsupportedOperationException("must not be used by column readers");
+ }
+ }
+
+ static final class ShardLevelFieldsReader implements Releasable {
+ final int shardIndex;
+ private final BlockLoaderFactory blockFactory;
+ private final SegmentLevelFieldsReader[] segments;
+ private final BlockLoader[] loaders;
+ private final boolean[] dimensions;
+ private final Block.Builder[] builders;
+ private final StoredFieldsSpec storedFieldsSpec;
+ private final SourceLoader sourceLoader;
+
+ ShardLevelFieldsReader(
+ int shardIndex,
+ BlockFactory blockFactory,
+ ShardContext shardContext,
+ List fields
+ ) {
+ this.shardIndex = shardIndex;
+ this.blockFactory = new BlockLoaderFactory(blockFactory);
+ final IndexReader indexReader = shardContext.searcher().getIndexReader();
+ this.segments = new SegmentLevelFieldsReader[indexReader.leaves().size()];
+ this.loaders = new BlockLoader[fields.size()];
+ this.builders = new Block.Builder[loaders.length];
+ StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
+ for (int i = 0; i < fields.size(); i++) {
+ BlockLoader loader = fields.get(i).blockLoader().apply(shardIndex);
+ storedFieldsSpec = storedFieldsSpec.merge(loader.rowStrideStoredFieldSpec());
+ loaders[i] = loader;
+ }
+ for (int i = 0; i < indexReader.leaves().size(); i++) {
+ LeafReaderContext leafReaderContext = indexReader.leaves().get(i);
+ segments[i] = new SegmentLevelFieldsReader(leafReaderContext, loaders);
+ }
+ if (storedFieldsSpec.requiresSource()) {
+ sourceLoader = shardContext.newSourceLoader();
+ storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(false, false, sourceLoader.requiredStoredFields()));
+ } else {
+ sourceLoader = null;
+ }
+ this.storedFieldsSpec = storedFieldsSpec;
+ this.dimensions = new boolean[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ dimensions[i] = shardContext.fieldType(fields.get(i).name()).isDimension();
+ }
+ }
+
+ /**
+ * For dimension fields, skips reading them when {@code nonDimensionFieldsOnly} is true,
+ * since they only need to be read once per tsid.
+ */
+ void readValues(int segment, int docID, boolean nonDimensionFieldsOnly) throws IOException {
+ segments[segment].read(docID, builders, nonDimensionFieldsOnly, dimensions);
+ }
+
+ void prepareForReading(int estimatedSize) throws IOException {
+ if (this.builders.length > 0 && this.builders[0] == null) {
+ for (int f = 0; f < builders.length; f++) {
+ builders[f] = (Block.Builder) loaders[f].builder(blockFactory, estimatedSize);
+ }
+ }
+ for (SegmentLevelFieldsReader segment : segments) {
+ segment.reinitializeIfNeeded(sourceLoader, storedFieldsSpec);
+ }
+ }
+
+ void buildBlocks(Block[] blocks, IntVector tsidOrdinals) {
+ for (int i = 0; i < builders.length; i++) {
+ if (dimensions[i]) {
+ blocks[i] = buildBlockForDimensionField(builders[i], tsidOrdinals);
+ } else {
+ blocks[i] = builders[i].build();
+ }
+ }
+ Arrays.fill(builders, null);
+ }
+
+ private Block buildBlockForDimensionField(Block.Builder builder, IntVector tsidOrdinals) {
+ try (var values = builder.build()) {
+ if (values.asVector() instanceof BytesRefVector bytes) {
+ tsidOrdinals.incRef();
+ values.incRef();
+ return new OrdinalBytesRefVector(tsidOrdinals, bytes).asBlock();
+ } else if (values.areAllValuesNull()) {
+ return blockFactory.factory.newConstantNullBlock(tsidOrdinals.getPositionCount());
+ } else {
+ final int positionCount = tsidOrdinals.getPositionCount();
+ try (var newBuilder = values.elementType().newBlockBuilder(positionCount, blockFactory.factory)) {
+ for (int p = 0; p < positionCount; p++) {
+ int pos = tsidOrdinals.getInt(p);
+ newBuilder.copyFrom(values, pos, pos + 1);
+ }
+ return newBuilder.build();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(builders);
+ }
+ }
+
+ static final class SegmentLevelFieldsReader {
+ private final BlockLoader.RowStrideReader[] rowStride;
+ private final BlockLoader[] loaders;
+ private final LeafReaderContext leafContext;
+ private BlockLoaderStoredFieldsFromLeafLoader storedFields;
+ private Thread loadedThread = null;
+
+ SegmentLevelFieldsReader(LeafReaderContext leafContext, BlockLoader[] loaders) {
+ this.leafContext = leafContext;
+ this.loaders = loaders;
+ this.rowStride = new BlockLoader.RowStrideReader[loaders.length];
+ }
+
+ private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec storedFieldsSpec) throws IOException {
+ final Thread currentThread = Thread.currentThread();
+ if (loadedThread != currentThread) {
+ loadedThread = currentThread;
+ for (int f = 0; f < loaders.length; f++) {
+ rowStride[f] = loaders[f].rowStrideReader(leafContext);
+ }
+ storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
+ StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(leafContext, null),
+ sourceLoader != null ? sourceLoader.leaf(leafContext.reader(), null) : null
+ );
+ }
+ }
+
+ void read(int docId, Block.Builder[] builder, boolean nonDimensionFieldsOnly, boolean[] dimensions) throws IOException {
+ storedFields.advanceTo(docId);
+ if (nonDimensionFieldsOnly) {
+ for (int i = 0; i < rowStride.length; i++) {
+ if (dimensions[i] == false) {
+ rowStride[i].read(docId, storedFields, builder[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < rowStride.length; i++) {
+ rowStride[i].read(docId, storedFields, builder[i]);
+ }
+ }
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
index a0dfadcdbd9db..d0f1a5ee5fcd0 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
@@ -8,7 +8,6 @@
package org.elasticsearch.compute.lucene;
import org.apache.lucene.index.DocValues;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
@@ -33,24 +32,16 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
-import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
-import org.elasticsearch.index.mapper.BlockLoader;
-import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
-import org.elasticsearch.index.mapper.SourceLoader;
-import org.elasticsearch.search.fetch.StoredFieldsSpec;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public final class TimeSeriesSourceOperator extends LuceneOperator {
- private final boolean emitDocIds;
private final int maxPageSize;
private final BlockFactory blockFactory;
private final LuceneSliceQueue sliceQueue;
@@ -61,24 +52,13 @@ public final class TimeSeriesSourceOperator extends LuceneOperator {
private LongVector.Builder timestampsBuilder;
private TsidBuilder tsHashesBuilder;
private SegmentsIterator iterator;
- private final List fieldsToExtracts;
- private ShardLevelFieldsReader fieldsReader;
private DocIdCollector docCollector;
private long tsidsLoaded;
- TimeSeriesSourceOperator(
- BlockFactory blockFactory,
- boolean emitDocIds,
- List fieldsToExtract,
- LuceneSliceQueue sliceQueue,
- int maxPageSize,
- int limit
- ) {
+ TimeSeriesSourceOperator(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int maxPageSize, int limit) {
super(blockFactory, maxPageSize, sliceQueue);
this.maxPageSize = maxPageSize;
this.blockFactory = blockFactory;
- this.fieldsToExtracts = fieldsToExtract;
- this.emitDocIds = emitDocIds;
this.remainingDocs = limit;
this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
this.tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(limit, maxPageSize));
@@ -107,7 +87,7 @@ public Page getCheckedOutput() throws IOException {
}
Page page = null;
- Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
+ Block[] blocks = new Block[3];
long startInNanos = System.nanoTime();
try {
if (iterator == null) {
@@ -119,36 +99,24 @@ public Page getCheckedOutput() throws IOException {
if (slice.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
- Releasables.close(fieldsReader);
- fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);
iterator = new SegmentsIterator(slice);
- if (emitDocIds) {
- docCollector = new DocIdCollector(blockFactory, slice.shardContext());
- }
- }
- if (docCollector != null) {
- docCollector.prepareForCollecting(Math.min(remainingDocs, maxPageSize));
+ docCollector = new DocIdCollector(blockFactory, slice.shardContext());
}
- fieldsReader.prepareForReading(Math.min(remainingDocs, maxPageSize));
iterator.readDocsForNextPage();
if (currentPagePos > 0) {
- int blockIndex = 0;
- if (docCollector != null) {
- blocks[blockIndex++] = docCollector.build().asBlock();
- }
+ blocks[0] = docCollector.build().asBlock();
OrdinalBytesRefVector tsidVector = tsHashesBuilder.build();
- blocks[blockIndex++] = tsidVector.asBlock();
+ blocks[1] = tsidVector.asBlock();
tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize));
- blocks[blockIndex++] = timestampsBuilder.build().asBlock();
+ blocks[2] = timestampsBuilder.build().asBlock();
timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
- System.arraycopy(fieldsReader.buildBlocks(tsidVector.getOrdinalsVector()), 0, blocks, blockIndex, fieldsToExtracts.size());
page = new Page(currentPagePos, blocks);
currentPagePos = 0;
}
if (iterator.completed()) {
processedShards.add(iterator.luceneSlice.shardContext().shardIdentifier());
processedSlices++;
- Releasables.close(docCollector, fieldsReader);
+ Releasables.close(docCollector);
iterator = null;
}
} catch (IOException e) {
@@ -164,7 +132,7 @@ public Page getCheckedOutput() throws IOException {
@Override
public void close() {
- Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector, fieldsReader);
+ Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector);
}
class SegmentsIterator {
@@ -201,6 +169,7 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) {
// TODO: add optimize for one leaf?
void readDocsForNextPage() throws IOException {
+ docCollector.prepareForCollecting(Math.min(remainingDocs, maxPageSize));
Thread executingThread = Thread.currentThread();
for (LeafIterator leaf : mainQueue) {
leaf.reinitializeIfNeeded(executingThread);
@@ -221,18 +190,13 @@ void readDocsForNextPage() throws IOException {
}
private boolean readValuesForOneTsid(PriorityQueue sub) throws IOException {
- boolean first = true;
do {
LeafIterator top = sub.top();
currentPagePos++;
remainingDocs--;
- if (docCollector != null) {
- docCollector.collect(top.segmentOrd, top.docID);
- }
+ docCollector.collect(top.segmentOrd, top.docID);
tsHashesBuilder.appendOrdinal();
timestampsBuilder.appendLong(top.timestamp);
- fieldsReader.readValues(top.segmentOrd, top.docID, first == false);
- first = false;
if (top.nextDoc()) {
sub.updateTop();
} else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) {
@@ -331,175 +295,6 @@ void reinitializeIfNeeded(Thread executingThread) throws IOException {
}
}
- static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
- BlockLoaderFactory(BlockFactory factory) {
- super(factory);
- }
-
- @Override
- public BlockLoader.Block constantNulls() {
- throw new UnsupportedOperationException("must not be used by column readers");
- }
-
- @Override
- public BlockLoader.Block constantBytes(BytesRef value) {
- throw new UnsupportedOperationException("must not be used by column readers");
- }
-
- @Override
- public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
- throw new UnsupportedOperationException("must not be used by column readers");
- }
- }
-
- static final class ShardLevelFieldsReader implements Releasable {
- private final BlockLoaderFactory blockFactory;
- private final SegmentLevelFieldsReader[] segments;
- private final BlockLoader[] loaders;
- private final boolean[] dimensions;
- private final Block.Builder[] builders;
- private final StoredFieldsSpec storedFieldsSpec;
- private final SourceLoader sourceLoader;
-
- ShardLevelFieldsReader(BlockFactory blockFactory, ShardContext shardContext, List fields) {
- this.blockFactory = new BlockLoaderFactory(blockFactory);
- final IndexReader indexReader = shardContext.searcher().getIndexReader();
- this.segments = new SegmentLevelFieldsReader[indexReader.leaves().size()];
- this.loaders = new BlockLoader[fields.size()];
- this.builders = new Block.Builder[loaders.length];
- StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
- for (int i = 0; i < fields.size(); i++) {
- BlockLoader loader = fields.get(i).blockLoader().apply(shardContext.index());
- storedFieldsSpec = storedFieldsSpec.merge(loader.rowStrideStoredFieldSpec());
- loaders[i] = loader;
- }
- for (int i = 0; i < indexReader.leaves().size(); i++) {
- LeafReaderContext leafReaderContext = indexReader.leaves().get(i);
- segments[i] = new SegmentLevelFieldsReader(leafReaderContext, loaders);
- }
- if (storedFieldsSpec.requiresSource()) {
- sourceLoader = shardContext.newSourceLoader();
- storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(false, false, sourceLoader.requiredStoredFields()));
- } else {
- sourceLoader = null;
- }
- this.storedFieldsSpec = storedFieldsSpec;
- this.dimensions = new boolean[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- dimensions[i] = shardContext.fieldType(fields.get(i).name()).isDimension();
- }
- }
-
- /**
- * For dimension fields, skips reading them when {@code nonDimensionFieldsOnly} is true,
- * since they only need to be read once per tsid.
- */
- void readValues(int segment, int docID, boolean nonDimensionFieldsOnly) throws IOException {
- segments[segment].read(docID, builders, nonDimensionFieldsOnly, dimensions);
- }
-
- void prepareForReading(int estimatedSize) throws IOException {
- if (this.builders.length > 0 && this.builders[0] == null) {
- for (int f = 0; f < builders.length; f++) {
- builders[f] = (Block.Builder) loaders[f].builder(blockFactory, estimatedSize);
- }
- }
- for (SegmentLevelFieldsReader segment : segments) {
- if (segment != null) {
- segment.reinitializeIfNeeded(sourceLoader, storedFieldsSpec);
- }
- }
- }
-
- Block[] buildBlocks(IntVector tsidOrdinals) {
- final Block[] blocks = new Block[loaders.length];
- try {
- for (int i = 0; i < builders.length; i++) {
- if (dimensions[i]) {
- blocks[i] = buildBlockForDimensionField(builders[i], tsidOrdinals);
- } else {
- blocks[i] = builders[i].build();
- }
- }
- Arrays.fill(builders, null);
- } finally {
- if (blocks.length > 0 && blocks[blocks.length - 1] == null) {
- Releasables.close(blocks);
- }
- }
- return blocks;
- }
-
- private Block buildBlockForDimensionField(Block.Builder builder, IntVector tsidOrdinals) {
- try (var values = builder.build()) {
- if (values.asVector() instanceof BytesRefVector bytes) {
- tsidOrdinals.incRef();
- values.incRef();
- return new OrdinalBytesRefVector(tsidOrdinals, bytes).asBlock();
- } else if (values.areAllValuesNull()) {
- return blockFactory.factory.newConstantNullBlock(tsidOrdinals.getPositionCount());
- } else {
- final int positionCount = tsidOrdinals.getPositionCount();
- try (var newBuilder = values.elementType().newBlockBuilder(positionCount, blockFactory.factory)) {
- for (int p = 0; p < positionCount; p++) {
- int pos = tsidOrdinals.getInt(p);
- newBuilder.copyFrom(values, pos, pos + 1);
- }
- return newBuilder.build();
- }
- }
- }
- }
-
- @Override
- public void close() {
- Releasables.close(builders);
- }
- }
-
- static final class SegmentLevelFieldsReader {
- private final BlockLoader.RowStrideReader[] rowStride;
- private final BlockLoader[] loaders;
- private final LeafReaderContext leafContext;
- private BlockLoaderStoredFieldsFromLeafLoader storedFields;
- private Thread loadedThread = null;
-
- SegmentLevelFieldsReader(LeafReaderContext leafContext, BlockLoader[] loaders) {
- this.leafContext = leafContext;
- this.loaders = loaders;
- this.rowStride = new BlockLoader.RowStrideReader[loaders.length];
- }
-
- private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec storedFieldsSpec) throws IOException {
- final Thread currentThread = Thread.currentThread();
- if (loadedThread != currentThread) {
- loadedThread = currentThread;
- for (int f = 0; f < loaders.length; f++) {
- rowStride[f] = loaders[f].rowStrideReader(leafContext);
- }
- storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
- StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(leafContext, null),
- sourceLoader != null ? sourceLoader.leaf(leafContext.reader(), null) : null
- );
- }
- }
-
- void read(int docId, Block.Builder[] builder, boolean nonDimensionFieldsOnly, boolean[] dimensions) throws IOException {
- storedFields.advanceTo(docId);
- if (nonDimensionFieldsOnly) {
- for (int i = 0; i < rowStride.length; i++) {
- if (dimensions[i] == false) {
- rowStride[i].read(docId, storedFields, builder[i]);
- }
- }
- } else {
- for (int i = 0; i < rowStride.length; i++) {
- rowStride[i].read(docId, storedFields, builder[i]);
- }
- }
- }
- }
-
/**
* Collect tsids then build a {@link OrdinalBytesRefVector}
*/
@@ -559,25 +354,21 @@ static final class DocIdCollector implements Releasable {
private final ShardContext shardContext;
private IntVector.Builder docsBuilder;
private IntVector.Builder segmentsBuilder;
- private final int[] docPerSegments;
DocIdCollector(BlockFactory blockFactory, ShardContext shardContext) {
this.blockFactory = blockFactory;
this.shardContext = shardContext;
- docPerSegments = new int[shardContext.searcher().getIndexReader().leaves().size()];
}
void prepareForCollecting(int estimatedSize) {
assert docsBuilder == null;
docsBuilder = blockFactory.newIntVectorBuilder(estimatedSize);
segmentsBuilder = blockFactory.newIntVectorBuilder(estimatedSize);
- Arrays.fill(docPerSegments, 0);
}
void collect(int segment, int docId) {
docsBuilder.appendInt(docId);
segmentsBuilder.appendInt(segment);
- docPerSegments[segment]++;
}
DocVector build() {
@@ -591,7 +382,7 @@ DocVector build() {
segments = segmentsBuilder.build();
segmentsBuilder = null;
shards = blockFactory.newConstantIntVector(shardContext.index(), docs.getPositionCount());
- docVector = buildDocVector(shards, segments, docs, docPerSegments);
+ docVector = new DocVector(shards, segments, docs, segments.isConstant());
return docVector;
} finally {
if (docVector == null) {
@@ -600,43 +391,6 @@ DocVector build() {
}
}
- private DocVector buildDocVector(IntVector shards, IntVector segments, IntVector docs, int[] docPerSegments) {
- if (segments.isConstant()) {
- // DocIds are sorted in each segment. Hence, if docIds come from a single segment, we can mark this DocVector
- // as singleSegmentNonDecreasing to enable optimizations in the ValuesSourceReaderOperator.
- return new DocVector(shards, segments, docs, true);
- }
- boolean success = false;
- int positionCount = shards.getPositionCount();
- long estimatedSize = DocVector.sizeOfSegmentDocMap(positionCount);
- blockFactory.adjustBreaker(estimatedSize);
- // Use docPerSegments to build a forward/backward docMap in O(N)
- // instead of O(N*log(N)) in DocVector#buildShardSegmentDocMapIfMissing.
- try {
- final int[] forwards = new int[positionCount];
- final int[] starts = new int[docPerSegments.length];
- for (int i = 1; i < starts.length; i++) {
- starts[i] = starts[i - 1] + docPerSegments[i - 1];
- }
- for (int i = 0; i < segments.getPositionCount(); i++) {
- final int segment = segments.getInt(i);
- assert forwards[starts[segment]] == 0 : "must not set";
- forwards[starts[segment]++] = i;
- }
- final int[] backwards = new int[forwards.length];
- for (int p = 0; p < forwards.length; p++) {
- backwards[forwards[p]] = p;
- }
- final DocVector docVector = new DocVector(shards, segments, docs, forwards, backwards);
- success = true;
- return docVector;
- } finally {
- if (success == false) {
- blockFactory.adjustBreaker(-estimatedSize);
- }
- }
- }
-
@Override
public void close() {
Releasables.close(docsBuilder, segmentsBuilder);
@@ -650,7 +404,7 @@ protected void describe(StringBuilder sb) {
@Override
public Operator.Status status() {
- final long valuesLoaded = rowsEmitted * (1 + fieldsToExtracts.size()); // @timestamp and other fields
+ final long valuesLoaded = rowsEmitted; // @timestamp field
return new Status(this, tsidsLoaded, valuesLoaded);
}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java
index 4a581198a235a..7ee13b3e6e0f5 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java
@@ -29,13 +29,9 @@
public class TimeSeriesSourceOperatorFactory extends LuceneOperator.Factory {
private final int maxPageSize;
- private final boolean emitDocIds;
- private final List fieldsToExact;
private TimeSeriesSourceOperatorFactory(
List extends ShardContext> contexts,
- boolean emitDocIds,
- List fieldsToExact,
Function> queryFunction,
int taskConcurrency,
int maxPageSize,
@@ -52,13 +48,11 @@ private TimeSeriesSourceOperatorFactory(
ScoreMode.COMPLETE_NO_SCORES
);
this.maxPageSize = maxPageSize;
- this.emitDocIds = emitDocIds;
- this.fieldsToExact = fieldsToExact;
}
@Override
public SourceOperator get(DriverContext driverContext) {
- return new TimeSeriesSourceOperator(driverContext.blockFactory(), emitDocIds, fieldsToExact, sliceQueue, maxPageSize, limit);
+ return new TimeSeriesSourceOperator(driverContext.blockFactory(), sliceQueue, maxPageSize, limit);
}
@Override
@@ -70,11 +64,9 @@ public static TimeSeriesSourceOperatorFactory create(
int limit,
int maxPageSize,
int taskConcurrency,
- boolean emitDocIds,
List extends ShardContext> contexts,
- List fieldsToExact,
Function> queryFunction
) {
- return new TimeSeriesSourceOperatorFactory(contexts, emitDocIds, fieldsToExact, queryFunction, taskConcurrency, maxPageSize, limit);
+ return new TimeSeriesSourceOperatorFactory(contexts, queryFunction, taskConcurrency, maxPageSize, limit);
}
}
diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
index b66f32adf48dd..15ae1d506a2fe 100644
--- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
+++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
@@ -134,7 +134,7 @@ public void testStatus() {
assertThat(status.tsidLoaded(), equalTo((long) numTimeSeries));
assertThat(status.rowsEmitted(), equalTo((long) numTimeSeries * numSamplesPerTS));
assertThat(status.documentsFound(), equalTo((long) numTimeSeries * numSamplesPerTS));
- assertThat(status.valuesLoaded(), equalTo((long) numTimeSeries * numSamplesPerTS * 3));
+ assertThat(status.valuesLoaded(), equalTo((long) numTimeSeries * numSamplesPerTS));
String expected = String.format(
Locale.ROOT,
@@ -224,8 +224,6 @@ record Doc(int host, long timestamp, long metric) {}
var timeSeriesFactory = createTimeSeriesSourceOperator(
directory,
r -> this.reader = r,
- true,
- List.of(new ExtractField(metricField, ElementType.LONG)),
limit,
maxPageSize,
randomBoolean(),
@@ -243,7 +241,7 @@ record Doc(int host, long timestamp, long metric) {}
TestDriverFactory.create(
driverContext,
timeSeriesFactory.get(driverContext),
- List.of(),
+ List.of(extractFieldsFactory(reader, List.of(new ExtractField(metricField, ElementType.LONG))).get(driverContext)),
new TestResultPageSinkOperator(results::add)
)
);
@@ -307,9 +305,7 @@ public void testMatchNone() throws Exception {
Integer.MAX_VALUE,
randomIntBetween(1, 1024),
1,
- randomBoolean(),
List.of(ctx),
- List.of(),
unused -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of()))
);
var driverContext = driverContext();
@@ -329,7 +325,7 @@ public void testMatchNone() throws Exception {
@Override
protected Operator.OperatorFactory simple(SimpleOptions options) {
- return createTimeSeriesSourceOperator(directory, r -> this.reader = r, randomBoolean(), List.of(), 1, 1, false, writer -> {
+ return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, true, writer -> {
long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
return 1;
@@ -372,11 +368,13 @@ Driver createDriver(
) {
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
+ var extractFields = List.of(
+ new ExtractField(voltageField, ElementType.LONG),
+ new ExtractField(hostnameField, ElementType.BYTES_REF)
+ );
var timeSeriesFactory = createTimeSeriesSourceOperator(
directory,
indexReader -> this.reader = indexReader,
- true,
- List.of(new ExtractField(voltageField, ElementType.LONG), new ExtractField(hostnameField, ElementType.BYTES_REF)),
limit,
maxPageSize,
forceMerge,
@@ -396,7 +394,7 @@ Driver createDriver(
return TestDriverFactory.create(
driverContext,
timeSeriesFactory.get(driverContext),
- List.of(),
+ List.of(extractFieldsFactory(reader, extractFields).get(driverContext)),
new TestResultPageSinkOperator(consumer)
);
}
@@ -408,8 +406,6 @@ public record ExtractField(MappedFieldType ft, ElementType elementType) {
public static TimeSeriesSourceOperatorFactory createTimeSeriesSourceOperator(
Directory directory,
Consumer readerConsumer,
- boolean emitDocIds,
- List extractFields,
int limit,
int maxPageSize,
boolean forceMerge,
@@ -438,32 +434,11 @@ public static TimeSeriesSourceOperatorFactory createTimeSeriesSourceOperator(
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0) {
- @Override
- public MappedFieldType fieldType(String name) {
- for (ExtractField e : extractFields) {
- if (e.ft.name().equals(name)) {
- return e.ft;
- }
- }
- throw new IllegalArgumentException("Unknown field [" + name + "]");
- }
- };
+ var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
Function> queryFunction = c -> List.of(
new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())
);
-
- var fieldInfos = extractFields.stream()
- .map(
- f -> new ValuesSourceReaderOperator.FieldInfo(
- f.ft.name(),
- f.elementType,
- n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext())
- )
- )
- .toList();
-
- return TimeSeriesSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
+ return TimeSeriesSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);
}
public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
@@ -494,4 +469,28 @@ public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimens
);
iw.addDocument(fields);
}
+
+ TimeSeriesExtractFieldOperator.Factory extractFieldsFactory(IndexReader reader, List extractFields) {
+ var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0) {
+ @Override
+ public MappedFieldType fieldType(String name) {
+ for (ExtractField e : extractFields) {
+ if (e.ft.name().equals(name)) {
+ return e.ft;
+ }
+ }
+ throw new IllegalArgumentException("Unknown field [" + name + "]");
+ }
+ };
+ var fieldInfos = extractFields.stream()
+ .map(
+ f -> new ValuesSourceReaderOperator.FieldInfo(
+ f.ft.name(),
+ f.elementType,
+ n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext())
+ )
+ )
+ .toList();
+ return new TimeSeriesExtractFieldOperator.Factory(fieldInfos, List.of(ctx));
+ }
}
diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java
index 765ba0f2c14c7..09e04bfaa742a 100644
--- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java
+++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java
@@ -13,6 +13,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -745,15 +746,20 @@ public void testProfile() {
totalTimeSeries++;
assertThat(p.operators(), hasSize(2));
assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));
+ } else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) {
+ assertThat(p.operators(), hasSize(3));
+ assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
+ assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator"));
+ assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator"));
} else {
assertThat(p.operators(), hasSize(4));
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
- assertThat(p.operators().get(1).operator(), containsString("EvalOperator[evaluator=DateTruncDatetimeEvaluator"));
- assertThat(p.operators().get(2).operator(), containsString("TimeSeriesAggregationOperator"));
+ assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator"));
+ assertThat(p.operators().get(2).operator(), containsString("EvalOperator"));
assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));
}
}
- assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 2));
+ assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 3));
}
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java
index eb6ca71a2678e..b3fb1aa7098e9 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java
@@ -11,7 +11,7 @@
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
-import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushDownFieldExtractionToTimeSeriesSource;
+import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ParallelizeTimeSeriesSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
@@ -81,7 +81,7 @@ protected static List> rules(boolean optimizeForEsSource) {
new InsertFieldExtraction(),
new SpatialDocValuesExtraction(),
new SpatialShapeBoundsExtraction(),
- new PushDownFieldExtractionToTimeSeriesSource()
+ new ParallelizeTimeSeriesSource()
);
return List.of(pushdown, fieldExtraction);
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ParallelizeTimeSeriesSource.java
similarity index 61%
rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java
rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ParallelizeTimeSeriesSource.java
index f7fc10bbf6fae..d26599d7a96c9 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ParallelizeTimeSeriesSource.java
@@ -18,6 +18,8 @@
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
+import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesFieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
@@ -27,42 +29,39 @@
import java.util.Set;
/**
- * An optimization rule that pushes down field extractions to occur at the lowest filter, limit, or topN in the time-series source plan.
- * For example:
- * `TS index | WHERE host = 'a' AND cluster = 'b' | STATS max(rate(counter)) BY host, bucket(1minute)`
- * In this query, the extraction of the `host` and `cluster` fields will be pushed down to the time-series source,
- * while the extraction of the `counter` field will occur later. In such cases, the `doc_ids` still need to be returned
- * for the later extraction. However, if the filter (`host = 'a' AND cluster = 'b'`) is pushed down to Lucene, all field extractions
- * (e.g., `host` and `counter`) will be pushed down to the time-series source, and `doc_ids` will not be returned.
+ * An optimization rule vertically partitions the time-series into three parts: time-series source, field extraction,
+ * and time-series aggregation so that they can run parallel to speed up time-series query.
+ * For the field-extraction part, it will use a specialized version for time-series indices.
*/
-public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
- PhysicalPlan,
+public class ParallelizeTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
+ TimeSeriesAggregateExec,
LocalPhysicalOptimizerContext> {
@Override
- public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
+ public PhysicalPlan rule(TimeSeriesAggregateExec plan, LocalPhysicalOptimizerContext context) {
+ if (plan.getMode().isInputPartial()) {
+ return plan;
+ }
if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) {
return plan;
}
final List pushDownExtracts = new ArrayList<>();
- final Holder keepDocIds = new Holder<>(Boolean.FALSE);
plan.forEachDown(p -> {
if (p instanceof FieldExtractExec) {
pushDownExtracts.add((FieldExtractExec) p);
} else if (stopPushDownExtract(p)) {
if (pushDownExtracts.isEmpty() == false) {
- keepDocIds.set(Boolean.TRUE);
pushDownExtracts.clear();
}
}
});
final Holder aborted = new Holder<>(Boolean.FALSE);
- return plan.transformUp(PhysicalPlan.class, p -> {
+ PhysicalPlan newChild = plan.child().transformUp(PhysicalPlan.class, p -> {
if (aborted.get()) {
return p;
}
if (p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) {
- return addFieldExtract(context, q, keepDocIds.get(), pushDownExtracts);
+ return addFieldExtract(context, q, pushDownExtracts);
}
if (stopPushDownExtract(p)) {
aborted.set(Boolean.TRUE);
@@ -73,18 +72,14 @@ public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext contex
}
return p;
});
+ return plan.replaceChild(new ParallelExec(plan.source(), newChild));
}
private static boolean stopPushDownExtract(PhysicalPlan p) {
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
}
- private PhysicalPlan addFieldExtract(
- LocalPhysicalOptimizerContext context,
- EsQueryExec query,
- boolean keepDocAttribute,
- List extracts
- ) {
+ private PhysicalPlan addFieldExtract(LocalPhysicalOptimizerContext context, EsQueryExec query, List extracts) {
Set docValuesAttributes = new HashSet<>();
Set boundsAttributes = new HashSet<>();
List attributesToExtract = new ArrayList<>();
@@ -94,22 +89,14 @@ private PhysicalPlan addFieldExtract(
attributesToExtract.addAll(extract.attributesToExtract());
}
List attrs = query.attrs();
- if (keepDocAttribute == false) {
- attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
- }
- var tsSource = new TimeSeriesSourceExec(
+ var tsSource = new TimeSeriesSourceExec(query.source(), attrs, query.query(), query.limit(), query.estimatedRowSize());
+ return new TimeSeriesFieldExtractExec(
query.source(),
- attrs,
- query.query(),
- query.limit(),
+ new ParallelExec(query.source(), tsSource),
+ attributesToExtract,
context.configuration().pragmas().fieldExtractPreference(),
docValuesAttributes,
- boundsAttributes,
- attributesToExtract,
- query.estimatedRowSize()
+ boundsAttributes
);
- // Use a separate driver for the time-series source to split the pipeline to increase parallelism,
- // since the time-series source must be executed with a single driver at the shard level.
- return new ParallelExec(query.source(), tsSource);
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FieldExtractExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FieldExtractExec.java
index c79ed9b8c49e6..e0067269b31fd 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FieldExtractExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FieldExtractExec.java
@@ -33,14 +33,14 @@ public class FieldExtractExec extends UnaryExec implements EstimatesRowSize {
FieldExtractExec::new
);
- private final List attributesToExtract;
- private final @Nullable Attribute sourceAttribute;
+ protected final List attributesToExtract;
+ protected final @Nullable Attribute sourceAttribute;
/**
* The default for {@link #fieldExtractPreference} if the plan doesn't require
* a preference.
*/
- private final MappedFieldType.FieldExtractPreference defaultPreference;
+ protected final MappedFieldType.FieldExtractPreference defaultPreference;
/**
* Attributes that may be extracted as doc values even if that makes them
@@ -51,7 +51,7 @@ public class FieldExtractExec extends UnaryExec implements EstimatesRowSize {
* This is never serialized between nodes and only used locally.
*
*/
- private final Set docValuesAttributes;
+ protected final Set docValuesAttributes;
/**
* Attributes of a shape whose extent can be extracted directly from the doc-values encoded geometry.
@@ -59,7 +59,7 @@ public class FieldExtractExec extends UnaryExec implements EstimatesRowSize {
* This is never serialized between nodes and only used locally.
*
*/
- private final Set boundsAttributes;
+ protected final Set boundsAttributes;
private List lazyOutput;
@@ -72,7 +72,7 @@ public FieldExtractExec(
this(source, child, attributesToExtract, defaultPreference, Set.of(), Set.of());
}
- private FieldExtractExec(
+ protected FieldExtractExec(
Source source,
PhysicalPlan child,
List attributesToExtract,
@@ -128,7 +128,7 @@ protected AttributeSet computeReferences() {
}
@Override
- protected NodeInfo info() {
+ protected NodeInfo extends FieldExtractExec> info() {
return NodeInfo.create(this, FieldExtractExec::new, child(), attributesToExtract, defaultPreference);
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesFieldExtractExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesFieldExtractExec.java
new file mode 100644
index 0000000000000..ebd8fdc6c48f7
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesFieldExtractExec.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.physical;
+
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public class TimeSeriesFieldExtractExec extends FieldExtractExec {
+ public TimeSeriesFieldExtractExec(
+ Source source,
+ PhysicalPlan child,
+ List attributesToExtract,
+ MappedFieldType.FieldExtractPreference defaultPreference,
+ Set docValuesAttributes,
+ Set boundsAttributes
+ ) {
+ super(source, child, attributesToExtract, defaultPreference, docValuesAttributes, boundsAttributes);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ throw new UnsupportedOperationException("local plan");
+ }
+
+ @Override
+ public String getWriteableName() {
+ throw new UnsupportedOperationException("local plan");
+ }
+
+ @Override
+ protected NodeInfo info() {
+ return NodeInfo.create(
+ this,
+ TimeSeriesFieldExtractExec::new,
+ child(),
+ attributesToExtract,
+ defaultPreference,
+ docValuesAttributes,
+ boundsAttributes
+ );
+ }
+
+ @Override
+ public UnaryExec replaceChild(PhysicalPlan newChild) {
+ return new TimeSeriesFieldExtractExec(
+ source(),
+ newChild,
+ attributesToExtract,
+ defaultPreference,
+ docValuesAttributes,
+ boundsAttributes
+ );
+ }
+
+ @Override
+ public FieldExtractExec withDocValuesAttributes(Set docValuesAttributes) {
+ return new TimeSeriesFieldExtractExec(
+ source(),
+ child(),
+ attributesToExtract,
+ defaultPreference,
+ docValuesAttributes,
+ boundsAttributes
+ );
+ }
+
+ @Override
+ public FieldExtractExec withBoundsAttributes(Set boundsAttributes) {
+ return new TimeSeriesFieldExtractExec(
+ source(),
+ child(),
+ attributesToExtract,
+ defaultPreference,
+ docValuesAttributes,
+ boundsAttributes
+ );
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java
index 12d2d9eebeb2e..8776cf583f47d 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java
@@ -9,62 +9,33 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
-import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
/**
* Similar to {@link EsQueryExec}, but this is a physical plan specifically for time series indices.
- * This plan is forked from {@link EsQueryExec} to allow field extractions, leveraging caching
- * and avoiding the cost of sorting and rebuilding blocks.
*/
public class TimeSeriesSourceExec extends LeafExec implements EstimatesRowSize {
- private final MappedFieldType.FieldExtractPreference defaultPreference;
- private final Set docValuesAttributes;
- private final Set boundsAttributes;
- private final List attributesToExtract;
-
private final List attrs;
private final QueryBuilder query;
private final Expression limit;
private final Integer estimatedRowSize;
- private List lazyOutput;
-
- public TimeSeriesSourceExec(
- Source source,
- List attrs,
- QueryBuilder query,
- Expression limit,
- MappedFieldType.FieldExtractPreference defaultPreference,
- Set docValuesAttributes,
- Set boundsAttributes,
- List attributesToExtract,
- Integer estimatedRowSize
- ) {
+
+ public TimeSeriesSourceExec(Source source, List attrs, QueryBuilder query, Expression limit, Integer estimatedRowSize) {
super(source);
this.attrs = attrs;
this.query = query;
this.limit = limit;
- this.defaultPreference = defaultPreference;
- this.docValuesAttributes = docValuesAttributes;
- this.boundsAttributes = boundsAttributes;
- this.attributesToExtract = attributesToExtract;
this.estimatedRowSize = estimatedRowSize;
- if (this.attributesToExtract.isEmpty()) {
- lazyOutput = attrs;
- }
}
@Override
@@ -79,18 +50,7 @@ public String getWriteableName() {
@Override
protected NodeInfo info() {
- return NodeInfo.create(
- this,
- TimeSeriesSourceExec::new,
- attrs,
- query,
- limit,
- defaultPreference,
- docValuesAttributes,
- boundsAttributes,
- attributesToExtract,
- estimatedRowSize
- );
+ return NodeInfo.create(this, TimeSeriesSourceExec::new, attrs, query, limit, estimatedRowSize);
}
public QueryBuilder query() {
@@ -103,17 +63,7 @@ public List attrs() {
@Override
public List output() {
- if (lazyOutput == null) {
- lazyOutput = new ArrayList<>(attrs.size() + attributesToExtract.size());
- lazyOutput.addAll(attrs);
- lazyOutput.addAll(attributesToExtract);
- }
- return lazyOutput;
- }
-
- @Override
- protected AttributeSet computeReferences() {
- return super.computeReferences();
+ return attrs;
}
public Expression limit() {
@@ -124,41 +74,16 @@ public Integer estimatedRowSize() {
return estimatedRowSize;
}
- public List attributesToExtract() {
- return attributesToExtract;
- }
-
- public MappedFieldType.FieldExtractPreference fieldExtractPreference(Attribute attr) {
- if (boundsAttributes.contains(attr)) {
- return MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS;
- }
- if (docValuesAttributes.contains(attr)) {
- return MappedFieldType.FieldExtractPreference.DOC_VALUES;
- }
- return defaultPreference;
- }
-
@Override
public PhysicalPlan estimateRowSize(State state) {
state.add(false, Integer.BYTES * 2);
state.add(false, 22); // tsid
state.add(false, 8); // timestamp
- state.add(false, attributesToExtract);
int size = state.consumeAllFields(false);
if (Objects.equals(this.estimatedRowSize, size)) {
return this;
} else {
- return new TimeSeriesSourceExec(
- source(),
- attrs,
- query,
- limit,
- defaultPreference,
- docValuesAttributes,
- boundsAttributes,
- attributesToExtract,
- size
- );
+ return new TimeSeriesSourceExec(source(), attrs, query, limit, size);
}
}
@@ -167,10 +92,7 @@ public int hashCode() {
return Objects.hash(
attrs,
query,
- defaultPreference,
- docValuesAttributes,
- boundsAttributes,
- attributesToExtract,
+
limit,
estimatedRowSize
);
@@ -188,10 +110,6 @@ public boolean equals(Object obj) {
TimeSeriesSourceExec other = (TimeSeriesSourceExec) obj;
return Objects.equals(attrs, other.attrs)
- && Objects.equals(defaultPreference, other.defaultPreference)
- && Objects.equals(docValuesAttributes, other.docValuesAttributes)
- && Objects.equals(boundsAttributes, other.boundsAttributes)
- && Objects.equals(attributesToExtract, other.attributesToExtract)
&& Objects.equals(query, other.query)
&& Objects.equals(limit, other.limit)
&& Objects.equals(estimatedRowSize, other.estimatedRowSize);
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
index 1c699c60c155d..1c2bbe36db9ea 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
@@ -24,6 +24,7 @@
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
+import org.elasticsearch.compute.lucene.TimeSeriesExtractFieldOperator;
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperatorFactory;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.Operator;
@@ -65,6 +66,7 @@
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
+import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesFieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
@@ -140,7 +142,12 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
layout.append(attr);
}
var fields = extractFields(fieldExtractExec.attributesToExtract(), fieldExtractExec::fieldExtractPreference);
- return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build());
+ if (fieldExtractExec instanceof TimeSeriesFieldExtractExec) {
+ // TODO: consolidate with ValuesSourceReaderOperator
+ return source.with(new TimeSeriesExtractFieldOperator.Factory(fields, shardContexts), layout.build());
+ } else {
+ return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build());
+ }
}
private static String getFieldName(Attribute attr) {
@@ -249,16 +256,12 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
@Override
public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
-
final int limit = ts.limit() != null ? (Integer) ts.limit().fold(context.foldCtx()) : NO_LIMIT;
- final boolean emitDocIds = ts.attrs().stream().anyMatch(EsQueryExec::isSourceAttribute);
LuceneOperator.Factory luceneFactory = TimeSeriesSourceOperatorFactory.create(
limit,
context.pageSize(ts.estimatedRowSize()),
context.queryPragmas().taskConcurrency(),
- emitDocIds,
shardContexts,
- extractFields(ts.attributesToExtract(), ts::fieldExtractPreference),
querySupplier(ts.query())
);
Layout.Builder layout = new Layout.Builder();
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java
index 02590ff680b08..401539601e260 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java
@@ -2033,9 +2033,9 @@ public void testMatchFunctionWithPushableDisjunction() {
assertThat(esQuery.query().toString(), equalTo(expected.toString()));
}
- public void testPushDownFieldExtractToTimeSeriesSource() {
+ public void testParallelizeTimeSeriesPlan() {
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
- var query = "TS k8s | STATS max(rate(network.total_bytes_in))";
+ var query = "TS k8s | STATS max(rate(network.total_bytes_in)) BY bucket(@timestamp, 1h)";
var optimizer = new TestPlannerOptimizer(config, timeSeriesAnalyzer);
PhysicalPlan plan = optimizer.plan(query);
var limit = as(plan, LimitExec.class);
@@ -2044,13 +2044,11 @@ public void testPushDownFieldExtractToTimeSeriesSource() {
var timeSeriesFinalAgg = as(partialAgg.child(), TimeSeriesAggregateExec.class);
var exchange = as(timeSeriesFinalAgg.child(), ExchangeExec.class);
var timeSeriesPartialAgg = as(exchange.child(), TimeSeriesAggregateExec.class);
- var parallel = as(timeSeriesPartialAgg.child(), ParallelExec.class);
- var timeSeriesSource = as(parallel.child(), TimeSeriesSourceExec.class);
- assertThat(timeSeriesSource.attributesToExtract(), hasSize(1));
- FieldAttribute field = as(timeSeriesSource.attributesToExtract().getFirst(), FieldAttribute.class);
- assertThat(field.name(), equalTo("network.total_bytes_in"));
- assertThat(timeSeriesSource.attrs(), hasSize(2));
- assertTrue(timeSeriesSource.attrs().stream().noneMatch(EsQueryExec::isSourceAttribute));
+ var parallel1 = as(timeSeriesPartialAgg.child(), ParallelExec.class);
+ var eval = as(parallel1.child(), EvalExec.class);
+ var fieldExtract = as(eval.child(), FieldExtractExec.class);
+ var parallel2 = as(fieldExtract.child(), ParallelExec.class);
+ as(parallel2.child(), TimeSeriesSourceExec.class);
}
/**