Skip to content

Commit 331a654

Browse files
committed
Working bulk loading
1 parent 6783135 commit 331a654

File tree

1 file changed

+107
-22
lines changed

1 file changed

+107
-22
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 107 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@
5454
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
5555

5656
import java.io.IOException;
57+
import java.util.ArrayList;
58+
import java.util.Arrays;
59+
import java.util.List;
5760

5861
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_JUMP_LENGTH_PER_LEVEL;
5962
import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_MAX_LEVEL;
@@ -364,13 +367,13 @@ private BinaryDocValues getCompressedBinary(BinaryEntry entry) throws IOExceptio
364367
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
365368
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);
366369

367-
final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength);
368-
final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData);
370+
final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength);
371+
final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData);
369372
return new DenseBinaryDocValues(maxDoc) {
370373
final BinaryDecoder decoder = new BinaryDecoder(
371374
entry.compression.compressionMode().newDecompressor(),
372375
addresses,
373-
docRanges,
376+
docOffsets,
374377
data.clone(),
375378
entry.maxUncompressedChunkSize,
376379
entry.maxNumDocsInAnyBlock
@@ -391,13 +394,22 @@ public BlockLoader.Block tryRead(
391394
boolean toInt
392395
) throws IOException {
393396
int count = docs.count() - offset;
394-
try (var builder = factory.bytesRefs(count)) {
395-
for (int i = offset; i < docs.count(); i++) {
396-
doc = docs.get(i);
397-
var bytes = decoder.decode(doc, entry.numCompressedBlocks);
398-
builder.appendBytesRef(bytes);
397+
int firstDocId = docs.get(offset);
398+
int lastDocId = docs.get(count - 1);
399+
doc = lastDocId;
400+
401+
if (isDense(firstDocId, lastDocId, count)) {
402+
try (var builder = factory.singletonBytesRefs(count)) {
403+
decoder.decodeBulk(entry.numCompressedBlocks, firstDocId, count, builder);
404+
return builder.build();
405+
}
406+
} else {
407+
try (var builder = factory.bytesRefs(count)) {
408+
for (int i = offset; i < docs.count(); i++) {
409+
builder.appendBytesRef(decoder.decode(docs.get(i), entry.numCompressedBlocks));
410+
}
411+
return builder.build();
399412
}
400-
return builder.build();
401413
}
402414
}
403415
};
@@ -414,13 +426,13 @@ public BlockLoader.Block tryRead(
414426
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
415427
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);
416428

417-
final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength);
418-
final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData);
429+
final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength);
430+
final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData);
419431
return new SparseBinaryDocValues(disi) {
420432
final BinaryDecoder decoder = new BinaryDecoder(
421433
entry.compression.compressionMode().newDecompressor(),
422434
addresses,
423-
docRanges,
435+
docOffsets,
424436
data.clone(),
425437
entry.maxUncompressedChunkSize,
426438
entry.maxNumDocsInAnyBlock
@@ -439,7 +451,7 @@ public BytesRef binaryValue() throws IOException {
439451
static final class BinaryDecoder {
440452

441453
private final LongValues addresses;
442-
private final DirectMonotonicReader docRanges;
454+
private final DirectMonotonicReader docOffsets;
443455
private final IndexInput compressedData;
444456
// Cache of last uncompressed block
445457
private long lastBlockId = -1;
@@ -453,14 +465,14 @@ static final class BinaryDecoder {
453465
BinaryDecoder(
454466
Decompressor decompressor,
455467
LongValues addresses,
456-
DirectMonotonicReader docRanges,
468+
DirectMonotonicReader docOffsets,
457469
IndexInput compressedData,
458470
int biggestUncompressedBlockSize,
459471
int maxNumDocsInAnyBlock
460472
) {
461473
this.decompressor = decompressor;
462474
this.addresses = addresses;
463-
this.docRanges = docRanges;
475+
this.docOffsets = docOffsets;
464476
this.compressedData = compressedData;
465477
// pre-allocate a byte array large enough for the biggest uncompressed block needed.
466478
this.uncompressedBlock = new byte[biggestUncompressedBlockSize];
@@ -526,7 +538,7 @@ long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int d
526538

527539
BytesRef decode(int docNumber, int numBlocks) throws IOException {
528540
// docNumber, rather than docId, because these are dense and could be indices from a DISI
529-
long blockId = docNumber < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(docRanges, lastBlockId, docNumber, numBlocks);
541+
long blockId = docNumber < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(docOffsets, lastBlockId, docNumber, numBlocks);
530542

531543
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
532544
int idxInBlock = (int) (docNumber - startDocNumForBlock);
@@ -544,6 +556,79 @@ BytesRef decode(int docNumber, int numBlocks) throws IOException {
544556
uncompressedBytesRef.length = end - start;
545557
return uncompressedBytesRef;
546558
}
559+
560+
void decodeBulk(
561+
int numBlocks,
562+
int firstDoc,
563+
int count,
564+
BlockLoader.SingletonBytesRefBuilder builder
565+
) throws IOException {
566+
int remainingCount = count;
567+
int nextDoc = firstDoc;
568+
long[] offsets = new long[count + 1];
569+
int docsAdded = 0;
570+
int currBlockByteOffset = 0;
571+
List<BytesRef> decompressedBlocks = new ArrayList<>();
572+
573+
while (remainingCount > 0) {
574+
575+
long blockId = nextDoc < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(this.docOffsets, lastBlockId, nextDoc, numBlocks);
576+
assert blockId >= 0;
577+
578+
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
579+
int idxFirstDocInBlock = (int) (nextDoc - startDocNumForBlock);
580+
int countInBlock = Math.min(numDocsInBlock - idxFirstDocInBlock, remainingCount);
581+
582+
assert idxFirstDocInBlock < numDocsInBlock;
583+
assert countInBlock <= numDocsInBlock;
584+
585+
if (blockId != lastBlockId) {
586+
decompressBlock((int) blockId, numDocsInBlock);
587+
// uncompressedBytesRef and uncompressedDocStarts now populated
588+
lastBlockId = blockId;
589+
}
590+
591+
// Copy offsets for block into combined offset array
592+
int startOffset = uncompressedDocStarts[idxFirstDocInBlock];
593+
int endOffset = uncompressedDocStarts[idxFirstDocInBlock + countInBlock];
594+
int lenValuesInBlock = endOffset - startOffset;
595+
int offsetIdx = 0;
596+
for (int i = idxFirstDocInBlock; i < idxFirstDocInBlock + countInBlock; i++) {
597+
offsets[docsAdded + offsetIdx+1] = uncompressedDocStarts[i+1] - startOffset + currBlockByteOffset;
598+
offsetIdx++;
599+
}
600+
601+
nextDoc += countInBlock;
602+
remainingCount -= countInBlock;
603+
docsAdded += countInBlock;
604+
currBlockByteOffset += lenValuesInBlock;
605+
606+
if (remainingCount == 0) {
607+
// avoid making a copy if this was the last block to be decompressed
608+
decompressedBlocks.add(new BytesRef(uncompressedBlock, startOffset, lenValuesInBlock));
609+
} else {
610+
decompressedBlocks.add(new BytesRef(Arrays.copyOfRange(uncompressedBlock, startOffset, startOffset + lenValuesInBlock)));
611+
}
612+
}
613+
614+
int totalLen = Math.toIntExact(offsets[count]);
615+
if (totalLen == 0) {
616+
builder.appendBytesRefs(new byte[0], 0);
617+
} else {
618+
var allBytes = combinedBytes(totalLen, decompressedBlocks);
619+
builder.appendBytesRefs(allBytes, offsets);
620+
}
621+
}
622+
623+
static byte[] combinedBytes(int totalLen, List<BytesRef> allBytes) {
624+
byte[] all = new byte[totalLen];
625+
int byteOffset = 0;
626+
for (var bytes : allBytes) {
627+
System.arraycopy(bytes.bytes, bytes.offset, all, byteOffset, bytes.length);
628+
byteOffset += bytes.length;
629+
}
630+
return all;
631+
}
547632
}
548633

549634
abstract static class DenseBinaryDocValues extends BinaryDocValues implements BlockLoader.OptionalColumnAtATimeReader {
@@ -1533,9 +1618,9 @@ private BinaryEntry readBinary(IndexInput meta, int version) throws IOException
15331618
entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
15341619
entry.addressesLength = meta.readLong();
15351620

1536-
entry.docRangeOffset = meta.readLong();
1537-
entry.docRangeMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
1538-
entry.docRangeLength = meta.readLong();
1621+
entry.docOffsetsOffset = meta.readLong();
1622+
entry.docOffsetMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
1623+
entry.docOffsetLength = meta.readLong();
15391624

15401625
entry.numCompressedBlocks = numCompressedChunks;
15411626
}
@@ -2230,14 +2315,14 @@ static class BinaryEntry {
22302315
int maxLength;
22312316
long addressesOffset;
22322317
long addressesLength;
2233-
long docRangeOffset;
2234-
long docRangeLength;
2318+
long docOffsetsOffset;
2319+
long docOffsetLength;
22352320
// compression mode
22362321
int maxUncompressedChunkSize;
22372322
int maxNumDocsInAnyBlock;
22382323
int numCompressedBlocks;
22392324
DirectMonotonicReader.Meta addressesMeta;
2240-
DirectMonotonicReader.Meta docRangeMeta;
2325+
DirectMonotonicReader.Meta docOffsetMeta;
22412326

22422327
BinaryEntry(BinaryDVCompressionMode compression) {
22432328
this.compression = compression;

0 commit comments

Comments
 (0)