Skip to content

Commit 955316d

Browse files
committed
iter
1 parent 05f344e commit 955316d

File tree

2 files changed

+149
-28
lines changed

2 files changed

+149
-28
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 126 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,9 @@ BlockLoader.Block tryReadAHead(BlockLoader.BlockFactory factory, BlockLoader.Doc
487487

488488
@Override
489489
public void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException {
490-
termsEnum.lookupOrds(sortedOrds, uniqueCount, consumer);
490+
// termsEnum.lookupOrds(sortedOrds, uniqueCount, consumer);
491+
var r = new BulkOrdinalLookup(entry.termsDictEntry, data, merging);
492+
r.lookupOrds(sortedOrds, uniqueCount, consumer);
491493
}
492494
}
493495

@@ -644,7 +646,7 @@ private static class TermsDict extends BaseTermsEnum {
644646
final BytesRef term;
645647
long ord = -1;
646648

647-
BytesRef blockBuffer = null;
649+
final BytesRef blockBuffer;
648650
ByteArrayDataInput blockInput = null;
649651
long currentCompressedBlockStart = -1;
650652
long currentCompressedBlockEnd = -1;
@@ -715,6 +717,11 @@ public void seekExact(long ord) throws IOException {
715717
}
716718

717719
void lookupOrds(int[] sortedOrds, int uniqueCount, BlockLoader.BulkOrdinalLookup.TermConsumer consumer) throws IOException {
720+
final long firstBlockIndex = sortedOrds[0] >> TERMS_DICT_BLOCK_LZ4_SHIFT;
721+
final long firstBlockAddress = blockAddresses.get(firstBlockIndex);
722+
bytes.seek(firstBlockAddress);
723+
this.ord = (firstBlockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
724+
718725
for (int offset = 0; offset < uniqueCount; offset++) {
719726
int targetOrd = sortedOrds[offset];
720727
// Signed shift since ord is -1 when the terms enum is not positioned
@@ -726,36 +733,10 @@ void lookupOrds(int[] sortedOrds, int uniqueCount, BlockLoader.BulkOrdinalLookup
726733
bytes.seek(blockAddress);
727734
this.ord = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
728735
}
729-
730736
// Scan to the looked up ord
731737
while (this.ord < targetOrd) {
732738
next();
733739
}
734-
735-
// Scan to the looked up ord
736-
// for (this.ord++; this.ord < targetOrd; this.ord++) {
737-
// if (++this.ord >= entry.termsDictSize) {
738-
// return null;
739-
// }
740-
741-
// if ((this.ord & blockMask) == 0L) {
742-
// decompressBlock();
743-
// } else {
744-
// DataInput input = blockInput;
745-
// final int token = Byte.toUnsignedInt(input.readByte());
746-
// int prefixLength = token & 0x0F;
747-
// int suffixLength = 1 + (token >>> 4);
748-
// if (prefixLength == 15) {
749-
// prefixLength += input.readVInt();
750-
// }
751-
// if (suffixLength == 16) {
752-
// suffixLength += input.readVInt();
753-
// }
754-
// term.length = prefixLength + suffixLength;
755-
// input.readBytes(term.bytes, prefixLength, suffixLength);
756-
// }
757-
// }
758-
759740
consumer.onTerm(offset, term);
760741
}
761742
}
@@ -915,6 +896,123 @@ public int docFreq() throws IOException {
915896
}
916897
}
917898

899+
static final class BulkOrdinalLookup implements BlockLoader.BulkOrdinalLookup {
900+
901+
final TermsDictEntry entry;
902+
final LongValues blockAddresses;
903+
final IndexInput bytes;
904+
final long blockMask;
905+
final LongValues indexAddresses;
906+
final RandomAccessInput indexBytes;
907+
final BytesRef blockBuffer;
908+
909+
long currentCompressedBlockStart = -1;
910+
long currentCompressedBlockEnd = -1;
911+
912+
BulkOrdinalLookup(TermsDictEntry entry, IndexInput data, boolean merging) throws IOException {
913+
this.entry = entry;
914+
RandomAccessInput addressesSlice = data.randomAccessSlice(entry.termsAddressesOffset, entry.termsAddressesLength);
915+
blockAddresses = DirectMonotonicReader.getInstance(entry.termsAddressesMeta, addressesSlice, merging);
916+
bytes = data.slice("terms", entry.termsDataOffset, entry.termsDataLength);
917+
blockMask = (1L << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
918+
RandomAccessInput indexAddressesSlice = data.randomAccessSlice(
919+
entry.termsIndexAddressesOffset,
920+
entry.termsIndexAddressesLength
921+
);
922+
indexAddresses = DirectMonotonicReader.getInstance(entry.termsIndexAddressesMeta, indexAddressesSlice, merging);
923+
indexBytes = data.randomAccessSlice(entry.termsIndexOffset, entry.termsIndexLength);
924+
925+
// add the max term length for the dictionary
926+
// add 7 padding bytes can help decompression run faster.
927+
int bufferSize = entry.maxBlockLength + entry.maxTermLength + TermsDict.LZ4_DECOMPRESSOR_PADDING;
928+
blockBuffer = new BytesRef(new byte[bufferSize], 0, bufferSize);
929+
}
930+
931+
@Override
932+
public void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException {
933+
assert sortedOrds[sortedOrds.length - 1] < entry.termsDictSize;
934+
935+
BytesRef term = new BytesRef(entry.maxTermLength);
936+
937+
long blockIndex = sortedOrds[0] >> TERMS_DICT_BLOCK_LZ4_SHIFT;
938+
long blockAddress = blockAddresses.get(blockIndex);
939+
bytes.seek(blockAddress);
940+
long currentOrd = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
941+
942+
ByteArrayDataInput blockInput = null;
943+
for (int offset = 0; offset < uniqueCount; offset++) {
944+
int targetOrd = sortedOrds[offset];
945+
// Signed shift since ord is -1 when the terms enum is not positioned
946+
long currentBlockIndex = currentOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT;
947+
blockIndex = targetOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT;
948+
if (blockIndex != currentBlockIndex) {
949+
// The looked up ord belongs to a different block, seek again
950+
blockAddress = blockAddresses.get(blockIndex);
951+
bytes.seek(blockAddress);
952+
currentOrd = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1;
953+
}
954+
955+
// Scan to the looked up ord
956+
while (currentOrd < targetOrd) {
957+
currentOrd++;
958+
if ((currentOrd & blockMask) == 0L) {
959+
blockInput = decompressBlock(term, blockInput);
960+
} else {
961+
DataInput input = blockInput;
962+
final int token = Byte.toUnsignedInt(input.readByte());
963+
int prefixLength = token & 0x0F;
964+
int suffixLength = 1 + (token >>> 4);
965+
if (prefixLength == 15) {
966+
prefixLength += input.readVInt();
967+
}
968+
if (suffixLength == 16) {
969+
suffixLength += input.readVInt();
970+
}
971+
972+
term.length = prefixLength + suffixLength;
973+
input.readBytes(term.bytes, prefixLength, suffixLength);
974+
// if (currentOrd == targetOrd) {
975+
// term.length = prefixLength + suffixLength;
976+
// input.readBytes(term.bytes, prefixLength, suffixLength);
977+
// } else {
978+
// input.skipBytes(suffixLength);
979+
// }
980+
}
981+
}
982+
consumer.onTerm(offset, term);
983+
}
984+
}
985+
986+
private ByteArrayDataInput decompressBlock(BytesRef term, ByteArrayDataInput blockInput) throws IOException {
987+
// The first term is kept uncompressed, so no need to decompress block if only
988+
// look up the first term when doing seek block.
989+
term.length = bytes.readVInt();
990+
bytes.readBytes(term.bytes, 0, term.length);
991+
long offset = bytes.getFilePointer();
992+
if (offset < entry.termsDataLength - 1) {
993+
// Avoid decompress again if we are reading a same block.
994+
if (currentCompressedBlockStart != offset) {
995+
blockBuffer.offset = term.length;
996+
blockBuffer.length = bytes.readVInt();
997+
// Decompress the remaining of current block, using the first term as a dictionary
998+
System.arraycopy(term.bytes, 0, blockBuffer.bytes, 0, blockBuffer.offset);
999+
LZ4.decompress(bytes, blockBuffer.length, blockBuffer.bytes, blockBuffer.offset);
1000+
currentCompressedBlockStart = offset;
1001+
currentCompressedBlockEnd = bytes.getFilePointer();
1002+
} else {
1003+
// Skip decompression but need to re-seek to block end.
1004+
bytes.seek(currentCompressedBlockEnd);
1005+
}
1006+
1007+
// Reset the buffer.
1008+
return new ByteArrayDataInput(blockBuffer.bytes, blockBuffer.offset, blockBuffer.length);
1009+
} else {
1010+
return blockInput;
1011+
}
1012+
}
1013+
1014+
}
1015+
9181016
@Override
9191017
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
9201018
SortedNumericEntry entry = sortedNumerics.get(field.number);

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,10 @@ public void testForceMergeDenseCase() throws Exception {
158158
assertNotNull(tagsDV);
159159
var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes");
160160
assertNotNull(tagBytesDV);
161+
List<Integer> ordinals = new ArrayList<>();
161162
for (int i = 0; i < numDocs; i++) {
162163
assertEquals(i, hostNameDV.nextDoc());
164+
ordinals.add(hostNameDV.ordValue());
163165
int batchIndex = i / numHosts;
164166
assertEquals(batchIndex, hostNameDV.ordValue());
165167
String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex);
@@ -205,6 +207,27 @@ public void testForceMergeDenseCase() throws Exception {
205207
BytesRef tagBytesValue = tagBytesDV.binaryValue();
206208
assertTrue("unexpected bytes " + tagBytesValue, Arrays.binarySearch(tags, tagBytesValue.utf8ToString()) >= 0);
207209
}
210+
211+
var bulkOrdinalLookup = (BlockLoader.BulkOrdinalLookup) hostNameDV;
212+
{
213+
int[] sortedOrds = ordinals.stream().distinct().mapToInt(i -> i).toArray();
214+
var hosts = new ArrayList<>(numHosts);
215+
bulkOrdinalLookup.lookupOrds(sortedOrds, sortedOrds.length, (offset, term) -> hosts.add(term.utf8ToString()));
216+
for (int i = 0; i < hosts.size(); i++) {
217+
String expectedHostName = String.format(Locale.ROOT, "host-%03d", i);
218+
assertEquals(expectedHostName, hosts.get(i));
219+
}
220+
}
221+
{
222+
int offset = ordinals.size() - 3;
223+
int[] sortedOrds = ordinals.subList(offset, ordinals.size()).stream().distinct().mapToInt(i -> i).toArray();
224+
var hosts = new ArrayList<>(numHosts);
225+
bulkOrdinalLookup.lookupOrds(sortedOrds, sortedOrds.length, (o, term) -> hosts.add(term.utf8ToString()));
226+
for (int i = offset; i < hosts.size(); i++) {
227+
String expectedHostName = String.format(Locale.ROOT, "host-%03d", i);
228+
assertEquals(expectedHostName, hosts.get(i));
229+
}
230+
}
208231
}
209232
}
210233
}

0 commit comments

Comments
 (0)