Skip to content

Commit d52b0d8

Browse files
committed
Scan forward through chunk to find uncompressed size and allocate buffer
1 parent 9bed9c5 commit d52b0d8

File tree

1 file changed

+59
-34
lines changed

1 file changed

+59
-34
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,12 @@ void deltaDecode(int[] arr, int length) {
521521
}
522522
}
523523

524-
long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int docNumber, int numBlocks) {
525-
long index = docRanges.binarySearch(lastBlockId + 1, numBlocks, docNumber);
524+
long findAndUpdateBlock(int docNumber, int numBlocks) {
525+
if (docNumber < limitDocNumForBlock && lastBlockId >= 0) {
526+
return lastBlockId;
527+
}
528+
529+
long index = docOffsets.binarySearch(lastBlockId + 1, numBlocks, docNumber);
526530
// If index is found, index is inclusive lower bound of docNum range, so docNum is in blockId == index
527531
if (index < 0) {
528532
// If index was not found, insertion point (-index - 1) will be upper bound of docNum range.
@@ -531,16 +535,25 @@ long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int d
531535
}
532536
assert index < numBlocks : "invalid range " + index + " for doc " + docNumber + " in numBlocks " + numBlocks;
533537

534-
startDocNumForBlock = docRanges.get(index);
535-
limitDocNumForBlock = docRanges.get(index + 1);
538+
startDocNumForBlock = docOffsets.get(index);
539+
limitDocNumForBlock = docOffsets.get(index + 1);
536540
return index;
537541
}
538542

543+
// If query is over adjacent values we can scan forward through blocks, rather than binary searching for the next block.
544+
long findAndUpdateBlockByScanning(int docNumber) {
545+
if (docNumber < limitDocNumForBlock && lastBlockId >= 0) {
546+
return lastBlockId;
547+
}
548+
long blockId = lastBlockId + 1;
549+
startDocNumForBlock = docOffsets.get(blockId);
550+
limitDocNumForBlock = docOffsets.get(blockId + 1);
551+
return blockId;
552+
}
553+
539554
BytesRef decode(int docNumber, int numBlocks) throws IOException {
540555
// docNumber, rather than docId, because these are dense and could be indices from a DISI
541-
long blockId = docNumber < limitDocNumForBlock
542-
? lastBlockId
543-
: findAndUpdateBlock(docOffsets, lastBlockId, docNumber, numBlocks);
556+
long blockId = findAndUpdateBlock(docNumber, numBlocks);
544557

545558
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
546559
int idxInBlock = (int) (docNumber - startDocNumForBlock);
@@ -559,20 +572,49 @@ BytesRef decode(int docNumber, int numBlocks) throws IOException {
559572
return uncompressedBytesRef;
560573
}
561574

575+
int computeMultipleBlockBufferSize(int count, int firstDoc, long firstBlockId) throws IOException {
576+
long lastBlockId = firstBlockId;
577+
578+
int remaining = count;
579+
int nextDoc = firstDoc;
580+
for (long blockId = firstBlockId; remaining > 0; blockId++, lastBlockId++) {
581+
long blockStart = docOffsets.get(blockId);
582+
long blockLimit = docOffsets.get(blockId + 1);
583+
int numDocsInBlock = (int) (blockLimit - blockStart);
584+
int idxFirstDocInBlock = (int) (nextDoc - blockStart);
585+
int countInBlock = Math.min(numDocsInBlock - idxFirstDocInBlock, remaining);
586+
remaining -= countInBlock;
587+
nextDoc += countInBlock;
588+
}
589+
590+
// We could use compressedData directly, but making a clone seems less error-prone.
591+
IndexInput readAhead = compressedData.clone();
592+
int requiredBufferSize = 0;
593+
for (long blockId = firstBlockId; blockId <= lastBlockId; blockId++) {
594+
long blockStartOffset = addresses.get(blockId);
595+
readAhead.seek(blockStartOffset);
596+
readAhead.readByte(); // skip BlockHeader
597+
int uncompressedBlockLength = readAhead.readVInt();
598+
requiredBufferSize += uncompressedBlockLength;
599+
}
600+
return requiredBufferSize;
601+
}
602+
562603
void decodeBulk(int numBlocks, int firstDoc, int count, BlockLoader.SingletonBytesRefBuilder builder) throws IOException {
563604
int remainingCount = count;
564605
int nextDoc = firstDoc;
565606
int blockDocOffset = 0;
566607
int blockByteOffset = 0;
608+
609+
// Need to binary search forward for first blockId, but since query is dense, can scan from then on.
610+
// This block contains at least one value for range.
611+
long firstBlockId = findAndUpdateBlock(nextDoc, numBlocks);
567612
long[] offsets = new long[count + 1];
568-
List<BytesRef> decompressedBlocks = new ArrayList<>();
613+
int bufferSize = computeMultipleBlockBufferSize(count, firstDoc, firstBlockId);
614+
byte[] bytes = new byte[bufferSize];
569615

570616
while (remainingCount > 0) {
571-
long blockId = nextDoc < limitDocNumForBlock
572-
? lastBlockId
573-
: findAndUpdateBlock(this.docOffsets, lastBlockId, nextDoc, numBlocks);
574-
assert blockId >= 0;
575-
617+
long blockId = findAndUpdateBlockByScanning(nextDoc);
576618
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
577619
int idxFirstDocInBlock = (int) (nextDoc - startDocNumForBlock);
578620
int countInBlock = Math.min(numDocsInBlock - idxFirstDocInBlock, remainingCount);
@@ -595,38 +637,21 @@ void decodeBulk(int numBlocks, int firstDoc, int count, BlockLoader.SingletonByt
595637
offsets[blockDocOffset + i + 1] = byteOffsetInBlock + blockByteOffset;
596638
}
597639

640+
// Copy uncompressedBlock bytes into buffer for multiple blocks
641+
System.arraycopy(uncompressedBlock, startOffset, bytes, blockByteOffset, lenValuesInBlock);
642+
598643
nextDoc += countInBlock;
599644
remainingCount -= countInBlock;
600645
blockDocOffset += countInBlock;
601646
blockByteOffset += lenValuesInBlock;
602-
603-
if (remainingCount == 0) {
604-
// avoid making a copy if this was the last block to be decompressed
605-
decompressedBlocks.add(new BytesRef(uncompressedBlock, startOffset, lenValuesInBlock));
606-
} else {
607-
decompressedBlocks.add(
608-
new BytesRef(Arrays.copyOfRange(uncompressedBlock, startOffset, startOffset + lenValuesInBlock))
609-
);
610-
}
611647
}
612648

613649
int totalLen = Math.toIntExact(offsets[count]);
614650
if (totalLen == 0) {
615651
builder.appendBytesRefs(new byte[0], 0);
616652
} else {
617-
var allBytes = combinedBytes(totalLen, decompressedBlocks);
618-
builder.appendBytesRefs(allBytes, offsets);
619-
}
620-
}
621-
622-
static byte[] combinedBytes(int totalLen, List<BytesRef> allBytes) {
623-
byte[] all = new byte[totalLen];
624-
int byteOffset = 0;
625-
for (var bytes : allBytes) {
626-
System.arraycopy(bytes.bytes, bytes.offset, all, byteOffset, bytes.length);
627-
byteOffset += bytes.length;
653+
builder.appendBytesRefs(bytes, offsets);
628654
}
629-
return all;
630655
}
631656
}
632657

0 commit comments

Comments
 (0)