diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java index ea7dc53203bb7..8d1656899617f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java @@ -53,4 +53,10 @@ public interface ShardContext { * Returns something to load values from this field into a {@link Block}. */ BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference); + + /** + * Returns the {@link MappedFieldType} for the given field name. + * By default, this delegate to {@link org.elasticsearch.index.query.SearchExecutionContext#getFieldType(String)} + */ + MappedFieldType fieldType(String name); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java index 4e00822fa5258..acae71f5668f0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java @@ -133,11 +133,12 @@ public Page getCheckedOutput() throws IOException { if (docCollector != null) { blocks[blockIndex++] = docCollector.build().asBlock(); } - blocks[blockIndex++] = tsHashesBuilder.build().asBlock(); + OrdinalBytesRefVector tsidVector = tsHashesBuilder.build(); + blocks[blockIndex++] = tsidVector.asBlock(); tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize)); blocks[blockIndex++] = timestampsBuilder.build().asBlock(); timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize)); - System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExtracts.size()); + System.arraycopy(fieldsReader.buildBlocks(tsidVector.getOrdinalsVector()), 0, blocks, blockIndex, fieldsToExtracts.size()); page = new Page(currentPagePos, blocks); currentPagePos = 0; } @@ -217,6 +218,7 @@ void readDocsForNextPage() throws IOException { } private boolean readValuesForOneTsid(PriorityQueue sub) throws IOException { + boolean first = true; do { LeafIterator top = sub.top(); currentPagePos++; @@ -226,7 +228,8 @@ private boolean readValuesForOneTsid(PriorityQueue sub) throws IOE } tsHashesBuilder.appendOrdinal(); timestampsBuilder.appendLong(top.timestamp); - fieldsReader.readValues(top.segmentOrd, top.docID); + fieldsReader.readValues(top.segmentOrd, top.docID, first == false); + first = false; if (top.nextDoc()) { sub.updateTop(); } else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) { @@ -350,6 +353,7 @@ static final class ShardLevelFieldsReader implements Releasable { private final BlockLoaderFactory blockFactory; private final SegmentLevelFieldsReader[] segments; private final BlockLoader[] loaders; + private final boolean[] dimensions; private final Block.Builder[] builders; private final StoredFieldsSpec storedFieldsSpec; private final SourceLoader sourceLoader; @@ -377,10 +381,18 @@ static final class ShardLevelFieldsReader implements Releasable { sourceLoader = null; } this.storedFieldsSpec = storedFieldsSpec; + this.dimensions = new boolean[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + dimensions[i] = shardContext.fieldType(fields.get(i).name()).isDimension(); + } } - void readValues(int segment, int docID) throws IOException { - segments[segment].read(docID, builders); + /** + * For dimension fields, skips reading them when {@code nonDimensionFieldsOnly} is true, + * since they only need to be read once per tsid. + */ + void readValues(int segment, int docID, boolean nonDimensionFieldsOnly) throws IOException { + segments[segment].read(docID, builders, nonDimensionFieldsOnly, dimensions); } void prepareForReading(int estimatedSize) throws IOException { @@ -396,12 +408,46 @@ void prepareForReading(int estimatedSize) throws IOException { } } - Block[] buildBlocks() { - Block[] blocks = Block.Builder.buildAll(builders); - Arrays.fill(builders, null); + Block[] buildBlocks(IntVector tsidOrdinals) { + final Block[] blocks = new Block[loaders.length]; + try { + for (int i = 0; i < builders.length; i++) { + if (dimensions[i]) { + blocks[i] = buildBlockForDimensionField(builders[i], tsidOrdinals); + } else { + blocks[i] = builders[i].build(); + } + } + Arrays.fill(builders, null); + } finally { + if (blocks.length > 0 && blocks[blocks.length - 1] == null) { + Releasables.close(blocks); + } + } return blocks; } + private Block buildBlockForDimensionField(Block.Builder builder, IntVector tsidOrdinals) { + try (var values = builder.build()) { + if (values.asVector() instanceof BytesRefVector bytes) { + tsidOrdinals.incRef(); + values.incRef(); + return new OrdinalBytesRefVector(tsidOrdinals, bytes).asBlock(); + } else if (values.areAllValuesNull()) { + return blockFactory.factory.newConstantNullBlock(tsidOrdinals.getPositionCount()); + } else { + final int positionCount = tsidOrdinals.getPositionCount(); + try (var newBuilder = values.elementType().newBlockBuilder(positionCount, blockFactory.factory)) { + for (int p = 0; p < positionCount; p++) { + int pos = tsidOrdinals.getInt(p); + newBuilder.copyFrom(values, pos, pos + 1); + } + return newBuilder.build(); + } + } + } + } + @Override public void close() { Releasables.close(builders); @@ -435,10 +481,18 @@ private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec st } } - void read(int docId, Block.Builder[] builder) throws IOException { + void read(int docId, Block.Builder[] builder, boolean nonDimensionFieldsOnly, boolean[] dimensions) throws IOException { storedFields.advanceTo(docId); - for (int i = 0; i < rowStride.length; i++) { - rowStride[i].read(docId, storedFields, builder[i]); + if (nonDimensionFieldsOnly) { + for (int i = 0; i < rowStride.length; i++) { + if (dimensions[i] == false) { + rowStride[i].read(docId, storedFields, builder[i]); + } + } + } else { + for (int i = 0; i < rowStride.length; i++) { + rowStride[i].read(docId, storedFields, builder[i]); + } } } } @@ -480,9 +534,9 @@ public void close() { Releasables.close(dictBuilder, ordinalsBuilder); } - BytesRefVector build() throws IOException { + OrdinalBytesRefVector build() throws IOException { BytesRefVector dict = null; - BytesRefVector result = null; + OrdinalBytesRefVector result = null; IntVector ordinals = null; try { dict = dictBuilder.build(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index a7de1ddf74ec2..5e0fb3fa87b29 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -324,5 +324,10 @@ public Optional buildSort(List> sorts) { public String shardIdentifier() { return "test"; } + + @Override + public MappedFieldType fieldType(String name) { + throw new UnsupportedOperationException(); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java index 4cb6dac14c151..d6b9c30614c9f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java @@ -438,7 +438,17 @@ public static TimeSeriesSourceOperatorFactory createTimeSeriesSourceOperator( } catch (IOException e) { throw new UncheckedIOException(e); } - var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0); + var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0) { + @Override + public MappedFieldType fieldType(String name) { + for (ExtractField e : extractFields) { + if (e.ft.name().equals(name)) { + return e.ft; + } + } + throw new IllegalArgumentException("Unknown field [" + name + "]"); + } + }; Function queryFunction = c -> new MatchAllDocsQuery(); var fieldInfos = extractFields.stream() diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index ef0b9aa931267..638d1197006fd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -177,7 +177,7 @@ private static class DefaultShardContextForUnmappedField extends DefaultShardCon } @Override - protected @Nullable MappedFieldType fieldType(String name) { + public @Nullable MappedFieldType fieldType(String name) { var superResult = super.fieldType(name); return superResult == null && name.equals(unmappedEsField.getName()) ? new KeywordFieldMapper.KeywordFieldType(name, false /* isIndexed */, false /* hasDocValues */, Map.of() /* meta */) @@ -459,7 +459,8 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { return loader; } - protected @Nullable MappedFieldType fieldType(String name) { + @Override + public @Nullable MappedFieldType fieldType(String name) { return ctx.getFieldType(name); }