diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java index a3b2fd3633adf..9c94dcde21bb2 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec; +import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -257,7 +258,13 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE ); config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); config.setMergePolicy(new LogByteSizeMergePolicy()); - var docValuesFormat = new ES819TSDBDocValuesFormat(4096, 512, optimizedMergeEnabled); + var docValuesFormat = new ES819TSDBDocValuesFormat( + 4096, + 512, + optimizedMergeEnabled, + BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1, + true + ); config.setCodec(new Elasticsearch92Lucene103Codec() { @Override public DocValuesFormat getDocValuesFormatForField(String field) { diff --git a/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java b/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java index 43eb47e55324c..ff3186caa4d1a 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java +++ b/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java @@ -41,6 +41,7 @@ public class PerFieldFormatSupplier { private static final Set INCLUDE_META_FIELDS; + private static final Set EXCLUDE_MAPPER_TYPES; static { // TODO: should we just allow all fields to use tsdb doc values codec? @@ -53,6 +54,7 @@ public class PerFieldFormatSupplier { // Don't the include _recovery_source_size and _recovery_source fields, since their values can be trimmed away in // RecoverySourcePruneMergePolicy, which leads to inconsistencies between merge stats and actual values. INCLUDE_META_FIELDS = Collections.unmodifiableSet(includeMetaField); + EXCLUDE_MAPPER_TYPES = Set.of("geo_shape"); } private static final DocValuesFormat docValuesFormat = new Lucene90DocValuesFormat(); @@ -145,6 +147,10 @@ boolean useTSDBDocValuesFormat(final String field) { return false; } + if (excludeMapperTypes(field)) { + return false; + } + return mapperService != null && mapperService.getIndexSettings().useTimeSeriesDocValuesFormat() && mapperService.getIndexSettings().isES87TSDBCodecEnabled(); @@ -154,4 +160,29 @@ private boolean excludeFields(String fieldName) { return fieldName.startsWith("_") && INCLUDE_META_FIELDS.contains(fieldName) == false; } + private boolean excludeMapperTypes(String fieldName) { + var typeName = getMapperType(fieldName); + if (typeName == null) { + return false; + } + return EXCLUDE_MAPPER_TYPES.contains(getMapperType(fieldName)); + } + + private boolean isTimeSeriesModeIndex() { + return mapperService != null && IndexMode.TIME_SERIES == mapperService.getIndexSettings().getMode(); + } + + private boolean isLogsModeIndex() { + return mapperService != null && IndexMode.LOGSDB == mapperService.getIndexSettings().getMode(); + } + + String getMapperType(final String field) { + if (mapperService != null) { + Mapper mapper = mapperService.mappingLookup().getMapper(field); + if (mapper != null) { + return mapper.typeName(); + } + } + return null; + } } diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/BinaryDVCompressionMode.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/BinaryDVCompressionMode.java new file mode 100644 index 0000000000000..ad5f3b9ef1f98 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/BinaryDVCompressionMode.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb; + +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.elasticsearch.index.codec.zstd.ZstdCompressionMode; + +public enum BinaryDVCompressionMode { + + NO_COMPRESS((byte) 0, null), + COMPRESSED_ZSTD_LEVEL_1((byte) 1, new ZstdCompressionMode(1)); + + public final byte code; + private final CompressionMode compressionMode; + + private static final BinaryDVCompressionMode[] values = new BinaryDVCompressionMode[values().length]; + static { + for (BinaryDVCompressionMode mode : values()) { + values[mode.code] = mode; + } + } + + BinaryDVCompressionMode(byte code, CompressionMode compressionMode) { + this.code = code; + this.compressionMode = compressionMode; + } + + public static BinaryDVCompressionMode fromMode(byte code) { + if (code < 0 || code >= values.length) { + throw new IllegalStateException("unknown compression mode [" + code + "]"); + } + return values[code]; + } + + public CompressionMode compressionMode() { + if (compressionMode == null) { + throw new UnsupportedOperationException("BinaryDVCompressionMode [" + code + "] does not support compression"); + } + return compressionMode; + } + + public record BlockHeader(boolean isCompressed) { + static final byte IS_COMPRESSED = 0x1; + + public static BlockHeader fromByte(byte header) { + boolean isCompressed = (header & IS_COMPRESSED) != 0; + return new BlockHeader(isCompressed); + } + + public byte toByte() { + byte header = 0; + if (isCompressed) { + header |= IS_COMPRESSED; + } + return header; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockMetadataAccumulator.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockMetadataAccumulator.java new file mode 100644 index 0000000000000..3f9a5a5574864 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockMetadataAccumulator.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.packed.DirectMonotonicWriter; +import org.elasticsearch.core.IOUtils; + +import java.io.Closeable; +import java.io.IOException; + +final class BlockMetadataAccumulator implements Closeable { + + private final DelayedOffsetAccumulator blockAddressAcc; + private final DelayedOffsetAccumulator blockDocRangeAcc; + + BlockMetadataAccumulator(Directory dir, IOContext context, IndexOutput data, long addressesStart) throws IOException { + boolean success = false; + try { + blockDocRangeAcc = new DelayedOffsetAccumulator(dir, context, data, "block-doc-ranges", 0); + blockAddressAcc = new DelayedOffsetAccumulator(dir, context, data, "block-addresses", addressesStart); + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't + } + } + } + + public void addDoc(long numDocsInBlock, long blockLenInBytes) throws IOException { + blockDocRangeAcc.addDoc(numDocsInBlock); + blockAddressAcc.addDoc(blockLenInBytes); + } + + public void build(IndexOutput meta, IndexOutput data) throws IOException { + long dataAddressesStart = data.getFilePointer(); + blockAddressAcc.build(meta, data); + long dataDocRangeStart = data.getFilePointer(); + long addressesLength = dataDocRangeStart - dataAddressesStart; + meta.writeLong(addressesLength); + + meta.writeLong(dataDocRangeStart); + blockDocRangeAcc.build(meta, data); + long docRangesLen = data.getFilePointer() - dataDocRangeStart; + meta.writeLong(docRangesLen); + } + + @Override + public void close() throws IOException { + IOUtils.closeWhileHandlingException(blockAddressAcc, blockDocRangeAcc); + } + + /** + * Like OffsetsAccumulator builds offsets and stores in a DirectMonotonicWriter. But write to temp file + * rather than directly to a DirectMonotonicWriter because the number of values is unknown. + */ + static final class DelayedOffsetAccumulator implements Closeable { + + private final Directory dir; + private final long startOffset; + + private int numValues = 0; + private final IndexOutput tempOutput; + private final String suffix; + + DelayedOffsetAccumulator(Directory dir, IOContext context, IndexOutput data, String suffix, long startOffset) throws IOException { + this.dir = dir; + this.startOffset = startOffset; + this.suffix = suffix; + + boolean success = false; + try { + tempOutput = dir.createTempOutput(data.getName(), suffix, context); + CodecUtil.writeHeader(tempOutput, ES819TSDBDocValuesFormat.META_CODEC + suffix, ES819TSDBDocValuesFormat.VERSION_CURRENT); + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't + } + } + } + + void addDoc(long delta) throws IOException { + tempOutput.writeVLong(delta); + numValues++; + } + + void build(IndexOutput meta, IndexOutput data) throws IOException { + CodecUtil.writeFooter(tempOutput); + IOUtils.close(tempOutput); + + // write the offsets info to the meta file by reading from temp file + try (ChecksumIndexInput tempInput = dir.openChecksumInput(tempOutput.getName());) { + CodecUtil.checkHeader( + tempInput, + ES819TSDBDocValuesFormat.META_CODEC + suffix, + ES819TSDBDocValuesFormat.VERSION_CURRENT, + ES819TSDBDocValuesFormat.VERSION_CURRENT + ); + Throwable priorE = null; + try { + final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance( + meta, + data, + numValues + 1, + ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT + ); + + long offset = startOffset; + writer.add(offset); + for (int i = 0; i < numValues; ++i) { + offset += tempInput.readVLong(); + writer.add(offset); + } + writer.finish(); + } catch (Throwable e) { + priorE = e; + } finally { + CodecUtil.checkFooter(tempInput, priorE); + } + } + } + + @Override + public void close() throws IOException { + if (tempOutput != null) { + IOUtils.close(tempOutput, () -> dir.deleteFile(tempOutput.getName())); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java index 968e50eaf32be..c75fd6a470add 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java @@ -11,6 +11,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.lucene90.IndexedDISI; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DocValues; @@ -27,8 +28,10 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.SortedSetSelector; import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.store.ByteBuffersDataInput; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; @@ -41,14 +44,19 @@ import org.apache.lucene.util.packed.DirectMonotonicWriter; import org.apache.lucene.util.packed.PackedInts; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode; import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder; +import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import static org.elasticsearch.index.codec.tsdb.es819.DocValuesConsumerUtil.compatibleWithOptimizedMerge; +import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.BLOCK_BYTES_THRESHOLD; +import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.BLOCK_COUNT_THRESHOLD; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_LEVEL_SHIFT; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_MAX_LEVEL; @@ -65,8 +73,13 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer { private final int minDocsPerOrdinalForOrdinalRangeEncoding; final boolean enableOptimizedMerge; private final int primarySortFieldNumber; + final SegmentWriteState state; + final BinaryDVCompressionMode binaryDVCompressionMode; + private final boolean enablePerBlockCompression; // only false for testing ES819TSDBDocValuesConsumer( + BinaryDVCompressionMode binaryDVCompressionMode, + final boolean enablePerBlockCompression, SegmentWriteState state, int skipIndexIntervalSize, int minDocsPerOrdinalForOrdinalRangeEncoding, @@ -76,6 +89,9 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer { String metaCodec, String metaExtension ) throws IOException { + this.binaryDVCompressionMode = binaryDVCompressionMode; + this.enablePerBlockCompression = enablePerBlockCompression; + this.state = state; this.termsDictBuffer = new byte[1 << 14]; this.dir = state.directory; this.minDocsPerOrdinalForOrdinalRangeEncoding = minDocsPerOrdinalForOrdinalRangeEncoding; @@ -315,6 +331,7 @@ public void mergeBinaryField(FieldInfo mergeFieldInfo, MergeState mergeState) th public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(ES819TSDBDocValuesFormat.BINARY); + meta.writeByte(binaryDVCompressionMode.code); if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer.mergeStats.supported()) { final int numDocsWithField = tsdbValuesProducer.mergeStats.sumNumDocsWithField(); @@ -327,28 +344,29 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th long start = data.getFilePointer(); meta.writeLong(start); // dataOffset - OffsetsAccumulator offsetsAccumulator = null; DISIAccumulator disiAccumulator = null; + BinaryWriter binaryWriter = null; try { if (numDocsWithField > 0 && numDocsWithField < maxDoc) { disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); } assert maxLength >= minLength; - if (maxLength > minLength) { - offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField); + if (binaryDVCompressionMode == BinaryDVCompressionMode.NO_COMPRESS) { + var offsetsAccumulator = maxLength > minLength ? new OffsetsAccumulator(dir, context, data, numDocsWithField) : null; + binaryWriter = new DirectBinaryWriter(offsetsAccumulator, null); + } else { + binaryWriter = new CompressedBinaryBlockWriter(binaryDVCompressionMode); } for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { BytesRef v = values.binaryValue(); - data.writeBytes(v.bytes, v.offset, v.length); + binaryWriter.addDoc(v); if (disiAccumulator != null) { disiAccumulator.addDocId(doc); } - if (offsetsAccumulator != null) { - offsetsAccumulator.addDoc(v.length); - } } + binaryWriter.flushData(); meta.writeLong(data.getFilePointer() - start); // dataLength if (numDocsWithField == 0) { @@ -373,75 +391,238 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th meta.writeInt(numDocsWithField); meta.writeInt(minLength); meta.writeInt(maxLength); - if (offsetsAccumulator != null) { - offsetsAccumulator.build(meta, data); - } + + binaryWriter.writeAddressMetadata(minLength, maxLength, numDocsWithField); } finally { - IOUtils.close(disiAccumulator, offsetsAccumulator); + IOUtils.close(disiAccumulator, binaryWriter); } } else { - BinaryDocValues values = valuesProducer.getBinary(field); - long start = data.getFilePointer(); - meta.writeLong(start); // dataOffset - int numDocsWithField = 0; - int minLength = Integer.MAX_VALUE; - int maxLength = 0; - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - numDocsWithField++; - BytesRef v = values.binaryValue(); - int length = v.length; - data.writeBytes(v.bytes, v.offset, v.length); - minLength = Math.min(length, minLength); - maxLength = Math.max(length, maxLength); + BinaryWriter binaryWriter = null; + try { + if (binaryDVCompressionMode == BinaryDVCompressionMode.NO_COMPRESS) { + binaryWriter = new DirectBinaryWriter(null, valuesProducer.getBinary(field)); + } else { + binaryWriter = new CompressedBinaryBlockWriter(binaryDVCompressionMode); + } + + BinaryDocValues values = valuesProducer.getBinary(field); + long start = data.getFilePointer(); + meta.writeLong(start); // dataOffset + int numDocsWithField = 0; + int minLength = Integer.MAX_VALUE; + int maxLength = 0; + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + numDocsWithField++; + BytesRef v = values.binaryValue(); + int length = v.length; + binaryWriter.addDoc(v); + minLength = Math.min(length, minLength); + maxLength = Math.max(length, maxLength); + } + binaryWriter.flushData(); + + assert numDocsWithField <= maxDoc; + meta.writeLong(data.getFilePointer() - start); // dataLength + + if (numDocsWithField == 0) { + meta.writeLong(-2); // docsWithFieldOffset + meta.writeLong(0L); // docsWithFieldLength + meta.writeShort((short) -1); // jumpTableEntryCount + meta.writeByte((byte) -1); // denseRankPower + } else if (numDocsWithField == maxDoc) { + meta.writeLong(-1); // docsWithFieldOffset + meta.writeLong(0L); // docsWithFieldLength + meta.writeShort((short) -1); // jumpTableEntryCount + meta.writeByte((byte) -1); // denseRankPower + } else { + long offset = data.getFilePointer(); + meta.writeLong(offset); // docsWithFieldOffset + values = valuesProducer.getBinary(field); + final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); + meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength + meta.writeShort(jumpTableEntryCount); + meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER); + } + + meta.writeInt(numDocsWithField); + meta.writeInt(minLength); + meta.writeInt(maxLength); + + binaryWriter.writeAddressMetadata(minLength, maxLength, numDocsWithField); + } finally { + IOUtils.close(binaryWriter); } - assert numDocsWithField <= maxDoc; - meta.writeLong(data.getFilePointer() - start); // dataLength + } + } - if (numDocsWithField == 0) { - meta.writeLong(-2); // docsWithFieldOffset - meta.writeLong(0L); // docsWithFieldLength - meta.writeShort((short) -1); // jumpTableEntryCount - meta.writeByte((byte) -1); // denseRankPower - } else if (numDocsWithField == maxDoc) { - meta.writeLong(-1); // docsWithFieldOffset - meta.writeLong(0L); // docsWithFieldLength - meta.writeShort((short) -1); // jumpTableEntryCount - meta.writeByte((byte) -1); // denseRankPower - } else { - long offset = data.getFilePointer(); - meta.writeLong(offset); // docsWithFieldOffset - values = valuesProducer.getBinary(field); - final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); - meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength - meta.writeShort(jumpTableEntryCount); - meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER); + private sealed interface BinaryWriter extends Closeable { + void addDoc(BytesRef v) throws IOException; + + default void flushData() throws IOException {} + + default void writeAddressMetadata(int minLength, int maxLength, int numDocsWithField) throws IOException {} + + @Override + default void close() throws IOException {} + } + + private final class DirectBinaryWriter implements BinaryWriter { + final OffsetsAccumulator offsetsAccumulator; + final BinaryDocValues values; + + private DirectBinaryWriter(OffsetsAccumulator offsetsAccumulator, BinaryDocValues values) { + this.offsetsAccumulator = offsetsAccumulator; + this.values = values; + } + + @Override + public void addDoc(BytesRef v) throws IOException { + data.writeBytes(v.bytes, v.offset, v.length); + if (offsetsAccumulator != null) { + offsetsAccumulator.addDoc(v.length); } + } - meta.writeInt(numDocsWithField); - meta.writeInt(minLength); - meta.writeInt(maxLength); - if (maxLength > minLength) { - start = data.getFilePointer(); - meta.writeLong(start); - meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT); + @Override + public void writeAddressMetadata(int minLength, int maxLength, int numDocsWithField) throws IOException { + if (offsetsAccumulator != null) { + // If optimized merging and minLength > maxLength + offsetsAccumulator.build(meta, data); + } else if (values != null) { + if (maxLength > minLength) { + // If optimized merging and minLength > maxLength + long addressStart = data.getFilePointer(); + meta.writeLong(addressStart); + meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT); - final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance( - meta, - data, - numDocsWithField + 1, - ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT - ); - long addr = 0; - writer.add(addr); - values = valuesProducer.getBinary(field); - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - addr += values.binaryValue().length; + final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance( + meta, + data, + numDocsWithField + 1, + ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT + ); + long addr = 0; writer.add(addr); + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + addr += values.binaryValue().length; + writer.add(addr); + } + writer.finish(); + meta.writeLong(data.getFilePointer() - addressStart); } - writer.finish(); - meta.writeLong(data.getFilePointer() - start); } } + + @Override + public void close() throws IOException { + IOUtils.close(offsetsAccumulator); + } + } + + private final class CompressedBinaryBlockWriter implements BinaryWriter { + final Compressor compressor; + + final int[] docOffsets = new int[BLOCK_COUNT_THRESHOLD + 1]; // start for each doc plus start of doc that would be after last + + int uncompressedBlockLength = 0; + int maxUncompressedBlockLength = 0; + int numDocsInCurrentBlock = 0; + + byte[] block = BytesRef.EMPTY_BYTES; + int totalChunks = 0; + int maxNumDocsInAnyBlock = 0; + + final BlockMetadataAccumulator blockMetaAcc; + + CompressedBinaryBlockWriter(BinaryDVCompressionMode compressionMode) throws IOException { + this.compressor = compressionMode.compressionMode().newCompressor(); + long blockAddressesStart = data.getFilePointer(); + this.blockMetaAcc = new BlockMetadataAccumulator(state.directory, state.context, data, blockAddressesStart); + } + + @Override + public void addDoc(BytesRef v) throws IOException { + block = ArrayUtil.grow(block, uncompressedBlockLength + v.length); + System.arraycopy(v.bytes, v.offset, block, uncompressedBlockLength, v.length); + uncompressedBlockLength += v.length; + + numDocsInCurrentBlock++; + docOffsets[numDocsInCurrentBlock] = uncompressedBlockLength; + + if (uncompressedBlockLength >= BLOCK_BYTES_THRESHOLD || numDocsInCurrentBlock >= BLOCK_COUNT_THRESHOLD) { + flushData(); + } + } + + @Override + public void flushData() throws IOException { + if (numDocsInCurrentBlock == 0) { + return; + } + + totalChunks++; + long thisBlockStartPointer = data.getFilePointer(); + + // Data can be compressed or uncompressed on a per-block granularity, though is currently always compressed. + // In the future will leave data uncompressed if compression does not reduce storage. + final boolean shouldCompress = enablePerBlockCompression; + var header = new BinaryDVCompressionMode.BlockHeader(shouldCompress); + data.writeByte(header.toByte()); + + // write length of string data + data.writeVInt(uncompressedBlockLength); + + maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength); + maxNumDocsInAnyBlock = Math.max(maxNumDocsInAnyBlock, numDocsInCurrentBlock); + + compressOffsets(data, numDocsInCurrentBlock); + + if (shouldCompress) { + compress(block, uncompressedBlockLength, data); + } else { + data.writeBytes(block, 0, uncompressedBlockLength); + } + + long blockLenBytes = data.getFilePointer() - thisBlockStartPointer; + blockMetaAcc.addDoc(numDocsInCurrentBlock, blockLenBytes); + numDocsInCurrentBlock = uncompressedBlockLength = 0; + } + + void compressOffsets(DataOutput output, int numDocsInCurrentBlock) throws IOException { + int numOffsets = numDocsInCurrentBlock + 1; + // delta encode + for (int i = numOffsets - 1; i > 0; i--) { + docOffsets[i] -= docOffsets[i - 1]; + } + output.writeGroupVInts(docOffsets, numOffsets); + } + + void compress(byte[] data, int uncompressedLength, DataOutput output) throws IOException { + ByteBuffer inputBuffer = ByteBuffer.wrap(data, 0, uncompressedLength); + ByteBuffersDataInput input = new ByteBuffersDataInput(List.of(inputBuffer)); + compressor.compress(input, output); + } + + @Override + public void writeAddressMetadata(int minLength, int maxLength, int numDocsWithField) throws IOException { + if (totalChunks == 0) { + return; + } + + long dataAddressesStart = data.getFilePointer(); + meta.writeLong(dataAddressesStart); + meta.writeVInt(totalChunks); + meta.writeVInt(maxUncompressedBlockLength); + meta.writeVInt(maxNumDocsInAnyBlock); + meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT); + + blockMetaAcc.build(meta, data); + } + + @Override + public void close() throws IOException { + IOUtils.close(blockMetaAcc); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java index fb43ae176dedc..dbfbed346bc07 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java @@ -13,7 +13,9 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; +import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode; import java.io.IOException; @@ -27,10 +29,16 @@ * view of all values. If index sorting is active merging a doc value field requires a merge sort which can be very cpu intensive. * The previous format always has to merge sort a doc values field multiple times, so doing the merge sort just once saves on * cpu resources. + *
  • Version 1 adds block-wise compression to binary doc values. Each block contains a variable number of values so that each + * block is approximately the same size. To map a given value's index to the block containing the value, there are two parallel + * arrays. These contain the starting address for each block, and the starting value index for each block. Additional compression + * types may be added by creating a new mode in {@link org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode}.
  • * */ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesFormat { + public static final boolean BINARY_DV_COMPRESSION_FEATURE_FLAG = new FeatureFlag("binary_dv_compression").isEnabled(); + static final int NUMERIC_BLOCK_SHIFT = 7; public static final int NUMERIC_BLOCK_SIZE = 1 << NUMERIC_BLOCK_SHIFT; static final int NUMERIC_BLOCK_MASK = NUMERIC_BLOCK_SIZE - 1; @@ -47,7 +55,8 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues static final byte SORTED_NUMERIC = 4; static final int VERSION_START = 0; - static final int VERSION_CURRENT = VERSION_START; + static final int VERSION_BINARY_DV_COMPRESSION = 1; + static final int VERSION_CURRENT = VERSION_BINARY_DV_COMPRESSION; static final int TERMS_DICT_BLOCK_LZ4_SHIFT = 6; static final int TERMS_DICT_BLOCK_LZ4_SIZE = 1 << TERMS_DICT_BLOCK_LZ4_SHIFT; @@ -57,6 +66,14 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues static final int TERMS_DICT_REVERSE_INDEX_SIZE = 1 << TERMS_DICT_REVERSE_INDEX_SHIFT; static final int TERMS_DICT_REVERSE_INDEX_MASK = TERMS_DICT_REVERSE_INDEX_SIZE - 1; + /** + * These thresholds determine the size of a compressed binary block. We build a new block if the uncompressed data in the block + * is 128k, or if the number of values is 1024. These values are a tradeoff between the high compression ratio and decompression + * speed of large blocks, and the ability to avoid decompressing unneeded values provided by small blocks. + */ + public static final int BLOCK_BYTES_THRESHOLD = 128 * 1024; + public static final int BLOCK_COUNT_THRESHOLD = 1024; + // number of documents in an interval private static final int DEFAULT_SKIP_INDEX_INTERVAL_SIZE = 4096; // bytes on an interval: @@ -119,14 +136,48 @@ private static boolean getOptimizedMergeEnabledDefault() { final int skipIndexIntervalSize; final int minDocsPerOrdinalForRangeEncoding; final boolean enableOptimizedMerge; + final BinaryDVCompressionMode binaryDVCompressionMode; + final boolean enablePerBlockCompression; /** Default constructor. */ public ES819TSDBDocValuesFormat() { - this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE, ORDINAL_RANGE_ENCODING_MIN_DOC_PER_ORDINAL, OPTIMIZED_MERGE_ENABLE_DEFAULT); + this( + DEFAULT_SKIP_INDEX_INTERVAL_SIZE, + ORDINAL_RANGE_ENCODING_MIN_DOC_PER_ORDINAL, + OPTIMIZED_MERGE_ENABLE_DEFAULT, + BINARY_DV_COMPRESSION_FEATURE_FLAG ? BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1 : BinaryDVCompressionMode.NO_COMPRESS, + true + ); + } + + public ES819TSDBDocValuesFormat(BinaryDVCompressionMode binaryDVCompressionMode) { + this( + DEFAULT_SKIP_INDEX_INTERVAL_SIZE, + ORDINAL_RANGE_ENCODING_MIN_DOC_PER_ORDINAL, + OPTIMIZED_MERGE_ENABLE_DEFAULT, + binaryDVCompressionMode, + true + ); + } + + public ES819TSDBDocValuesFormat(BinaryDVCompressionMode binaryDVCompressionMode, boolean enablePerBlockCompression) { + this( + DEFAULT_SKIP_INDEX_INTERVAL_SIZE, + ORDINAL_RANGE_ENCODING_MIN_DOC_PER_ORDINAL, + OPTIMIZED_MERGE_ENABLE_DEFAULT, + binaryDVCompressionMode, + enablePerBlockCompression + ); } /** Doc values fields format with specified skipIndexIntervalSize. */ - public ES819TSDBDocValuesFormat(int skipIndexIntervalSize, int minDocsPerOrdinalForRangeEncoding, boolean enableOptimizedMerge) { + public ES819TSDBDocValuesFormat( + int skipIndexIntervalSize, + int minDocsPerOrdinalForRangeEncoding, + boolean enableOptimizedMerge, + BinaryDVCompressionMode binaryDVCompressionMode, + final boolean enablePerBlockCompression + ) { super(CODEC_NAME); if (skipIndexIntervalSize < 2) { throw new IllegalArgumentException("skipIndexIntervalSize must be > 1, got [" + skipIndexIntervalSize + "]"); @@ -134,11 +185,15 @@ public ES819TSDBDocValuesFormat(int skipIndexIntervalSize, int minDocsPerOrdinal this.skipIndexIntervalSize = skipIndexIntervalSize; this.minDocsPerOrdinalForRangeEncoding = minDocsPerOrdinalForRangeEncoding; this.enableOptimizedMerge = enableOptimizedMerge; + this.binaryDVCompressionMode = binaryDVCompressionMode; + this.enablePerBlockCompression = enablePerBlockCompression; } @Override public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { return new ES819TSDBDocValuesConsumer( + binaryDVCompressionMode, + enablePerBlockCompression, state, skipIndexIntervalSize, minDocsPerOrdinalForRangeEncoding, diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index 8b9d6cae97846..227653e21d550 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -11,6 +11,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.compressing.Decompressor; import org.apache.lucene.codecs.lucene90.IndexedDISI; import org.apache.lucene.index.BaseTermsEnum; import org.apache.lucene.index.BinaryDocValues; @@ -39,6 +40,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RandomAccessInput; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.GroupVIntUtil; import org.apache.lucene.util.LongValues; import org.apache.lucene.util.compress.LZ4; import org.apache.lucene.util.packed.DirectMonotonicReader; @@ -46,6 +48,7 @@ import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode; import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader; @@ -98,7 +101,7 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer { state.segmentSuffix ); - readFields(in, state.fieldInfos); + readFields(in, state.fieldInfos, version); } catch (Throwable exception) { priorE = exception; @@ -194,6 +197,13 @@ public BinaryDocValues getBinary(FieldInfo field) throws IOException { return DocValues.emptyBinary(); } + return switch (entry.compression) { + case NO_COMPRESS -> getUncompressedBinary(entry); + default -> getCompressedBinary(entry); + }; + } + + public BinaryDocValues getUncompressedBinary(BinaryEntry entry) throws IOException { final RandomAccessInput bytesSlice = data.randomAccessSlice(entry.dataOffset, entry.dataLength); if (entry.docsWithFieldOffset == -1) { @@ -348,6 +358,174 @@ public BytesRef binaryValue() throws IOException { } } + private BinaryDocValues getCompressedBinary(BinaryEntry entry) throws IOException { + if (entry.docsWithFieldOffset == -1) { + // dense + final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength); + final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData); + + final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength); + final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData); + return new DenseBinaryDocValues(maxDoc) { + final BinaryDecoder decoder = new BinaryDecoder( + entry.compression.compressionMode().newDecompressor(), + addresses, + docRanges, + data.clone(), + entry.maxUncompressedChunkSize, + entry.maxNumDocsInAnyBlock + ); + + @Override + public BytesRef binaryValue() throws IOException { + return decoder.decode(doc, entry.numCompressedBlocks); + } + }; + } else { + // sparse + final IndexedDISI disi = new IndexedDISI( + data, + entry.docsWithFieldOffset, + entry.docsWithFieldLength, + entry.jumpTableEntryCount, + entry.denseRankPower, + entry.numDocsWithField + ); + final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength); + final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData); + + final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength); + final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData); + return new SparseBinaryDocValues(disi) { + final BinaryDecoder decoder = new BinaryDecoder( + entry.compression.compressionMode().newDecompressor(), + addresses, + docRanges, + data.clone(), + entry.maxUncompressedChunkSize, + entry.maxNumDocsInAnyBlock + ); + + @Override + public BytesRef binaryValue() throws IOException { + return decoder.decode(disi.index(), entry.numCompressedBlocks); + } + }; + } + + } + + // Decompresses blocks of binary values to retrieve content + static final class BinaryDecoder { + + private final LongValues addresses; + private final DirectMonotonicReader docRanges; + private final IndexInput compressedData; + // Cache of last uncompressed block + private long lastBlockId = -1; + private final int[] uncompressedDocStarts; + private final byte[] uncompressedBlock; + private final BytesRef uncompressedBytesRef; + private long startDocNumForBlock = -1; + private long limitDocNumForBlock = -1; + private final Decompressor decompressor; + + BinaryDecoder( + Decompressor decompressor, + LongValues addresses, + DirectMonotonicReader docRanges, + IndexInput compressedData, + int biggestUncompressedBlockSize, + int maxNumDocsInAnyBlock + ) { + this.decompressor = decompressor; + this.addresses = addresses; + this.docRanges = docRanges; + this.compressedData = compressedData; + // pre-allocate a byte array large enough for the biggest uncompressed block needed. + this.uncompressedBlock = new byte[biggestUncompressedBlockSize]; + uncompressedBytesRef = new BytesRef(uncompressedBlock); + uncompressedDocStarts = new int[maxNumDocsInAnyBlock + 1]; + } + + // unconditionally decompress blockId into uncompressedDocStarts and uncompressedBlock + private void decompressBlock(int blockId, int numDocsInBlock) throws IOException { + long blockStartOffset = addresses.get(blockId); + compressedData.seek(blockStartOffset); + + var header = BinaryDVCompressionMode.BlockHeader.fromByte(compressedData.readByte()); + int uncompressedBlockLength = compressedData.readVInt(); + + if (uncompressedBlockLength == 0) { + return; + } + + decompressDocOffsets(numDocsInBlock, compressedData); + + assert uncompressedBlockLength <= uncompressedBlock.length; + uncompressedBytesRef.offset = 0; + uncompressedBytesRef.length = uncompressedBlock.length; + + if (header.isCompressed()) { + decompressor.decompress(compressedData, uncompressedBlockLength, 0, uncompressedBlockLength, uncompressedBytesRef); + } else { + compressedData.readBytes(uncompressedBlock, 0, uncompressedBlockLength); + } + } + + void decompressDocOffsets(int numDocsInBlock, DataInput input) throws IOException { + int numOffsets = numDocsInBlock + 1; + GroupVIntUtil.readGroupVInts(input, uncompressedDocStarts, numOffsets); + deltaDecode(uncompressedDocStarts, numOffsets); + } + + // Borrowed from to TSDBDocValuesEncoder.decodeDelta + // The `sum` variable helps compiler optimize method, should not be removed. + void deltaDecode(int[] arr, int length) { + int sum = 0; + for (int i = 0; i < length; ++i) { + sum += arr[i]; + arr[i] = sum; + } + } + + long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int docNumber, int numBlocks) { + long index = docRanges.binarySearch(lastBlockId + 1, numBlocks, docNumber); + // If index is found, index is inclusive lower bound of docNum range, so docNum is in blockId == index + if (index < 0) { + // If index was not found, insertion point (-index - 1) will be upper bound of docNum range. + // Subtract additional 1 so that index points to lower bound of doc range, which is the blockId + index = -2 - index; + } + assert index < numBlocks : "invalid range " + index + " for doc " + docNumber + " in numBlocks " + numBlocks; + + startDocNumForBlock = docRanges.get(index); + limitDocNumForBlock = docRanges.get(index + 1); + return index; + } + + BytesRef decode(int docNumber, int numBlocks) throws IOException { + // docNumber, rather than docId, because these are dense and could be indices from a DISI + long blockId = docNumber < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(docRanges, lastBlockId, docNumber, numBlocks); + + int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock); + int idxInBlock = (int) (docNumber - startDocNumForBlock); + assert idxInBlock < numDocsInBlock; + + if (blockId != lastBlockId) { + decompressBlock((int) blockId, numDocsInBlock); + // uncompressedBytesRef and uncompressedDocStarts now populated + lastBlockId = blockId; + } + + int start = uncompressedDocStarts[idxInBlock]; + int end = uncompressedDocStarts[idxInBlock + 1]; + uncompressedBytesRef.offset = start; + uncompressedBytesRef.length = end - start; + return uncompressedBytesRef; + } + } + abstract static class DenseBinaryDocValues extends BinaryDocValues implements BlockLoader.OptionalColumnAtATimeReader { final int maxDoc; @@ -390,6 +568,19 @@ public boolean advanceExact(int target) throws IOException { public int docIDRunEnd() throws IOException { return maxDoc; } + + @Override + @Nullable + public BlockLoader.Block tryRead( + BlockLoader.BlockFactory factory, + BlockLoader.Docs docs, + int offset, + boolean nullsFiltered, + BlockDocValuesReader.ToDouble toDouble, + boolean toInt + ) throws IOException { + return null; + } } private abstract static class SparseBinaryDocValues extends BinaryDocValues { @@ -1194,7 +1385,7 @@ static int primarySortFieldNumber(SegmentInfo segmentInfo, FieldInfos fieldInfos return -1; } - private void readFields(IndexInput meta, FieldInfos infos) throws IOException { + private void readFields(IndexInput meta, FieldInfos infos, int version) throws IOException { for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) { FieldInfo info = infos.fieldInfo(fieldNumber); if (info == null) { @@ -1207,7 +1398,7 @@ private void readFields(IndexInput meta, FieldInfos infos) throws IOException { if (type == ES819TSDBDocValuesFormat.NUMERIC) { numerics.put(info.number, readNumeric(meta)); } else if (type == ES819TSDBDocValuesFormat.BINARY) { - binaries.put(info.number, readBinary(meta)); + binaries.put(info.number, readBinary(meta, version)); } else if (type == ES819TSDBDocValuesFormat.SORTED) { sorted.put(info.number, readSorted(meta)); } else if (type == ES819TSDBDocValuesFormat.SORTED_SET) { @@ -1269,8 +1460,15 @@ private static void readNumeric(IndexInput meta, NumericEntry entry) throws IOEx entry.denseRankPower = meta.readByte(); } - private BinaryEntry readBinary(IndexInput meta) throws IOException { - final BinaryEntry entry = new BinaryEntry(); + private BinaryEntry readBinary(IndexInput meta, int version) throws IOException { + final BinaryDVCompressionMode compression; + if (version >= ES819TSDBDocValuesFormat.VERSION_BINARY_DV_COMPRESSION) { + compression = BinaryDVCompressionMode.fromMode(meta.readByte()); + } else { + compression = BinaryDVCompressionMode.NO_COMPRESS; + } + final BinaryEntry entry = new BinaryEntry(compression); + entry.dataOffset = meta.readLong(); entry.dataLength = meta.readLong(); entry.docsWithFieldOffset = meta.readLong(); @@ -1280,15 +1478,34 @@ private BinaryEntry readBinary(IndexInput meta) throws IOException { entry.numDocsWithField = meta.readInt(); entry.minLength = meta.readInt(); entry.maxLength = meta.readInt(); - if (entry.minLength < entry.maxLength) { - entry.addressesOffset = meta.readLong(); - - // Old count of uncompressed addresses - long numAddresses = entry.numDocsWithField + 1L; - final int blockShift = meta.readVInt(); - entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numAddresses, blockShift); - entry.addressesLength = meta.readLong(); + if (compression == BinaryDVCompressionMode.NO_COMPRESS) { + if (entry.minLength < entry.maxLength) { + entry.addressesOffset = meta.readLong(); + // Old count of uncompressed addresses + long numAddresses = entry.numDocsWithField + 1L; + final int blockShift = meta.readVInt(); + entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numAddresses, blockShift); + entry.addressesLength = meta.readLong(); + } + } else { + if (entry.numDocsWithField > 0) { + entry.addressesOffset = meta.readLong(); + // New count of compressed addresses - the number of compressed blocks + int numCompressedChunks = meta.readVInt(); + entry.maxUncompressedChunkSize = meta.readVInt(); + entry.maxNumDocsInAnyBlock = meta.readVInt(); + final int blockShift = meta.readVInt(); + + entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift); + entry.addressesLength = meta.readLong(); + + entry.docRangeOffset = meta.readLong(); + entry.docRangeMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift); + entry.docRangeLength = meta.readLong(); + + entry.numCompressedBlocks = numCompressedChunks; + } } return entry; } @@ -1967,6 +2184,8 @@ static class NumericEntry { } static class BinaryEntry { + final BinaryDVCompressionMode compression; + long dataOffset; long dataLength; long docsWithFieldOffset; @@ -1978,7 +2197,18 @@ static class BinaryEntry { int maxLength; long addressesOffset; long addressesLength; + long docRangeOffset; + long docRangeLength; + // compression mode + int maxUncompressedChunkSize; + int maxNumDocsInAnyBlock; + int numCompressedBlocks; DirectMonotonicReader.Meta addressesMeta; + DirectMonotonicReader.Meta docRangeMeta; + + BinaryEntry(BinaryDVCompressionMode compression) { + this.compression = compression; + } } static class SortedNumericEntry extends NumericEntry { diff --git a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java index c235516d6c363..1a9670382dc15 100644 --- a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java @@ -290,18 +290,19 @@ static boolean useDelegate(String name, IOContext ioContext) { * mmap-ing that should still be ok even is memory is scarce. * The fdt file is large and tends to cause more page faults when memory is scarce. * - * For disi and address-data files, in es819 tsdb doc values codec, docids and offsets are first written to a tmp file and - * read and written into new segment. + * For disi, address-data, block-addresses, and block-doc-ranges files, in es819 tsdb doc values codec, + * docids and offsets are first written to a tmp file and read and written into new segment. * * @param name The name of the file in Lucene index * @param extension The extension of the in Lucene index * @return whether to avoid using delegate if the file is a tmp fdt file. */ static boolean avoidDelegateForFdtTempFiles(String name, LuceneFilesExtensions extension) { - return extension == LuceneFilesExtensions.TMP - && (name.contains("fdt") || name.contains("disi") || name.contains("address-data")); + return extension == LuceneFilesExtensions.TMP && NO_MMAP_FILE_SUFFIXES.stream().anyMatch(name::contains); } + static final Set NO_MMAP_FILE_SUFFIXES = Set.of("fdt", "disi", "address-data", "block-addresses", "block-doc-ranges"); + MMapDirectory getDelegate() { return delegate; } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java index ee9351ed51b97..ab7e65959d686 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormatTests; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -61,6 +62,8 @@ public void testDuel() throws IOException { ? new ES819TSDBDocValuesFormat( ESTestCase.randomIntBetween(1, 4096), ESTestCase.randomIntBetween(1, 512), + random().nextBoolean(), + ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode(), random().nextBoolean() ) : new TestES87TSDBDocValuesFormat(); diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java index 0bee87b560a72..f9dc86e181638 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; @@ -42,6 +43,7 @@ import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormatTests; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -59,14 +61,32 @@ public class TsdbDocValueBwcTests extends ESTestCase { public void testMixedIndex() throws Exception { var oldCodec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); - var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + var compressionMode = ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode(); + var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat(compressionMode)); testMixedIndex(oldCodec, newCodec); } - // TODO update Current to Version1 once version is incremented - public void testMixedIndexDocValueVersion0ToCurrent() throws Exception { + public void testMixedIndexDocValueVersion0ToVersion1() throws Exception { var oldCodec = TestUtil.alwaysDocValuesFormat(new TestES819TSDBDocValuesFormatVersion0()); - var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + var compressionMode = ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode(); + var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat(compressionMode)); + testMixedIndex(oldCodec, newCodec, this::assertVersion819, this::assertVersion819); + } + + public void testMixedIndexDocValueBinaryCompressionFeatureDisabledOldCodec() throws Exception { + // Mimic the behavior of BINARY_DV_COMPRESSION_FEATURE_FLAG being disabled in the oldCodec, but enabled in the newCodec. + var oldCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat(BinaryDVCompressionMode.NO_COMPRESS)); + var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat(BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1)); + testMixedIndex(oldCodec, newCodec, this::assertVersion819, this::assertVersion819); + } + + public void testMixedIndexDocValueBinaryPerBlockCompression() throws Exception { + var oldCodec = TestUtil.alwaysDocValuesFormat( + new ES819TSDBDocValuesFormat(BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1, randomBoolean()) + ); + var newCodec = TestUtil.alwaysDocValuesFormat( + new ES819TSDBDocValuesFormat(BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1, randomBoolean()) + ); testMixedIndex(oldCodec, newCodec, this::assertVersion819, this::assertVersion819); } @@ -81,8 +101,9 @@ public DocValuesFormat getDocValuesFormatForField(String field) { } }; var newCodec = new Elasticsearch92Lucene103Codec() { - - final DocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat(); + final DocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat( + ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode() + ); @Override public DocValuesFormat getDocValuesFormatForField(String field) { @@ -148,8 +169,9 @@ void testMixedIndex(Codec oldCodec, Codec newCodec, VersionAssert assertOldVersi d.add(new SortedNumericDocValuesField(timestampField, timestamp++)); if (r % 10 < 8) { - // Most of the time store counter: + // Most of the time store counter and binary value: d.add(new NumericDocValuesField("counter_1", counter1++)); + d.add(new BinaryDocValuesField("binary_tag", new BytesRef(tags[j % tags.length]))); } if (r % 10 == 5) { @@ -191,6 +213,10 @@ void testMixedIndex(Codec oldCodec, Codec newCodec, VersionAssert assertOldVersi if (tagsDV == null) { tagsDV = DocValues.emptySortedSet(); } + var binaryDV = MultiDocValues.getBinaryValues(reader, "binary_tag"); + if (binaryDV == null) { + binaryDV = DocValues.emptyBinary(); + } for (int i = 0; i < numDocs; i++) { assertEquals(i, hostNameDV.nextDoc()); String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); @@ -221,6 +247,10 @@ void testMixedIndex(Codec oldCodec, Codec newCodec, VersionAssert assertOldVersi assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); } } + if (binaryDV.advanceExact(i)) { + String actualBinary = binaryDV.binaryValue().utf8ToString(); + assertTrue("unexpected binary [" + actualBinary + "]", Arrays.binarySearch(tags, actualBinary) >= 0); + } } } @@ -251,6 +281,10 @@ void testMixedIndex(Codec oldCodec, Codec newCodec, VersionAssert assertOldVersi if (tagsDV == null) { tagsDV = DocValues.emptySortedSet(); } + var binaryDV = MultiDocValues.getBinaryValues(reader, "binary_tag"); + if (binaryDV == null) { + binaryDV = DocValues.emptyBinary(); + } for (int i = 0; i < numDocs; i++) { assertEquals(i, hostNameDV.nextDoc()); String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); @@ -281,6 +315,10 @@ void testMixedIndex(Codec oldCodec, Codec newCodec, VersionAssert assertOldVersi assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); } } + if (binaryDV.advanceExact(i)) { + String actualBinary = binaryDV.binaryValue().utf8ToString(); + assertTrue("unexpected binary [" + actualBinary + "]", Arrays.binarySearch(tags, actualBinary) >= 0); + } } } } @@ -310,7 +348,9 @@ public void testEncodeOrdinalRange() throws IOException { new ES819TSDBDocValuesFormat( random().nextInt(16, 128), nextOrdinalRangeThreshold.getAsInt(), - random().nextBoolean() + random().nextBoolean(), + ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode(), + randomBoolean() ) ) ); diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumerVersion0.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumerVersion0.java index df669f007db2b..9115842533453 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumerVersion0.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumerVersion0.java @@ -88,7 +88,7 @@ final class ES819TSDBDocValuesConsumerVersion0 extends XDocValuesConsumer { CodecUtil.writeIndexHeader( data, dataCodec, - ES819TSDBDocValuesFormat.VERSION_CURRENT, + ES819TSDBDocValuesFormat.VERSION_START, // Test with version 0 rather than current state.segmentInfo.getId(), state.segmentSuffix ); @@ -97,7 +97,7 @@ final class ES819TSDBDocValuesConsumerVersion0 extends XDocValuesConsumer { CodecUtil.writeIndexHeader( meta, metaCodec, - ES819TSDBDocValuesFormat.VERSION_CURRENT, + ES819TSDBDocValuesFormat.VERSION_START, // Test with version 0 rather than current state.segmentInfo.getId(), state.segmentSuffix ); diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index ae4594e0568d7..7f65cb9af3ad5 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec; +import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer.BaseDenseNumericValues; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer.BaseSortedDocValues; @@ -65,8 +66,11 @@ import java.util.function.Supplier; import java.util.stream.IntStream; +import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.BLOCK_BYTES_THRESHOLD; +import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.BLOCK_COUNT_THRESHOLD; import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -78,9 +82,11 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests private final Codec codec = new Elasticsearch92Lucene103Codec() { final ES819TSDBDocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat( - randomIntBetween(2, 4096), - randomIntBetween(1, 512), - random().nextBoolean() + ESTestCase.randomIntBetween(2, 4096), + ESTestCase.randomIntBetween(1, 512), + random().nextBoolean(), + randomBinaryCompressionMode(), + true ); @Override @@ -115,6 +121,123 @@ protected Codec getCodec() { return codec; } + public void testBinaryCompressionFeatureFlag() { + ES819TSDBDocValuesFormat docValueFormat = new ES819TSDBDocValuesFormat(); + if (ES819TSDBDocValuesFormat.BINARY_DV_COMPRESSION_FEATURE_FLAG) { + assertThat(docValueFormat.binaryDVCompressionMode, equalTo(BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1)); + } else { + assertThat(docValueFormat.binaryDVCompressionMode, equalTo(BinaryDVCompressionMode.NO_COMPRESS)); + } + } + + public void testBlockWiseBinary() throws Exception { + boolean sparse = randomBoolean(); + int numBlocksBound = 10; + // Since average size is 25b will hit count threshold rather than size threshold, so use count threshold compute needed docs. + int numNonNullValues = randomIntBetween(0, numBlocksBound * BLOCK_COUNT_THRESHOLD); + + List binaryValues = new ArrayList<>(); + int numNonNull = 0; + while (numNonNull < numNonNullValues) { + if (sparse && randomBoolean()) { + binaryValues.add(null); + } else { + // Average + final String value = randomAlphaOfLengthBetween(0, 50); + binaryValues.add(value); + numNonNull++; + } + } + + assertBinaryValues(binaryValues); + } + + public void testBlockWiseBinarySmallValues() throws Exception { + boolean sparse = randomBoolean(); + int numBlocksBound = 5; + int numNonNullValues = randomIntBetween(0, numBlocksBound * BLOCK_COUNT_THRESHOLD); + + List binaryValues = new ArrayList<>(); + int numNonNull = 0; + while (numNonNull < numNonNullValues) { + if (sparse && randomBoolean()) { + binaryValues.add(null); + } else { + final String value = randomAlphaOfLengthBetween(0, 2); + binaryValues.add(value); + numNonNull++; + } + } + + assertBinaryValues(binaryValues); + } + + public void testBlockWiseBinaryLargeValues() throws Exception { + boolean sparse = randomBoolean(); + int numBlocksBound = 5; + int binaryDataSize = randomIntBetween(0, numBlocksBound * BLOCK_BYTES_THRESHOLD); + List binaryValues = new ArrayList<>(); + int totalSize = 0; + while (totalSize < binaryDataSize) { + if (sparse && randomBoolean()) { + binaryValues.add(null); + } else { + final String value = randomAlphaOfLengthBetween(BLOCK_BYTES_THRESHOLD / 2, 2 * BLOCK_BYTES_THRESHOLD); + binaryValues.add(value); + totalSize += value.length(); + } + } + + assertBinaryValues(binaryValues); + } + + public void assertBinaryValues(List binaryValues) throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + String binaryField = "binary_field"; + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + + int numDocs = binaryValues.size(); + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + long timestamp = baseTimestamp + (1000L * i); + d.add(new SortedDocValuesField(hostnameField, new BytesRef("host-1"))); + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + + String binaryValue = binaryValues.get(i); + if (binaryValue != null) { + d.add(new BinaryDocValuesField(binaryField, new BytesRef(binaryValue))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + iw.forceMerge(1); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var binaryDV = leaf.getBinaryDocValues(binaryField); + assertNotNull(binaryDV); + for (int i = 0; i < numDocs; i++) { + String expected = binaryValues.removeLast(); + if (expected == null) { + assertFalse(binaryDV.advanceExact(i)); + } else { + assertTrue(binaryDV.advanceExact(i)); + assertEquals(expected, binaryDV.binaryValue().utf8ToString()); + } + } + } + } + } + public void testForceMergeDenseCase() throws Exception { String timestampField = "@timestamp"; String hostnameField = "host.name"; @@ -860,9 +983,12 @@ public void testOptionalColumnAtATimeReader() throws Exception { } } - { + // TODO add bulk loading to compressed values so this is not necessary + var block = (TestBlock) binaryFixedDV.tryRead(factory, docs, 0, random().nextBoolean(), null, false); + if (isCompressed(config, binaryFixedField)) { + assertNull(block); + } else { // bulk loading binary fixed length field: - var block = (TestBlock) binaryFixedDV.tryRead(factory, docs, 0, random().nextBoolean(), null, false); assertNotNull(block); assertEquals(size, block.size()); for (int j = 0; j < block.size(); j++) { @@ -872,9 +998,12 @@ public void testOptionalColumnAtATimeReader() throws Exception { } } - { + // TODO add bulk loading to compressed values so this is not necessary + block = (TestBlock) binaryVariableDV.tryRead(factory, docs, 0, random().nextBoolean(), null, false); + if (isCompressed(config, binaryVariableField)) { + assertNull(block); + } else { // bulk loading binary variable length field: - var block = (TestBlock) binaryVariableDV.tryRead(factory, docs, 0, random().nextBoolean(), null, false); assertNotNull(block); assertEquals(size, block.size()); for (int j = 0; j < block.size(); j++) { @@ -1242,25 +1371,32 @@ public void testOptionalColumnAtATimeReaderWithSparseDocs() throws Exception { { // Bulk binary loader can only handle sparse queries over dense documents List testDocs = IntStream.range(0, numDocs - 1).filter(i -> randomBoolean()).boxed().toList(); + docs = TestBlock.docs(testDocs.stream().mapToInt(n -> n).toArray()); if (testDocs.isEmpty() == false) { { - // fixed length - docs = TestBlock.docs(testDocs.stream().mapToInt(n -> n).toArray()); var dv = getDenseBinaryValues(leafReader, binaryFixedField); var block = (TestBlock) dv.tryRead(factory, docs, 0, random().nextBoolean(), null, false); - assertNotNull(block); - for (int i = 0; i < testDocs.size(); i++) { - assertThat(block.get(i), equalTo(binaryFixed[testDocs.get(i)])); + // TODO add bulk loading to compressed values so this is not necessary + if (isCompressed(config, binaryFixedField)) { + assertNull(block); + } else { + assertNotNull(block); + for (int i = 0; i < testDocs.size(); i++) { + assertThat(block.get(i), equalTo(binaryFixed[testDocs.get(i)])); + } } } { - // variable length - docs = TestBlock.docs(testDocs.stream().mapToInt(n -> n).toArray()); var dv = getDenseBinaryValues(leafReader, binaryVariableField); var block = (TestBlock) dv.tryRead(factory, docs, 0, random().nextBoolean(), null, false); - assertNotNull(block); - for (int i = 0; i < testDocs.size(); i++) { - assertThat(block.get(i), equalTo(binaryVariable[testDocs.get(i)])); + // TODO add bulk loading to compressed values so this is not necessary + if (isCompressed(config, binaryVariableField)) { + assertNull(block); + } else { + assertNotNull(block); + for (int i = 0; i < testDocs.size(); i++) { + assertThat(block.get(i), equalTo(binaryVariable[testDocs.get(i)])); + } } } } @@ -1281,7 +1417,9 @@ public void testLoadKeywordFieldWithIndexSorts() throws IOException { final ES819TSDBDocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat( randomIntBetween(2, 4096), 1, // always enable range-encode - random().nextBoolean() + random().nextBoolean(), + randomBinaryCompressionMode(), + randomBoolean() ); @Override @@ -1655,4 +1793,18 @@ private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, b return config; } + public static BinaryDVCompressionMode randomBinaryCompressionMode() { + BinaryDVCompressionMode[] modes = BinaryDVCompressionMode.values(); + return modes[random().nextInt(modes.length)]; + } + + private boolean isCompressed(IndexWriterConfig config, String field) { + if (config.getCodec() instanceof Elasticsearch92Lucene103Codec codec) { + if (codec.getDocValuesFormatForField(field) instanceof ES819TSDBDocValuesFormat format) { + return format.binaryDVCompressionMode != BinaryDVCompressionMode.NO_COMPRESS; + } + } + return false; + } + } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java index 247b75f2977b5..d43e253729a64 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java @@ -19,14 +19,26 @@ public class ES819TSDBDocValuesFormatVariableSkipIntervalTests extends ES87TSDBD protected Codec getCodec() { // small interval size to test with many intervals return TestUtil.alwaysDocValuesFormat( - new ES819TSDBDocValuesFormat(random().nextInt(4, 16), random().nextInt(1, 32), random().nextBoolean()) + new ES819TSDBDocValuesFormat( + random().nextInt(4, 16), + random().nextInt(1, 32), + random().nextBoolean(), + ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode(), + random().nextBoolean() + ) ); } public void testSkipIndexIntervalSize() { IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new ES819TSDBDocValuesFormat(random().nextInt(Integer.MIN_VALUE, 2), random().nextInt(1, 32), random().nextBoolean()) + () -> new ES819TSDBDocValuesFormat( + random().nextInt(Integer.MIN_VALUE, 2), + random().nextInt(1, 32), + random().nextBoolean(), + ES819TSDBDocValuesFormatTests.randomBinaryCompressionMode(), + random().nextBoolean() + ) ); assertTrue(ex.getMessage().contains("skipIndexIntervalSize must be > 1")); } diff --git a/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java b/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java index 527d961197e8a..21684da097e66 100644 --- a/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java +++ b/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java @@ -28,7 +28,8 @@ public enum FeatureFlag { ), RANDOM_SAMPLING("es.random_sampling_feature_flag_enabled=true", Version.fromString("9.2.0"), null), INFERENCE_API_CCM("es.inference_api_ccm_feature_flag_enabled=true", Version.fromString("9.3.0"), null), - GENERIC_VECTOR_FORMAT("es.generic_vector_format_feature_flag_enabled=true", Version.fromString("9.3.0"), null); + GENERIC_VECTOR_FORMAT("es.generic_vector_format_feature_flag_enabled=true", Version.fromString("9.3.0"), null), + BINARY_DOC_VALUE_COMPRESSION("es.binary_dv_compression_feature_flag_enabled=true", Version.fromString("9.3.0"), null); public final String systemProperty; public final Version from; diff --git a/x-pack/plugin/logsdb/src/yamlRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbTestSuiteIT.java b/x-pack/plugin/logsdb/src/yamlRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbTestSuiteIT.java index 5a1ccb4e08140..bfa059d428bc0 100644 --- a/x-pack/plugin/logsdb/src/yamlRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbTestSuiteIT.java +++ b/x-pack/plugin/logsdb/src/yamlRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbTestSuiteIT.java @@ -37,6 +37,7 @@ public class LogsdbTestSuiteIT extends ESClientYamlSuiteTestCase { .setting("xpack.security.autoconfiguration.enabled", "false") .setting("xpack.license.self_generated.type", "trial") .feature(FeatureFlag.DOC_VALUES_SKIPPER) + .feature(FeatureFlag.BINARY_DOC_VALUE_COMPRESSION) .build(); public LogsdbTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/GeoShapeDocValueFormatTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/GeoShapeDocValueFormatTests.java new file mode 100644 index 0000000000000..5498105425c4a --- /dev/null +++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/GeoShapeDocValueFormatTests.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.spatial.index.mapper; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.spatial.LocalStateSpatialPlugin; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.startsWith; + +public class GeoShapeDocValueFormatTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LocalStateSpatialPlugin.class); + } + + /** + * geo_shape uses binary doc values internally. As geo_shape does not work well with binary doc_value compression, if logsdb + * is used, geo_shape should use the Lucene doc value format rather than ES819TSDBDocValuesFormat. + */ + public void testGeoShapeDocValueUseLuceneFormat() throws IOException { + String indexName = "test"; + String fieldName = "field_name"; + Settings settings = Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.getName()).build(); + IndexService indexService = createIndex(indexName, settings, "doc", "@timestamp", "type=date", fieldName, "type=geo_shape"); + + var indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE).source(""" + { + "@timestamp": "2025-10-01T12:34:56.789", + "%field": { + "type" : "Point", + "coordinates" : [-77.03653, 38.897676] + } + } + """.replace("%field", fieldName), XContentType.JSON); + var response = client().bulk(new BulkRequest().add(indexRequest)).actionGet(); + assertFalse(response.hasFailures()); + safeGet(indicesAdmin().refresh(new RefreshRequest(indexName).indicesOptions(IndicesOptions.lenientExpandOpenHidden()))); + + try (var searcher = indexService.getShard(0).acquireSearcher(indexName)) { + try (var indexReader = searcher.getIndexReader()) { + var leaves = indexReader.leaves(); + assertThat(leaves.size(), equalTo(1)); + FieldInfos fieldInfos = leaves.getFirst().reader().getFieldInfos(); + FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldName); + assertNotNull(fieldInfo); + Map attributes = fieldInfo.attributes(); + assertThat(attributes, hasKey("PerFieldDocValuesFormat.format")); + assertThat(attributes.get("PerFieldDocValuesFormat.format"), startsWith("Lucene")); + } + } + } +} diff --git a/x-pack/plugin/wildcard/src/test/java/org/elasticsearch/xpack/wildcard/mapper/WildcardDocValueFormatTests.java b/x-pack/plugin/wildcard/src/test/java/org/elasticsearch/xpack/wildcard/mapper/WildcardDocValueFormatTests.java new file mode 100644 index 0000000000000..01ea60dcf2abf --- /dev/null +++ b/x-pack/plugin/wildcard/src/test/java/org/elasticsearch/xpack/wildcard/mapper/WildcardDocValueFormatTests.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.wildcard.mapper; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.wildcard.Wildcard; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; + +public class WildcardDocValueFormatTests extends ESSingleNodeTestCase { + + protected Collection> getPlugins() { + return List.of(Wildcard.class); + } + + public void testWildcardDocValueUseES819Format() throws IOException { + String indexName = "test"; + String fieldName = "field_name"; + Settings settings = Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.getName()).build(); + IndexService indexService = createIndex(indexName, settings, "doc", "@timestamp", "type=date", fieldName, "type=wildcard"); + + var indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE).source(""" + { + "@timestamp": "2025-10-01T12:34:56.789", + "%field": "baz" + } + """.replace("%field", fieldName), XContentType.JSON); + var response = client().bulk(new BulkRequest().add(indexRequest)).actionGet(); + assertFalse(response.hasFailures()); + safeGet(indicesAdmin().refresh(new RefreshRequest(indexName).indicesOptions(IndicesOptions.lenientExpandOpenHidden()))); + + try (var searcher = indexService.getShard(0).acquireSearcher(indexName)) { + try (var indexReader = searcher.getIndexReader()) { + var leaves = indexReader.leaves(); + assertThat(leaves.size(), equalTo(1)); + FieldInfos fieldInfos = leaves.getFirst().reader().getFieldInfos(); + FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldName); + assertNotNull(fieldInfo); + Map attributes = fieldInfo.attributes(); + assertThat(attributes, hasEntry("PerFieldDocValuesFormat.format", "ES819TSDB")); + } + } + } +}