Skip to content

Commit e45360f

Browse files
committed
Copy uncompressed addBinaryDocValues
1 parent e1b7d50 commit e45360f

File tree

3 files changed

+341
-3
lines changed

3 files changed

+341
-3
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.codec.tsdb;
10+
11+
public enum BinaryDVCompressionMode {
12+
13+
NO_COMPRESS((byte) 0),
14+
COMPRESSED_WITH_LZ4((byte) 1);
15+
16+
public final byte code;
17+
18+
BinaryDVCompressionMode(byte code) {
19+
this.code = code;
20+
}
21+
22+
static BinaryDVCompressionMode fromMode(byte mode) {
23+
return switch (mode) {
24+
case 0 -> NO_COMPRESS;
25+
case 1 -> COMPRESSED_WITH_LZ4;
26+
default -> throw new IllegalStateException("unknown compression mode [" + mode + "]");
27+
};
28+
}
29+
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
package org.elasticsearch.index.codec.tsdb.es819;
1111

12+
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
1213
import org.apache.lucene.codecs.CodecUtil;
1314
import org.apache.lucene.codecs.DocValuesProducer;
1415
import org.apache.lucene.codecs.lucene90.IndexedDISI;
1516
import org.apache.lucene.index.BinaryDocValues;
17+
import org.apache.lucene.index.CorruptIndexException;
1618
import org.apache.lucene.index.DocValues;
1719
import org.apache.lucene.index.DocValuesSkipIndexType;
1820
import org.apache.lucene.index.FieldInfo;
@@ -29,6 +31,7 @@
2931
import org.apache.lucene.store.ByteArrayDataOutput;
3032
import org.apache.lucene.store.ByteBuffersDataOutput;
3133
import org.apache.lucene.store.ByteBuffersIndexOutput;
34+
import org.apache.lucene.store.ChecksumIndexInput;
3235
import org.apache.lucene.store.Directory;
3336
import org.apache.lucene.store.IOContext;
3437
import org.apache.lucene.store.IndexOutput;
@@ -41,8 +44,10 @@
4144
import org.apache.lucene.util.packed.DirectMonotonicWriter;
4245
import org.apache.lucene.util.packed.PackedInts;
4346
import org.elasticsearch.core.IOUtils;
47+
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
4448
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
4549

50+
import java.io.Closeable;
4651
import java.io.IOException;
4752
import java.util.ArrayList;
4853
import java.util.Arrays;
@@ -65,9 +70,12 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
6570
private final int minDocsPerOrdinalForOrdinalRangeEncoding;
6671
final boolean enableOptimizedMerge;
6772
private final int primarySortFieldNumber;
73+
final SegmentWriteState state;
74+
final BinaryDVCompressionMode binaryDVCompressionMode;
6875

6976
ES819TSDBDocValuesConsumer(
7077
SegmentWriteState state,
78+
BinaryDVCompressionMode binaryDVCompressionMode,
7179
int skipIndexIntervalSize,
7280
int minDocsPerOrdinalForOrdinalRangeEncoding,
7381
boolean enableOptimizedMerge,
@@ -76,6 +84,8 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
7684
String metaCodec,
7785
String metaExtension
7886
) throws IOException {
87+
this.state = state;
88+
this.binaryDVCompressionMode = binaryDVCompressionMode;
7989
this.termsDictBuffer = new byte[1 << 14];
8090
this.dir = state.directory;
8191
this.minDocsPerOrdinalForOrdinalRangeEncoding = minDocsPerOrdinalForOrdinalRangeEncoding;
@@ -315,7 +325,143 @@ public void mergeBinaryField(FieldInfo mergeFieldInfo, MergeState mergeState) th
315325
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
316326
meta.writeInt(field.number);
317327
meta.writeByte(ES819TSDBDocValuesFormat.BINARY);
328+
meta.writeByte(binaryDVCompressionMode.code);
329+
switch (binaryDVCompressionMode) {
330+
case NO_COMPRESS -> doAddUncompressedBinary(field, valuesProducer);
331+
case COMPRESSED_WITH_LZ4 -> doAddCompressedBinaryLZ4(field, valuesProducer);
332+
}
333+
}
334+
335+
public void doAddUncompressedBinary(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
336+
if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer.mergeStats.supported()) {
337+
final int numDocsWithField = tsdbValuesProducer.mergeStats.sumNumDocsWithField();
338+
final int minLength = tsdbValuesProducer.mergeStats.minLength();
339+
final int maxLength = tsdbValuesProducer.mergeStats.maxLength();
340+
341+
assert numDocsWithField <= maxDoc;
342+
343+
BinaryDocValues values = valuesProducer.getBinary(field);
344+
long start = data.getFilePointer();
345+
meta.writeLong(start); // dataOffset
346+
347+
OffsetsAccumulator offsetsAccumulator = null;
348+
DISIAccumulator disiAccumulator = null;
349+
try {
350+
if (numDocsWithField > 0 && numDocsWithField < maxDoc) {
351+
disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
352+
}
353+
354+
assert maxLength >= minLength;
355+
if (maxLength > minLength) {
356+
offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField);
357+
}
358+
359+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
360+
BytesRef v = values.binaryValue();
361+
data.writeBytes(v.bytes, v.offset, v.length);
362+
if (disiAccumulator != null) {
363+
disiAccumulator.addDocId(doc);
364+
}
365+
if (offsetsAccumulator != null) {
366+
offsetsAccumulator.addDoc(v.length);
367+
}
368+
}
369+
meta.writeLong(data.getFilePointer() - start); // dataLength
370+
371+
if (numDocsWithField == 0) {
372+
meta.writeLong(-2); // docsWithFieldOffset
373+
meta.writeLong(0L); // docsWithFieldLength
374+
meta.writeShort((short) -1); // jumpTableEntryCount
375+
meta.writeByte((byte) -1); // denseRankPower
376+
} else if (numDocsWithField == maxDoc) {
377+
meta.writeLong(-1); // docsWithFieldOffset
378+
meta.writeLong(0L); // docsWithFieldLength
379+
meta.writeShort((short) -1); // jumpTableEntryCount
380+
meta.writeByte((byte) -1); // denseRankPower
381+
} else {
382+
long offset = data.getFilePointer();
383+
meta.writeLong(offset); // docsWithFieldOffset
384+
final short jumpTableEntryCount = disiAccumulator.build(data);
385+
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
386+
meta.writeShort(jumpTableEntryCount);
387+
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
388+
}
389+
390+
meta.writeInt(numDocsWithField);
391+
meta.writeInt(minLength);
392+
meta.writeInt(maxLength);
393+
if (offsetsAccumulator != null) {
394+
offsetsAccumulator.build(meta, data);
395+
}
396+
} finally {
397+
IOUtils.close(disiAccumulator, offsetsAccumulator);
398+
}
399+
} else {
400+
BinaryDocValues values = valuesProducer.getBinary(field);
401+
long start = data.getFilePointer();
402+
meta.writeLong(start); // dataOffset
403+
int numDocsWithField = 0;
404+
int minLength = Integer.MAX_VALUE;
405+
int maxLength = 0;
406+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
407+
numDocsWithField++;
408+
BytesRef v = values.binaryValue();
409+
int length = v.length;
410+
data.writeBytes(v.bytes, v.offset, v.length);
411+
minLength = Math.min(length, minLength);
412+
maxLength = Math.max(length, maxLength);
413+
}
414+
assert numDocsWithField <= maxDoc;
415+
meta.writeLong(data.getFilePointer() - start); // dataLength
416+
417+
if (numDocsWithField == 0) {
418+
meta.writeLong(-2); // docsWithFieldOffset
419+
meta.writeLong(0L); // docsWithFieldLength
420+
meta.writeShort((short) -1); // jumpTableEntryCount
421+
meta.writeByte((byte) -1); // denseRankPower
422+
} else if (numDocsWithField == maxDoc) {
423+
meta.writeLong(-1); // docsWithFieldOffset
424+
meta.writeLong(0L); // docsWithFieldLength
425+
meta.writeShort((short) -1); // jumpTableEntryCount
426+
meta.writeByte((byte) -1); // denseRankPower
427+
} else {
428+
long offset = data.getFilePointer();
429+
meta.writeLong(offset); // docsWithFieldOffset
430+
values = valuesProducer.getBinary(field);
431+
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
432+
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
433+
meta.writeShort(jumpTableEntryCount);
434+
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
435+
}
436+
437+
meta.writeInt(numDocsWithField);
438+
meta.writeInt(minLength);
439+
meta.writeInt(maxLength);
440+
if (maxLength > minLength) {
441+
start = data.getFilePointer();
442+
meta.writeLong(start);
443+
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
444+
445+
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
446+
meta,
447+
data,
448+
numDocsWithField + 1,
449+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
450+
);
451+
long addr = 0;
452+
writer.add(addr);
453+
values = valuesProducer.getBinary(field);
454+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
455+
addr += values.binaryValue().length;
456+
writer.add(addr);
457+
}
458+
writer.finish();
459+
meta.writeLong(data.getFilePointer() - start);
460+
}
461+
}
462+
}
318463

