diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java new file mode 100644 index 0000000000000..5dc7ed47d7b80 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java @@ -0,0 +1,181 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.index.codec.tsdb; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.lucene101.Lucene101Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.profile.AsyncProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Fork(1) +@Threads(1) +@Warmup(iterations = 0) +@Measurement(iterations = 1) +public class TSDBDocValuesMergeBenchmark { + + @Param("20431204") + private int nDocs; + + @Param("1000") + private int deltaTime; + + @Param("42") + private int seed; + + private static final String TIMESTAMP_FIELD = "@timestamp"; + private static final String HOSTNAME_FIELD = "host.name"; + private static final long BASE_TIMESTAMP = 1704067200000L; + + private IndexWriter indexWriterWithoutOptimizedMerge; + private IndexWriter indexWriterWithOptimizedMerge; + private ExecutorService executorService; + + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName()) + .addProfiler(AsyncProfiler.class) + .build(); + + new Runner(options).run(); + } + + @Setup(Level.Trial) + public void setup() throws IOException { + executorService = Executors.newSingleThreadExecutor(); + + final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-")); + final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-")); + + indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false); + indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true); + } + + private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException { + + final IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer()); + // NOTE: index sort config matching LogsDB's sort order + config.setIndexSort( + new Sort( + new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false), + new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true) + ) + ); + ES87TSDBDocValuesFormat docValuesFormat = new ES87TSDBDocValuesFormat(4096, optimizedMergeEnabled); + config.setCodec(new Lucene101Codec() { + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }); + + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + int numHosts = 1000; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + final Random random = new Random(seed); + IndexWriter indexWriter = new IndexWriter(directory, config); + for (int i = 0; i < nDocs; i++) { + final Document doc = new Document(); + + final int batchIndex = i / numHosts; + final String hostName = "host-" + batchIndex; + // Slightly vary the timestamp in each document + final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime); + + doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName))); + doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp)); + doc.add(new SortedNumericDocValuesField("counter_1", counter1++)); + doc.add(new SortedNumericDocValuesField("counter_2", counter2++)); + doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + int numTags = tags.length % (i + 1); + for (int j = 0; j < numTags; j++) { + doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + + indexWriter.addDocument(doc); + } + indexWriter.commit(); + return indexWriter; + } + + @Benchmark + public void forceMergeWithoutOptimizedMerge() throws IOException { + forceMerge(indexWriterWithoutOptimizedMerge); + } + + @Benchmark + public void forceMergeWithOptimizedMerge() throws IOException { + forceMerge(indexWriterWithOptimizedMerge); + } + + private void forceMerge(final IndexWriter indexWriter) throws IOException { + indexWriter.forceMerge(1); + } + + @TearDown(Level.Trial) + public void tearDown() { + if (executorService != null) { + executorService.shutdown(); + try { + if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/docs/changelog/125403.yaml b/docs/changelog/125403.yaml new file mode 100644 index 0000000000000..d953dae4db4fe --- /dev/null +++ b/docs/changelog/125403.yaml @@ -0,0 +1,5 @@ +pr: 125403 +summary: First step optimizing tsdb doc values codec merging +area: Codec +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesConsumerUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesConsumerUtil.java new file mode 100644 index 0000000000000..499555618e9d7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesConsumerUtil.java @@ -0,0 +1,669 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.EmptyDocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OrdinalMap; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.packed.PackedInts; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Mostly contains forked code from {@link org.apache.lucene.codecs.DocValuesConsumer}. + */ +class DocValuesConsumerUtil { + + static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1); + + abstract static class TsdbDocValuesProducer extends EmptyDocValuesProducer { + + final MergeStats mergeStats; + + TsdbDocValuesProducer(MergeStats mergeStats) { + this.mergeStats = mergeStats; + } + + } + + record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField) {} + + static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) + throws IOException { + if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) { + return UNSUPPORTED; + } + + // Documents marked as deleted should be rare. Maybe in the case of noop operation? + for (int i = 0; i < mergeState.liveDocs.length; i++) { + if (mergeState.liveDocs[i] != null) { + return UNSUPPORTED; + } + } + + long sumNumValues = 0; + int sumNumDocsWithField = 0; + // TODO bring back codec version check? (per field doc values producer sits between ES87TSDBDocValuesConsumer) + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + switch (fieldInfo.getDocValuesType()) { + case NUMERIC -> { + var numeric = docValuesProducer.getNumeric(fieldInfo); + if (numeric instanceof ES87TSDBDocValuesProducer.BaseNumericDocValues baseNumeric) { + var entry = baseNumeric.entry; + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } else if (numeric != null) { + return UNSUPPORTED; + } + } + case SORTED_NUMERIC -> { + var sortedNumeric = docValuesProducer.getSortedNumeric(fieldInfo); + if (sortedNumeric instanceof ES87TSDBDocValuesProducer.BaseSortedNumericDocValues baseSortedNumericDocValues) { + var entry = baseSortedNumericDocValues.entry; + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } else { + var singleton = DocValues.unwrapSingleton(sortedNumeric); + if (singleton instanceof ES87TSDBDocValuesProducer.BaseNumericDocValues baseNumeric) { + var entry = baseNumeric.entry; + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } else if (sortedNumeric != null) { + return UNSUPPORTED; + } + } + } + case SORTED -> { + var sorted = docValuesProducer.getSorted(fieldInfo); + if (sorted instanceof ES87TSDBDocValuesProducer.BaseSortedDocValues baseSortedDocValues) { + var entry = baseSortedDocValues.entry; + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } else if (sorted != null) { + return UNSUPPORTED; + } + } + case SORTED_SET -> { + var sortedSet = docValuesProducer.getSortedSet(fieldInfo); + if (sortedSet instanceof ES87TSDBDocValuesProducer.BaseSortedSetDocValues baseSortedSet) { + var entry = baseSortedSet.entry; + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } else { + var singleton = DocValues.unwrapSingleton(sortedSet); + if (singleton instanceof ES87TSDBDocValuesProducer.BaseSortedDocValues baseSorted) { + var entry = baseSorted.entry; + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } else if (sortedSet != null) { + return UNSUPPORTED; + } + } + } + default -> throw new IllegalStateException("unexpected doc values producer type: " + fieldInfo.getDocValuesType()); + } + } + + return new MergeStats(true, sumNumValues, sumNumDocsWithField); + } + + static DocValuesProducer mergeNumericProducer(MergeStats mergeStats, FieldInfo mergeFieldInfo, MergeState mergeState) { + return new TsdbDocValuesProducer(mergeStats) { + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + NumericDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.NUMERIC) { + values = docValuesProducer.getNumeric(readerFieldInfo); + } + } + if (values != null) { + subs.add(new NumericDocValuesSub(mergeState.docMaps[i], values)); + } + } + + return mergeNumericValues(subs, mergeState.needsIndexSort); + } + }; + } + + static NumericDocValues mergeNumericValues(List subs, boolean indexIsSorted) throws IOException { + long cost = 0; + for (NumericDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new NumericDocValues() { + private int docID = -1; + private NumericDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long longValue() throws IOException { + return current.values.longValue(); + } + + }; + } + + static class NumericDocValuesSub extends DocIDMerger.Sub { + + final NumericDocValues values; + + NumericDocValuesSub(MergeState.DocMap docMap, NumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + static DocValuesProducer mergeSortedNumericProducer(MergeStats mergeStats, FieldInfo mergeFieldInfo, MergeState mergeState) { + return new TsdbDocValuesProducer(mergeStats) { + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedNumericDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_NUMERIC) { + values = docValuesProducer.getSortedNumeric(readerFieldInfo); + } + } + if (values != null) { + subs.add(new SortedNumericDocValuesSub(mergeState.docMaps[i], values)); + } + } + return mergeSortedNumericValues(subs, mergeState.needsIndexSort); + } + }; + } + + static SortedNumericDocValues mergeSortedNumericValues(List subs, boolean indexIsSorted) throws IOException { + long cost = 0; + for (SortedNumericDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new SortedNumericDocValues() { + private int docID = -1; + private SortedNumericDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long nextValue() throws IOException { + return current.values.nextValue(); + } + + @Override + public int docValueCount() { + return current.values.docValueCount(); + } + + }; + } + + static class SortedNumericDocValuesSub extends DocIDMerger.Sub { + + final SortedNumericDocValues values; + + SortedNumericDocValuesSub(MergeState.DocMap docMap, SortedNumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + static DocValuesProducer mergeSortedProducer(MergeStats mergeStats, FieldInfo mergeFieldInfo, MergeState mergeState) { + return new TsdbDocValuesProducer(mergeStats) { + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + + TermsEnum[] liveTerms = new TermsEnum[mergeState.docValuesProducers.length]; + long[] weights = new long[liveTerms.length]; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + + liveTerms[i] = values.termsEnum(); + weights[i] = values.getValueCount(); + subs.add(new SortedDocValuesSub(mergeState.docMaps[i], values)); + } + + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + for (int i = 0; i < subs.size(); i++) { + subs.get(i).map = map.getGlobalOrds(i); + } + return mergeSortedValues(subs, mergeState.needsIndexSort, map); + } + }; + } + + static SortedDocValues mergeSortedValues(List subs, boolean indexIsSorted, OrdinalMap map) throws IOException { + long cost = 0; + for (SortedDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new SortedDocValues() { + private int docID = -1; + private SortedDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public int ordValue() throws IOException { + int subOrd = current.values.ordValue(); + assert subOrd != -1; + return (int) current.map.get(subOrd); + } + + @Override + public BytesRef lookupOrd(int ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + int segmentOrd = (int) map.getFirstSegmentOrd(ord); + return subs.get(segmentNumber).values.lookupOrd(segmentOrd); + } + + @Override + public int getValueCount() { + return (int) map.getValueCount(); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] termsEnurmSubs = new TermsEnum[subs.size()]; + for (int sub = 0; sub < termsEnurmSubs.length; ++sub) { + termsEnurmSubs[sub] = subs.get(sub).values.termsEnum(); + } + return new MergedTermsEnum(map, termsEnurmSubs); + } + }; + } + + static class SortedDocValuesSub extends DocIDMerger.Sub { + + LongValues map; + final SortedDocValues values; + + SortedDocValuesSub(MergeState.DocMap docMap, SortedDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + static DocValuesProducer mergeSortedSetProducer(MergeStats mergeStats, FieldInfo mergeFieldInfo, MergeState mergeState) { + return new TsdbDocValuesProducer(mergeStats) { + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + + TermsEnum[] liveTerms = new TermsEnum[mergeState.docValuesProducers.length]; + long[] weights = new long[liveTerms.length]; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + liveTerms[i] = values.termsEnum(); + weights[i] = values.getValueCount(); + subs.add(new SortedSetDocValuesSub(mergeState.docMaps[i], values)); + } + + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + for (int i = 0; i < subs.size(); i++) { + subs.get(i).map = map.getGlobalOrds(i); + } + return mergeSortedSetValues(subs, mergeState.needsIndexSort, map); + } + }; + } + + static SortedSetDocValues mergeSortedSetValues(List subs, boolean indexIsSorted, OrdinalMap map) + throws IOException { + long cost = 0; + for (SortedSetDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new SortedSetDocValues() { + private int docID = -1; + private SortedSetDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long nextOrd() throws IOException { + long subOrd = current.values.nextOrd(); + return current.map.get(subOrd); + } + + @Override + public int docValueCount() { + return current.values.docValueCount(); + } + + @Override + public BytesRef lookupOrd(long ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + int segmentOrd = (int) map.getFirstSegmentOrd(ord); + return subs.get(segmentNumber).values.lookupOrd(segmentOrd); + } + + @Override + public long getValueCount() { + return map.getValueCount(); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] termsEnurmSubs = new TermsEnum[subs.size()]; + for (int sub = 0; sub < termsEnurmSubs.length; ++sub) { + termsEnurmSubs[sub] = subs.get(sub).values.termsEnum(); + } + return new MergedTermsEnum(map, termsEnurmSubs); + } + }; + } + + static class SortedSetDocValuesSub extends DocIDMerger.Sub { + + LongValues map; + final SortedSetDocValues values; + + SortedSetDocValuesSub(MergeState.DocMap docMap, SortedSetDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + static class MergedTermsEnum extends BaseTermsEnum { + + private final TermsEnum[] subs; + private final OrdinalMap ordinalMap; + private final long valueCount; + private long ord = -1; + private BytesRef term; + + MergedTermsEnum(OrdinalMap ordinalMap, TermsEnum[] subs) { + this.ordinalMap = ordinalMap; + this.subs = subs; + this.valueCount = ordinalMap.getValueCount(); + } + + @Override + public BytesRef term() throws IOException { + return term; + } + + @Override + public long ord() throws IOException { + return ord; + } + + @Override + public BytesRef next() throws IOException { + if (++ord >= valueCount) { + return null; + } + final int subNum = ordinalMap.getFirstSegmentNumber(ord); + final TermsEnum sub = subs[subNum]; + final long subOrd = ordinalMap.getFirstSegmentOrd(ord); + do { + term = sub.next(); + } while (sub.ord() < subOrd); + assert sub.ord() == subOrd; + return term; + } + + @Override + public AttributeSource attributes() { + throw new UnsupportedOperationException(); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(long ord) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long totalTermFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TermState termState() throws IOException { + throw new UnsupportedOperationException(); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java index dc73428a07c7c..485a9b9c20397 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java @@ -19,17 +19,21 @@ import org.apache.lucene.index.EmptyDocValuesProducer; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.MergeState; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.CheckedIntConsumer; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.SortedSetSelector; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; @@ -46,6 +50,7 @@ import java.util.Arrays; import java.util.List; +import static org.elasticsearch.index.codec.tsdb.DocValuesConsumerUtil.compatibleWithOptimizedMerge; import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.SKIP_INDEX_LEVEL_SHIFT; import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.SKIP_INDEX_MAX_LEVEL; @@ -55,11 +60,14 @@ final class ES87TSDBDocValuesConsumer extends DocValuesConsumer { IndexOutput data, meta; final int maxDoc; + final Directory dir; + final boolean enableOptimizedMerge; private byte[] termsDictBuffer; private final int skipIndexIntervalSize; ES87TSDBDocValuesConsumer( SegmentWriteState state, + boolean enableOptimizedMerge, int skipIndexIntervalSize, String dataCodec, String dataExtension, @@ -89,6 +97,8 @@ final class ES87TSDBDocValuesConsumer extends DocValuesConsumer { ); maxDoc = state.segmentInfo.maxDoc(); this.skipIndexIntervalSize = skipIndexIntervalSize; + this.enableOptimizedMerge = enableOptimizedMerge; + this.dir = state.directory; success = true; } finally { if (success == false) { @@ -101,51 +111,56 @@ final class ES87TSDBDocValuesConsumer extends DocValuesConsumer { public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(ES87TSDBDocValuesFormat.NUMERIC); - DocValuesProducer producer = new EmptyDocValuesProducer() { - @Override - public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { - return DocValues.singleton(valuesProducer.getNumeric(field)); - } - }; + final DocValuesProducer producer; + if (valuesProducer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdb) { + producer = new DocValuesConsumerUtil.TsdbDocValuesProducer(tsdb.mergeStats) { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return DocValues.singleton(valuesProducer.getNumeric(field)); + } + }; + } else { + producer = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return DocValues.singleton(valuesProducer.getNumeric(field)); + } + }; + } if (field.docValuesSkipIndexType() != DocValuesSkipIndexType.NONE) { writeSkipIndex(field, producer); } - writeField(field, producer, -1); + writeField(field, producer, -1, null); } - private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, long maxOrd) throws IOException { + private long[] writeField( + FieldInfo field, + DocValuesProducer valuesProducer, + long maxOrd, + CheckedIntConsumer docCountConsumer + ) throws IOException { int numDocsWithValue = 0; long numValues = 0; - SortedNumericDocValues values = valuesProducer.getSortedNumeric(field); - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - numDocsWithValue++; - final int count = values.docValueCount(); - numValues += count; - } - - if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values - meta.writeLong(-2); // docsWithFieldOffset - meta.writeLong(0L); // docsWithFieldLength - meta.writeShort((short) -1); // jumpTableEntryCount - meta.writeByte((byte) -1); // denseRankPower - } else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values - meta.writeLong(-1); // docsWithFieldOffset - meta.writeLong(0L); // docsWithFieldLength - meta.writeShort((short) -1); // jumpTableEntryCount - meta.writeByte((byte) -1); // denseRankPower - } else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values - long offset = data.getFilePointer(); - meta.writeLong(offset); // docsWithFieldOffset + SortedNumericDocValues values; + if (valuesProducer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdbDocValuesProducer) { + numDocsWithValue = tsdbDocValuesProducer.mergeStats.sumNumDocsWithField(); + numValues = tsdbDocValuesProducer.mergeStats.sumNumValues(); + } else { values = valuesProducer.getSortedNumeric(field); - final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); - meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength - meta.writeShort(jumpTableEntryCount); - meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER); + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + numDocsWithValue++; + final int count = values.docValueCount(); + numValues += count; + } } meta.writeLong(numValues); + meta.writeInt(numDocsWithValue); + // TODO: which IOContext should be used here? + IndexOutput disiTempOutput = null; + IndexedDISIBuilder docIdSetBuilder = null; if (numValues > 0) { // Special case for maxOrd of 1, signal -1 that no blocks will be written meta.writeInt(maxOrd != 1 ? ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1); @@ -165,8 +180,20 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); values = valuesProducer.getSortedNumeric(field); final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1; + + if (numDocsWithValue != 0 && numDocsWithValue != maxDoc) { + disiTempOutput = dir.createTempOutput(data.getName(), "disi", IOContext.DEFAULT); + docIdSetBuilder = new IndexedDISIBuilder(disiTempOutput, IndexedDISI.DEFAULT_DENSE_RANK_POWER); + } + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + if (docIdSetBuilder != null) { + docIdSetBuilder.addDocId(doc); + } final int count = values.docValueCount(); + if (docCountConsumer != null) { + docCountConsumer.accept(count); + } for (int i = 0; i < count; ++i) { buffer[bufferSize++] = values.nextValue(); if (bufferSize == ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) { @@ -205,10 +232,53 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon meta.writeLong(valuesDataOffset); meta.writeLong(valuesDataLength); } + if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values + meta.writeLong(-2); // docsWithFieldOffset + meta.writeLong(0L); // docsWithFieldLength + meta.writeShort((short) -1); // jumpTableEntryCount + meta.writeByte((byte) -1); // denseRankPower + } else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values + meta.writeLong(-1); // docsWithFieldOffset + meta.writeLong(0L); // docsWithFieldLength + meta.writeShort((short) -1); // jumpTableEntryCount + meta.writeByte((byte) -1); // denseRankPower + } else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values + long offset = data.getFilePointer(); + meta.writeLong(offset); // docsWithFieldOffset + final short jumpTableEntryCount; + if (maxOrd != 1 && docIdSetBuilder != null) { + jumpTableEntryCount = docIdSetBuilder.build(); + String skipListTempFileName = disiTempOutput.getName(); + disiTempOutput.close(); + try ( + // TODO: which IOContext should be used here? + var addressDataInput = dir.openInput(skipListTempFileName, IOContext.DEFAULT) + ) { + data.copyBytes(addressDataInput, addressDataInput.length()); + } + org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(dir, skipListTempFileName); + } else { + values = valuesProducer.getSortedNumeric(field); + jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); + } + meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength + meta.writeShort(jumpTableEntryCount); + meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER); + } return new long[] { numDocsWithValue, numValues }; } + @Override + public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + addNumericField(mergeFieldInfo, DocValuesConsumerUtil.mergeNumericProducer(result, mergeFieldInfo, mergeState)); + } else { + super.mergeNumericField(mergeFieldInfo, mergeState); + } + } + @Override public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); @@ -284,45 +354,97 @@ public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) th doAddSortedField(field, valuesProducer, false); } + @Override + public void mergeSortedField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + addSortedField(mergeFieldInfo, DocValuesConsumerUtil.mergeSortedProducer(result, mergeFieldInfo, mergeState)); + } else { + super.mergeSortedField(mergeFieldInfo, mergeState); + } + } + private void doAddSortedField(FieldInfo field, DocValuesProducer valuesProducer, boolean addTypeByte) throws IOException { - DocValuesProducer producer = new EmptyDocValuesProducer() { - @Override - public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { - SortedDocValues sorted = valuesProducer.getSorted(field); - NumericDocValues sortedOrds = new NumericDocValues() { - @Override - public long longValue() throws IOException { - return sorted.ordValue(); - } + final DocValuesProducer producer; + if (valuesProducer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdb) { + producer = new DocValuesConsumerUtil.TsdbDocValuesProducer(tsdb.mergeStats) { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + SortedDocValues sorted = valuesProducer.getSorted(field); + NumericDocValues sortedOrds = new NumericDocValues() { + @Override + public long longValue() throws IOException { + return sorted.ordValue(); + } - @Override - public boolean advanceExact(int target) throws IOException { - return sorted.advanceExact(target); - } + @Override + public boolean advanceExact(int target) throws IOException { + return sorted.advanceExact(target); + } - @Override - public int docID() { - return sorted.docID(); - } + @Override + public int docID() { + return sorted.docID(); + } - @Override - public int nextDoc() throws IOException { - return sorted.nextDoc(); - } + @Override + public int nextDoc() throws IOException { + return sorted.nextDoc(); + } - @Override - public int advance(int target) throws IOException { - return sorted.advance(target); - } + @Override + public int advance(int target) throws IOException { + return sorted.advance(target); + } - @Override - public long cost() { - return sorted.cost(); - } - }; - return DocValues.singleton(sortedOrds); - } - }; + @Override + public long cost() { + return sorted.cost(); + } + }; + return DocValues.singleton(sortedOrds); + } + }; + } else { + producer = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + SortedDocValues sorted = valuesProducer.getSorted(field); + NumericDocValues sortedOrds = new NumericDocValues() { + @Override + public long longValue() throws IOException { + return sorted.ordValue(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + return sorted.advanceExact(target); + } + + @Override + public int docID() { + return sorted.docID(); + } + + @Override + public int nextDoc() throws IOException { + return sorted.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + return sorted.advance(target); + } + + @Override + public long cost() { + return sorted.cost(); + } + }; + return DocValues.singleton(sortedOrds); + } + }; + } if (field.docValuesSkipIndexType() != DocValuesSkipIndexType.NONE) { writeSkipIndex(field, producer); } @@ -331,7 +453,7 @@ public long cost() { } SortedDocValues sorted = valuesProducer.getSorted(field); int maxOrd = sorted.getValueCount(); - writeField(field, producer, maxOrd); + writeField(field, producer, maxOrd, null); addTermsDict(DocValues.singleton(valuesProducer.getSorted(field))); } @@ -490,36 +612,101 @@ private void writeSortedNumericField(FieldInfo field, DocValuesProducer valuesPr if (maxOrd > -1) { meta.writeByte((byte) 1); // multiValued (1 = multiValued) } - long[] stats = writeField(field, valuesProducer, maxOrd); - int numDocsWithField = Math.toIntExact(stats[0]); - long numValues = stats[1]; - assert numValues >= numDocsWithField; - meta.writeInt(numDocsWithField); - if (numValues > numDocsWithField) { - long start = data.getFilePointer(); - meta.writeLong(start); - meta.writeVInt(ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT); - - final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance( - meta, - data, - numDocsWithField + 1L, - ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT - ); - long addr = 0; - addressesWriter.add(addr); - SortedNumericDocValues values = valuesProducer.getSortedNumeric(field); - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - addr += values.docValueCount(); + if (valuesProducer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdbDocValuesProducer) { + int numDocsWithField = tsdbDocValuesProducer.mergeStats.sumNumDocsWithField(); + long numValues = tsdbDocValuesProducer.mergeStats.sumNumValues(); + if (numDocsWithField == numValues) { + writeField(field, valuesProducer, maxOrd, null); + assert numValues >= numDocsWithField; + } else { + assert numValues >= numDocsWithField; + + var addressMetaBuffer = new ByteBuffersDataOutput(); + String addressDataOutputName = null; + try ( + var addressMetaOutput = new ByteBuffersIndexOutput(addressMetaBuffer, "meta-temp", "meta-temp"); + // TODO: which IOContext should be used here? + var addressDataOutput = dir.createTempOutput(data.getName(), "address-data", IOContext.DEFAULT) + ) { + addressDataOutputName = addressDataOutput.getName(); + final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance( + addressMetaOutput, + addressDataOutput, + numDocsWithField + 1L, + ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT + ); + long[] addr = new long[1]; + addressesWriter.add(addr[0]); + writeField(field, valuesProducer, maxOrd, docValueCount -> { + addr[0] += docValueCount; + addressesWriter.add(addr[0]); + }); + addressesWriter.finish(); + + long start = data.getFilePointer(); + meta.writeLong(start); + meta.writeVInt(ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT); + addressMetaBuffer.copyTo(meta); + addressDataOutput.close(); + try ( + // TODO: which IOContext should be used here? + var addressDataInput = dir.openInput(addressDataOutput.getName(), IOContext.DEFAULT) + ) { + data.copyBytes(addressDataInput, addressDataInput.length()); + meta.writeLong(data.getFilePointer() - start); + } + } finally { + if (addressDataOutputName != null) { + org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(dir, addressDataOutputName); + } + } + } + } else { + long[] stats = writeField(field, valuesProducer, maxOrd, null); + int numDocsWithField = Math.toIntExact(stats[0]); + long numValues = stats[1]; + assert numValues >= numDocsWithField; + if (numValues > numDocsWithField) { + long start = data.getFilePointer(); + meta.writeLong(start); + meta.writeVInt(ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT); + + final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance( + meta, + data, + numDocsWithField + 1L, + ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT + ); + long addr = 0; addressesWriter.add(addr); + SortedNumericDocValues values = valuesProducer.getSortedNumeric(field); + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + addr += values.docValueCount(); + addressesWriter.add(addr); + } + addressesWriter.finish(); + meta.writeLong(data.getFilePointer() - start); } - addressesWriter.finish(); - meta.writeLong(data.getFilePointer() - start); } } - private static boolean isSingleValued(SortedSetDocValues values) throws IOException { + @Override + public void mergeSortedNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + addSortedNumericField(mergeFieldInfo, DocValuesConsumerUtil.mergeSortedNumericProducer(result, mergeFieldInfo, mergeState)); + } else { + super.mergeSortedNumericField(mergeFieldInfo, mergeState); + } + } + + private static boolean isSingleValued(FieldInfo field, DocValuesProducer producer) throws IOException { + if (producer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdb) { + return tsdb.mergeStats.sumNumValues() == tsdb.mergeStats.sumNumDocsWithField(); + } + + var values = producer.getSortedSet(field); if (DocValues.unwrapSingleton(values) != null) { return true; } @@ -535,78 +722,155 @@ private static boolean isSingleValued(SortedSetDocValues values) throws IOExcept return true; } + @Override + public void mergeSortedSetField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + addSortedSetField(mergeFieldInfo, DocValuesConsumerUtil.mergeSortedSetProducer(result, mergeFieldInfo, mergeState)); + } else { + super.mergeSortedSetField(mergeFieldInfo, mergeState); + } + } + @Override public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(SORTED_SET); - if (isSingleValued(valuesProducer.getSortedSet(field))) { - doAddSortedField(field, new EmptyDocValuesProducer() { - @Override - public SortedDocValues getSorted(FieldInfo field) throws IOException { - return SortedSetSelector.wrap(valuesProducer.getSortedSet(field), SortedSetSelector.Type.MIN); - } - }, true); + if (isSingleValued(field, valuesProducer)) { + if (valuesProducer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdb) { + doAddSortedField(field, new DocValuesConsumerUtil.TsdbDocValuesProducer(tsdb.mergeStats) { + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + return SortedSetSelector.wrap(valuesProducer.getSortedSet(field), SortedSetSelector.Type.MIN); + } + }, true); + } else { + doAddSortedField(field, new EmptyDocValuesProducer() { + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + return SortedSetSelector.wrap(valuesProducer.getSortedSet(field), SortedSetSelector.Type.MIN); + } + }, true); + } return; } SortedSetDocValues values = valuesProducer.getSortedSet(field); long maxOrd = values.getValueCount(); - writeSortedNumericField(field, new EmptyDocValuesProducer() { - @Override - public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { - SortedSetDocValues values = valuesProducer.getSortedSet(field); - return new SortedNumericDocValues() { + if (valuesProducer instanceof DocValuesConsumerUtil.TsdbDocValuesProducer tsdb) { + writeSortedNumericField(field, new DocValuesConsumerUtil.TsdbDocValuesProducer(tsdb.mergeStats) { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + SortedSetDocValues values = valuesProducer.getSortedSet(field); + return new SortedNumericDocValues() { - long[] ords = LongsRef.EMPTY_LONGS; - int i, docValueCount; + long[] ords = LongsRef.EMPTY_LONGS; + int i, docValueCount; - @Override - public long nextValue() { - return ords[i++]; - } + @Override + public long nextValue() { + return ords[i++]; + } - @Override - public int docValueCount() { - return docValueCount; - } + @Override + public int docValueCount() { + return docValueCount; + } - @Override - public boolean advanceExact(int target) { - throw new UnsupportedOperationException(); - } + @Override + public boolean advanceExact(int target) { + throw new UnsupportedOperationException(); + } - @Override - public int docID() { - return values.docID(); - } + @Override + public int docID() { + return values.docID(); + } - @Override - public int nextDoc() throws IOException { - int doc = values.nextDoc(); - if (doc != NO_MORE_DOCS) { - docValueCount = values.docValueCount(); - ords = ArrayUtil.grow(ords, docValueCount); - for (int j = 0; j < docValueCount; j++) { - ords[j] = values.nextOrd(); + @Override + public int nextDoc() throws IOException { + int doc = values.nextDoc(); + if (doc != NO_MORE_DOCS) { + docValueCount = values.docValueCount(); + ords = ArrayUtil.grow(ords, docValueCount); + for (int j = 0; j < docValueCount; j++) { + ords[j] = values.nextOrd(); + } + i = 0; } - i = 0; + return doc; } - return doc; - } - @Override - public int advance(int target) throws IOException { - throw new UnsupportedOperationException(); - } + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } - @Override - public long cost() { - return values.cost(); - } - }; - } - }, maxOrd); + @Override + public long cost() { + return values.cost(); + } + }; + } + }, maxOrd); + } else { + writeSortedNumericField(field, new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + SortedSetDocValues values = valuesProducer.getSortedSet(field); + return new SortedNumericDocValues() { + + long[] ords = LongsRef.EMPTY_LONGS; + int i, docValueCount; + + @Override + public long nextValue() { + return ords[i++]; + } + + @Override + public int docValueCount() { + return docValueCount; + } + + @Override + public boolean advanceExact(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public int docID() { + return values.docID(); + } + + @Override + public int nextDoc() throws IOException { + int doc = values.nextDoc(); + if (doc != NO_MORE_DOCS) { + docValueCount = values.docValueCount(); + ords = ArrayUtil.grow(ords, docValueCount); + for (int j = 0; j < docValueCount; j++) { + ords[j] = values.nextOrd(); + } + i = 0; + } + return doc; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return values.cost(); + } + }; + } + }, maxOrd); + } addTermsDict(valuesProducer.getSortedSet(field)); } diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java index 496c41b42869a..ca8fc2e9774ee 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java @@ -13,6 +13,7 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; +import org.elasticsearch.common.util.FeatureFlag; import java.io.IOException; @@ -75,25 +76,48 @@ public class ES87TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesF } } + // Default for escape hatch: + static final boolean OPTIMIZED_MERGE_ENABLE_DEFAULT; + static final FeatureFlag TSDB_DOC_VALUES_OPTIMIZED_MERGE = new FeatureFlag("tsdb_doc_values_optimized_merge"); + static final String OPTIMIZED_MERGE_ENABLED_NAME = ES87TSDBDocValuesConsumer.class.getName() + ".enableOptimizedMerge"; + + static { + boolean optimizedMergeDefault = TSDB_DOC_VALUES_OPTIMIZED_MERGE.isEnabled(); + OPTIMIZED_MERGE_ENABLE_DEFAULT = Boolean.parseBoolean( + System.getProperty(OPTIMIZED_MERGE_ENABLED_NAME, Boolean.toString(optimizedMergeDefault)) + ); + } + private final int skipIndexIntervalSize; + // TODO: remove escape hatch? Is useful now when testing/benchmarking, but current optimized merge logic currently do too scary things. + private final boolean enableOptimizedMerge; /** Default constructor. */ public ES87TSDBDocValuesFormat() { - this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE); + this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE, OPTIMIZED_MERGE_ENABLE_DEFAULT); } /** Doc values fields format with specified skipIndexIntervalSize. */ - public ES87TSDBDocValuesFormat(int skipIndexIntervalSize) { + public ES87TSDBDocValuesFormat(int skipIndexIntervalSize, boolean enableOptimizedMerge) { super(CODEC_NAME); if (skipIndexIntervalSize < 2) { throw new IllegalArgumentException("skipIndexIntervalSize must be > 1, got [" + skipIndexIntervalSize + "]"); } this.skipIndexIntervalSize = skipIndexIntervalSize; + this.enableOptimizedMerge = enableOptimizedMerge; } @Override public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { - return new ES87TSDBDocValuesConsumer(state, skipIndexIntervalSize, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION); + return new ES87TSDBDocValuesConsumer( + state, + enableOptimizedMerge, + skipIndexIntervalSize, + DATA_CODEC, + DATA_EXTENSION, + META_CODEC, + META_EXTENSION + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java index 7edef480dd7b8..3a5fa6b4ec8b2 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java @@ -50,15 +50,15 @@ import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.TERMS_DICT_BLOCK_LZ4_SHIFT; public class ES87TSDBDocValuesProducer extends DocValuesProducer { - private final IntObjectHashMap numerics; + final IntObjectHashMap numerics; private final IntObjectHashMap binaries; - private final IntObjectHashMap sorted; - private final IntObjectHashMap sortedSets; - private final IntObjectHashMap sortedNumerics; + final IntObjectHashMap sorted; + final IntObjectHashMap sortedSets; + final IntObjectHashMap sortedNumerics; private final IntObjectHashMap skippers; private final IndexInput data; private final int maxDoc; - private final int version; + final int version; private final boolean merging; ES87TSDBDocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) @@ -359,7 +359,7 @@ public long cost() { }; } - private abstract class BaseSortedDocValues extends SortedDocValues { + abstract class BaseSortedDocValues extends SortedDocValues { final SortedEntry entry; final TermsEnum termsEnum; @@ -395,7 +395,7 @@ public TermsEnum termsEnum() throws IOException { } } - private abstract static class BaseSortedSetDocValues extends SortedSetDocValues { + abstract static class BaseSortedSetDocValues extends SortedSetDocValues { final SortedSetEntry entry; final IndexInput data; @@ -904,11 +904,8 @@ private static DocValuesSkipperEntry readDocValueSkipperMeta(IndexInput meta) th } private static void readNumeric(IndexInput meta, NumericEntry entry) throws IOException { - entry.docsWithFieldOffset = meta.readLong(); - entry.docsWithFieldLength = meta.readLong(); - entry.jumpTableEntryCount = meta.readShort(); - entry.denseRankPower = meta.readByte(); entry.numValues = meta.readLong(); + entry.numDocsWithField = meta.readInt(); if (entry.numValues > 0) { final int indexBlockShift = meta.readInt(); // Special case, -1 means there are no blocks, so no need to load the metadata for it @@ -925,6 +922,10 @@ private static void readNumeric(IndexInput meta, NumericEntry entry) throws IOEx entry.valuesOffset = meta.readLong(); entry.valuesLength = meta.readLong(); } + entry.docsWithFieldOffset = meta.readLong(); + entry.docsWithFieldLength = meta.readLong(); + entry.jumpTableEntryCount = meta.readShort(); + entry.denseRankPower = meta.readByte(); } private BinaryEntry readBinary(IndexInput meta) throws IOException { @@ -959,7 +960,6 @@ private static SortedNumericEntry readSortedNumeric(IndexInput meta) throws IOEx private static SortedNumericEntry readSortedNumeric(IndexInput meta, SortedNumericEntry entry) throws IOException { readNumeric(meta, entry); - entry.numDocsWithField = meta.readInt(); if (entry.numDocsWithField != entry.numValues) { entry.addressesOffset = meta.readLong(); final int blockShift = meta.readVInt(); @@ -1031,7 +1031,7 @@ private NumericDocValues getNumeric(NumericEntry entry, long maxOrd) throws IOEx // Special case for maxOrd 1, no need to read blocks and use ordinal 0 as only value if (entry.docsWithFieldOffset == -1) { // Special case when all docs have a value - return new NumericDocValues() { + return new BaseNumericDocValues(entry) { private final int maxDoc = ES87TSDBDocValuesProducer.this.maxDoc; private int doc = -1; @@ -1080,7 +1080,7 @@ public long cost() { entry.denseRankPower, entry.numValues ); - return new NumericDocValues() { + return new BaseNumericDocValues(entry) { @Override public int advance(int target) throws IOException { @@ -1125,7 +1125,7 @@ public long longValue() { final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1; if (entry.docsWithFieldOffset == -1) { // dense - return new NumericDocValues() { + return new BaseNumericDocValues(entry) { private final int maxDoc = ES87TSDBDocValuesProducer.this.maxDoc; private int doc = -1; @@ -1192,7 +1192,7 @@ public long longValue() throws IOException { entry.denseRankPower, entry.numValues ); - return new NumericDocValues() { + return new BaseNumericDocValues(entry) { private final TSDBDocValuesEncoder decoder = new TSDBDocValuesEncoder(ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); private long currentBlockIndex = -1; @@ -1247,6 +1247,15 @@ public long longValue() throws IOException { } } + abstract static class BaseNumericDocValues extends NumericDocValues { + + final NumericEntry entry; + + BaseNumericDocValues(NumericEntry entry) { + this.entry = entry; + } + } + private NumericValues getValues(NumericEntry entry, final long maxOrd) throws IOException { assert entry.numValues > 0; final RandomAccessInput indexSlice = data.randomAccessSlice(entry.indexOffset, entry.indexLength); @@ -1283,7 +1292,49 @@ long advance(long index) throws IOException { private SortedNumericDocValues getSortedNumeric(SortedNumericEntry entry, long maxOrd) throws IOException { if (entry.numValues == entry.numDocsWithField) { - return DocValues.singleton(getNumeric(entry, maxOrd)); + var numeric = getNumeric(entry, maxOrd); + if (merging) { + return new BaseSortedNumericDocValues(entry) { + + @Override + public long nextValue() throws IOException { + return numeric.longValue(); + } + + @Override + public int docValueCount() { + return 1; + } + + @Override + public boolean advanceExact(int target) throws IOException { + return numeric.advanceExact(target); + } + + @Override + public int docID() { + return numeric.docID(); + } + + @Override + public int nextDoc() throws IOException { + return numeric.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + return numeric.advance(target); + } + + @Override + public long cost() { + return numeric.cost(); + } + }; + } else { + // Required otherwise search / compute engine can't otherwise optimize for when each document has exactly one value: + return DocValues.singleton(numeric); + } } final RandomAccessInput addressesInput = data.randomAccessSlice(entry.addressesOffset, entry.addressesLength); @@ -1293,7 +1344,7 @@ private SortedNumericDocValues getSortedNumeric(SortedNumericEntry entry, long m if (entry.docsWithFieldOffset == -1) { // dense - return new SortedNumericDocValues() { + return new BaseSortedNumericDocValues(entry) { int doc = -1; long start, end; @@ -1354,7 +1405,7 @@ public int docValueCount() { entry.denseRankPower, entry.numDocsWithField ); - return new SortedNumericDocValues() { + return new BaseSortedNumericDocValues(entry) { boolean set; long start, end; @@ -1413,9 +1464,18 @@ private void set() { } } + abstract static class BaseSortedNumericDocValues extends SortedNumericDocValues { + + final SortedNumericEntry entry; + + BaseSortedNumericDocValues(SortedNumericEntry entry) { + this.entry = entry; + } + } + private record DocValuesSkipperEntry(long offset, long length, long minValue, long maxValue, int docCount, int maxDocId) {} - private static class NumericEntry { + static class NumericEntry { long docsWithFieldOffset; long docsWithFieldLength; short jumpTableEntryCount; @@ -1426,6 +1486,7 @@ private static class NumericEntry { DirectMonotonicReader.Meta indexMeta; long valuesOffset; long valuesLength; + int numDocsWithField; } private static class BinaryEntry { @@ -1443,19 +1504,18 @@ private static class BinaryEntry { DirectMonotonicReader.Meta addressesMeta; } - private static class SortedNumericEntry extends NumericEntry { - int numDocsWithField; + static class SortedNumericEntry extends NumericEntry { DirectMonotonicReader.Meta addressesMeta; long addressesOffset; long addressesLength; } - private static class SortedEntry { + static class SortedEntry { NumericEntry ordsEntry; TermsDictEntry termsDictEntry; } - private static class SortedSetEntry { + static class SortedSetEntry { SortedEntry singleValueEntry; SortedNumericEntry ordsEntry; TermsDictEntry termsDictEntry; diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/IndexedDISIBuilder.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/IndexedDISIBuilder.java new file mode 100644 index 0000000000000..4c8c639b5cca4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/IndexedDISIBuilder.java @@ -0,0 +1,172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb; + +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BitSetIterator; +import org.apache.lucene.util.FixedBitSet; + +import java.io.IOException; + +/** + * Fork of {@link org.apache.lucene.codecs.lucene90.IndexedDISI#writeBitSet(DocIdSetIterator, IndexOutput)} but that allows + * building jump list iteratively with one docid at a time instead of relying on docidset iterator. + */ +final class IndexedDISIBuilder { + + private static final int BLOCK_SIZE = 65536; // The number of docIDs that a single block represents + + private static final int DENSE_BLOCK_LONGS = BLOCK_SIZE / Long.SIZE; // 1024 + public static final byte DEFAULT_DENSE_RANK_POWER = 9; // Every 512 docIDs / 8 longs + + static final int MAX_ARRAY_LENGTH = (1 << 12) - 1; + + final IndexOutput out; + final byte denseRankPower; + final long origo; + + int totalCardinality = 0; + int blockCardinality = 0; + final FixedBitSet buffer = new FixedBitSet(1 << 16); + int[] jumps = new int[ArrayUtil.oversize(1, Integer.BYTES * 2)]; + int prevBlock = -1; + int jumpBlockIndex = 0; + + IndexedDISIBuilder(IndexOutput out, byte denseRankPower) { + this.out = out; + this.denseRankPower = denseRankPower; + + this.origo = out.getFilePointer(); // All jumps are relative to the origo + if ((denseRankPower < 7 || denseRankPower > 15) && denseRankPower != -1) { + throw new IllegalArgumentException( + "Acceptable values for denseRankPower are 7-15 (every 128-32768 docIDs). " + + "The provided power was " + + denseRankPower + + " (every " + + (int) Math.pow(2, denseRankPower) + + " docIDs)" + ); + } + } + + void addDocId(int doc) throws IOException { + final int block = doc >>> 16; + if (prevBlock != -1 && block != prevBlock) { + // Track offset+index from previous block up to current + jumps = addJumps(jumps, out.getFilePointer() - origo, totalCardinality, jumpBlockIndex, prevBlock + 1); + jumpBlockIndex = prevBlock + 1; + // Flush block + flush(prevBlock, buffer, blockCardinality, denseRankPower, out); + // Reset for next block + buffer.clear(); + totalCardinality += blockCardinality; + blockCardinality = 0; + } + buffer.set(doc & 0xFFFF); + blockCardinality++; + prevBlock = block; + } + + short build() throws IOException { + if (blockCardinality > 0) { + jumps = addJumps(jumps, out.getFilePointer() - origo, totalCardinality, jumpBlockIndex, prevBlock + 1); + totalCardinality += blockCardinality; + flush(prevBlock, buffer, blockCardinality, denseRankPower, out); + buffer.clear(); + prevBlock++; + } + final int lastBlock = prevBlock == -1 ? 0 : prevBlock; // There will always be at least 1 block (NO_MORE_DOCS) + // Last entry is a SPARSE with blockIndex == 32767 and the single entry 65535, which becomes the + // docID NO_MORE_DOCS + // To avoid creating 65K jump-table entries, only a single entry is created pointing to the + // offset of the + // NO_MORE_DOCS block, with the jumpBlockIndex set to the logical EMPTY block after all real + // blocks. + jumps = addJumps(jumps, out.getFilePointer() - origo, totalCardinality, lastBlock, lastBlock + 1); + buffer.set(DocIdSetIterator.NO_MORE_DOCS & 0xFFFF); + flush(DocIdSetIterator.NO_MORE_DOCS >>> 16, buffer, 1, denseRankPower, out); + // offset+index jump-table stored at the end + return flushBlockJumps(jumps, lastBlock + 1, out); + } + + // Adds entries to the offset & index jump-table for blocks + private static int[] addJumps(int[] jumps, long offset, int index, int startBlock, int endBlock) { + assert offset < Integer.MAX_VALUE : "Logically the offset should not exceed 2^30 but was >= Integer.MAX_VALUE"; + jumps = ArrayUtil.grow(jumps, (endBlock + 1) * 2); + for (int b = startBlock; b < endBlock; b++) { + jumps[b * 2] = index; + jumps[b * 2 + 1] = (int) offset; + } + return jumps; + } + + private static void flush(int block, FixedBitSet buffer, int cardinality, byte denseRankPower, IndexOutput out) throws IOException { + assert block >= 0 && block < BLOCK_SIZE; + out.writeShort((short) block); + assert cardinality > 0 && cardinality <= BLOCK_SIZE; + out.writeShort((short) (cardinality - 1)); + if (cardinality > MAX_ARRAY_LENGTH) { + if (cardinality != BLOCK_SIZE) { // all docs are set + if (denseRankPower != -1) { + final byte[] rank = createRank(buffer, denseRankPower); + out.writeBytes(rank, rank.length); + } + for (long word : buffer.getBits()) { + out.writeLong(word); + } + } + } else { + BitSetIterator it = new BitSetIterator(buffer, cardinality); + for (int doc = it.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = it.nextDoc()) { + out.writeShort((short) doc); + } + } + } + + // Flushes the offset & index jump-table for blocks. This should be the last data written to out + // This method returns the blockCount for the blocks reachable for the jump_table or -1 for no + // jump-table + private static short flushBlockJumps(int[] jumps, int blockCount, IndexOutput out) throws IOException { + if (blockCount == 2) { // Jumps with a single real entry + NO_MORE_DOCS is just wasted space so we ignore + // that + blockCount = 0; + } + for (int i = 0; i < blockCount; i++) { + out.writeInt(jumps[i * 2]); // index + out.writeInt(jumps[i * 2 + 1]); // offset + } + // As there are at most 32k blocks, the count is a short + // The jumpTableOffset will be at lastPos - (blockCount * Long.BYTES) + return (short) blockCount; + } + + // Creates a DENSE rank-entry (the number of set bits up to a given point) for the buffer. + // One rank-entry for every {@code 2^denseRankPower} bits, with each rank-entry using 2 bytes. + // Represented as a byte[] for fast flushing and mirroring of the retrieval representation. + private static byte[] createRank(FixedBitSet buffer, byte denseRankPower) { + final int longsPerRank = 1 << (denseRankPower - 6); + final int rankMark = longsPerRank - 1; + final int rankIndexShift = denseRankPower - 7; // 6 for the long (2^6) + 1 for 2 bytes/entry + final byte[] rank = new byte[DENSE_BLOCK_LONGS >> rankIndexShift]; + final long[] bits = buffer.getBits(); + int bitCount = 0; + for (int word = 0; word < DENSE_BLOCK_LONGS; word++) { + if ((word & rankMark) == 0) { // Every longsPerRank longs + rank[word >> rankIndexShift] = (byte) (bitCount >> 8); + rank[(word >> rankIndexShift) + 1] = (byte) (bitCount & 0xFF); + } + bitCount += Long.bitCount(bits[word]); + } + return rank; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java index 71ca8a37098f5..89b6b214f346a 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java @@ -13,32 +13,40 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.StoredFields; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.tests.index.BaseDocValuesFormatTestCase; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.hamcrest.Matchers.equalTo; @@ -246,4 +254,217 @@ public void testManyDocsWithManyValues() throws Exception { } } } + + public void testForceMergeDenseCase() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + int numDocs = 256 + random().nextInt(1024); + int numHosts = numDocs / 20; + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + + int batchIndex = i / numHosts; + String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + long timestamp = baseTimestamp + (1000L * i); + + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + d.add(new NumericDocValuesField("counter_1", counter1++)); + d.add(new SortedNumericDocValuesField("counter_2", counter2++)); + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + int numTags = 1 + random().nextInt(8); + for (int j = 0; j < numTags; j++) { + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + + iw.forceMerge(1); + + // For asserting using binary search later on: + Arrays.sort(gauge2Values); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var counterOneDV = leaf.getNumericDocValues("counter_1"); + assertNotNull(counterOneDV); + var counterTwoDV = leaf.getSortedNumericDocValues("counter_2"); + assertNotNull(counterTwoDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var gaugeTwoDV = leaf.getSortedNumericDocValues("gauge_2"); + assertNotNull(gaugeTwoDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + int batchIndex = i / numHosts; + assertEquals(batchIndex, hostNameDV.ordValue()); + String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + assertEquals(expectedHostName, hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString()); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + (1000L * numDocs); + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + + assertEquals(i, counterOneDV.nextDoc()); + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + + assertEquals(i, counterTwoDV.nextDoc()); + assertEquals(1, counterTwoDV.docValueCount()); + long counterTwoValue = counterTwoDV.nextValue(); + assertTrue("unexpected counter [" + counterTwoValue + "]", counterTwoValue > 0 && counterTwoValue <= counter2); + + assertEquals(i, gaugeOneDV.nextDoc()); + assertEquals(1, gaugeOneDV.docValueCount()); + long gaugeOneValue = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0); + + assertEquals(i, gaugeTwoDV.nextDoc()); + assertEquals(1, gaugeTwoDV.docValueCount()); + long gaugeTwoValue = gaugeTwoDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0); + + assertEquals(i, tagsDV.nextDoc()); + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + + public void testWithNoValueMultiValue() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + int numRounds = 32 + random().nextInt(32); + int numDocsPerRound = 64 + random().nextInt(64); + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + { + long timestamp = baseTimestamp; + for (int i = 0; i < numRounds; i++) { + int r = random().nextInt(10); + for (int j = 0; j < numDocsPerRound; j++) { + var d = new Document(); + // host in reverse, otherwise merging will detect that segments are already ordered and will use sequential docid + // merger: + String hostName = String.format(Locale.ROOT, "host-%03d", numRounds - i); + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp++)); + + if (r % 10 == 5) { + // sometimes no values + } else if (r % 10 > 5) { + // often single value: + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[j % gauge1Values.length])); + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j % tags.length]))); + } else { + // otherwise multiple values: + int numValues = 2 + random().nextInt(4); + for (int k = 0; k < numValues; k++) { + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[(j + k) % gauge1Values.length])); + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(j + k) % tags.length]))); + } + } + iw.addDocument(d); + } + iw.commit(); + } + iw.forceMerge(1); + } + + int numDocs = numRounds * numDocsPerRound; + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); + assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + numDocs; + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + if (gaugeOneDV.advanceExact(i)) { + for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { + long value = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); + } + } + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + } + + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { + var config = new IndexWriterConfig(); + config.setIndexSort( + new Sort( + new SortField(hostnameField, SortField.Type.STRING, false), + new SortedNumericSortField(timestampField, SortField.Type.LONG, true) + ) + ); + config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + config.setMergePolicy(new LogByteSizeMergePolicy()); + config.setCodec(getCodec()); + return config; + } } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatVariableSkipIntervalTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatVariableSkipIntervalTests.java index 099b59808ef4a..8a4a5c59d1a9c 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatVariableSkipIntervalTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatVariableSkipIntervalTests.java @@ -32,13 +32,13 @@ public class ES87TSDBDocValuesFormatVariableSkipIntervalTests extends BaseDocVal @Override protected Codec getCodec() { // small interval size to test with many intervals - return TestUtil.alwaysDocValuesFormat(new ES87TSDBDocValuesFormat(random().nextInt(4, 16))); + return TestUtil.alwaysDocValuesFormat(new ES87TSDBDocValuesFormat(random().nextInt(4, 16), false)); } public void testSkipIndexIntervalSize() { IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new ES87TSDBDocValuesFormat(random().nextInt(Integer.MIN_VALUE, 2)) + () -> new ES87TSDBDocValuesFormat(random().nextInt(Integer.MIN_VALUE, 2), false) ); assertTrue(ex.getMessage().contains("skipIndexIntervalSize must be > 1")); }