diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index f2bb92cffadd1..408675de196ba 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -431,14 +431,17 @@ BlockLoader.Block tryReadAHead(BlockLoader.BlockFactory factory, BlockLoader.Doc }; } - abstract class BaseSortedDocValues extends SortedDocValues implements BlockLoader.OptionalColumnAtATimeReader { + abstract class BaseSortedDocValues extends SortedDocValues + implements + BlockLoader.OptionalColumnAtATimeReader, + BlockLoader.BulkOrdinalLookup { final SortedEntry entry; - final TermsEnum termsEnum; + final TermsDict termsEnum; BaseSortedDocValues(SortedEntry entry) throws IOException { this.entry = entry; - this.termsEnum = termsEnum(); + this.termsEnum = (TermsDict) termsEnum(); } @Override @@ -481,6 +484,11 @@ public BlockLoader.Block tryRead( BlockLoader.Block tryReadAHead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { return null; } + + @Override + public void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException { + termsEnum.lookupOrds(sortedOrds, uniqueCount, consumer); + } } abstract static class BaseDenseNumericValues extends NumericDocValues implements BlockLoader.OptionalColumnAtATimeReader { @@ -636,7 +644,7 @@ private static class TermsDict extends BaseTermsEnum { final BytesRef term; long ord = -1; - BytesRef blockBuffer = null; + final BytesRef blockBuffer; ByteArrayDataInput blockInput = null; long currentCompressedBlockStart = -1; long currentCompressedBlockEnd = -1; @@ -666,7 +674,11 @@ public BytesRef next() throws IOException { if (++ord >= entry.termsDictSize) { return null; } + readTerm(); + return term; + } + void readTerm() throws IOException { if ((ord & blockMask) == 0L) { decompressBlock(); } else { @@ -683,7 +695,6 @@ public BytesRef next() throws IOException { term.length = prefixLength + suffixLength; input.readBytes(term.bytes, prefixLength, suffixLength); } - return term; } @Override @@ -706,6 +717,32 @@ public void seekExact(long ord) throws IOException { } } + void lookupOrds(int[] sortedOrds, int uniqueCount, BlockLoader.BulkOrdinalLookup.TermConsumer consumer) throws IOException { + final long firstBlockIndex = sortedOrds[0] >> TERMS_DICT_BLOCK_LZ4_SHIFT; + final long firstBlockAddress = blockAddresses.get(firstBlockIndex); + bytes.seek(firstBlockAddress); + this.ord = (firstBlockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1; + + for (int offset = 0; offset < uniqueCount; offset++) { + int targetOrd = sortedOrds[offset]; + // Signed shift since ord is -1 when the terms enum is not positioned + final long currentBlockIndex = this.ord >> TERMS_DICT_BLOCK_LZ4_SHIFT; + final long blockIndex = targetOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT; + if (blockIndex != currentBlockIndex) { + // The looked up ord belongs to a different block, seek again + final long blockAddress = blockAddresses.get(blockIndex); + bytes.seek(blockAddress); + this.ord = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1; + } + // Scan to the looked up ord + while (this.ord < targetOrd) { + ord++; + readTerm(); + } + consumer.onTerm(offset, term); + } + } + private BytesRef getTermFromIndex(long index) throws IOException { assert index >= 0 && index <= (entry.termsDictSize - 1) >>> entry.termsDictIndexShift; final long start = indexAddresses.get(index); @@ -861,6 +898,123 @@ public int docFreq() throws IOException { } } + static final class BulkOrdinalLookup implements BlockLoader.BulkOrdinalLookup { + + final TermsDictEntry entry; + final LongValues blockAddresses; + final IndexInput bytes; + final long blockMask; + final LongValues indexAddresses; + final RandomAccessInput indexBytes; + final BytesRef blockBuffer; + + long currentCompressedBlockStart = -1; + long currentCompressedBlockEnd = -1; + + BulkOrdinalLookup(TermsDictEntry entry, IndexInput data, boolean merging) throws IOException { + this.entry = entry; + RandomAccessInput addressesSlice = data.randomAccessSlice(entry.termsAddressesOffset, entry.termsAddressesLength); + blockAddresses = DirectMonotonicReader.getInstance(entry.termsAddressesMeta, addressesSlice, merging); + bytes = data.slice("terms", entry.termsDataOffset, entry.termsDataLength); + blockMask = (1L << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1; + RandomAccessInput indexAddressesSlice = data.randomAccessSlice( + entry.termsIndexAddressesOffset, + entry.termsIndexAddressesLength + ); + indexAddresses = DirectMonotonicReader.getInstance(entry.termsIndexAddressesMeta, indexAddressesSlice, merging); + indexBytes = data.randomAccessSlice(entry.termsIndexOffset, entry.termsIndexLength); + + // add the max term length for the dictionary + // add 7 padding bytes can help decompression run faster. + int bufferSize = entry.maxBlockLength + entry.maxTermLength + TermsDict.LZ4_DECOMPRESSOR_PADDING; + blockBuffer = new BytesRef(new byte[bufferSize], 0, bufferSize); + } + + @Override + public void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException { + assert sortedOrds[sortedOrds.length - 1] < entry.termsDictSize; + + BytesRef term = new BytesRef(entry.maxTermLength); + + long blockIndex = sortedOrds[0] >> TERMS_DICT_BLOCK_LZ4_SHIFT; + long blockAddress = blockAddresses.get(blockIndex); + bytes.seek(blockAddress); + long currentOrd = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1; + + ByteArrayDataInput blockInput = null; + for (int offset = 0; offset < uniqueCount; offset++) { + int targetOrd = sortedOrds[offset]; + // Signed shift since ord is -1 when the terms enum is not positioned + long currentBlockIndex = currentOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT; + blockIndex = targetOrd >> TERMS_DICT_BLOCK_LZ4_SHIFT; + if (blockIndex != currentBlockIndex) { + // The looked up ord belongs to a different block, seek again + blockAddress = blockAddresses.get(blockIndex); + bytes.seek(blockAddress); + currentOrd = (blockIndex << TERMS_DICT_BLOCK_LZ4_SHIFT) - 1; + } + + // Scan to the looked up ord + while (currentOrd < targetOrd) { + currentOrd++; + if ((currentOrd & blockMask) == 0L) { + blockInput = decompressBlock(term, blockInput); + } else { + DataInput input = blockInput; + final int token = Byte.toUnsignedInt(input.readByte()); + int prefixLength = token & 0x0F; + int suffixLength = 1 + (token >>> 4); + if (prefixLength == 15) { + prefixLength += input.readVInt(); + } + if (suffixLength == 16) { + suffixLength += input.readVInt(); + } + + term.length = prefixLength + suffixLength; + input.readBytes(term.bytes, prefixLength, suffixLength); + // if (currentOrd == targetOrd) { + // term.length = prefixLength + suffixLength; + // input.readBytes(term.bytes, prefixLength, suffixLength); + // } else { + // input.skipBytes(suffixLength); + // } + } + } + consumer.onTerm(offset, term); + } + } + + private ByteArrayDataInput decompressBlock(BytesRef term, ByteArrayDataInput blockInput) throws IOException { + // The first term is kept uncompressed, so no need to decompress block if only + // look up the first term when doing seek block. + term.length = bytes.readVInt(); + bytes.readBytes(term.bytes, 0, term.length); + long offset = bytes.getFilePointer(); + if (offset < entry.termsDataLength - 1) { + // Avoid decompress again if we are reading a same block. + if (currentCompressedBlockStart != offset) { + blockBuffer.offset = term.length; + blockBuffer.length = bytes.readVInt(); + // Decompress the remaining of current block, using the first term as a dictionary + System.arraycopy(term.bytes, 0, blockBuffer.bytes, 0, blockBuffer.offset); + LZ4.decompress(bytes, blockBuffer.length, blockBuffer.bytes, blockBuffer.offset); + currentCompressedBlockStart = offset; + currentCompressedBlockEnd = bytes.getFilePointer(); + } else { + // Skip decompression but need to re-seek to block end. + bytes.seek(currentCompressedBlockEnd); + } + + // Reset the buffer. + return new ByteArrayDataInput(blockBuffer.bytes, blockBuffer.offset, blockBuffer.length); + } else { + return blockInput; + } + } + + } + @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { SortedNumericEntry entry = sortedNumerics.get(field.number); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 5f2bd15abaa34..f7f842f0505d7 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -82,6 +82,16 @@ BlockLoader.Block tryRead( ) throws IOException; } + interface BulkOrdinalLookup { + + void lookupOrds(int[] sortedOrds, int uniqueCount, TermConsumer consumer) throws IOException; + + @FunctionalInterface + interface TermConsumer { + void onTerm(int offset, BytesRef term) throws IOException; + } + } + interface RowStrideReader extends Reader { /** * Reads the values of the given document into the builder. diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000 diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index ea29b1cbf1356..8de44854d6edb 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -158,8 +158,10 @@ public void testForceMergeDenseCase() throws Exception { assertNotNull(tagsDV); var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes"); assertNotNull(tagBytesDV); + List ordinals = new ArrayList<>(); for (int i = 0; i < numDocs; i++) { assertEquals(i, hostNameDV.nextDoc()); + ordinals.add(hostNameDV.ordValue()); int batchIndex = i / numHosts; assertEquals(batchIndex, hostNameDV.ordValue()); String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex); @@ -205,6 +207,27 @@ public void testForceMergeDenseCase() throws Exception { BytesRef tagBytesValue = tagBytesDV.binaryValue(); assertTrue("unexpected bytes " + tagBytesValue, Arrays.binarySearch(tags, tagBytesValue.utf8ToString()) >= 0); } + + var bulkOrdinalLookup = (BlockLoader.BulkOrdinalLookup) hostNameDV; + { + int[] sortedOrds = ordinals.stream().distinct().mapToInt(i -> i).toArray(); + var hosts = new ArrayList<>(numHosts); + bulkOrdinalLookup.lookupOrds(sortedOrds, sortedOrds.length, (offset, term) -> hosts.add(term.utf8ToString())); + for (int i = 0; i < hosts.size(); i++) { + String expectedHostName = String.format(Locale.ROOT, "host-%03d", i); + assertEquals(expectedHostName, hosts.get(i)); + } + } + { + int offset = ordinals.size() - 3; + int[] sortedOrds = ordinals.subList(offset, ordinals.size()).stream().distinct().mapToInt(i -> i).toArray(); + var hosts = new ArrayList<>(numHosts); + bulkOrdinalLookup.lookupOrds(sortedOrds, sortedOrds.length, (o, term) -> hosts.add(term.utf8ToString())); + for (int i = offset; i < hosts.size(); i++) { + String expectedHostName = String.format(Locale.ROOT, "host-%03d", i); + assertEquals(expectedHostName, hosts.get(i)); + } + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilder.java index af308000d5ebd..3b5cb58dc512a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilder.java @@ -200,10 +200,17 @@ BytesRefBlock buildRegularBlock() { blockFactory.adjustBreaker(offsetsAndLength); breakerSize += offsetsAndLength; int[] offsets = new int[uniqueCount + 1]; - for (int o = 0; o < uniqueCount; o++) { - BytesRef v = docValues.lookupOrd(sortedOrds[o]); - offsets[o] = copies.length(); - copies.append(v); + if (docValues instanceof BlockLoader.BulkOrdinalLookup bulkOrdinalLookup) { + bulkOrdinalLookup.lookupOrds(sortedOrds, uniqueCount, (ord, term) -> { + offsets[ord] = copies.length(); + copies.append(term); + }); + } else { + for (int o = 0; o < uniqueCount; o++) { + BytesRef v = docValues.lookupOrd(sortedOrds[o]); + offsets[o] = copies.length(); + copies.append(v); + } } offsets[uniqueCount] = copies.length();