Skip to content

Commit 65426e8

Browse files
Improved bulk loading for binary doc values (#138631)
By decompressing all matching blocks into one byte array and appending that directly into the block builder.
1 parent f2d1c71 commit 65426e8

File tree

2 files changed

+129
-26
lines changed

2 files changed

+129
-26
lines changed

docs/changelog/138631.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138631
2+
summary: Improved bulk loading for binary doc values
3+
area: Codec
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 124 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -364,13 +364,13 @@ private BinaryDocValues getCompressedBinary(BinaryEntry entry) throws IOExceptio
364364
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
365365
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);
366366

367-
final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength);
368-
final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData);
367+
final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength);
368+
final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData);
369369
return new DenseBinaryDocValues(maxDoc) {
370370
final BinaryDecoder decoder = new BinaryDecoder(
371371
entry.compression.compressionMode().newDecompressor(),
372372
addresses,
373-
docRanges,
373+
docOffsets,
374374
data.clone(),
375375
entry.maxUncompressedChunkSize,
376376
entry.maxNumDocsInAnyBlock
@@ -391,13 +391,22 @@ public BlockLoader.Block tryRead(
391391
boolean toInt
392392
) throws IOException {
393393
int count = docs.count() - offset;
394-
try (var builder = factory.bytesRefs(count)) {
395-
for (int i = offset; i < docs.count(); i++) {
396-
doc = docs.get(i);
397-
var bytes = decoder.decode(doc, entry.numCompressedBlocks);
398-
builder.appendBytesRef(bytes);
394+
int firstDocId = docs.get(offset);
395+
int lastDocId = docs.get(count - 1);
396+
doc = lastDocId;
397+
398+
if (isDense(firstDocId, lastDocId, count)) {
399+
try (var builder = factory.singletonBytesRefs(count)) {
400+
decoder.decodeBulk(entry.numCompressedBlocks, firstDocId, count, builder);
401+
return builder.build();
402+
}
403+
} else {
404+
try (var builder = factory.bytesRefs(count)) {
405+
for (int i = offset; i < docs.count(); i++) {
406+
builder.appendBytesRef(decoder.decode(docs.get(i), entry.numCompressedBlocks));
407+
}
408+
return builder.build();
399409
}
400-
return builder.build();
401410
}
402411
}
403412
};
@@ -414,13 +423,13 @@ public BlockLoader.Block tryRead(
414423
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
415424
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);
416425

417-
final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength);
418-
final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData);
426+
final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength);
427+
final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData);
419428
return new SparseBinaryDocValues(disi) {
420429
final BinaryDecoder decoder = new BinaryDecoder(
421430
entry.compression.compressionMode().newDecompressor(),
422431
addresses,
423-
docRanges,
432+
docOffsets,
424433
data.clone(),
425434
entry.maxUncompressedChunkSize,
426435
entry.maxNumDocsInAnyBlock
@@ -439,7 +448,7 @@ public BytesRef binaryValue() throws IOException {
439448
static final class BinaryDecoder {
440449

441450
private final LongValues addresses;
442-
private final DirectMonotonicReader docRanges;
451+
private final DirectMonotonicReader docOffsets;
443452
private final IndexInput compressedData;
444453
// Cache of last uncompressed block
445454
private long lastBlockId = -1;
@@ -453,14 +462,14 @@ static final class BinaryDecoder {
453462
BinaryDecoder(
454463
Decompressor decompressor,
455464
LongValues addresses,
456-
DirectMonotonicReader docRanges,
465+
DirectMonotonicReader docOffsets,
457466
IndexInput compressedData,
458467
int biggestUncompressedBlockSize,
459468
int maxNumDocsInAnyBlock
460469
) {
461470
this.decompressor = decompressor;
462471
this.addresses = addresses;
463-
this.docRanges = docRanges;
472+
this.docOffsets = docOffsets;
464473
this.compressedData = compressedData;
465474
// pre-allocate a byte array large enough for the biggest uncompressed block needed.
466475
this.uncompressedBlock = new byte[biggestUncompressedBlockSize];
@@ -509,8 +518,12 @@ void deltaDecode(int[] arr, int length) {
509518
}
510519
}
511520

512-
long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int docNumber, int numBlocks) {
513-
long index = docRanges.binarySearch(lastBlockId + 1, numBlocks, docNumber);
521+
long findAndUpdateBlock(int docNumber, int numBlocks) {
522+
if (docNumber < limitDocNumForBlock && lastBlockId >= 0) {
523+
return lastBlockId;
524+
}
525+
526+
long index = docOffsets.binarySearch(lastBlockId + 1, numBlocks, docNumber);
514527
// If index is found, index is inclusive lower bound of docNum range, so docNum is in blockId == index
515528
if (index < 0) {
516529
// If index was not found, insertion point (-index - 1) will be upper bound of docNum range.
@@ -519,14 +532,25 @@ long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int d
519532
}
520533
assert index < numBlocks : "invalid range " + index + " for doc " + docNumber + " in numBlocks " + numBlocks;
521534

522-
startDocNumForBlock = docRanges.get(index);
523-
limitDocNumForBlock = docRanges.get(index + 1);
535+
startDocNumForBlock = docOffsets.get(index);
536+
limitDocNumForBlock = docOffsets.get(index + 1);
524537
return index;
525538
}
526539

540+
// If query is over adjacent values we can scan forward through blocks, rather than binary searching for the next block.
541+
long findAndUpdateBlockByScanning(int docNumber) {
542+
if (docNumber < limitDocNumForBlock && lastBlockId >= 0) {
543+
return lastBlockId;
544+
}
545+
long blockId = lastBlockId + 1;
546+
startDocNumForBlock = docOffsets.get(blockId);
547+
limitDocNumForBlock = docOffsets.get(blockId + 1);
548+
return blockId;
549+
}
550+
527551
BytesRef decode(int docNumber, int numBlocks) throws IOException {
528552
// docNumber, rather than docId, because these are dense and could be indices from a DISI
529-
long blockId = docNumber < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(docRanges, lastBlockId, docNumber, numBlocks);
553+
long blockId = findAndUpdateBlock(docNumber, numBlocks);
530554

531555
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
532556
int idxInBlock = (int) (docNumber - startDocNumForBlock);
@@ -544,6 +568,80 @@ BytesRef decode(int docNumber, int numBlocks) throws IOException {
544568
uncompressedBytesRef.length = end - start;
545569
return uncompressedBytesRef;
546570
}
571+
572+
int computeMultipleBlockBufferSize(int count, int firstDoc, long firstBlockId, long numBlocks) throws IOException {
573+
IndexInput readAhead = compressedData.clone();
574+
int lastDoc = firstDoc + count - 1;
575+
int requiredBufferSize = 0;
576+
577+
for (long blockId = firstBlockId; blockId < numBlocks; blockId++) {
578+
long blockStartOffset = addresses.get(blockId);
579+
readAhead.seek(blockStartOffset);
580+
readAhead.readByte(); // skip BlockHeader
581+
int uncompressedBlockLength = readAhead.readVInt();
582+
requiredBufferSize += uncompressedBlockLength;
583+
584+
long blockLimit = docOffsets.get(blockId + 1);
585+
if (lastDoc < blockLimit) {
586+
break;
587+
}
588+
}
589+
return requiredBufferSize;
590+
}
591+
592+
void decodeBulk(int numBlocks, int firstDoc, int count, BlockLoader.SingletonBytesRefBuilder builder) throws IOException {
593+
int remainingCount = count;
594+
int nextDoc = firstDoc;
595+
int blockDocOffset = 0;
596+
int blockByteOffset = 0;
597+
598+
// Need to binary search forward for first blockId, but since query is dense, can scan from then on.
599+
// This block contains at least one value for range.
600+
long firstBlockId = findAndUpdateBlock(nextDoc, numBlocks);
601+
long[] offsets = new long[count + 1];
602+
int bufferSize = computeMultipleBlockBufferSize(count, firstDoc, firstBlockId, numBlocks);
603+
byte[] bytes = new byte[bufferSize];
604+
605+
while (remainingCount > 0) {
606+
long blockId = findAndUpdateBlockByScanning(nextDoc);
607+
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
608+
int idxFirstDocInBlock = (int) (nextDoc - startDocNumForBlock);
609+
int countInBlock = Math.min(numDocsInBlock - idxFirstDocInBlock, remainingCount);
610+
611+
assert idxFirstDocInBlock < numDocsInBlock;
612+
assert countInBlock <= numDocsInBlock;
613+
614+
if (blockId != lastBlockId) {
615+
decompressBlock((int) blockId, numDocsInBlock);
616+
// uncompressedBytesRef and uncompressedDocStarts now populated
617+
lastBlockId = blockId;
618+
}
619+
620+
// Copy offsets for block into combined offset array
621+
int startOffset = uncompressedDocStarts[idxFirstDocInBlock];
622+
int endOffset = uncompressedDocStarts[idxFirstDocInBlock + countInBlock];
623+
int lenValuesInBlock = endOffset - startOffset;
624+
for (int i = 0; i < countInBlock; i++) {
625+
int byteOffsetInBlock = uncompressedDocStarts[idxFirstDocInBlock + i + 1] - startOffset;
626+
offsets[blockDocOffset + i + 1] = byteOffsetInBlock + blockByteOffset;
627+
}
628+
629+
// Copy uncompressedBlock bytes into buffer for multiple blocks
630+
System.arraycopy(uncompressedBlock, startOffset, bytes, blockByteOffset, lenValuesInBlock);
631+
632+
nextDoc += countInBlock;
633+
remainingCount -= countInBlock;
634+
blockDocOffset += countInBlock;
635+
blockByteOffset += lenValuesInBlock;
636+
}
637+
638+
int totalLen = Math.toIntExact(offsets[count]);
639+
if (totalLen == 0) {
640+
builder.appendBytesRefs(new byte[0], 0);
641+
} else {
642+
builder.appendBytesRefs(bytes, offsets);
643+
}
644+
}
547645
}
548646

549647
abstract static class DenseBinaryDocValues extends BinaryDocValues implements BlockLoader.OptionalColumnAtATimeReader {
@@ -1533,9 +1631,9 @@ private BinaryEntry readBinary(IndexInput meta, int version) throws IOException
15331631
entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
15341632
entry.addressesLength = meta.readLong();
15351633

1536-
entry.docRangeOffset = meta.readLong();
1537-
entry.docRangeMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
1538-
entry.docRangeLength = meta.readLong();
1634+
entry.docOffsetsOffset = meta.readLong();
1635+
entry.docOffsetMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
1636+
entry.docOffsetLength = meta.readLong();
15391637

15401638
entry.numCompressedBlocks = numCompressedChunks;
15411639
}
@@ -2230,14 +2328,14 @@ static class BinaryEntry {
22302328
int maxLength;
22312329
long addressesOffset;
22322330
long addressesLength;
2233-
long docRangeOffset;
2234-
long docRangeLength;
2331+
long docOffsetsOffset;
2332+
long docOffsetLength;
22352333
// compression mode
22362334
int maxUncompressedChunkSize;
22372335
int maxNumDocsInAnyBlock;
22382336
int numCompressedBlocks;
22392337
DirectMonotonicReader.Meta addressesMeta;
2240-
DirectMonotonicReader.Meta docRangeMeta;
2338+
DirectMonotonicReader.Meta docOffsetMeta;
22412339

22422340
BinaryEntry(BinaryDVCompressionMode compression) {
22432341
this.compression = compression;

0 commit comments

Comments
 (0)