diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
similarity index 51%
rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java
rename to x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
index c909f874f359b..420b3cc8a3158 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
@@ -13,8 +13,6 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BytesRef;
@@ -27,7 +25,6 @@
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@@ -41,352 +38,277 @@
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
-import java.util.function.Function;
-
-/**
- * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index.
- *
- * This source operator loads the _tsid and @timestamp fields, which is used for emitting documents in the correct order. These field values
- * are included in the page as seperate blocks and downstream operators can make use of these loaded time series ids and timestamps.
- *
- * The source operator includes all documents of a time serie with the same page. So the same time series never exists in multiple pages.
- * Downstream operators can make use of this implementation detail.
- *
- * This operator currently only supports shard level concurrency. A new concurrency mechanism should be introduced at the time serie level
- * in order to read tsdb indices in parallel.
- */
-public class TimeSeriesSortedSourceOperatorFactory extends LuceneOperator.Factory {
- private final int maxPageSize;
- private final boolean emitDocIds;
- private final List fieldsToExact;
+public final class TimeSeriesSourceOperator extends SourceOperator {
- private TimeSeriesSortedSourceOperatorFactory(
- List extends ShardContext> contexts,
+ private final boolean emitDocIds;
+ private final int maxPageSize;
+ private final BlockFactory blockFactory;
+ private final LuceneSliceQueue sliceQueue;
+ private int currentPagePos = 0;
+ private int remainingDocs;
+ private boolean doneCollecting;
+
+ private LongVector.Builder timestampsBuilder;
+ private TsidBuilder tsHashesBuilder;
+ private SegmentsIterator iterator;
+ private final List fieldsToExtracts;
+ private ShardLevelFieldsReader fieldsReader;
+ private DocIdCollector docCollector;
+
+ TimeSeriesSourceOperator(
+ BlockFactory blockFactory,
boolean emitDocIds,
- List fieldsToExact,
- Function queryFunction,
- int taskConcurrency,
+ List fieldsToExtract,
+ LuceneSliceQueue sliceQueue,
int maxPageSize,
int limit
) {
- super(
- contexts,
- queryFunction,
- DataPartitioning.SHARD,
- query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
- taskConcurrency,
- limit,
- false,
- ScoreMode.COMPLETE_NO_SCORES
- );
this.maxPageSize = maxPageSize;
+ this.blockFactory = blockFactory;
+ this.fieldsToExtracts = fieldsToExtract;
this.emitDocIds = emitDocIds;
- this.fieldsToExact = fieldsToExact;
+ this.remainingDocs = limit;
+ this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
+ this.tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(limit, maxPageSize));
+ this.sliceQueue = sliceQueue;
}
@Override
- public SourceOperator get(DriverContext driverContext) {
- return new Impl(driverContext.blockFactory(), emitDocIds, fieldsToExact, sliceQueue, maxPageSize, limit);
+ public void finish() {
+ this.doneCollecting = true;
}
@Override
- public String describe() {
- return "TimeSeriesSortedSourceOperator[maxPageSize = " + maxPageSize + ", limit = " + limit + "]";
- }
-
- public static TimeSeriesSortedSourceOperatorFactory create(
- int limit,
- int maxPageSize,
- int taskConcurrency,
- boolean emitDocIds,
- List extends ShardContext> contexts,
- List fieldsToExact,
- Function queryFunction
- ) {
- return new TimeSeriesSortedSourceOperatorFactory(
- contexts,
- emitDocIds,
- fieldsToExact,
- queryFunction,
- taskConcurrency,
- maxPageSize,
- limit
- );
+ public boolean isFinished() {
+ return doneCollecting;
}
- static final class Impl extends SourceOperator {
-
- private final boolean emitDocIds;
- private final int maxPageSize;
- private final BlockFactory blockFactory;
- private final LuceneSliceQueue sliceQueue;
- private int currentPagePos = 0;
- private int remainingDocs;
- private boolean doneCollecting;
-
- private LongVector.Builder timestampsBuilder;
- private TsidBuilder tsHashesBuilder;
- private SegmentsIterator iterator;
- private final List fieldsToExtracts;
- private ShardLevelFieldsReader fieldsReader;
- private DocIdCollector docCollector;
-
- Impl(
- BlockFactory blockFactory,
- boolean emitDocIds,
- List fieldsToExtract,
- LuceneSliceQueue sliceQueue,
- int maxPageSize,
- int limit
- ) {
- this.maxPageSize = maxPageSize;
- this.blockFactory = blockFactory;
- this.fieldsToExtracts = fieldsToExtract;
- this.emitDocIds = emitDocIds;
- this.remainingDocs = limit;
- this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
- this.tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(limit, maxPageSize));
- this.sliceQueue = sliceQueue;
- }
-
- @Override
- public void finish() {
- this.doneCollecting = true;
+ @Override
+ public Page getOutput() {
+ if (isFinished()) {
+ return null;
}
- @Override
- public boolean isFinished() {
- return doneCollecting;
+ if (remainingDocs <= 0) {
+ doneCollecting = true;
+ return null;
}
- @Override
- public Page getOutput() {
- if (isFinished()) {
- return null;
+ Page page = null;
+ Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
+ try {
+ if (iterator == null) {
+ var slice = sliceQueue.nextSlice();
+ if (slice == null) {
+ doneCollecting = true;
+ return null;
+ }
+ Releasables.close(fieldsReader);
+ fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);
+ iterator = new SegmentsIterator(slice);
+ if (emitDocIds) {
+ docCollector = new DocIdCollector(blockFactory, slice.shardContext());
+ }
}
-
- if (remainingDocs <= 0) {
- doneCollecting = true;
- return null;
+ if (docCollector != null) {
+ docCollector.prepareForCollecting(Math.min(remainingDocs, maxPageSize));
}
-
- Page page = null;
- Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
- try {
- if (iterator == null) {
- var slice = sliceQueue.nextSlice();
- if (slice == null) {
- doneCollecting = true;
- return null;
- }
- Releasables.close(fieldsReader);
- fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);
- iterator = new SegmentsIterator(slice);
- if (emitDocIds) {
- docCollector = new DocIdCollector(blockFactory, slice.shardContext());
- }
- }
+ fieldsReader.prepareForReading(Math.min(remainingDocs, maxPageSize));
+ iterator.readDocsForNextPage();
+ if (currentPagePos > 0) {
+ int blockIndex = 0;
if (docCollector != null) {
- docCollector.prepareForCollecting(Math.min(remainingDocs, maxPageSize));
+ blocks[blockIndex++] = docCollector.build().asBlock();
}
- fieldsReader.prepareForReading(Math.min(remainingDocs, maxPageSize));
- iterator.readDocsForNextPage();
- if (currentPagePos > 0) {
- int blockIndex = 0;
- if (docCollector != null) {
- blocks[blockIndex++] = docCollector.build().asBlock();
- }
- blocks[blockIndex++] = tsHashesBuilder.build().asBlock();
- tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize));
- blocks[blockIndex++] = timestampsBuilder.build().asBlock();
- timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
- System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExtracts.size());
- page = new Page(currentPagePos, blocks);
- currentPagePos = 0;
- }
- if (iterator.completed()) {
- Releasables.close(docCollector, fieldsReader);
- iterator = null;
+ blocks[blockIndex++] = tsHashesBuilder.build().asBlock();
+ tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize));
+ blocks[blockIndex++] = timestampsBuilder.build().asBlock();
+ timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
+ System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExtracts.size());
+ page = new Page(currentPagePos, blocks);
+ currentPagePos = 0;
+ }
+ if (iterator.completed()) {
+ Releasables.close(docCollector, fieldsReader);
+ iterator = null;
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } finally {
+ if (page == null) {
+ Releasables.closeExpectNoException(blocks);
+ }
+ }
+ return page;
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector, fieldsReader);
+ }
+
+ class SegmentsIterator {
+ private final PriorityQueue mainQueue;
+ private final PriorityQueue oneTsidQueue;
+ final LuceneSlice luceneSlice;
+
+ SegmentsIterator(LuceneSlice luceneSlice) throws IOException {
+ this.luceneSlice = luceneSlice;
+ this.mainQueue = new PriorityQueue<>(luceneSlice.numLeaves()) {
+ @Override
+ protected boolean lessThan(LeafIterator a, LeafIterator b) {
+ return a.timeSeriesHash.compareTo(b.timeSeriesHash) < 0;
}
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- } finally {
- if (page == null) {
- Releasables.closeExpectNoException(blocks);
+ };
+ Weight weight = luceneSlice.weight();
+ int maxSegmentOrd = 0;
+ for (var leafReaderContext : luceneSlice.leaves()) {
+ LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext());
+ leafIterator.nextDoc();
+ if (leafIterator.docID != DocIdSetIterator.NO_MORE_DOCS) {
+ mainQueue.add(leafIterator);
+ maxSegmentOrd = Math.max(maxSegmentOrd, leafIterator.segmentOrd);
}
}
- return page;
+ this.oneTsidQueue = new PriorityQueue<>(mainQueue.size()) {
+ @Override
+ protected boolean lessThan(LeafIterator a, LeafIterator b) {
+ return a.timestamp > b.timestamp;
+ }
+ };
}
- @Override
- public void close() {
- Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector, fieldsReader);
+ // TODO: add optimize for one leaf?
+ void readDocsForNextPage() throws IOException {
+ Thread executingThread = Thread.currentThread();
+ for (LeafIterator leaf : mainQueue) {
+ leaf.reinitializeIfNeeded(executingThread);
+ }
+ for (LeafIterator leaf : oneTsidQueue) {
+ leaf.reinitializeIfNeeded(executingThread);
+ }
+ do {
+ PriorityQueue sub = subQueueForNextTsid();
+ if (sub.size() == 0) {
+ break;
+ }
+ tsHashesBuilder.appendNewTsid(sub.top().timeSeriesHash);
+ if (readValuesForOneTsid(sub)) {
+ break;
+ }
+ } while (mainQueue.size() > 0);
}
- class SegmentsIterator {
- private final PriorityQueue mainQueue;
- private final PriorityQueue oneTsidQueue;
- final LuceneSlice luceneSlice;
-
- SegmentsIterator(LuceneSlice luceneSlice) throws IOException {
- this.luceneSlice = luceneSlice;
- this.mainQueue = new PriorityQueue<>(luceneSlice.numLeaves()) {
- @Override
- protected boolean lessThan(LeafIterator a, LeafIterator b) {
- return a.timeSeriesHash.compareTo(b.timeSeriesHash) < 0;
- }
- };
- Weight weight = luceneSlice.weight();
- int maxSegmentOrd = 0;
- for (var leafReaderContext : luceneSlice.leaves()) {
- LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext());
- leafIterator.nextDoc();
- if (leafIterator.docID != DocIdSetIterator.NO_MORE_DOCS) {
- mainQueue.add(leafIterator);
- maxSegmentOrd = Math.max(maxSegmentOrd, leafIterator.segmentOrd);
- }
+ private boolean readValuesForOneTsid(PriorityQueue sub) throws IOException {
+ do {
+ LeafIterator top = sub.top();
+ currentPagePos++;
+ remainingDocs--;
+ if (docCollector != null) {
+ docCollector.collect(top.segmentOrd, top.docID);
}
- this.oneTsidQueue = new PriorityQueue<>(mainQueue.size()) {
- @Override
- protected boolean lessThan(LeafIterator a, LeafIterator b) {
- return a.timestamp > b.timestamp;
- }
- };
- }
-
- // TODO: add optimize for one leaf?
- void readDocsForNextPage() throws IOException {
- Thread executingThread = Thread.currentThread();
- for (LeafIterator leaf : mainQueue) {
- leaf.reinitializeIfNeeded(executingThread);
+ tsHashesBuilder.appendOrdinal();
+ timestampsBuilder.appendLong(top.timestamp);
+ fieldsReader.readValues(top.segmentOrd, top.docID);
+ if (top.nextDoc()) {
+ sub.updateTop();
+ } else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) {
+ sub.pop();
+ } else {
+ mainQueue.add(sub.pop());
}
- for (LeafIterator leaf : oneTsidQueue) {
- leaf.reinitializeIfNeeded(executingThread);
+ if (remainingDocs <= 0 || currentPagePos >= maxPageSize) {
+ return true;
}
- do {
- PriorityQueue sub = subQueueForNextTsid();
- if (sub.size() == 0) {
- break;
- }
- tsHashesBuilder.appendNewTsid(sub.top().timeSeriesHash);
- if (readValuesForOneTsid(sub)) {
- break;
- }
- } while (mainQueue.size() > 0);
- }
+ } while (sub.size() > 0);
+ return false;
+ }
- private boolean readValuesForOneTsid(PriorityQueue sub) throws IOException {
- do {
- LeafIterator top = sub.top();
- currentPagePos++;
- remainingDocs--;
- if (docCollector != null) {
- docCollector.collect(top.segmentOrd, top.docID);
- }
- tsHashesBuilder.appendOrdinal();
- timestampsBuilder.appendLong(top.timestamp);
- fieldsReader.readValues(top.segmentOrd, top.docID);
- if (top.nextDoc()) {
- sub.updateTop();
- } else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) {
- sub.pop();
+ private PriorityQueue subQueueForNextTsid() {
+ if (oneTsidQueue.size() == 0 && mainQueue.size() > 0) {
+ LeafIterator last = mainQueue.pop();
+ oneTsidQueue.add(last);
+ while (mainQueue.size() > 0) {
+ var top = mainQueue.top();
+ if (top.timeSeriesHash.equals(last.timeSeriesHash)) {
+ oneTsidQueue.add(mainQueue.pop());
} else {
- mainQueue.add(sub.pop());
- }
- if (remainingDocs <= 0 || currentPagePos >= maxPageSize) {
- return true;
- }
- } while (sub.size() > 0);
- return false;
- }
-
- private PriorityQueue subQueueForNextTsid() {
- if (oneTsidQueue.size() == 0 && mainQueue.size() > 0) {
- LeafIterator last = mainQueue.pop();
- oneTsidQueue.add(last);
- while (mainQueue.size() > 0) {
- var top = mainQueue.top();
- if (top.timeSeriesHash.equals(last.timeSeriesHash)) {
- oneTsidQueue.add(mainQueue.pop());
- } else {
- break;
- }
+ break;
}
}
- return oneTsidQueue;
}
+ return oneTsidQueue;
+ }
- boolean completed() {
- return mainQueue.size() == 0 && oneTsidQueue.size() == 0;
+ boolean completed() {
+ return mainQueue.size() == 0 && oneTsidQueue.size() == 0;
+ }
+ }
+
+ static class LeafIterator {
+ private final int segmentOrd;
+ private final Weight weight;
+ private final LeafReaderContext leafContext;
+ private SortedDocValues tsids;
+ private NumericDocValues timestamps;
+ private DocIdSetIterator disi;
+ private Thread createdThread;
+
+ private long timestamp;
+ private int lastTsidOrd = -1;
+ private BytesRef timeSeriesHash;
+ private int docID = -1;
+
+ LeafIterator(Weight weight, LeafReaderContext leafContext) throws IOException {
+ this.segmentOrd = leafContext.ord;
+ this.weight = weight;
+ this.leafContext = leafContext;
+ this.createdThread = Thread.currentThread();
+ tsids = leafContext.reader().getSortedDocValues("_tsid");
+ timestamps = DocValues.unwrapSingleton(leafContext.reader().getSortedNumericDocValues("@timestamp"));
+ final Scorer scorer = weight.scorer(leafContext);
+ disi = scorer != null ? scorer.iterator() : DocIdSetIterator.empty();
+ }
+
+ boolean nextDoc() throws IOException {
+ docID = disi.nextDoc();
+ if (docID == DocIdSetIterator.NO_MORE_DOCS) {
+ return false;
+ }
+ boolean advanced = timestamps.advanceExact(docID);
+ assert advanced;
+ timestamp = timestamps.longValue();
+ advanced = tsids.advanceExact(docID);
+ assert advanced;
+
+ int ord = tsids.ordValue();
+ if (ord != lastTsidOrd) {
+ timeSeriesHash = tsids.lookupOrd(ord);
+ lastTsidOrd = ord;
+ return false;
+ } else {
+ return true;
}
}
- static class LeafIterator {
- private final int segmentOrd;
- private final Weight weight;
- private final LeafReaderContext leafContext;
- private SortedDocValues tsids;
- private NumericDocValues timestamps;
- private DocIdSetIterator disi;
- private Thread createdThread;
-
- private long timestamp;
- private int lastTsidOrd = -1;
- private BytesRef timeSeriesHash;
- private int docID = -1;
-
- LeafIterator(Weight weight, LeafReaderContext leafContext) throws IOException {
- this.segmentOrd = leafContext.ord;
- this.weight = weight;
- this.leafContext = leafContext;
- this.createdThread = Thread.currentThread();
+ void reinitializeIfNeeded(Thread executingThread) throws IOException {
+ if (executingThread != createdThread) {
tsids = leafContext.reader().getSortedDocValues("_tsid");
timestamps = DocValues.unwrapSingleton(leafContext.reader().getSortedNumericDocValues("@timestamp"));
final Scorer scorer = weight.scorer(leafContext);
disi = scorer != null ? scorer.iterator() : DocIdSetIterator.empty();
- }
-
- boolean nextDoc() throws IOException {
- docID = disi.nextDoc();
- if (docID == DocIdSetIterator.NO_MORE_DOCS) {
- return false;
- }
- boolean advanced = timestamps.advanceExact(docID);
- assert advanced;
- timestamp = timestamps.longValue();
- advanced = tsids.advanceExact(docID);
- assert advanced;
-
- int ord = tsids.ordValue();
- if (ord != lastTsidOrd) {
- timeSeriesHash = tsids.lookupOrd(ord);
- lastTsidOrd = ord;
- return false;
- } else {
- return true;
- }
- }
-
- void reinitializeIfNeeded(Thread executingThread) throws IOException {
- if (executingThread != createdThread) {
- tsids = leafContext.reader().getSortedDocValues("_tsid");
- timestamps = DocValues.unwrapSingleton(leafContext.reader().getSortedNumericDocValues("@timestamp"));
- final Scorer scorer = weight.scorer(leafContext);
- disi = scorer != null ? scorer.iterator() : DocIdSetIterator.empty();
- if (docID != -1) {
- disi.advance(docID);
- }
- createdThread = executingThread;
+ if (docID != -1) {
+ disi.advance(docID);
}
+ createdThread = executingThread;
}
}
+ }
- @Override
- public String toString() {
- return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
- }
-
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
}
static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java
new file mode 100644
index 0000000000000..a3592a54c56c4
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.SourceOperator;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index.
+ *
+ * This source operator loads the _tsid and @timestamp fields, which is used for emitting documents in the correct order. These field values
+ * are included in the page as seperate blocks and downstream operators can make use of these loaded time series ids and timestamps.
+ *
+ * The source operator includes all documents of a time serie with the same page. So the same time series never exists in multiple pages.
+ * Downstream operators can make use of this implementation detail.
+ *
+ * This operator currently only supports shard level concurrency. A new concurrency mechanism should be introduced at the time serie level
+ * in order to read tsdb indices in parallel.
+ */
+public class TimeSeriesSourceOperatorFactory extends LuceneOperator.Factory {
+
+ private final int maxPageSize;
+ private final boolean emitDocIds;
+ private final List fieldsToExact;
+
+ private TimeSeriesSourceOperatorFactory(
+ List extends ShardContext> contexts,
+ boolean emitDocIds,
+ List fieldsToExact,
+ Function queryFunction,
+ int taskConcurrency,
+ int maxPageSize,
+ int limit
+ ) {
+ super(
+ contexts,
+ queryFunction,
+ DataPartitioning.SHARD,
+ query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
+ taskConcurrency,
+ limit,
+ false,
+ ScoreMode.COMPLETE_NO_SCORES
+ );
+ this.maxPageSize = maxPageSize;
+ this.emitDocIds = emitDocIds;
+ this.fieldsToExact = fieldsToExact;
+ }
+
+ @Override
+ public SourceOperator get(DriverContext driverContext) {
+ return new TimeSeriesSourceOperator(driverContext.blockFactory(), emitDocIds, fieldsToExact, sliceQueue, maxPageSize, limit);
+ }
+
+ @Override
+ public String describe() {
+ return "TimeSeriesSourceOperator[maxPageSize = " + maxPageSize + ", limit = " + limit + "]";
+ }
+
+ public static TimeSeriesSourceOperatorFactory create(
+ int limit,
+ int maxPageSize,
+ int taskConcurrency,
+ boolean emitDocIds,
+ List extends ShardContext> contexts,
+ List fieldsToExact,
+ Function queryFunction
+ ) {
+ return new TimeSeriesSourceOperatorFactory(contexts, emitDocIds, fieldsToExact, queryFunction, taskConcurrency, maxPageSize, limit);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
similarity index 97%
rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java
rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
index c4128bc88254d..9389c885a8954 100644
--- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java
+++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
@@ -67,7 +67,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
-public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
+public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase {
private IndexReader reader;
private final Directory directory = newDirectory();
@@ -239,7 +239,7 @@ public void testMatchNone() throws Exception {
try (var reader = writer.getReader()) {
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
Query query = randomFrom(LongField.newRangeQuery("@timestamp", 0, t0), new MatchNoDocsQuery());
- var timeSeriesFactory = TimeSeriesSortedSourceOperatorFactory.create(
+ var timeSeriesFactory = TimeSeriesSourceOperatorFactory.create(
Integer.MAX_VALUE,
randomIntBetween(1, 1024),
1,
@@ -274,12 +274,12 @@ protected Operator.OperatorFactory simple() {
@Override
protected Matcher expectedDescriptionOfSimple() {
- return equalTo("TimeSeriesSortedSourceOperator[maxPageSize = 1, limit = 1]");
+ return equalTo("TimeSeriesSourceOperator[maxPageSize = 1, limit = 1]");
}
@Override
protected Matcher expectedToStringOfSimple() {
- return equalTo("Impl[maxPageSize=1, remainingDocs=1]");
+ return equalTo("TimeSeriesSourceOperator[maxPageSize=1, remainingDocs=1]");
}
List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
@@ -324,7 +324,7 @@ public record ExtractField(MappedFieldType ft, ElementType elementType) {
}
- public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
+ public static TimeSeriesSourceOperatorFactory createTimeSeriesSourceOperator(
Directory directory,
Consumer readerConsumer,
boolean emitDocIds,
@@ -370,7 +370,7 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat
)
.toList();
- return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
+ return TimeSeriesSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
}
public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
index 70d9cef347c89..0f348e268ef69 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
@@ -17,14 +17,13 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.GroupingAggregator;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
-import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.lucene.LuceneCountOperator;
import org.elasticsearch.compute.lucene.LuceneOperator;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
-import org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorFactory;
+import org.elasticsearch.compute.lucene.TimeSeriesSourceOperatorFactory;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
@@ -249,7 +248,7 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca
final int limit = ts.limit() != null ? (Integer) ts.limit().fold(context.foldCtx()) : NO_LIMIT;
final boolean emitDocIds = ts.attrs().stream().anyMatch(EsQueryExec::isSourceAttribute);
- LuceneOperator.Factory luceneFactory = TimeSeriesSortedSourceOperatorFactory.create(
+ LuceneOperator.Factory luceneFactory = TimeSeriesSourceOperatorFactory.create(
limit,
context.pageSize(ts.estimatedRowSize()),
context.queryPragmas().taskConcurrency(),