Skip to content

Commit a201f5c

Browse files
authored
ES819 Binary doc values: compact doc offsets using bit packing (#142772)
The doc offsets are currently being delta encoded into encoded as grouped vints. For some queries reading grouped vints is a relative expensive operations. This PR changes grouped vint encoding with a more simplistic bit packing method. Because this PR makes a format a new doc values format is added that extends from ES819TSDBDocValuesFormat. Only new indices will use this new doc value format. This is required because otherwise in mixed clusters bwc doc values format issues will still occur (even with a new codec version). This is because there is no mechanism that prevents shard recovery from a newer node with higher codec version to an older node with lower codec version. Tying the new doc values format to index version avoids this problem.
1 parent 92ccd73 commit a201f5c

25 files changed

+712
-83
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.elasticsearch.common.logging.LogConfigurator;
2929
import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec;
3030
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
31-
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
31+
import org.elasticsearch.index.codec.tsdb.es819.ES819Version3TSDBDocValuesFormat;
3232
import org.openjdk.jmh.annotations.Benchmark;
3333
import org.openjdk.jmh.annotations.BenchmarkMode;
3434
import org.openjdk.jmh.annotations.Fork;
@@ -54,6 +54,8 @@
5454
import java.util.concurrent.TimeUnit;
5555
import java.util.function.Supplier;
5656

57+
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.NUMERIC_LARGE_BLOCK_SHIFT;
58+
5759
@BenchmarkMode(Mode.SingleShotTime)
5860
@OutputTimeUnit(TimeUnit.MILLISECONDS)
5961
@State(Scope.Benchmark)
@@ -258,12 +260,13 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
258260
);
259261
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
260262
config.setMergePolicy(new LogByteSizeMergePolicy());
261-
var docValuesFormat = new ES819TSDBDocValuesFormat(
263+
var docValuesFormat = new ES819Version3TSDBDocValuesFormat(
262264
4096,
263265
512,
264266
optimizedMergeEnabled,
265267
BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1,
266-
true
268+
true,
269+
NUMERIC_LARGE_BLOCK_SHIFT
267270
);
268271
config.setCodec(new Elasticsearch92Lucene103Codec() {
269272
@Override

docs/changelog/142772.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: Codec
2+
issues: []
3+
pr: 142772
4+
summary: "ES819 Binary doc values: compact doc offsets using bit packing"
5+
type: enhancement

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@
458458
with
459459
org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat,
460460
org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat,
461+
org.elasticsearch.index.codec.tsdb.es819.ES819Version3TSDBDocValuesFormat,
461462
org.elasticsearch.index.codec.bloomfilter.ES94BloomFilterDocValuesFormat;
462463
provides org.apache.lucene.codecs.KnnVectorsFormat
463464
with

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ private static Version parseUnchecked(String version) {
222222
public static final IndexVersion DISK_BBQ_QUANTIZE_BITS = def(9_069_0_00, Version.LUCENE_10_3_2);
223223
public static final IndexVersion ID_FIELD_USE_ES812_POSTINGS_FORMAT = def(9_070_0_00, Version.LUCENE_10_3_2);
224224
public static final IndexVersion TIME_SERIES_USE_SYNTHETIC_ID_94 = def(9_071_0_00, Version.LUCENE_10_3_2);
225+
public static final IndexVersion TIME_SERIES_DOC_VALUES_FORMAT_VERSION_3 = def(9_072_0_00, Version.LUCENE_10_3_2);
225226

226227
/*
227228
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.elasticsearch.index.codec.bloomfilter.ES94BloomFilterDocValuesFormat;
2323
import org.elasticsearch.index.codec.postings.ES812PostingsFormat;
2424
import org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat;
25-
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
25+
import org.elasticsearch.index.codec.tsdb.es819.TSDBDocValuesFormatFactory;
2626
import org.elasticsearch.index.codec.vectors.es93.ES93HnswVectorsFormat;
2727
import org.elasticsearch.index.mapper.CompletionFieldMapper;
2828
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -67,8 +67,6 @@ public class PerFieldFormatSupplier {
6767

6868
private static final DocValuesFormat docValuesFormat = new Lucene90DocValuesFormat();
6969
private final KnnVectorsFormat knnVectorsFormat;
70-
private static final ES819TSDBDocValuesFormat tsdbDocValuesFormat = ES819TSDBDocValuesFormat.getInstance(false);
71-
private static final ES819TSDBDocValuesFormat tsdbDocValuesFormatLargeNumericBlock = ES819TSDBDocValuesFormat.getInstance(true);
7270
private static final ES812PostingsFormat es812PostingsFormat = new ES812PostingsFormat();
7371
private static final PostingsFormat completionPostingsFormat = PostingsFormat.forName("Completion101");
7472

@@ -197,9 +195,9 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
197195
}
198196

199197
if (useTSDBDocValuesFormat(field)) {
200-
return (mapperService != null && mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeBlockSize())
201-
? tsdbDocValuesFormatLargeNumericBlock
202-
: tsdbDocValuesFormat;
198+
var indexCreatedVersion = mapperService.getIndexSettings().getIndexVersionCreated();
199+
boolean useLargeBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeBlockSize();
200+
return TSDBDocValuesFormatFactory.createDocValuesFormat(indexCreatedVersion, useLargeBlockSize);
203201
}
204202

205203
return docValuesFormat;
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.apache.lucene.store.DataInput;
13+
import org.apache.lucene.store.DataOutput;
14+
import org.apache.lucene.util.GroupVIntUtil;
15+
import org.apache.lucene.util.packed.PackedInts;
16+
17+
import java.io.IOException;
18+
import java.util.Arrays;
19+
20+
/**
21+
* Represents a codec for encoding and decoding document offsets.
22+
* Each codec defines custom strategies for compression and decompression
23+
* of document offsets for compressed binary doc values.
24+
*/
25+
enum DocOffsetsCodec {
26+
27+
/**
28+
* A codec that uses delta encoding and bit-packing for storage of document offsets.
29+
*/
30+
BITPACKING {
31+
@Override
32+
public Encoder getEncoder() {
33+
return (docOffsets, numDocsInCurrentBlock, output) -> {
34+
int numOffsets = numDocsInCurrentBlock + 1;
35+
// delta encode
36+
int maxDelta = 0;
37+
for (int i = numOffsets - 1; i > 0; i--) {
38+
docOffsets[i] -= docOffsets[i - 1];
39+
maxDelta = Math.max(maxDelta, docOffsets[i]);
40+
}
41+
int bitsPerValue = maxDelta == 0 ? 0 : PackedInts.bitsRequired(maxDelta);
42+
output.writeByte((byte) bitsPerValue);
43+
if (bitsPerValue > 0) {
44+
long accumulator = 0;
45+
int bitsInAccumulator = 0;
46+
for (int i = 0; i < numOffsets; i++) {
47+
accumulator = (accumulator << bitsPerValue) | docOffsets[i];
48+
bitsInAccumulator += bitsPerValue;
49+
while (bitsInAccumulator >= 8) {
50+
bitsInAccumulator -= 8;
51+
output.writeByte((byte) (accumulator >>> bitsInAccumulator));
52+
}
53+
}
54+
if (bitsInAccumulator > 0) {
55+
output.writeByte((byte) (accumulator << (8 - bitsInAccumulator)));
56+
}
57+
}
58+
};
59+
}
60+
61+
@Override
62+
public Decoder getDecoder() {
63+
return (docOffsets, numDocsInBlock, input) -> {
64+
int numOffsets = numDocsInBlock + 1;
65+
int bitsPerValue = input.readByte() & 0xFF;
66+
if (bitsPerValue == 0) {
67+
Arrays.fill(docOffsets, 0, numOffsets, 0);
68+
} else {
69+
int totalBits = numOffsets * bitsPerValue;
70+
int totalBytes = (totalBits + 7) / 8;
71+
long accumulator = 0;
72+
int bitsInAccumulator = 0;
73+
int offsetIndex = 0;
74+
int mask = (1 << bitsPerValue) - 1;
75+
for (int i = 0; i < totalBytes && offsetIndex < numOffsets; i++) {
76+
accumulator = (accumulator << 8) | (input.readByte() & 0xFF);
77+
bitsInAccumulator += 8;
78+
while (bitsInAccumulator >= bitsPerValue && offsetIndex < numOffsets) {
79+
bitsInAccumulator -= bitsPerValue;
80+
docOffsets[offsetIndex++] = (int) ((accumulator >>> bitsInAccumulator) & mask);
81+
}
82+
}
83+
}
84+
deltaDecode(docOffsets, numOffsets);
85+
};
86+
}
87+
},
88+
/**
89+
* A codec that uses grouped VInts for storage of document offsets.
90+
*/
91+
GROUPED_VINT {
92+
@Override
93+
public Encoder getEncoder() {
94+
return (docOffsets, numDocsInCurrentBlock, output) -> {
95+
int numOffsets = numDocsInCurrentBlock + 1;
96+
// delta encode
97+
for (int i = numOffsets - 1; i > 0; i--) {
98+
docOffsets[i] -= docOffsets[i - 1];
99+
}
100+
output.writeGroupVInts(docOffsets, numOffsets);
101+
};
102+
}
103+
104+
@Override
105+
public Decoder getDecoder() {
106+
return (docOffsets, numDocsInBlock, input) -> {
107+
int numOffsets = numDocsInBlock + 1;
108+
GroupVIntUtil.readGroupVInts(input, docOffsets, numOffsets);
109+
deltaDecode(docOffsets, numOffsets);
110+
};
111+
}
112+
};
113+
114+
public abstract Encoder getEncoder();
115+
116+
public abstract Decoder getDecoder();
117+
118+
/**
119+
* An encoder to store doc offsets in a more space-efficient format for storage.
120+
*/
121+
@FunctionalInterface
122+
public interface Encoder {
123+
/**
124+
* Encodes doc offsets in a more space efficient format for storage.
125+
*
126+
* @param docOffsets an array of document offsets to encode
127+
* @param numDocsInCurrentBlock the number of documents in the current block to encode
128+
* @param output the {@link DataOutput} to which the encoded data is written
129+
* @throws IOException if an I/O error occurs during the encoding process
130+
*/
131+
void encode(int[] docOffsets, int numDocsInCurrentBlock, DataOutput output) throws IOException;
132+
}
133+
134+
/**
135+
* A decoder to decode the format on disk to doc offsets.
136+
* A decoder performs the operations that an encoder performs in reverse order.
137+
*/
138+
@FunctionalInterface
139+
public interface Decoder {
140+
/**
141+
* Decodes the format on disk to doc offsets.
142+
*
143+
* @param docOffsets the array to store decoded document offsets
144+
* @param numDocsInBlock the number of documents in the block to be decoded
145+
* @param input the input source containing encoded data to be decoded
146+
* @throws IOException if an I/O error occurs during decoding
147+
*/
148+
void decode(int[] docOffsets, int numDocsInBlock, DataInput input) throws IOException;
149+
}
150+
151+
// Borrowed from to TSDBDocValuesEncoder.decodeDelta
152+
// The `sum` variable helps compiler optimize method, should not be removed.
153+
void deltaDecode(int[] arr, int length) {
154+
int sum = 0;
155+
for (int i = 0; i < length; ++i) {
156+
sum += arr[i];
157+
arr[i] = sum;
158+
}
159+
}
160+
161+
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,12 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
7878
final SegmentWriteState state;
7979
final BinaryDVCompressionMode binaryDVCompressionMode;
8080
private final boolean enablePerBlockCompression; // only false for testing
81+
private final DocOffsetsCodec.Encoder docOffsetsEncoder;
8182

8283
ES819TSDBDocValuesConsumer(
8384
BinaryDVCompressionMode binaryDVCompressionMode,
8485
final boolean enablePerBlockCompression,
86+
DocOffsetsCodec.Encoder docOffsetsEncoder,
8587
SegmentWriteState state,
8688
int skipIndexIntervalSize,
8789
int minDocsPerOrdinalForOrdinalRangeEncoding,
@@ -94,6 +96,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
9496
) throws IOException {
9597
this.binaryDVCompressionMode = binaryDVCompressionMode;
9698
this.enablePerBlockCompression = enablePerBlockCompression;
99+
this.docOffsetsEncoder = docOffsetsEncoder;
97100
this.state = state;
98101
this.termsDictBuffer = new byte[1 << 14];
99102
this.dir = state.directory;
@@ -584,7 +587,7 @@ public void flushData() throws IOException {
584587
maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength);
585588
maxNumDocsInAnyBlock = Math.max(maxNumDocsInAnyBlock, numDocsInCurrentBlock);
586589

587-
compressOffsets(data, numDocsInCurrentBlock);
590+
docOffsetsEncoder.encode(docOffsets, numDocsInCurrentBlock, data);
588591

589592
if (shouldCompress) {
590593
compress(block, uncompressedBlockLength, data);
@@ -597,15 +600,6 @@ public void flushData() throws IOException {
597600
numDocsInCurrentBlock = uncompressedBlockLength = 0;
598601
}
599602

600-
void compressOffsets(DataOutput output, int numDocsInCurrentBlock) throws IOException {
601-
int numOffsets = numDocsInCurrentBlock + 1;
602-
// delta encode
603-
for (int i = numOffsets - 1; i > 0; i--) {
604-
docOffsets[i] -= docOffsets[i - 1];
605-
}
606-
output.writeGroupVInts(docOffsets, numOffsets);
607-
}
608-
609603
void compress(byte[] data, int uncompressedLength, DataOutput output) throws IOException {
610604
ByteBuffer inputBuffer = ByteBuffer.wrap(data, 0, uncompressedLength);
611605
ByteBuffersDataInput input = new ByteBuffersDataInput(List.of(inputBuffer));

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesFormat {
3939

4040
static final int NUMERIC_BLOCK_SHIFT = 7;
41-
static final int NUMERIC_LARGE_BLOCK_SHIFT = 9;
41+
public static final int NUMERIC_LARGE_BLOCK_SHIFT = 9;
4242
static final int DIRECT_MONOTONIC_BLOCK_SHIFT = 16;
4343
static final String CODEC_NAME = "ES819TSDB";
4444
static final String DATA_CODEC = "ES819TSDBDocValuesData";
@@ -73,7 +73,7 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues
7373
public static final int BLOCK_COUNT_THRESHOLD = 1024;
7474

7575
// number of documents in an interval
76-
private static final int DEFAULT_SKIP_INDEX_INTERVAL_SIZE = 4096;
76+
static final int DEFAULT_SKIP_INDEX_INTERVAL_SIZE = 4096;
7777
// bytes on an interval:
7878
// * 1 byte : number of levels
7979
// * 16 bytes: min / max value,
@@ -139,6 +139,7 @@ private static boolean getOptimizedMergeEnabledDefault() {
139139
final boolean enableOptimizedMerge;
140140
final BinaryDVCompressionMode binaryDVCompressionMode;
141141
final boolean enablePerBlockCompression;
142+
final DocOffsetsCodec docOffsetsCodec;
142143

143144
public static ES819TSDBDocValuesFormat getInstance(boolean useLargeNumericBlock) {
144145
return useLargeNumericBlock ? new ES819TSDBDocValuesFormat(NUMERIC_LARGE_BLOCK_SHIFT) : new ES819TSDBDocValuesFormat();
@@ -207,7 +208,29 @@ public ES819TSDBDocValuesFormat(
207208
final boolean enablePerBlockCompression,
208209
final int numericBlockShift
209210
) {
210-
super(CODEC_NAME);
211+
this(
212+
CODEC_NAME,
213+
skipIndexIntervalSize,
214+
minDocsPerOrdinalForRangeEncoding,
215+
enableOptimizedMerge,
216+
binaryDVCompressionMode,
217+
enablePerBlockCompression,
218+
numericBlockShift,
219+
DocOffsetsCodec.GROUPED_VINT
220+
);
221+
}
222+
223+
public ES819TSDBDocValuesFormat(
224+
String codecName,
225+
int skipIndexIntervalSize,
226+
int minDocsPerOrdinalForRangeEncoding,
227+
boolean enableOptimizedMerge,
228+
BinaryDVCompressionMode binaryDVCompressionMode,
229+
final boolean enablePerBlockCompression,
230+
final int numericBlockShift,
231+
DocOffsetsCodec docOffsetsCodec
232+
) {
233+
super(codecName);
211234
assert numericBlockShift == NUMERIC_BLOCK_SHIFT || numericBlockShift == NUMERIC_LARGE_BLOCK_SHIFT : numericBlockShift;
212235
if (skipIndexIntervalSize < 2) {
213236
throw new IllegalArgumentException("skipIndexIntervalSize must be > 1, got [" + skipIndexIntervalSize + "]");
@@ -218,13 +241,15 @@ public ES819TSDBDocValuesFormat(
218241
this.binaryDVCompressionMode = binaryDVCompressionMode;
219242
this.enablePerBlockCompression = enablePerBlockCompression;
220243
this.numericBlockShift = numericBlockShift;
244+
this.docOffsetsCodec = docOffsetsCodec;
221245
}
222246

223247
@Override
224248
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
225249
return new ES819TSDBDocValuesConsumer(
226250
binaryDVCompressionMode,
227251
enablePerBlockCompression,
252+
docOffsetsCodec.getEncoder(),
228253
state,
229254
skipIndexIntervalSize,
230255
minDocsPerOrdinalForRangeEncoding,
@@ -239,6 +264,6 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept
239264

240265
@Override
241266
public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException {
242-
return new ES819TSDBDocValuesProducer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
267+
return new ES819TSDBDocValuesProducer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION, docOffsetsCodec.getDecoder());
243268
}
244269
}

0 commit comments

Comments
 (0)