464+
public void doAddCompressedBinaryLZ4(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
319465
if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer.mergeStats.supported()) {
320466
final int numDocsWithField = tsdbValuesProducer.mergeStats.sumNumDocsWithField();
321467
final int minLength = tsdbValuesProducer.mergeStats.minLength();
@@ -444,6 +590,164 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
444590
}
445591
}
446592

593+
static final int BINARY_BLOCK_SHIFT = 5;
594+
static final int BINARY_DOCS_PER_COMPRESSED_BLOCK = 1 << BINARY_BLOCK_SHIFT;
595+
596+
private class CompressedBinaryBlockWriter implements Closeable {
597+
final LZ4.FastCompressionHashTable ht = new LZ4.FastCompressionHashTable();
598+
int uncompressedBlockLength = 0;
599+
int maxUncompressedBlockLength = 0;
600+
int numDocsInCurrentBlock = 0;
601+
final int[] docLengths = new int[BINARY_DOCS_PER_COMPRESSED_BLOCK];
602+
byte[] block = BytesRef.EMPTY_BYTES;
603+
int totalChunks = 0;
604+
long maxPointer = 0;
605+
final long blockAddressesStart;
606+
607+
final IndexOutput tempBinaryOffsets;
608+
609+
CompressedBinaryBlockWriter() throws IOException {
610+
tempBinaryOffsets = EndiannessReverserUtil.createTempOutput(
611+
state.directory,
612+
state.segmentInfo.name,
613+
"binary_pointers",
614+
state.context
615+
);
616+
boolean success = false;
617+
try {
618+
CodecUtil.writeHeader(
619+
tempBinaryOffsets,
620+
ES819TSDBDocValuesFormat.META_CODEC + "FilePointers",
621+
ES819TSDBDocValuesFormat.VERSION_CURRENT
622+
);
623+
blockAddressesStart = data.getFilePointer();
624+
success = true;
625+
} finally {
626+
if (success == false) {
627+
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
628+
}
629+
}
630+
}
631+
632+
void addDoc(int doc, BytesRef v) throws IOException {
633+
docLengths[numDocsInCurrentBlock] = v.length;
634+
block = ArrayUtil.grow(block, uncompressedBlockLength + v.length);
635+
System.arraycopy(v.bytes, v.offset, block, uncompressedBlockLength, v.length);
636+
uncompressedBlockLength += v.length;
637+
numDocsInCurrentBlock++;
638+
if (numDocsInCurrentBlock == BINARY_DOCS_PER_COMPRESSED_BLOCK) {
639+
flushData();
640+
}
641+
}
642+
643+
private void flushData() throws IOException {
644+
if (numDocsInCurrentBlock > 0) {
645+
// Write offset to this block to temporary offsets file
646+
totalChunks++;
647+
long thisBlockStartPointer = data.getFilePointer();
648+
649+
// Optimisation - check if all lengths are same
650+
boolean allLengthsSame = true;
651+
for (int i = 1; i < BINARY_DOCS_PER_COMPRESSED_BLOCK; i++) {
652+
if (docLengths[i] != docLengths[i - 1]) {
653+
allLengthsSame = false;
654+
break;
655+
}
656+
}
657+
if (allLengthsSame) {
658+
// Only write one value shifted. Steal a bit to indicate all other lengths are the same
659+
int onlyOneLength = (docLengths[0] << 1) | 1;
660+
data.writeVInt(onlyOneLength);
661+
} else {
662+
for (int i = 0; i < BINARY_DOCS_PER_COMPRESSED_BLOCK; i++) {
663+
if (i == 0) {
664+
// Write first value shifted and steal a bit to indicate other lengths are to follow
665+
int multipleLengths = (docLengths[0] << 1);
666+
data.writeVInt(multipleLengths);
667+
} else {
668+
data.writeVInt(docLengths[i]);
669+
}
670+
}
671+
}
672+
maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength);
673+
LZ4.compress(block, 0, uncompressedBlockLength, EndiannessReverserUtil.wrapDataOutput(data), ht);
674+
numDocsInCurrentBlock = 0;
675+
// Ensure initialized with zeroes because full array is always written
676+
Arrays.fill(docLengths, 0);
677+
uncompressedBlockLength = 0;
678+
maxPointer = data.getFilePointer();
679+
tempBinaryOffsets.writeVLong(maxPointer - thisBlockStartPointer);
680+
}
681+
}
682+
683+
void writeMetaData() throws IOException {
684+
if (totalChunks == 0) {
685+
return;
686+
}
687+
688+
long startDMW = data.getFilePointer();
689+
meta.writeLong(startDMW);
690+
691+
meta.writeVInt(totalChunks);
692+
meta.writeVInt(BINARY_BLOCK_SHIFT);
693+
meta.writeVInt(maxUncompressedBlockLength);
694+
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
695+
696+
CodecUtil.writeFooter(tempBinaryOffsets);
697+
IOUtils.close(tempBinaryOffsets);
698+
// write the compressed block offsets info to the meta file by reading from temp file
699+
try (
700+
ChecksumIndexInput filePointersIn = EndiannessReverserUtil.openChecksumInput(
701+
state.directory,
702+
tempBinaryOffsets.getName(),
703+
IOContext.READONCE
704+
)
705+
) {
706+
CodecUtil.checkHeader(
707+
filePointersIn,
708+
ES819TSDBDocValuesFormat.META_CODEC + "FilePointers",
709+
ES819TSDBDocValuesFormat.VERSION_CURRENT,
710+
ES819TSDBDocValuesFormat.VERSION_CURRENT
711+
);
712+
Throwable priorE = null;
713+
try {
714+
final DirectMonotonicWriter filePointers = DirectMonotonicWriter.getInstance(
715+
meta,
716+
data,
717+
totalChunks,
718+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
719+
);
720+
long fp = blockAddressesStart;
721+
for (int i = 0; i < totalChunks; ++i) {
722+
filePointers.add(fp);
723+
fp += filePointersIn.readVLong();
724+
}
725+
if (maxPointer < fp) {
726+
throw new CorruptIndexException(
727+
"File pointers don't add up (" + fp + " vs expected " + maxPointer + ")",
728+
filePointersIn
729+
);
730+
}
731+
filePointers.finish();
732+
} catch (Throwable e) {
733+
priorE = e;
734+
} finally {
735+
CodecUtil.checkFooter(filePointersIn, priorE);
736+
}
737+
}
738+
// Write the length of the DMW block in the data
739+
meta.writeLong(data.getFilePointer() - startDMW);
740+
}
741+
742+
@Override
743+
public void close() throws IOException {
744+
if (tempBinaryOffsets != null) {
745+
IOUtils.close(tempBinaryOffsets, () -> state.directory.deleteFile(tempBinaryOffsets.getName()));
746+
}
747+
}
748+
}
749+
// END: Copied fom LUCENE-9211
750+
447751
@Override
448752
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
449753
meta.writeInt(field.number);

0 commit comments

Comments
 (0)