Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,14 +431,17 @@ BlockLoader.Block tryReadAHead(BlockLoader.BlockFactory factory, BlockLoader.Doc
};
}

abstract class BaseSortedDocValues extends SortedDocValues implements BlockLoader.OptionalColumnAtATimeReader {
abstract class BaseSortedDocValues extends SortedDocValues
implements
BlockLoader.OptionalColumnAtATimeReader,
BlockLoader.BulkOrdinalLookup {

final SortedEntry entry;
final TermsEnum termsEnum;
final TermsDict termsEnum;

BaseSortedDocValues(SortedEntry entry) throws IOException {
this.entry = entry;
this.termsEnum = termsEnum();
this.termsEnum = (TermsDict) termsEnum();
}

@Override
Expand Down Expand Up @@ -481,6 +484,11 @@ public BlockLoader.Block tryRead(
BlockLoader.Block tryReadAHead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
return null;
}

@Override
public void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException {
termsEnum.lookupOrds(sortedOrds, uniqueCount, consumer);
}
}

abstract static class BaseDenseNumericValues extends NumericDocValues implements BlockLoader.OptionalColumnAtATimeReader {
Expand Down Expand Up @@ -636,7 +644,7 @@ private static class TermsDict extends BaseTermsEnum {
final BytesRef term;
long ord = -1;

BytesRef blockBuffer = null;
final BytesRef blockBuffer;
ByteArrayDataInput blockInput = null;
long currentCompressedBlockStart = -1;
long currentCompressedBlockEnd = -1;
Expand Down Expand Up @@ -666,7 +674,11 @@ public BytesRef next() throws IOException {
if (++ord >= entry.termsDictSize) {
return null;
}
readTerm();
return term;
}

void readTerm() throws IOException {
if ((ord & blockMask) == 0L) {
decompressBlock();
} else {
Expand All @@ -683,7 +695,6 @@ public BytesRef next() throws IOException {
term.length = prefixLength + suffixLength;
input.readBytes(term.bytes, prefixLength, suffixLength);
}
return term;
}

@Override
Expand All @@ -706,6 +717,32 @@ public void seekExact(long ord) throws IOException {
}
}

void lookupOrds(int[] sortedOrds, int uniqueCount, BlockLoader.BulkOrdinalLookup.TermConsumer consumer) throws IOException {
final long firstBlockIndex = sortedOrds[0] >> TERMS_DICT_BLOCK_LZ4_SHIFT;
final long firstBlockAddress = blockAddresses.get(firstBlockIndex);
bytes.seek(firstBlockAddress);
this.ord = (firstBlockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;

for (int offset = 0; offset < uniqueCount; offset++) {
int targetOrd = sortedOrds[offset];
// Signed shift since ord is -1 when the terms enum is not positioned
final long currentBlockIndex = this.ord >> TERMS_DICT_BLOCK_LZ4_SHIFT;
final long blockIndex = targetOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT;
if (blockIndex != currentBlockIndex) {
// The looked up ord belongs to a different block, seek again
final long blockAddress = blockAddresses.get(blockIndex);
bytes.seek(blockAddress);
this.ord = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
}
// Scan to the looked up ord
while (this.ord < targetOrd) {
ord++;
readTerm();
}
consumer.onTerm(offset, term);
}
}

private BytesRef getTermFromIndex(long index) throws IOException {
assert index >= 0 && index <= (entry.termsDictSize - 1) >>> entry.termsDictIndexShift;
final long start = indexAddresses.get(index);
Expand Down Expand Up @@ -861,6 +898,123 @@ public int docFreq() throws IOException {
}
}

static final class BulkOrdinalLookup implements BlockLoader.BulkOrdinalLookup {

final TermsDictEntry entry;
final LongValues blockAddresses;
final IndexInput bytes;
final long blockMask;
final LongValues indexAddresses;
final RandomAccessInput indexBytes;
final BytesRef blockBuffer;

long currentCompressedBlockStart = -1;
long currentCompressedBlockEnd = -1;

BulkOrdinalLookup(TermsDictEntry entry, IndexInput data, boolean merging) throws IOException {
this.entry = entry;
RandomAccessInput addressesSlice = data.randomAccessSlice(entry.termsAddressesOffset, entry.termsAddressesLength);
blockAddresses = DirectMonotonicReader.getInstance(entry.termsAddressesMeta, addressesSlice, merging);
bytes = data.slice("terms", entry.termsDataOffset, entry.termsDataLength);
blockMask = (1L << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
RandomAccessInput indexAddressesSlice = data.randomAccessSlice(
entry.termsIndexAddressesOffset,
entry.termsIndexAddressesLength
);
indexAddresses = DirectMonotonicReader.getInstance(entry.termsIndexAddressesMeta, indexAddressesSlice, merging);
indexBytes = data.randomAccessSlice(entry.termsIndexOffset, entry.termsIndexLength);

// add the max term length for the dictionary
// add 7 padding bytes can help decompression run faster.
int bufferSize = entry.maxBlockLength + entry.maxTermLength + TermsDict.LZ4_DECOMPRESSOR_PADDING;
blockBuffer = new BytesRef(new byte[bufferSize], 0, bufferSize);
}

@Override
public void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException {
assert sortedOrds[sortedOrds.length - 1] < entry.termsDictSize;

BytesRef term = new BytesRef(entry.maxTermLength);

long blockIndex = sortedOrds[0] >> TERMS_DICT_BLOCK_LZ4_SHIFT;
long blockAddress = blockAddresses.get(blockIndex);
bytes.seek(blockAddress);
long currentOrd = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;

ByteArrayDataInput blockInput = null;
for (int offset = 0; offset < uniqueCount; offset++) {
int targetOrd = sortedOrds[offset];
// Signed shift since ord is -1 when the terms enum is not positioned
long currentBlockIndex = currentOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT;
blockIndex = targetOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT;
if (blockIndex != currentBlockIndex) {
// The looked up ord belongs to a different block, seek again
blockAddress = blockAddresses.get(blockIndex);
bytes.seek(blockAddress);
currentOrd = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
}

// Scan to the looked up ord
while (currentOrd < targetOrd) {
currentOrd++;
if ((currentOrd & blockMask) == 0L) {
blockInput = decompressBlock(term, blockInput);
} else {
DataInput input = blockInput;
final int token = Byte.toUnsignedInt(input.readByte());
int prefixLength = token & 0x0F;
int suffixLength = 1 + (token >>> 4);
if (prefixLength == 15) {
prefixLength += input.readVInt();
}
if (suffixLength == 16) {
suffixLength += input.readVInt();
}

term.length = prefixLength + suffixLength;
input.readBytes(term.bytes, prefixLength, suffixLength);
// if (currentOrd == targetOrd) {
// term.length = prefixLength + suffixLength;
// input.readBytes(term.bytes, prefixLength, suffixLength);
// } else {
// input.skipBytes(suffixLength);
// }
}
}
consumer.onTerm(offset, term);
}
}

private ByteArrayDataInput decompressBlock(BytesRef term, ByteArrayDataInput blockInput) throws IOException {
// The first term is kept uncompressed, so no need to decompress block if only
// look up the first term when doing seek block.
term.length = bytes.readVInt();
bytes.readBytes(term.bytes, 0, term.length);
long offset = bytes.getFilePointer();
if (offset < entry.termsDataLength - 1) {
// Avoid decompress again if we are reading a same block.
if (currentCompressedBlockStart != offset) {
blockBuffer.offset = term.length;
blockBuffer.length = bytes.readVInt();
// Decompress the remaining of current block, using the first term as a dictionary
System.arraycopy(term.bytes, 0, blockBuffer.bytes, 0, blockBuffer.offset);
LZ4.decompress(bytes, blockBuffer.length, blockBuffer.bytes, blockBuffer.offset);
currentCompressedBlockStart = offset;
currentCompressedBlockEnd = bytes.getFilePointer();
} else {
// Skip decompression but need to re-seek to block end.
bytes.seek(currentCompressedBlockEnd);
}

// Reset the buffer.
return new ByteArrayDataInput(blockBuffer.bytes, blockBuffer.offset, blockBuffer.length);
} else {
return blockInput;
}
}

}

@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
SortedNumericEntry entry = sortedNumerics.get(field.number);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ BlockLoader.Block tryRead(
) throws IOException;
}

interface BulkOrdinalLookup {

void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException;

@FunctionalInterface
interface TermConsumer {
void onTerm(int offset, BytesRef term) throws IOException;
}
}

interface RowStrideReader extends Reader {
/**
* Reads the values of the given document into the builder.
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/8.18.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_8_18_6,8840008
transform_check_for_dangling_tasks,8840011
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/8.19.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_8_19_3,8841067
transform_check_for_dangling_tasks,8841070
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.0.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_9_0_6,9000015
transform_check_for_dangling_tasks,9000018
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.1.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_9_1_4,9112007
transform_check_for_dangling_tasks,9112009
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ml_inference_endpoint_cache,9157000
initial_9.2.0,9185000
1 change: 1 addition & 0 deletions server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial_9.2.0,9185000
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ public void testForceMergeDenseCase() throws Exception {
assertNotNull(tagsDV);
var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes");
assertNotNull(tagBytesDV);
List<Integer> ordinals = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
assertEquals(i, hostNameDV.nextDoc());
ordinals.add(hostNameDV.ordValue());
int batchIndex = i / numHosts;
assertEquals(batchIndex, hostNameDV.ordValue());
String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex);
Expand Down Expand Up @@ -205,6 +207,27 @@ public void testForceMergeDenseCase() throws Exception {
BytesRef tagBytesValue = tagBytesDV.binaryValue();
assertTrue("unexpected bytes " + tagBytesValue, Arrays.binarySearch(tags, tagBytesValue.utf8ToString()) >= 0);
}

var bulkOrdinalLookup = (BlockLoader.BulkOrdinalLookup) hostNameDV;
{
int[] sortedOrds = ordinals.stream().distinct().mapToInt(i -> i).toArray();
var hosts = new ArrayList<>(numHosts);
bulkOrdinalLookup.lookupOrds(sortedOrds, sortedOrds.length, (offset, term) -> hosts.add(term.utf8ToString()));
for (int i = 0; i < hosts.size(); i++) {
String expectedHostName = String.format(Locale.ROOT, "host-%03d", i);
assertEquals(expectedHostName, hosts.get(i));
}
}
{
int offset = ordinals.size() - 3;
int[] sortedOrds = ordinals.subList(offset, ordinals.size()).stream().distinct().mapToInt(i -> i).toArray();
var hosts = new ArrayList<>(numHosts);
bulkOrdinalLookup.lookupOrds(sortedOrds, sortedOrds.length, (o, term) -> hosts.add(term.utf8ToString()));
for (int i = offset; i < hosts.size(); i++) {
String expectedHostName = String.format(Locale.ROOT, "host-%03d", i);
assertEquals(expectedHostName, hosts.get(i));
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,17 @@ BytesRefBlock buildRegularBlock() {
blockFactory.adjustBreaker(offsetsAndLength);
breakerSize += offsetsAndLength;
int[] offsets = new int[uniqueCount + 1];
for (int o = 0; o < uniqueCount; o++) {
BytesRef v = docValues.lookupOrd(sortedOrds[o]);
offsets[o] = copies.length();
copies.append(v);
if (docValues instanceof BlockLoader.BulkOrdinalLookup bulkOrdinalLookup) {
bulkOrdinalLookup.lookupOrds(sortedOrds, uniqueCount, (ord, term) -> {
offsets[ord] = copies.length();
copies.append(term);
});
} else {
for (int o = 0; o < uniqueCount; o++) {
BytesRef v = docValues.lookupOrd(sortedOrds[o]);
offsets[o] = copies.length();
copies.append(v);
}
}
offsets[uniqueCount] = copies.length();

Expand Down