Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
74880a0
Copy binary compression from LUCENE-9211
parkertimmins Oct 23, 2025
a973713
Initial version of block withs variable number values
parkertimmins Oct 23, 2025
3fc95dc
Fix issue with index output unclosed
parkertimmins Oct 23, 2025
c302cc2
Changes docRanges to single limit per block, plus start of 0
parkertimmins Oct 23, 2025
99748c8
Factor block address and block doc offset to accumulator class
parkertimmins Oct 23, 2025
fa2ea11
Rename offset accumulator
parkertimmins Oct 24, 2025
b67dd58
Change lz4 to zstd
parkertimmins Oct 24, 2025
638dbbc
Fix direct monotonic reader size
parkertimmins Oct 24, 2025
fdf3428
Fix docRangeLen bug, use for non-logsdb wildcards
parkertimmins Oct 24, 2025
36b3e10
Change offset encoding from zstd to numeric
parkertimmins Oct 24, 2025
eeded36
[CI] Auto commit changes from spotless
Oct 24, 2025
2d8e6dc
Fix missing compression in es819 format
parkertimmins Oct 25, 2025
efa270f
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Oct 25, 2025
c4d67e5
Store offsets rather than lengths
parkertimmins Oct 25, 2025
06a2035
[CI] Auto commit changes from spotless
Oct 25, 2025
7ccb18d
Remove forbidden APIs
parkertimmins Oct 25, 2025
a57e0d4
[CI] Auto commit changes from spotless
Oct 25, 2025
f156e55
Binary search to find block containing docNum
parkertimmins Oct 27, 2025
91e5842
[CI] Auto commit changes from spotless
Oct 27, 2025
401a041
do not mmap temp offset files
parkertimmins Oct 27, 2025
ad55bc3
feedback
parkertimmins Oct 27, 2025
4d4e153
[CI] Auto commit changes from spotless
Oct 27, 2025
f1ff182
Move zstd (de)compressor to separate class
parkertimmins Oct 27, 2025
9d2f237
Combine doAddCompressedBinary and doAddUncompressedBinary
parkertimmins Oct 27, 2025
2269f9c
[CI] Auto commit changes from spotless
Oct 27, 2025
1c4e9dc
feedback
parkertimmins Oct 28, 2025
3ddb649
Add WildcardRollingUpgradeIT
parkertimmins Oct 28, 2025
dbcd1c6
need new compressor/decompressor for new block writer
parkertimmins Oct 29, 2025
5537d8c
[CI] Auto commit changes from spotless
Oct 29, 2025
d7fce75
Cleanup binaryWriter interface
parkertimmins Oct 29, 2025
bb8361c
Revert "[CI] Auto commit changes from spotless"
parkertimmins Oct 29, 2025
aa3d44f
Revert "Add WildcardRollingUpgradeIT"
parkertimmins Oct 29, 2025
2c1f143
[CI] Auto commit changes from spotless
Oct 29, 2025
636c150
Update code lookup to support other compressors
parkertimmins Oct 29, 2025
09898ff
feedback
parkertimmins Oct 29, 2025
8b8b50b
Update bwc tests
parkertimmins Oct 29, 2025
8a82c23
cleanup
parkertimmins Oct 29, 2025
cef255f
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Oct 29, 2025
718ffc6
Fix test broken from merge
parkertimmins Oct 29, 2025
ebda5b0
Update docs/changelog/137139.yaml
parkertimmins Oct 30, 2025
9fc23f1
Move block address and doc_range accumulators into BlockMetadataAccum…
parkertimmins Oct 30, 2025
49e5425
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Oct 30, 2025
80525bf
Unit tests that require multiple doc value blocks
parkertimmins Oct 31, 2025
b1d4b17
Test values near the size of a block
parkertimmins Oct 31, 2025
e332619
Self close BlockMetadataAcc if throw during construction
parkertimmins Oct 31, 2025
60ebfaa
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 3, 2025
1209e78
Update tsdb doc_values bwc test to mention version 1
parkertimmins Nov 3, 2025
80c14a3
Update docs/changelog/137139.yaml
parkertimmins Nov 3, 2025
602c203
Disable compression for geo_shape type
parkertimmins Nov 4, 2025
d6293d9
Test that wildcard uses ES819 docs encoding and geo_shape does not
parkertimmins Nov 4, 2025
982386e
[CI] Auto commit changes from spotless
Nov 4, 2025
e61b8c2
Add feature flag for binary dv compression
parkertimmins Nov 6, 2025
a225b98
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 7, 2025
f6fd5bd
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 7, 2025
5fe2c80
Add block count threshold in addition to size threshold
parkertimmins Nov 7, 2025
51b21ae
[CI] Auto commit changes from spotless
Nov 7, 2025
07eeb5a
Add test for very small binary values
parkertimmins Nov 7, 2025
d56d12f
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 14, 2025
980df97
Use groupVarInt instead of TSDB encoder
parkertimmins Nov 14, 2025
21a98ac
Dont test bulk loading if compressed, as not implemented
parkertimmins Nov 14, 2025
2239732
[CI] Auto commit changes from spotless
Nov 14, 2025
15823e8
Fix broken merge
parkertimmins Nov 14, 2025
25dcb56
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 14, 2025
200e14c
Revert to using TSDBDocValueEncoder for offsets
parkertimmins Nov 15, 2025
5ca24b4
Better naming and minor optmization
parkertimmins Nov 15, 2025
7f8fa16
Dont need to grow offsets array
parkertimmins Nov 15, 2025
91c23ee
And back to GroupedVarInt, this time with better delta decoding
parkertimmins Nov 17, 2025
92c8050
Add header to control whether block is compressed or uncompressed
parkertimmins Nov 17, 2025
016352a
Handle isCompressed in ES819DocValuesProducer, add bwc tests
parkertimmins Nov 17, 2025
8a2af81
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 17, 2025
026406b
[CI] Auto commit changes from spotless
Nov 17, 2025
d27bb8b
Skip bulk loading tests if compressed
parkertimmins Nov 17, 2025
db68af6
review feedback
parkertimmins Nov 18, 2025
50d9a26
Merge branch 'main' into parker/compressed-binary-doc-values
parkertimmins Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec;
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -257,7 +258,7 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
);
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
config.setMergePolicy(new LogByteSizeMergePolicy());
var docValuesFormat = new ES819TSDBDocValuesFormat(4096, 512, optimizedMergeEnabled);
var docValuesFormat = new ES819TSDBDocValuesFormat(4096, 512, optimizedMergeEnabled, BinaryDVCompressionMode.COMPRESSED_WITH_ZSTD);
config.setCodec(new Elasticsearch92Lucene103Codec() {
@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb;

public enum BinaryDVCompressionMode {

NO_COMPRESS((byte) 0),
COMPRESSED_WITH_ZSTD((byte) 1);

public final byte code;

BinaryDVCompressionMode(byte code) {
this.code = code;
}

public static BinaryDVCompressionMode fromMode(byte mode) {
return switch (mode) {
case 0 -> NO_COMPRESS;
case 1 -> COMPRESSED_WITH_ZSTD;
default -> throw new IllegalStateException("unknown compression mode [" + mode + "]");
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.es819;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.elasticsearch.core.IOUtils;

import java.io.Closeable;
import java.io.IOException;

/**
* Like OffsetsAccumulator builds offsets and stores in a DirectMonotonicWriter. But write to temp file
* rather than directly to a DirectMonotonicWriter because the number of values is unknown. If number of
* values if known prefer OffsetsWriter.
*/
final class DelayedOffsetAccumulator implements Closeable {
private final Directory dir;
private final long startOffset;

private int numValues = 0;
private final IndexOutput tempOutput;
private final String suffix;

DelayedOffsetAccumulator(Directory dir, IOContext context, IndexOutput data, String suffix, long startOffset) throws IOException {
this.dir = dir;
this.startOffset = startOffset;
this.suffix = suffix;

boolean success = false;
try {
tempOutput = dir.createTempOutput(data.getName(), suffix, context);
CodecUtil.writeHeader(tempOutput, ES819TSDBDocValuesFormat.META_CODEC + suffix, ES819TSDBDocValuesFormat.VERSION_CURRENT);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be tested

}
}
}

public void addDoc(long delta) throws IOException {
tempOutput.writeVLong(delta);
numValues++;
}

public void build(IndexOutput meta, IndexOutput data) throws IOException {
CodecUtil.writeFooter(tempOutput);
IOUtils.close(tempOutput);

// write the offsets info to the meta file by reading from temp file
try (ChecksumIndexInput tempInput = dir.openChecksumInput(tempOutput.getName());) {
CodecUtil.checkHeader(
tempInput,
ES819TSDBDocValuesFormat.META_CODEC + suffix,
ES819TSDBDocValuesFormat.VERSION_CURRENT,
ES819TSDBDocValuesFormat.VERSION_CURRENT
);
Throwable priorE = null;
try {
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
meta,
data,
numValues + 1,
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);

long offset = startOffset;
writer.add(offset);
for (int i = 0; i < numValues; ++i) {
offset += tempInput.readVLong();
writer.add(offset);
}
writer.finish();
} catch (Throwable e) {
priorE = e;
} finally {
CodecUtil.checkFooter(tempInput, priorE);
}
}
}

@Override
public void close() throws IOException {
if (tempOutput != null) {
IOUtils.close(tempOutput, () -> dir.deleteFile(tempOutput.getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.SortedSetSelector;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
Expand All @@ -41,15 +43,20 @@
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.index.codec.tsdb.es819.DocValuesConsumerUtil.compatibleWithOptimizedMerge;
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE;
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_LEVEL_SHIFT;
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_MAX_LEVEL;
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SORTED_SET;
Expand All @@ -65,8 +72,11 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
private final int minDocsPerOrdinalForOrdinalRangeEncoding;
final boolean enableOptimizedMerge;
private final int primarySortFieldNumber;
final SegmentWriteState state;
final BinaryDVCompressionMode binaryDVCompressionMode;

ES819TSDBDocValuesConsumer(
BinaryDVCompressionMode binaryDVCompressionMode,
SegmentWriteState state,
int skipIndexIntervalSize,
int minDocsPerOrdinalForOrdinalRangeEncoding,
Expand All @@ -76,6 +86,8 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
String metaCodec,
String metaExtension
) throws IOException {
this.binaryDVCompressionMode = binaryDVCompressionMode;
this.state = state;
this.termsDictBuffer = new byte[1 << 14];
this.dir = state.directory;
this.minDocsPerOrdinalForOrdinalRangeEncoding = minDocsPerOrdinalForOrdinalRangeEncoding;
Expand Down Expand Up @@ -315,7 +327,14 @@ public void mergeBinaryField(FieldInfo mergeFieldInfo, MergeState mergeState) th
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
meta.writeInt(field.number);
meta.writeByte(ES819TSDBDocValuesFormat.BINARY);
meta.writeByte(binaryDVCompressionMode.code);
switch (binaryDVCompressionMode) {
case NO_COMPRESS -> doAddUncompressedBinary(field, valuesProducer);
case COMPRESSED_WITH_ZSTD -> doAddCompressedBinary(field, valuesProducer);
}
}

public void doAddUncompressedBinary(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer.mergeStats.supported()) {
final int numDocsWithField = tsdbValuesProducer.mergeStats.sumNumDocsWithField();
final int minLength = tsdbValuesProducer.mergeStats.minLength();
Expand Down Expand Up @@ -444,6 +463,181 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
}
}

public void doAddCompressedBinary(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
try (CompressedBinaryBlockWriter blockWriter = new CompressedBinaryBlockWriter()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] could use some comments here, explaining what each chunk of code does

BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
int numDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
numDocsWithField++;
BytesRef v = values.binaryValue();
blockWriter.addDoc(v);
int length = v.length;
minLength = Math.min(length, minLength);
maxLength = Math.max(length, maxLength);
}
blockWriter.flushData();

assert numDocsWithField <= maxDoc;
meta.writeLong(data.getFilePointer() - start); // dataLength

if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getBinary(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}

meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);

blockWriter.writeMetaData();
}
}

private class CompressedBinaryBlockWriter implements Closeable {
static final int MIN_BLOCK_BYTES = 256 * 1024;
static final int START_BLOCK_DOCS = 1024;
static final int ZSTD_LEVEL = 1;

final Zstd814StoredFieldsFormat.ZstdCompressor compressor = new Zstd814StoredFieldsFormat.ZstdCompressor(ZSTD_LEVEL);

final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
final long[] docOffsetsCompressBuffer = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
int[] docOffsets = new int[START_BLOCK_DOCS];

int uncompressedBlockLength = 0;
int maxUncompressedBlockLength = 0;
int numDocsInCurrentBlock = 0;

byte[] block = BytesRef.EMPTY_BYTES;
int totalChunks = 0;
long maxPointer = 0;
int maxNumDocsInAnyBlock = 0;

final DelayedOffsetAccumulator blockAddressAcc;
final DelayedOffsetAccumulator blockDocRangeAcc;

CompressedBinaryBlockWriter() throws IOException {
long blockAddressesStart = data.getFilePointer();
blockAddressAcc = new DelayedOffsetAccumulator(state.directory, state.context, data, "block-addresses", blockAddressesStart);

try {
blockDocRangeAcc = new DelayedOffsetAccumulator(state.directory, state.context, data, "block-doc-ranges", 0);
} catch (IOException e) {
blockAddressAcc.close();
throw e;
}
}

void addDoc(BytesRef v) throws IOException {
block = ArrayUtil.grow(block, uncompressedBlockLength + v.length);
System.arraycopy(v.bytes, v.offset, block, uncompressedBlockLength, v.length);
uncompressedBlockLength += v.length;

numDocsInCurrentBlock++;
docOffsets = ArrayUtil.grow(docOffsets, numDocsInCurrentBlock + 1); // need one extra since writing start for next block
docOffsets[numDocsInCurrentBlock] = uncompressedBlockLength;

if (uncompressedBlockLength > MIN_BLOCK_BYTES) {
flushData();
}
}

private void flushData() throws IOException {
if (numDocsInCurrentBlock > 0) {
totalChunks++;
long thisBlockStartPointer = data.getFilePointer();

// write length of string data
data.writeInt(uncompressedBlockLength);

maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength);
maxNumDocsInAnyBlock = Math.max(maxNumDocsInAnyBlock, numDocsInCurrentBlock);

compressOffsets(data, numDocsInCurrentBlock);
compress(block, uncompressedBlockLength, data);

blockDocRangeAcc.addDoc(numDocsInCurrentBlock);
numDocsInCurrentBlock = 0;

uncompressedBlockLength = 0;
maxPointer = data.getFilePointer();
long blockLenBytes = maxPointer - thisBlockStartPointer;
blockAddressAcc.addDoc(blockLenBytes);
}
}

void compressOffsets(DataOutput output, int numDocsInCurrentBlock) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we encode the lengths using GroupVIntUtil#writeGroupVInts instead? I'm not sure TSDBDocValuesEncoder is suitable for encoding these offsets. Also, always padding 128 offsets may be wasteful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, especially after limiting the number of docs per block to 1024 the padding could be a concern. Sounds good, I'll give this a try 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so I'm seeing a slow-down with readGroupVInts on some benchmark queries. Mostly small decreases that could be noise, but some in the 25-40% range that are concerning. I'd think that GroupVIntUtil would be quite fast. Is there possibly something I'm missing in the decompression code that could speed it up? I'm currently benchmarking with uncompressed offsets to get a baseline for offset (de)compression.

int batchStart = 0;
int numOffsets = numDocsInCurrentBlock + 1;
while (batchStart < numOffsets) {
int batchLength = Math.min(numOffsets - batchStart, NUMERIC_BLOCK_SIZE);
for (int i = 0; i < batchLength; i++) {
docOffsetsCompressBuffer[i] = docOffsets[batchStart + i];
}
if (batchLength < docOffsetsCompressBuffer.length) {
Arrays.fill(docOffsetsCompressBuffer, batchLength, docOffsetsCompressBuffer.length, 0);
}
encoder.encode(docOffsetsCompressBuffer, output);
batchStart += batchLength;
}
}

void compress(byte[] data, int uncompressedLength, DataOutput output) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use Zstd directly, should we also handle cases where compression does not reduce storage and store the raw bytes instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this the idea of not compressing if it doesn't help. This would still apply with non-direct Zstd, right? I guess for non-direct Zstd we'd need a separate output buffer to check the length before sending to output.

I discussed some with Martijn and he suggested adding a signal byte now, which says whether or not the data is compressed. It would always be set to true now, but can will support false once we add Zstd direct, and enable this optimization. What do you think?

ByteBuffer inputBuffer = ByteBuffer.wrap(data, 0, uncompressedLength);
ByteBuffersDataInput input = new ByteBuffersDataInput(List.of(inputBuffer));
compressor.compress(input, output);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use Zstd from NativeAccess directly to avoid copying data to an intermediate buffer before the native buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is that currently this uses lucene's Compressor/CompressionMode. Which will make it easy to add other compressors. On the other hand, as we previously spoke about, it might make sense to use LZ4 to partially decompress blocks. If that is the case, we may not want to use the Compressor interface ... though I'm actually not sure either way.

Anyway, I split a hacky version of this off here, and will benchmark it to see if it's worth doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran some benchmarks on the above hacky version and got some weird results. Some of the queries got a nice throughput increase. The weird part is that the Store Size increased by an amount that was not reflected in the output of disk_usage. There must be a bug in my version that is causing this.

To keep this PR small(er), what do you think about updating to using NativeAccess directly in a separate PR?

}

void writeMetaData() throws IOException {
if (totalChunks == 0) {
return;
}

long dataAddressesStart = data.getFilePointer();

meta.writeLong(dataAddressesStart);
meta.writeVInt(totalChunks);
meta.writeVInt(maxUncompressedBlockLength);
meta.writeVInt(maxNumDocsInAnyBlock);
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);

blockAddressAcc.build(meta, data);
long dataDocRangeStart = data.getFilePointer();
long addressesLength = dataDocRangeStart - dataAddressesStart;
meta.writeLong(addressesLength);

meta.writeLong(dataDocRangeStart);
blockDocRangeAcc.build(meta, data);
long docRangesLen = data.getFilePointer() - dataDocRangeStart;
meta.writeLong(docRangesLen);
}

@Override
public void close() throws IOException {
blockDocRangeAcc.close();
blockAddressAcc.close();
}
}

@Override
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
meta.writeInt(field.number);
Expand Down
Loading