diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java index 18513c32003ab..5c369e8226f09 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java @@ -54,7 +54,7 @@ * * Of course, decoding follows the opposite order with respect to encoding. */ -public class TSDBDocValuesEncoder { +public final class TSDBDocValuesEncoder { private final DocValuesForUtil forUtil; private final int numericBlockSize; diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareNumericDocValues.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareNumericDocValues.java new file mode 100644 index 0000000000000..af5841a800be3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareNumericDocValues.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.index.NumericDocValues; + +public abstract class BlockAwareNumericDocValues extends NumericDocValues { + + public abstract SingletonDocValuesBlockLoader getSingletonBlockLoader(); + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareSortedDocValues.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareSortedDocValues.java new file mode 100644 index 0000000000000..aa7a86f4f3214 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareSortedDocValues.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.index.SortedDocValues; + +public abstract class BlockAwareSortedDocValues extends SortedDocValues { + + public abstract SingletonDocValuesBlockLoader getSingletonBlockLoader(); + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index 31d65bde1be0e..19cdfdd219bef 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -43,6 +43,7 @@ import org.apache.lucene.util.packed.PackedInts; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder; +import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -368,10 +369,19 @@ public int advance(int target) throws IOException { public long cost() { return ords.cost(); } + + @Override + public SingletonDocValuesBlockLoader getSingletonBlockLoader() { + if (ords instanceof BlockAwareNumericDocValues b) { + return b.getSingletonBlockLoader(); + } else { + return null; + } + } }; } - abstract class BaseSortedDocValues extends SortedDocValues { + abstract class BaseSortedDocValues extends BlockAwareSortedDocValues { final SortedEntry entry; final TermsEnum termsEnum; @@ -1140,7 +1150,7 @@ public long longValue() { final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1; if (entry.docsWithFieldOffset == -1) { // dense - return new NumericDocValues() { + return new BlockAwareNumericDocValues() { private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc; private int doc = -1; @@ -1155,11 +1165,13 @@ public int docID() { @Override public int nextDoc() throws IOException { + assert loader == null; return advance(doc + 1); } @Override public int advance(int target) throws IOException { + assert loader == null; if (target >= maxDoc) { return doc = NO_MORE_DOCS; } @@ -1168,6 +1180,7 @@ public int advance(int target) throws IOException { @Override public boolean advanceExact(int target) { + assert loader == null; doc = target; return true; } @@ -1179,6 +1192,7 @@ public long cost() { @Override public long longValue() throws IOException { + assert loader == null; final int index = doc; final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; @@ -1197,6 +1211,235 @@ public long longValue() throws IOException { } return currentBlock[blockInIndex]; } + + private SingletonDocValuesBlockLoader loader; + + @Override + public SingletonDocValuesBlockLoader getSingletonBlockLoader() { + if (loader == null) { + loader = new SingletonDocValuesBlockLoader() { + + @Override + public void loadBlock(BlockLoader.SingletonLongBuilder builder, BlockLoader.Docs docs, int offset) + throws IOException { + assert maxOrd == -1; + doc = docs.get(docs.count() - 1); + boolean isDense = doc - docs.get(0) == docs.count() - 1; + if (isDense) { + // Figure out where we start and whether the previous block needs to be read: + int firstDocId = docs.get(offset); + int firstBlockIndex = firstDocId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + int firstBlockInIndex = firstDocId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + + int start; + if (currentBlockIndex != firstBlockIndex) { + // different block, so seek: + valuesData.seek(indexReader.get(firstBlockIndex)); + if (firstBlockInIndex == 0) { + // start is a full block, defer consuming later with complete blocks. + start = offset; + } else { + // partial block, consume it here + currentBlockIndex = firstBlockInIndex; + decoder.decode(valuesData, currentBlock); + int length = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - firstBlockInIndex; + if (docs.count() < length) { + builder.appendLongs(currentBlock, firstBlockInIndex, docs.count()); + return; + } else { + builder.appendLongs(currentBlock, firstBlockInIndex, length); + start = offset + length; + } + } + } else { + // consume remaining + int docsLength = docs.count() - offset; + int blockLength = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - firstBlockInIndex; + if (docsLength < blockLength) { + builder.appendLongs(currentBlock, firstBlockInIndex, docsLength); + return; + } else { + builder.appendLongs(currentBlock, firstBlockInIndex, blockLength); + start = offset + blockLength; + } + } + + // Figure out how many complete blocks we can read: + int completeBlockSize = 0; + int[] completeBlocks = new int[(docs.count() / ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) + 1]; + int docsIndex = start; + while (docsIndex < docs.count()) { + int docId = docs.get(docsIndex); + if (docsIndex + ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE >= docs.count()) { + break; + } + + int nextIndex = docs.get(docsIndex + ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); + if (nextIndex - docId == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) { + completeBlocks[completeBlockSize++] = docId; + docsIndex += ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE; + } else { + break; + } + } + + // Read those complete blocks: + for (int i = 0; i < completeBlockSize; i++) { + int docId = completeBlocks[i]; + currentBlockIndex = docId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + decoder.decode(valuesData, currentBlock); + int blockInIndex = docId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + int length = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - blockInIndex; + assert length == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE : "unexpected length [" + length + "]"; + builder.appendLongs(currentBlock, blockInIndex, length); + } + + // Check for a remainder and if so read it: + if (docsIndex < docs.count()) { + int docId = docs.get(docsIndex); + currentBlockIndex = docId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + decoder.decode(valuesData, currentBlock); + + int blockInIndex = docId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + int lastBlockInIndex = doc & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + int length = lastBlockInIndex - blockInIndex + 1; + + builder.appendLongs(currentBlock, blockInIndex, length); + } + } else { + for (int i = offset; i < docs.count(); i++) { + int index = docs.get(i); + final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + if (blockIndex != currentBlockIndex) { + assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex; + // no need to seek if the loading block is the next block + if (currentBlockIndex + 1 != blockIndex) { + valuesData.seek(indexReader.get(blockIndex)); + } + currentBlockIndex = blockIndex; + decoder.decode(valuesData, currentBlock); + } + builder.appendLong(currentBlock[blockInIndex]); + } + } + } + + @Override + public void loadBlock(BlockLoader.TSSingletonOrdinalsBuilder builder, BlockLoader.Docs docs, int offset) + throws IOException { + assert maxOrd >= 0; + doc = docs.get(docs.count() - 1); + boolean isDense = doc - docs.get(0) == docs.count() - 1; + if (isDense) { + // Figure out where we start and whether the previous block needs to be read: + int firstDocId = docs.get(offset); + int firstBlockIndex = firstDocId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + int firstBlockInIndex = firstDocId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + + int start; + if (currentBlockIndex != firstBlockIndex) { + // different block, so seek: + valuesData.seek(indexReader.get(firstBlockIndex)); + if (firstBlockInIndex == 0) { + // start is a full block, defer consuming later with complete blocks. + start = offset; + } else { + // partial block, consume it here + currentBlockIndex = firstBlockInIndex; + decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd); + int length = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - firstBlockInIndex; + if (docs.count() < length) { + builder.appendOrds(currentBlock, firstBlockInIndex, docs.count()); + return; + } else { + builder.appendOrds(currentBlock, firstBlockInIndex, length); + start = offset + length; + } + } + } else { + // consume remaining + int blockLength = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - firstBlockInIndex; + int docsLength = docs.count() - offset; + if (docsLength < blockLength) { + builder.appendOrds(currentBlock, firstBlockInIndex, docsLength); + return; + } else { + builder.appendOrds(currentBlock, firstBlockInIndex, blockLength); + start = offset + blockLength; + } + } + + // Figure out how many complete blocks we can read: + int completeBlockSize = 0; + int[] completeBlocks = new int[(docs.count() / ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) + 1]; + int docsIndex = start; + while (docsIndex < docs.count()) { + int docId = docs.get(docsIndex); + if (docsIndex + ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE >= docs.count()) { + break; + } + + int nextIndex = docs.get(docsIndex + ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); + if (nextIndex - docId == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) { + completeBlocks[completeBlockSize++] = docId; + docsIndex += ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE; + } else { + break; + } + } + + // Read those complete blocks: + for (int i = 0; i < completeBlockSize; i++) { + int docId = completeBlocks[i]; + currentBlockIndex = docId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd); + int blockInIndex = docId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + int length = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - blockInIndex; + assert length == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE : "unexpected length [" + length + "]"; + builder.appendOrds(currentBlock, blockInIndex, length); + } + + // Check for a remainder and if so read it: + if (docsIndex < docs.count()) { + int docId = docs.get(docsIndex); + currentBlockIndex = docId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd); + + int blockInIndex = docId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + int lastBlockInIndex = doc & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + int length = lastBlockInIndex - blockInIndex + 1; + + builder.appendOrds(currentBlock, blockInIndex, length); + } + } else { + for (int i = offset; i < docs.count(); i++) { + int index = docs.get(i); + final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; + final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; + if (blockIndex != currentBlockIndex) { + assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex; + // no need to seek if the loading block is the next block + if (currentBlockIndex + 1 != blockIndex) { + valuesData.seek(indexReader.get(blockIndex)); + } + currentBlockIndex = blockIndex; + decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd); + } + builder.appendOrd(Math.toIntExact(currentBlock[blockInIndex])); + } + } + } + + @Override + public int docID() { + return doc; + } + }; + } + return loader; + } + }; } else { final IndexedDISI disi = new IndexedDISI( diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/SingletonDocValuesBlockLoader.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/SingletonDocValuesBlockLoader.java new file mode 100644 index 0000000000000..e555b1c9de252 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/SingletonDocValuesBlockLoader.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.elasticsearch.index.mapper.BlockLoader; + +import java.io.IOException; + +public interface SingletonDocValuesBlockLoader { + + void loadBlock(BlockLoader.SingletonLongBuilder builder, BlockLoader.Docs docs, int offset) throws IOException; + + void loadBlock(BlockLoader.TSSingletonOrdinalsBuilder builder, BlockLoader.Docs docs, int offset) throws IOException; + + int docID(); + +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java index 809bad5145fe6..f10e23db461cb 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java @@ -116,7 +116,7 @@ public AllReader reader(LeafReaderContext context) throws IOException { } } - private static class SingletonLongs extends BlockDocValuesReader { + static class SingletonLongs extends BlockDocValuesReader { private final NumericDocValues numericDocValues; SingletonLongs(NumericDocValues numericDocValues) { @@ -632,7 +632,7 @@ public String toString() { } } - private static class SingletonOrdinals extends BlockDocValuesReader { + public static class SingletonOrdinals extends BlockDocValuesReader { private final SortedDocValues ordinals; SingletonOrdinals(SortedDocValues ordinals) { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index a4a498e4048db..4af5ebfedd304 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -400,6 +400,8 @@ interface BlockFactory { */ LongBuilder longs(int expectedCount); + BlockLoader.SingletonLongBuilder singletonLongs(int expectedCount); + /** * Build a builder to load only {@code null}s. */ @@ -421,6 +423,8 @@ interface BlockFactory { */ SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count); + TSSingletonOrdinalsBuilder tsSingletonOrdinalsBuilder(boolean isPrimaryIndexSortField, SortedDocValues ordinals, int count); + /** * Build a reader for reading {@link SortedSetDocValues} */ @@ -505,6 +509,13 @@ interface LongBuilder extends Builder { LongBuilder appendLong(long value); } + interface SingletonLongBuilder extends Builder { + + SingletonLongBuilder appendLong(long value); + + SingletonLongBuilder appendLongs(long[] values, int from, int length); + } + interface SingletonOrdinalsBuilder extends Builder { /** * Appends an ordinal to the builder. @@ -512,6 +523,14 @@ interface SingletonOrdinalsBuilder extends Builder { SingletonOrdinalsBuilder appendOrd(int value); } + interface TSSingletonOrdinalsBuilder extends Builder { + + TSSingletonOrdinalsBuilder appendOrd(long value); + + TSSingletonOrdinalsBuilder appendOrds(long[] values, int from, int length); + + } + interface SortedSetOrdinalsBuilder extends Builder { /** * Appends an ordinal to the builder. diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java index 76d6dbb941409..03c2f43e01dc5 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java @@ -1012,8 +1012,13 @@ public Function pointReaderIfPossible() { @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { + var indexMode = blContext.indexSettings().getMode(); if (hasDocValues()) { - return new BlockDocValuesReader.LongsBlockLoader(name()); + if (name().equals("@timestamp") && (indexMode == IndexMode.TIME_SERIES || indexMode == IndexMode.LOGSDB)) { + return new TimestampBlockLoader(); + } else { + return new BlockDocValuesReader.LongsBlockLoader(name()); + } } // Multi fields don't have fallback synthetic source. diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index e26c969f7d495..5800bfe071396 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -794,7 +794,11 @@ NamedAnalyzer normalizer() { @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) { - return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name()); + if (isDimension) { + return new TSDimensionBlockLoader(name()); + } else { + return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name()); + } } if (isStored()) { return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name()); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TSDimensionBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/TSDimensionBlockLoader.java new file mode 100644 index 0000000000000..12f95f61eddbc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/TSDimensionBlockLoader.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.elasticsearch.index.codec.tsdb.es819.BlockAwareSortedDocValues; +import org.elasticsearch.index.codec.tsdb.es819.SingletonDocValuesBlockLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; +import java.util.Objects; + +public class TSDimensionBlockLoader implements BlockLoader { + + private final String fieldName; + private final BlockLoader fallback; + + public TSDimensionBlockLoader(String fieldName) { + this.fieldName = fieldName; + this.fallback = new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(fieldName); + } + + @Override + public Builder builder(BlockFactory factory, int expectedCount) { + return factory.bytesRefs(expectedCount); + } + + @Override + public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { + var singleton = DocValues.unwrapSingleton(context.reader().getSortedSetDocValues(fieldName)); + if (singleton instanceof BlockAwareSortedDocValues b) { + var loader = b.getSingletonBlockLoader(); + if (loader != null) { + return new TSDimensions(b, loader); + } + } + return fallback.columnAtATimeReader(context); + } + + @Override + public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException { + return fallback.rowStrideReader(context); + } + + @Override + public StoredFieldsSpec rowStrideStoredFieldSpec() { + return StoredFieldsSpec.NO_REQUIREMENTS; + } + + @Override + public boolean supportsOrdinals() { + return true; + } + + @Override + public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException { + return DocValues.getSortedSet(context.reader(), fieldName); + } + + public String toString() { + return "TSIDBlockLoader[" + fieldName + "]"; + } + + public static final class TSDimensions implements ColumnAtATimeReader { + private final Thread creationThread; + private final SortedDocValues sorted; + private final SingletonDocValuesBlockLoader blockLoader; + + TSDimensions(BlockAwareSortedDocValues sorted, SingletonDocValuesBlockLoader blockLoader) { + this.creationThread = Thread.currentThread(); + this.sorted = Objects.requireNonNull(sorted); + this.blockLoader = Objects.requireNonNull(blockLoader); + } + + @Override + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (var builder = factory.tsSingletonOrdinalsBuilder(false, sorted, docs.count() - offset)) { + blockLoader.loadBlock(builder, docs, offset); + return builder.build(); + } + } + + @Override + public boolean canReuse(int startingDocID) { + return creationThread == Thread.currentThread() && blockLoader.docID() <= startingDocID; + } + + @Override + public String toString() { + return "TSDimensionBlockLoader.TSDimensions"; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TSIDBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/TSIDBlockLoader.java new file mode 100644 index 0000000000000..e5972b11db930 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/TSIDBlockLoader.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.elasticsearch.index.codec.tsdb.es819.BlockAwareSortedDocValues; +import org.elasticsearch.index.codec.tsdb.es819.SingletonDocValuesBlockLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; + +public final class TSIDBlockLoader implements BlockLoader { + + private static final String FIELD_NAME = TimeSeriesIdFieldMapper.NAME; + + @Override + public Builder builder(BlockFactory factory, int expectedCount) { + return factory.bytesRefs(expectedCount); + } + + @Override + public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { + var singleton = context.reader().getSortedDocValues(FIELD_NAME); + return new TSIDs((BlockAwareSortedDocValues) singleton); + } + + @Override + public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException { + var singleton = context.reader().getSortedDocValues(FIELD_NAME); + return new BlockDocValuesReader.SingletonOrdinals(singleton); + } + + @Override + public StoredFieldsSpec rowStrideStoredFieldSpec() { + return StoredFieldsSpec.NO_REQUIREMENTS; + } + + @Override + public boolean supportsOrdinals() { + return true; + } + + @Override + public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException { + return DocValues.getSortedSet(context.reader(), FIELD_NAME); + } + + public static final class TSIDs implements ColumnAtATimeReader { + private final Thread creationThread; + private final SortedDocValues sorted; + private final SingletonDocValuesBlockLoader blockLoader; + + TSIDs(BlockAwareSortedDocValues sorted) { + this.creationThread = Thread.currentThread(); + this.sorted = sorted; + this.blockLoader = sorted.getSingletonBlockLoader(); + } + + @Override + public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (TSSingletonOrdinalsBuilder builder = factory.tsSingletonOrdinalsBuilder(true, sorted, docs.count() - offset)) { + blockLoader.loadBlock(builder, docs, offset); + return builder.build(); + } + } + + @Override + public boolean canReuse(int startingDocID) { + return creationThread == Thread.currentThread() && blockLoader.docID() <= startingDocID; + } + + @Override + public String toString() { + return "TSIDBlockLoader.TSIDs"; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java index b94fa64b42428..0f2c0c26aac8c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java @@ -135,7 +135,7 @@ public Query termQuery(Object value, SearchExecutionContext context) { @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { - return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name()); + return new TSIDBlockLoader(); } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TimestampBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/TimestampBlockLoader.java new file mode 100644 index 0000000000000..97a410f2daa28 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/TimestampBlockLoader.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.elasticsearch.index.codec.tsdb.es819.BlockAwareNumericDocValues; +import org.elasticsearch.index.codec.tsdb.es819.SingletonDocValuesBlockLoader; +import org.elasticsearch.search.fetch.StoredFieldsSpec; + +import java.io.IOException; + +public final class TimestampBlockLoader implements BlockLoader { + + private static final String FIELD_NAME = "@timestamp"; + + @Override + public Builder builder(BlockFactory factory, int expectedCount) { + return factory.longs(expectedCount); + } + + @Override + public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { + var singleton = getNumericDocValues(context); + return new Timestamps((BlockAwareNumericDocValues) singleton); + } + + private static NumericDocValues getNumericDocValues(LeafReaderContext context) throws IOException { + var singleton = context.reader().getNumericDocValues(FIELD_NAME); + if (singleton == null) { + var docValues = context.reader().getSortedNumericDocValues(FIELD_NAME); + singleton = DocValues.unwrapSingleton(docValues); + } + return singleton; + } + + @Override + public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException { + return new BlockDocValuesReader.SingletonLongs(getNumericDocValues(context)); + } + + @Override + public StoredFieldsSpec rowStrideStoredFieldSpec() { + return StoredFieldsSpec.NO_REQUIREMENTS; + } + + @Override + public boolean supportsOrdinals() { + return false; + } + + @Override + public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException { + throw new UnsupportedOperationException(); + } + + public static final class Timestamps implements ColumnAtATimeReader { + private final Thread creationThread; + private final SingletonDocValuesBlockLoader blockLoader; + + Timestamps(BlockAwareNumericDocValues blockAware) { + this.creationThread = Thread.currentThread(); + this.blockLoader = blockAware.getSingletonBlockLoader(); + } + + @Override + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + try (BlockLoader.SingletonLongBuilder builder = factory.singletonLongs(docs.count() - offset)) { + blockLoader.loadBlock(builder, docs, offset); + return builder.build(); + } + } + + @Override + public boolean canReuse(int startingDocID) { + return creationThread == Thread.currentThread() && blockLoader.docID() <= startingDocID; + } + + @Override + public String toString() { + return "TimestampBlockLoader.Timestamps"; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/mapper/TSBulkBlockLoadingTests.java b/server/src/test/java/org/elasticsearch/index/mapper/TSBulkBlockLoadingTests.java new file mode 100644 index 0000000000000..a4447a8dd828b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/mapper/TSBulkBlockLoadingTests.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; + +import java.io.IOException; +import java.util.Locale; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; + +public class TSBulkBlockLoadingTests extends MapperServiceTestCase { + + public void testManyTSIDs() throws IOException { + doTestManyValues(TimeSeriesIdFieldMapper.NAME, TSIDBlockLoader.TSIDs.class); + } + + public void testManyDimensions() throws IOException { + doTestManyValues("host_name", TSDimensionBlockLoader.TSDimensions.class); + } + + public void doTestManyValues(String fieldName, Class expectedColumnReader) throws IOException { + final String mappings = """ + { + "_doc" : { + "properties": { + "@timestamp": { + "type": "date", + "ignore_malformed": false + }, + "host_name": { + "type": "keyword", + "time_series_dimension": true + } + } + } + } + """; + Settings settings = indexSettings(IndexVersion.current(), 1, 1).put("index.mode", "time_series") + .put("index.routing_path", "host_name") + .build(); + var mapperService = createMapperService(settings, mappings); + try (Directory directory = newDirectory()) { + int from = 0; + int to = 10_000; + int uniqueTsidEvery = 200; + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + if (fieldName.equals(TimeSeriesIdFieldMapper.NAME)) { + iwc.setIndexSort(new Sort(new SortField(fieldName, SortField.Type.STRING, false))); + } + iwc.setCodec(TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat())); + try (IndexWriter iw = new IndexWriter(directory, iwc)) { + for (int i = from; i < to; i++) { + LuceneDocument doc = new LuceneDocument(); + int tsid = i / uniqueTsidEvery; + if (fieldName.equals(TimeSeriesIdFieldMapper.NAME)) { + doc.add(new SortedDocValuesField(fieldName, new BytesRef(String.format(Locale.ROOT, "%04d", tsid)))); + } else { + doc.add(new SortedSetDocValuesField(fieldName, new BytesRef(String.format(Locale.ROOT, "%04d", tsid)))); + } + iw.addDocument(doc); + } + iw.forceMerge(1); + } + var mockBlockContext = mock(MappedFieldType.BlockLoaderContext.class); + var blockLoader = mapperService.fieldType(fieldName).blockLoader(mockBlockContext); + try (DirectoryReader reader = DirectoryReader.open(directory)) { + LeafReaderContext context = reader.leaves().get(0); + { + // One big doc block + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(expectedColumnReader)); + var docBlock = TestBlock.docs(IntStream.range(from, to).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(to - from)); + for (int i = 0; i < block.size(); i++) { + String actual = ((BytesRef) block.get(i)).utf8ToString(); + int expectedTsid = i / uniqueTsidEvery; + assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid))); + } + } + { + // Smaller doc blocks + int docBlockSize = 1000; + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(expectedColumnReader)); + for (int i = from; i < to; i += docBlockSize) { + var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(docBlockSize)); + for (int j = 0; j < block.size(); j++) { + String actual = ((BytesRef) block.get(j)).utf8ToString(); + int expectedTsid = (i + j) / uniqueTsidEvery; + assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid))); + } + } + } + { + // One smaller doc block: + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(expectedColumnReader)); + var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(1010)); + for (int i = 0; i < block.size(); i++) { + String actual = ((BytesRef) block.get(i)).utf8ToString(); + int expectedTsid = (1010 + i) / uniqueTsidEvery; + assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid))); + } + } + { + // Read two tiny blocks: + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(expectedColumnReader)); + var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(32)); + for (int i = 0; i < block.size(); i++) { + String actual = ((BytesRef) block.get(i)).utf8ToString(); + int expectedTsid = (32 + i) / uniqueTsidEvery; + assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid))); + } + + docBlock = TestBlock.docs(IntStream.range(64, 96).toArray()); + block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(32)); + for (int i = 0; i < block.size(); i++) { + String actual = ((BytesRef) block.get(i)).utf8ToString(); + int expectedTsid = (64 + i) / uniqueTsidEvery; + assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid))); + } + } + } + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/mapper/blockloader/DateBulkBlockLoadingTests.java b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/DateBulkBlockLoadingTests.java new file mode 100644 index 0000000000000..11b4383040a90 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/DateBulkBlockLoadingTests.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper.blockloader; + +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.util.TestUtil; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; +import org.elasticsearch.index.mapper.LuceneDocument; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.index.mapper.TestBlock; +import org.elasticsearch.index.mapper.TimestampBlockLoader; + +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DateBulkBlockLoadingTests extends MapperServiceTestCase { + + public void testManyValues() throws Exception { + final String mappings = """ + { + "_doc" : { + "properties": { + "@timestamp": { + "type": "date", + "ignore_malformed": false + } + } + } + } + """; + Settings settings = indexSettings(IndexVersion.current(), 1, 1).put("index.mode", "logsdb").build(); + var mapperService = createMapperService(settings, mappings); + try (Directory directory = newDirectory()) { + int from = 0; + int to = 10_000; + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + iwc.setIndexSort(new Sort(new SortField("@timestamp", SortField.Type.LONG, true))); + iwc.setCodec(TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat())); + try (IndexWriter iw = new IndexWriter(directory, iwc)) { + for (long i = from; i < to; i++) { + LuceneDocument doc = new LuceneDocument(); + doc.add(new NumericDocValuesField("@timestamp", i)); + iw.addDocument(doc); + } + iw.forceMerge(1); + } + var mockBlockContext = mock(MappedFieldType.BlockLoaderContext.class); + IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + when(mockBlockContext.indexSettings()).thenReturn(indexSettings); + var blockLoader = mapperService.fieldType("@timestamp").blockLoader(mockBlockContext); + try (DirectoryReader reader = DirectoryReader.open(directory)) { + LeafReaderContext context = reader.leaves().get(0); + { + // One big doc block + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(TimestampBlockLoader.Timestamps.class)); + var docBlock = TestBlock.docs(IntStream.range(from, to).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(to - from)); + for (int i = 0; i < block.size(); i++) { + assertThat(block.get(i), equalTo(to - i - 1L)); + } + } + { + // Smaller doc blocks + int docBlockSize = 1000; + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(TimestampBlockLoader.Timestamps.class)); + for (int i = from; i < to; i += docBlockSize) { + var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(docBlockSize)); + for (int j = 0; j < block.size(); j++) { + long expected = to - ((long) docBlockSize * (i / docBlockSize)) - j - 1L; + assertThat(block.get(j), equalTo(expected)); + } + } + } + { + // One smaller doc block: + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(TimestampBlockLoader.Timestamps.class)); + var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(1010)); + for (int i = 0; i < block.size(); i++) { + long expected = 8990 - i - 1L; + assertThat(block.get(i), equalTo(expected)); + } + } + { + // Read two tiny blocks: + var columnReader = blockLoader.columnAtATimeReader(context); + assertThat(columnReader, instanceOf(TimestampBlockLoader.Timestamps.class)); + var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray()); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(32)); + for (int i = 0; i < block.size(); i++) { + long expected = 9968 - i - 1L; + assertThat(block.get(i), equalTo(expected)); + } + + docBlock = TestBlock.docs(IntStream.range(64, 96).toArray()); + block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + assertThat(block.size(), equalTo(32)); + for (int i = 0; i < block.size(); i++) { + long expected = 9936 - i - 1L; + assertThat(block.get(i), equalTo(expected)); + } + } + } + } + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java index cb73dc96f69b2..7392fd1368339 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java @@ -18,8 +18,10 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.test.ESTestCase.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -197,6 +199,55 @@ public LongsBuilder appendLong(long value) { return new LongsBuilder(); } + @Override + public BlockLoader.SingletonLongBuilder singletonLongs(int expectedCount) { + final long[] values = new long[expectedCount]; + return new BlockLoader.SingletonLongBuilder() { + + private int count; + + @Override + public BlockLoader.Block build() { + return new TestBlock(Arrays.stream(values).boxed().collect(Collectors.toUnmodifiableList())); + } + + @Override + public BlockLoader.SingletonLongBuilder appendLong(long value) { + values[count++] = value; + return this; + } + + @Override + public BlockLoader.SingletonLongBuilder appendLongs(long[] newValues, int from, int length) { + try { + System.arraycopy(newValues, from, values, count, length); + } catch (ArrayIndexOutOfBoundsException e) { + throw e; + } + count += length; + return this; + } + + @Override + public BlockLoader.Builder appendNull() { + throw new UnsupportedOperationException(); + } + + @Override + public BlockLoader.Builder beginPositionEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public BlockLoader.Builder endPositionEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() {} + }; + } + @Override public BlockLoader.Builder nulls(int expectedCount) { return longs(expectedCount); @@ -220,6 +271,65 @@ public BlockLoader.Block constantBytes(BytesRef value, int count) { return builder.build(); } + @Override + public BlockLoader.TSSingletonOrdinalsBuilder tsSingletonOrdinalsBuilder( + boolean isPrimaryIndexSortField, + SortedDocValues ordinals, + int expectedCount + ) { + final long[] ords = new long[expectedCount]; + return new BlockLoader.TSSingletonOrdinalsBuilder() { + + int count; + + @Override + public BlockLoader.TSSingletonOrdinalsBuilder appendOrd(long value) { + ords[count++] = value; + return this; + } + + @Override + public BlockLoader.TSSingletonOrdinalsBuilder appendOrds(long[] values, int from, int length) { + try { + System.arraycopy(values, from, ords, count, length); + } catch (ArrayIndexOutOfBoundsException e) { + throw e; + } + count += length; + return this; + } + + @Override + public BlockLoader.Block build() { + return new TestBlock(Arrays.stream(ords).mapToInt(Math::toIntExact).mapToObj(ord -> { + try { + return BytesRef.deepCopyOf(ordinals.lookupOrd(ord)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toUnmodifiableList())); + } + + @Override + public BlockLoader.Builder appendNull() { + throw new UnsupportedOperationException(); + } + + @Override + public BlockLoader.Builder beginPositionEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public BlockLoader.Builder endPositionEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() {} + }; + } + @Override public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int expectedCount) { class SingletonOrdsBuilder extends TestBlock.Builder implements BlockLoader.SingletonOrdinalsBuilder { @@ -230,7 +340,7 @@ private SingletonOrdsBuilder() { @Override public SingletonOrdsBuilder appendOrd(int value) { try { - add(ordinals.lookupOrd(value)); + add(BytesRef.deepCopyOf(ordinals.lookupOrd(value))); return this; } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java index c5e3628b268d4..8b97383a9ab3a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java @@ -76,11 +76,25 @@ public BlockLoader.LongBuilder longs(int expectedCount) { return factory.newLongBlockBuilder(expectedCount); } + @Override + public BlockLoader.SingletonLongBuilder singletonLongs(int expectedCount) { + return new SingletonLongsBuilder(expectedCount, factory); + } + @Override public BlockLoader.Builder nulls(int expectedCount) { return ElementType.NULL.newBlockBuilder(expectedCount, factory); } + @Override + public BlockLoader.TSSingletonOrdinalsBuilder tsSingletonOrdinalsBuilder( + boolean isPrimaryIndexSortField, + SortedDocValues ordinals, + int count + ) { + return new TSSingletonOrdinalsBuilder(isPrimaryIndexSortField, factory, ordinals, count); + } + @Override public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { return new SingletonOrdinalsBuilder(factory, ordinals, count); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonLongsBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonLongsBuilder.java new file mode 100644 index 0000000000000..b3ae526b1f596 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonLongsBuilder.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.index.mapper.BlockLoader; + +public class SingletonLongsBuilder implements BlockLoader.SingletonLongBuilder, Releasable, Block.Builder { + + private final long[] values; + private final BlockFactory blockFactory; + + private int count; + + public SingletonLongsBuilder(int initialSize, BlockFactory blockFactory) { + this.blockFactory = blockFactory; + blockFactory.adjustBreaker(valuesSize(initialSize)); + this.values = new long[initialSize]; + } + + @Override + public Block.Builder appendNull() { + throw new UnsupportedOperationException(); + } + + @Override + public Block.Builder beginPositionEntry() { + throw new UnsupportedOperationException(); + + } + + @Override + public Block.Builder endPositionEntry() { + throw new UnsupportedOperationException(); + + } + + @Override + public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) { + throw new UnsupportedOperationException(); + + } + + @Override + public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { + throw new UnsupportedOperationException(); + + } + + @Override + public long estimatedBytes() { + return (long) values.length * Long.BYTES; + } + + @Override + public Block build() { + return blockFactory.newLongArrayVector(values, count, 0L).asBlock(); + } + + @Override + public BlockLoader.SingletonLongBuilder appendLong(long value) { + values[count++] = value; + return this; + } + + @Override + public BlockLoader.SingletonLongBuilder appendLongs(long[] values, int from, int length) { + System.arraycopy(values, from, this.values, count, length); + count += length; + return this; + } + + @Override + public void close() { + blockFactory.adjustBreaker(-valuesSize(values.length)); + } + + static long valuesSize(int count) { + return (long) count * Long.BYTES; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TSSingletonOrdinalsBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TSSingletonOrdinalsBuilder.java new file mode 100644 index 0000000000000..db719a3192049 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TSSingletonOrdinalsBuilder.java @@ -0,0 +1,221 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.OrdinalBytesRefBlock; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.mapper.BlockLoader; + +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * Fork of {@link SingletonOrdinalsBuilder} but specialized for mainly _tsid field, but also time series dimension fields. + * The _tsid field is dense and used as primary sort field, and therefore we optimize a little more. + * Additionally, this implementation can collect complete value blocks. + */ +public final class TSSingletonOrdinalsBuilder implements BlockLoader.TSSingletonOrdinalsBuilder, Releasable, Block.Builder { + + private static final int TSID_SIZE_GUESS = 2 + 16 + 16 + 4 * 8; + + private final boolean isPrimaryIndexSortField; + private final BlockFactory blockFactory; + private final SortedDocValues docValues; + private final long[] ords; + private int count; + + public TSSingletonOrdinalsBuilder( + boolean isPrimaryIndexSortField, + BlockFactory blockFactory, + SortedDocValues docValues, + int initialSize + ) { + this.isPrimaryIndexSortField = isPrimaryIndexSortField; + this.blockFactory = blockFactory; + this.docValues = docValues; + blockFactory.adjustBreaker(nativeOrdsSize(initialSize)); + // tsdb codec uses long array natively. Use this array the capture native encoded ordinals and then later in build() convert: + this.ords = new long[initialSize]; + } + + @Override + public TSSingletonOrdinalsBuilder appendOrd(long ord) { + ords[count++] = ord; + return this; + } + + @Override + public BlockLoader.TSSingletonOrdinalsBuilder appendOrds(long[] values, int from, int length) { + try { + System.arraycopy(values, from, ords, count, length); + } catch (ArrayIndexOutOfBoundsException e) { + throw e; + } + count += length; + return this; + } + + @Override + public long estimatedBytes() { + return (long) ords.length * Long.BYTES; + } + + @Override + public BytesRefBlock build() { + assert ords.length == count; + + int minOrd; + int maxOrd; + boolean isDense = true; + if (isPrimaryIndexSortField) { + minOrd = Math.toIntExact(ords[0]); + maxOrd = Math.toIntExact(ords[count - 1]); + // I think we're always ordinals dense in this case? + } else { + long tmpMinOrd = Long.MAX_VALUE; + long tmpMaxOrd = Long.MIN_VALUE; + long prevOrd = ords[0] - 1; + for (int i = 0; i < count; i++) { + long ord = ords[i]; + tmpMinOrd = Math.min(tmpMinOrd, ord); + tmpMaxOrd = Math.max(tmpMaxOrd, ord); + if (ord - prevOrd != 1) { + isDense = false; + } + } + minOrd = Math.toIntExact(tmpMinOrd); + maxOrd = Math.toIntExact(tmpMaxOrd); + } + + var constantBlock = tryBuildConstantBlock(minOrd, maxOrd); + if (constantBlock != null) { + return constantBlock; + } + int valueCount = maxOrd - minOrd + 1; + return buildOrdinal(minOrd, maxOrd, valueCount, isDense); + } + + BytesRefBlock buildOrdinal(int minOrd, int maxOrd, int valueCount, boolean isDense) { + long breakerSize = ordsSize(count); + blockFactory.adjustBreaker(breakerSize); + + BytesRefVector bytesVector = null; + IntBlock ordinalBlock = null; + try { + // Convert back to int[] and remap ordinals + int[] newOrds = new int[count]; + for (int i = 0; i < count; i++) { + newOrds[i] = Math.toIntExact(ords[i]) - minOrd; + } + try (BytesRefVector.Builder bytesBuilder = blockFactory.newBytesRefVectorBuilder(valueCount * TSID_SIZE_GUESS)) { + if (isDense) { + TermsEnum tenum = docValues.termsEnum(); + tenum.seekExact(minOrd); + for (BytesRef term = tenum.term(); term != null && tenum.ord() <= maxOrd; term = tenum.next()) { + bytesBuilder.appendBytesRef(term); + } + } else { + for (int ord = minOrd; ord <= maxOrd; ord++) { + bytesBuilder.appendBytesRef(docValues.lookupOrd(ord)); + } + } + bytesVector = bytesBuilder.build(); + } catch (IOException e) { + throw new UncheckedIOException("error resolving tsid ordinals", e); + } + ordinalBlock = blockFactory.newIntArrayVector(newOrds, newOrds.length).asBlock(); + final OrdinalBytesRefBlock result = new OrdinalBytesRefBlock(ordinalBlock, bytesVector); + assert ords.length == result.getPositionCount(); + + bytesVector = null; + ordinalBlock = null; + return result; + } finally { + Releasables.close(() -> blockFactory.adjustBreaker(-breakerSize), ordinalBlock, bytesVector); + } + } + + private BytesRefBlock tryBuildConstantBlock(int minOrd, int maxOrd) { + if (minOrd != maxOrd) { + return null; + } + + final BytesRef v; + try { + v = BytesRef.deepCopyOf(docValues.lookupOrd(minOrd)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + BytesRefVector bytes = null; + IntVector ordinals = null; + boolean success = false; + try { + bytes = blockFactory.newConstantBytesRefVector(v, 1); + ordinals = blockFactory.newConstantIntVector(0, ords.length); + // Ideally, we would return a ConstantBytesRefVector, but we return an ordinal constant block instead + // to ensure ordinal optimizations are applied when constant optimization is not available. + final var result = new OrdinalBytesRefBlock(ordinals.asBlock(), bytes); + success = true; + return result; + } finally { + if (success == false) { + Releasables.close(bytes, ordinals); + } + } + } + + @Override + public void close() { + blockFactory.adjustBreaker(-nativeOrdsSize(ords.length)); + } + + @Override + public TSSingletonOrdinalsBuilder appendNull() { + throw new UnsupportedOperationException(); + } + + @Override + public TSSingletonOrdinalsBuilder beginPositionEntry() { + throw new UnsupportedOperationException("should only have one value per doc"); + } + + @Override + public TSSingletonOrdinalsBuilder endPositionEntry() { + throw new UnsupportedOperationException("should only have one value per doc"); + } + + @Override + public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) { + throw new UnsupportedOperationException(); + } + + @Override + public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { + throw new UnsupportedOperationException(); + } + + private static long ordsSize(int ordsCount) { + return RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) ordsCount * Integer.BYTES; + } + + private static long nativeOrdsSize(int ordsCount) { + return RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) ordsCount * Long.BYTES; + } + +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/TSSingletonOrdinalsBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/TSSingletonOrdinalsBuilderTests.java new file mode 100644 index 0000000000000..d02ded3ee971b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/TSSingletonOrdinalsBuilderTests.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene.read; + +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.test.ComputeTestCase; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; +import org.elasticsearch.indices.CrankyCircuitBreakerService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.equalTo; + +public class TSSingletonOrdinalsBuilderTests extends ComputeTestCase { + + public void testReader() throws IOException { + testRead(blockFactory()); + } + + public void testReadWithCranky() throws IOException { + var factory = crankyBlockFactory(); + try { + testRead(factory); + // If we made it this far cranky didn't fail us! + } catch (CircuitBreakingException e) { + logger.info("cranky", e); + assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); + } + assertThat(factory.breaker().getUsed(), equalTo(0L)); + } + + private void testRead(BlockFactory factory) throws IOException { + BytesRef[] values = new BytesRef[] { new BytesRef("a"), new BytesRef("b"), new BytesRef("c"), new BytesRef("d") }; + + boolean isPrimaryIndexSortField = randomBoolean(); + int count = 1000; + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = createIndexWriter(directory)) { + for (int i = 0; i < count; i++) { + BytesRef v = values[i % values.length]; + indexWriter.addDocument(List.of(new SortedDocValuesField("_tsid", v))); + } + } + Map counts = new HashMap<>(); + try (IndexReader reader = DirectoryReader.open(directory)) { + for (LeafReaderContext ctx : reader.leaves()) { + SortedDocValues docValues = ctx.reader().getSortedDocValues("_tsid"); + try ( + TSSingletonOrdinalsBuilder builder = new TSSingletonOrdinalsBuilder( + isPrimaryIndexSortField, + factory, + docValues, + ctx.reader().numDocs() + ) + ) { + for (int i = 0; i < ctx.reader().maxDoc(); i++) { + assertThat(docValues.advanceExact(i), equalTo(true)); + int ord = docValues.ordValue(); + builder.appendOrd(ord); + } + try (BytesRefBlock build = builder.build()) { + for (int i = 0; i < build.getPositionCount(); i++) { + String key = build.getBytesRef(i, new BytesRef()).utf8ToString(); + counts.merge(key, 1, Integer::sum); + } + } + } + } + } + int expectedCount = count / values.length; + assertMap( + counts, + matchesMap().entry("a", expectedCount).entry("b", expectedCount).entry("c", expectedCount).entry("d", expectedCount) + ); + } + } + + public void testHighCardinality() throws IOException { + int count = 1_000; + boolean isPrimaryIndexSortField = randomBoolean(); + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = createIndexWriter(directory)) { + for (int i = 0; i < count; i++) { + BytesRef tsid = new BytesRef(String.format(Locale.ROOT, "%04d", i)); + indexWriter.addDocument(List.of(new SortedDocValuesField("_tsid", tsid))); + } + indexWriter.forceMerge(1); + } + try (IndexReader reader = DirectoryReader.open(directory)) { + assertThat(reader.leaves().size(), equalTo(1)); + LeafReader leafReader = reader.leaves().get(0).reader(); + SortedDocValues docValues = leafReader.getSortedDocValues("_tsid"); + int offset = 850; + try ( + TSSingletonOrdinalsBuilder builder = new TSSingletonOrdinalsBuilder( + isPrimaryIndexSortField, + blockFactory(), + docValues, + count - offset + ) + ) { + for (int i = offset; i < leafReader.maxDoc(); i++) { + assertThat(docValues.advanceExact(i), equalTo(true)); + int ord = docValues.ordValue(); + builder.appendOrd(ord); + } + try (BytesRefBlock build = builder.build()) { + assertThat(build.getPositionCount(), equalTo(count - offset)); + for (int i = 0; i < build.getPositionCount(); i++) { + String key = build.getBytesRef(i, new BytesRef()).utf8ToString(); + assertThat(key, equalTo(String.format(Locale.ROOT, "%04d", offset + i))); + } + } + } + } + } + } + + static IndexWriter createIndexWriter(Directory directory) throws IOException { + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + iwc.setIndexSort(new Sort(new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, false))); + iwc.setCodec(TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat())); + return new IndexWriter(directory, iwc); + } + +}