Skip to content

Commit b4710ab

Browse files
committed
Optimize number of doc_value iterations in non-merging case
1 parent eb0c342 commit b4710ab

File tree

1 file changed

+36
-80
lines changed

1 file changed

+36
-80
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 36 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,13 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
157157
if (numValues > 0) {
158158
assert numDocsWithValue > 0;
159159
// Special case for maxOrd of 1, signal -1 that no blocks will be written
160-
meta.writeInt(maxOrd != 1 ? ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
160+
meta.writeInt(maxOrd != 1 ? DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
161161
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput();
162162
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance(
163163
meta,
164164
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"),
165165
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT),
166-
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
166+
DIRECT_MONOTONIC_BLOCK_SHIFT
167167
);
168168

169169
final long valuesDataOffset = data.getFilePointer();
@@ -348,8 +348,9 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
348348
int numDocsWithField = 0;
349349
int minLength = Integer.MAX_VALUE;
350350
int maxLength = 0;
351-
352351
var sampler = new ReservoirSampler();
352+
353+
// Iteration 1: minLength, maxLength, numDocs, sample
353354
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
354355
numDocsWithField++;
355356
BytesRef v = values.binaryValue();
@@ -361,18 +362,33 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
361362

362363
// Build encoder from sample
363364
FSST.SymbolTable symbolTable = FSST.SymbolTable.buildSymbolTable(sampler.getSample());
364-
try (CompressedOffsetWriter offsetWriter = new CompressedOffsetWriter(numDocsWithField)) {
365+
366+
DISIAccumulator disiAccumulator = null;
367+
OffsetsAccumulator offsetsAccumulator = null;
368+
try {
369+
if (numDocsWithField > 0 && numDocsWithField < maxDoc) {
370+
disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
371+
}
372+
assert maxLength >= minLength;
373+
offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField);
374+
CompressedOffsetWriter offsetWriter = new CompressedOffsetWriter(offsetsAccumulator);
375+
376+
// Compress Lines
365377
values = valuesProducer.getBinary(field);
366378
long start = data.getFilePointer();
367379
meta.writeLong(start); // dataOffset
368-
369-
try (var bulkCompresser = new BulkCompressBufferer(data, symbolTable, offsetWriter)) {
380+
try (var bulkCompressor = new BulkCompressBufferer(data, symbolTable, offsetWriter)) {
381+
// Iteration 2: compress lines
370382
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
371383
BytesRef v = values.binaryValue();
372-
bulkCompresser.addLine(v.bytes, v.offset, v.length);
384+
bulkCompressor.addLine(v.bytes, v.offset, v.length);
385+
if (disiAccumulator != null) {
386+
disiAccumulator.addDocId(doc);
387+
}
373388
}
374389
}
375390

391+
// Write metadata
376392
assert numDocsWithField <= maxDoc;
377393
meta.writeLong(data.getFilePointer() - start); // dataLength
378394

@@ -389,8 +405,7 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
389405
} else {
390406
long offset = data.getFilePointer();
391407
meta.writeLong(offset); // docsWithFieldOffset
392-
values = valuesProducer.getBinary(field);
393-
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
408+
final short jumpTableEntryCount = disiAccumulator.build(data);
394409
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
395410
meta.writeShort(jumpTableEntryCount);
396411
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
@@ -412,91 +427,32 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
412427
if (maxCompressedLength > minCompressedLength) {
413428
start = data.getFilePointer();
414429
meta.writeLong(start);
415-
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
416-
offsetWriter.writeOffsetsToMetadata();
430+
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
431+
// copy
432+
offsetsAccumulator.build(meta, data);
417433
meta.writeLong(data.getFilePointer() - start);
418434
}
435+
} finally {
436+
IOUtils.close(disiAccumulator, offsetsAccumulator);
419437
}
420438
}
421439
}
422440

423-
private class CompressedOffsetWriter implements Closeable, FSST.OffsetWriter {
424-
final IndexOutput tempBinaryOffsets;
425-
final int numDocsWithFields;
441+
private static class CompressedOffsetWriter implements FSST.OffsetWriter {
426442
int maxCompressedLength = 0;
427443
int minCompressedLength = Integer.MAX_VALUE;
444+
private final OffsetsAccumulator delegate;
428445

429-
CompressedOffsetWriter(int numDocsWithFields) throws IOException {
430-
this.numDocsWithFields = numDocsWithFields;
431-
tempBinaryOffsets = state.directory.createTempOutput(
432-
state.segmentInfo.name,
433-
"binary_pointers",
434-
state.context
435-
);
436-
boolean success = false;
437-
try {
438-
CodecUtil.writeHeader(
439-
tempBinaryOffsets,
440-
ES819TSDBDocValuesFormat.META_CODEC + "FilePointers",
441-
ES819TSDBDocValuesFormat.VERSION_CURRENT
442-
);
443-
success = true;
444-
} finally {
445-
if (success == false) {
446-
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
447-
}
448-
}
446+
private CompressedOffsetWriter(OffsetsAccumulator delegate) {
447+
this.delegate = delegate;
449448
}
450449

451450
public void addLen(int compressedLen) throws IOException {
452451
assert compressedLen >= 0;
453-
tempBinaryOffsets.writeVInt(compressedLen);
452+
delegate.addDoc(compressedLen);
454453
minCompressedLength = Math.min(compressedLen, minCompressedLength);
455454
maxCompressedLength = Math.max(compressedLen, maxCompressedLength);
456455
}
457-
458-
void writeOffsetsToMetadata() throws IOException {
459-
CodecUtil.writeFooter(tempBinaryOffsets);
460-
IOUtils.close(tempBinaryOffsets);
461-
462-
try (
463-
ChecksumIndexInput lengthFileInput = state.directory.openChecksumInput(tempBinaryOffsets.getName());
464-
) {
465-
CodecUtil.checkHeader(
466-
lengthFileInput,
467-
ES819TSDBDocValuesFormat.META_CODEC + "FilePointers",
468-
ES819TSDBDocValuesFormat.VERSION_CURRENT,
469-
ES819TSDBDocValuesFormat.VERSION_CURRENT
470-
);
471-
Throwable priorE = null;
472-
try {
473-
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
474-
meta,
475-
data,
476-
numDocsWithFields + 1,
477-
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
478-
);
479-
long addr = 0;
480-
writer.add(addr);
481-
for (int i = 0; i < numDocsWithFields ; ++i) {
482-
addr += lengthFileInput.readVInt();
483-
writer.add(addr);
484-
}
485-
writer.finish();
486-
} catch (Throwable e) {
487-
priorE = e;
488-
} finally {
489-
CodecUtil.checkFooter(lengthFileInput, priorE);
490-
}
491-
}
492-
}
493-
494-
@Override
495-
public void close() throws IOException {
496-
if (tempBinaryOffsets != null) {
497-
IOUtils.close(tempBinaryOffsets, () -> state.directory.deleteFile(tempBinaryOffsets.getName()));
498-
}
499-
}
500456
}
501457

502458
@Override
@@ -744,13 +700,13 @@ private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valu
744700
if (numValues > numDocsWithField) {
745701
long start = data.getFilePointer();
746702
meta.writeLong(start);
747-
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
703+
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
748704

749705
final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance(
750706
meta,
751707
data,
752708
numDocsWithField + 1L,
753-
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
709+
DIRECT_MONOTONIC_BLOCK_SHIFT
754710
);
755711
long addr = 0;
756712
addressesWriter.add(addr);

0 commit comments

Comments
 (0)