2929import org .apache .lucene .store .ByteArrayDataOutput ;
3030import org .apache .lucene .store .ByteBuffersDataOutput ;
3131import org .apache .lucene .store .ByteBuffersIndexOutput ;
32- import org .apache .lucene .store .ChecksumIndexInput ;
3332import org .apache .lucene .store .Directory ;
3433import org .apache .lucene .store .IOContext ;
3534import org .apache .lucene .store .IndexOutput ;
4544import org .elasticsearch .common .compress .fsst .FSST ;
4645import org .elasticsearch .common .compress .fsst .ReservoirSampler ;
4746import org .elasticsearch .core .IOUtils ;
47+ import org .elasticsearch .index .codec .tsdb .BinaryDVCompressionMode ;
4848import org .elasticsearch .index .codec .tsdb .TSDBDocValuesEncoder ;
4949
5050import java .io .Closeable ;
@@ -68,6 +68,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
6868 private byte [] termsDictBuffer ;
6969 private final int skipIndexIntervalSize ;
7070 final boolean enableOptimizedMerge ;
71+ private final BinaryDVCompressionMode binaryDVCompressionMode ;
7172 private final SegmentWriteState state ;
7273
7374 ES819TSDBDocValuesConsumer (
@@ -77,9 +78,11 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
7778 String dataCodec ,
7879 String dataExtension ,
7980 String metaCodec ,
80- String metaExtension
81+ String metaExtension ,
82+ BinaryDVCompressionMode binaryDVCompressionMode
8183 ) throws IOException {
8284 this .termsDictBuffer = new byte [1 << 14 ];
85+ this .binaryDVCompressionMode = binaryDVCompressionMode ;
8386 this .state = state ;
8487 this .dir = state .directory ;
8588 this .context = state .context ;
@@ -279,7 +282,146 @@ public void mergeBinaryField(FieldInfo mergeFieldInfo, MergeState mergeState) th
279282 public void addBinaryField (FieldInfo field , DocValuesProducer valuesProducer ) throws IOException {
280283 meta .writeInt (field .number );
281284 meta .writeByte (ES819TSDBDocValuesFormat .BINARY );
285+ meta .writeByte (binaryDVCompressionMode .code );
286+ switch (binaryDVCompressionMode ) {
287+ case NO_COMPRESS -> doAddUncompressedBinary (field , valuesProducer );
288+ case COMPRESSED_WITH_FSST -> doAddCompressedBinaryFSST (field , valuesProducer );
289+ }
290+ }
291+
292+ public void doAddUncompressedBinary (FieldInfo field , DocValuesProducer valuesProducer ) throws IOException {
293+ meta .writeInt (field .number );
294+ meta .writeByte (ES819TSDBDocValuesFormat .BINARY );
295+
296+ if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer .mergeStats .supported ()) {
297+ final int numDocsWithField = tsdbValuesProducer .mergeStats .sumNumDocsWithField ();
298+ final int minLength = tsdbValuesProducer .mergeStats .minLength ();
299+ final int maxLength = tsdbValuesProducer .mergeStats .maxLength ();
300+
301+ assert numDocsWithField <= maxDoc ;
302+
303+ BinaryDocValues values = valuesProducer .getBinary (field );
304+ long start = data .getFilePointer ();
305+ meta .writeLong (start ); // dataOffset
306+
307+ OffsetsAccumulator offsetsAccumulator = null ;
308+ DISIAccumulator disiAccumulator = null ;
309+ try {
310+ if (numDocsWithField > 0 && numDocsWithField < maxDoc ) {
311+ disiAccumulator = new DISIAccumulator (dir , context , data , IndexedDISI .DEFAULT_DENSE_RANK_POWER );
312+ }
313+
314+ assert maxLength >= minLength ;
315+ if (maxLength > minLength ) {
316+ offsetsAccumulator = new OffsetsAccumulator (dir , context , data , numDocsWithField );
317+ }
318+
319+ for (int doc = values .nextDoc (); doc != DocIdSetIterator .NO_MORE_DOCS ; doc = values .nextDoc ()) {
320+ BytesRef v = values .binaryValue ();
321+ data .writeBytes (v .bytes , v .offset , v .length );
322+ if (disiAccumulator != null ) {
323+ disiAccumulator .addDocId (doc );
324+ }
325+ if (offsetsAccumulator != null ) {
326+ offsetsAccumulator .addDoc (v .length );
327+ }
328+ }
329+ meta .writeLong (data .getFilePointer () - start ); // dataLength
330+
331+ if (numDocsWithField == 0 ) {
332+ meta .writeLong (-2 ); // docsWithFieldOffset
333+ meta .writeLong (0L ); // docsWithFieldLength
334+ meta .writeShort ((short ) -1 ); // jumpTableEntryCount
335+ meta .writeByte ((byte ) -1 ); // denseRankPower
336+ } else if (numDocsWithField == maxDoc ) {
337+ meta .writeLong (-1 ); // docsWithFieldOffset
338+ meta .writeLong (0L ); // docsWithFieldLength
339+ meta .writeShort ((short ) -1 ); // jumpTableEntryCount
340+ meta .writeByte ((byte ) -1 ); // denseRankPower
341+ } else {
342+ long offset = data .getFilePointer ();
343+ meta .writeLong (offset ); // docsWithFieldOffset
344+ final short jumpTableEntryCount = disiAccumulator .build (data );
345+ meta .writeLong (data .getFilePointer () - offset ); // docsWithFieldLength
346+ meta .writeShort (jumpTableEntryCount );
347+ meta .writeByte (IndexedDISI .DEFAULT_DENSE_RANK_POWER );
348+ }
349+
350+ meta .writeInt (numDocsWithField );
351+ meta .writeInt (minLength );
352+ meta .writeInt (maxLength );
353+ if (offsetsAccumulator != null ) {
354+ offsetsAccumulator .build (meta , data );
355+ }
356+ } finally {
357+ IOUtils .close (disiAccumulator , offsetsAccumulator );
358+ }
359+ } else {
360+ BinaryDocValues values = valuesProducer .getBinary (field );
361+ long start = data .getFilePointer ();
362+ meta .writeLong (start ); // dataOffset
363+ int numDocsWithField = 0 ;
364+ int minLength = Integer .MAX_VALUE ;
365+ int maxLength = 0 ;
366+ for (int doc = values .nextDoc (); doc != DocIdSetIterator .NO_MORE_DOCS ; doc = values .nextDoc ()) {
367+ numDocsWithField ++;
368+ BytesRef v = values .binaryValue ();
369+ int length = v .length ;
370+ data .writeBytes (v .bytes , v .offset , v .length );
371+ minLength = Math .min (length , minLength );
372+ maxLength = Math .max (length , maxLength );
373+ }
374+ assert numDocsWithField <= maxDoc ;
375+ meta .writeLong (data .getFilePointer () - start ); // dataLength
376+
377+ if (numDocsWithField == 0 ) {
378+ meta .writeLong (-2 ); // docsWithFieldOffset
379+ meta .writeLong (0L ); // docsWithFieldLength
380+ meta .writeShort ((short ) -1 ); // jumpTableEntryCount
381+ meta .writeByte ((byte ) -1 ); // denseRankPower
382+ } else if (numDocsWithField == maxDoc ) {
383+ meta .writeLong (-1 ); // docsWithFieldOffset
384+ meta .writeLong (0L ); // docsWithFieldLength
385+ meta .writeShort ((short ) -1 ); // jumpTableEntryCount
386+ meta .writeByte ((byte ) -1 ); // denseRankPower
387+ } else {
388+ long offset = data .getFilePointer ();
389+ meta .writeLong (offset ); // docsWithFieldOffset
390+ values = valuesProducer .getBinary (field );
391+ final short jumpTableEntryCount = IndexedDISI .writeBitSet (values , data , IndexedDISI .DEFAULT_DENSE_RANK_POWER );
392+ meta .writeLong (data .getFilePointer () - offset ); // docsWithFieldLength
393+ meta .writeShort (jumpTableEntryCount );
394+ meta .writeByte (IndexedDISI .DEFAULT_DENSE_RANK_POWER );
395+ }
396+
397+ meta .writeInt (numDocsWithField );
398+ meta .writeInt (minLength );
399+ meta .writeInt (maxLength );
400+ if (maxLength > minLength ) {
401+ start = data .getFilePointer ();
402+ meta .writeLong (start );
403+ meta .writeVInt (ES819TSDBDocValuesFormat .DIRECT_MONOTONIC_BLOCK_SHIFT );
404+
405+ final DirectMonotonicWriter writer = DirectMonotonicWriter .getInstance (
406+ meta ,
407+ data ,
408+ numDocsWithField + 1 ,
409+ ES819TSDBDocValuesFormat .DIRECT_MONOTONIC_BLOCK_SHIFT
410+ );
411+ long addr = 0 ;
412+ writer .add (addr );
413+ values = valuesProducer .getBinary (field );
414+ for (int doc = values .nextDoc (); doc != DocIdSetIterator .NO_MORE_DOCS ; doc = values .nextDoc ()) {
415+ addr += values .binaryValue ().length ;
416+ writer .add (addr );
417+ }
418+ writer .finish ();
419+ meta .writeLong (data .getFilePointer () - start );
420+ }
421+ }
422+ }
282423
424+ public void doAddCompressedBinaryFSST (FieldInfo field , DocValuesProducer valuesProducer ) throws IOException {
283425 if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer .mergeStats .supported ()) {
284426 final int numDocsWithField = tsdbValuesProducer .mergeStats .sumNumDocsWithField ();
285427 final int minLength = tsdbValuesProducer .mergeStats .minLength ();
0 commit comments