From 74ff36f7697e8866ea8ad8192cb952b019021a09 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 9 Apr 2025 07:50:16 +0200 Subject: [PATCH 1/3] [8.x] First step optimizing tsdb doc values codec merging. Backporting #125403 to the 8.x branch. The doc values codec iterates a few times over the doc value instance that needs to be written to disk. In case when merging and index sorting is enabled, this is much more expensive, as each time the doc values instance is iterated a merge sorting is performed (in order to get the doc ids of new segment in order of index sorting). There are several reasons why the doc value instance is iterated multiple times: * To compute stats (num values, number of docs with value) required for writing values to disk. * To write bitset that indicate which documents have a value. (indexed disi, jump table) * To write the actual values to disk. * To write the addresses to disk (in case docs have multiple values) This applies for numeric doc values, but also for the ordinals of sorted (set) doc values. This PR addresses solving the first reason why doc value instance needs to be iterated. This is done only when in case of merging and when the segments to be merged with are also of type es87 doc values, codec version is the same and there are no deletes. Note this optimized merged is behind a feature flag for now. --- .../tsdb/TSDBDocValuesMergeBenchmark.java | 196 +++++ docs/changelog/125403.yaml | 5 + server/src/main/java/module-info.java | 1 + .../index/codec/Elasticsearch816Codec.java | 4 +- .../perfield/XPerFieldDocValuesFormat.java | 365 +++++++++ .../codec/perfield/XPerFieldMergeState.java | 252 ++++++ .../tsdb/es819/DocValuesConsumerUtil.java | 93 +++ .../es819/ES819TSDBDocValuesConsumer.java | 99 ++- .../tsdb/es819/ES819TSDBDocValuesFormat.java | 22 +- .../es819/ES819TSDBDocValuesProducer.java | 16 +- .../tsdb/es819/TsdbDocValuesProducer.java | 66 ++ .../codec/tsdb/es819/XDocValuesConsumer.java | 759 ++++++++++++++++++ .../codec/tsdb/DocValuesCodecDuelTests.java | 19 +- .../tsdb/ES87TSDBDocValuesFormatTests.java | 18 +- .../codec/tsdb/TsdbDocValueBwcTests.java | 184 +++-- .../es819/ES819TSDBDocValuesFormatTests.java | 381 ++++++++- 16 files changed, 2379 insertions(+), 101 deletions(-) create mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java create mode 100644 docs/changelog/125403.yaml create mode 100644 server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java create mode 100644 server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java create mode 100644 server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java create mode 100644 server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java create mode 100644 server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java new file mode 100644 index 0000000000000..e3eb3405038dd --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java @@ -0,0 +1,196 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.index.codec.tsdb; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.profile.AsyncProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Fork(1) +@Threads(1) +@Warmup(iterations = 0) +@Measurement(iterations = 1) +public class TSDBDocValuesMergeBenchmark { + + static { + // For Elasticsearch900Lucene101Codec: + LogConfigurator.loadLog4jPlugins(); + LogConfigurator.configureESLogging(); + LogConfigurator.setNodeName("test"); + } + + @Param("20431204") + private int nDocs; + + @Param("1000") + private int deltaTime; + + @Param("42") + private int seed; + + private static final String TIMESTAMP_FIELD = "@timestamp"; + private static final String HOSTNAME_FIELD = "host.name"; + private static final long BASE_TIMESTAMP = 1704067200000L; + + private IndexWriter indexWriterWithoutOptimizedMerge; + private IndexWriter indexWriterWithOptimizedMerge; + private ExecutorService executorService; + + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName()) + .addProfiler(AsyncProfiler.class) + .build(); + + new Runner(options).run(); + } + + @Setup(Level.Trial) + public void setup() throws IOException { + executorService = Executors.newSingleThreadExecutor(); + + final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-")); + final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-")); + + indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false); + indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true); + } + + private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException { + final var iwc = createIndexWriterConfig(optimizedMergeEnabled); + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + int numHosts = 1000; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + final Random random = new Random(seed); + IndexWriter indexWriter = new IndexWriter(directory, iwc); + for (int i = 0; i < nDocs; i++) { + final Document doc = new Document(); + + final int batchIndex = i / numHosts; + final String hostName = "host-" + batchIndex; + // Slightly vary the timestamp in each document + final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime); + + doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName))); + doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp)); + doc.add(new SortedNumericDocValuesField("counter_1", counter1++)); + doc.add(new SortedNumericDocValuesField("counter_2", counter2++)); + doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + int numTags = tags.length % (i + 1); + for (int j = 0; j < numTags; j++) { + doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + + indexWriter.addDocument(doc); + } + indexWriter.commit(); + return indexWriter; + } + + @Benchmark + public void forceMergeWithoutOptimizedMerge() throws IOException { + forceMerge(indexWriterWithoutOptimizedMerge); + } + + @Benchmark + public void forceMergeWithOptimizedMerge() throws IOException { + forceMerge(indexWriterWithOptimizedMerge); + } + + private void forceMerge(final IndexWriter indexWriter) throws IOException { + indexWriter.forceMerge(1); + } + + @TearDown(Level.Trial) + public void tearDown() { + if (executorService != null) { + executorService.shutdown(); + try { + if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeEnabled) { + var config = new IndexWriterConfig(new StandardAnalyzer()); + // NOTE: index sort config matching LogsDB's sort order + config.setIndexSort( + new Sort( + new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false), + new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true) + ) + ); + config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + config.setMergePolicy(new LogByteSizeMergePolicy()); + var docValuesFormat = new ES819TSDBDocValuesFormat(4096, optimizedMergeEnabled); + config.setCodec(new Elasticsearch900Lucene101Codec() { + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }); + return config; + } +} diff --git a/docs/changelog/125403.yaml b/docs/changelog/125403.yaml new file mode 100644 index 0000000000000..d953dae4db4fe --- /dev/null +++ b/docs/changelog/125403.yaml @@ -0,0 +1,5 @@ +pr: 125403 +summary: First step optimizing tsdb doc values codec merging +area: Codec +type: enhancement +issues: [] diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index b24e63c700cbc..c90c6f7986a80 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -479,4 +479,5 @@ exports org.elasticsearch.inference.configuration; exports org.elasticsearch.monitor.metrics; exports org.elasticsearch.plugins.internal.rewriter to org.elasticsearch.inference; + exports org.elasticsearch.index.codec.perfield; } diff --git a/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch816Codec.java b/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch816Codec.java index 00711c7ecc306..a62f34acc0b9a 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch816Codec.java +++ b/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch816Codec.java @@ -17,9 +17,9 @@ import org.apache.lucene.codecs.lucene912.Lucene912Codec; import org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat; import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat; -import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat; import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat; import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat; +import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat; /** @@ -39,7 +39,7 @@ public PostingsFormat getPostingsFormatForField(String field) { }; private final DocValuesFormat defaultDVFormat; - private final DocValuesFormat docValuesFormat = new PerFieldDocValuesFormat() { + private final DocValuesFormat docValuesFormat = new XPerFieldDocValuesFormat() { @Override public DocValuesFormat getDocValuesFormatForField(String field) { return Elasticsearch816Codec.this.getDocValuesFormatForField(field); diff --git a/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java new file mode 100644 index 0000000000000..2d61c023ebe4a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java @@ -0,0 +1,365 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.perfield; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.core.SuppressForbidden; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Map; + +/** + * Fork of {@link PerFieldDocValuesFormat} to allow access FieldsReader's fields field, otherwise no changes. + */ +public abstract class XPerFieldDocValuesFormat extends DocValuesFormat { + /** Name of this {@link DocValuesFormat}. */ + public static final String PER_FIELD_NAME = "ESPerFieldDV819"; + + /** {@link FieldInfo} attribute name used to store the format name for each field. */ + // FORK note: usage of PerFieldDocValuesFormat is needed for bwc purposes. + // (Otherwise, we load no fields from indices that use PerFieldDocValuesFormat) + public static final String PER_FIELD_FORMAT_KEY = PerFieldDocValuesFormat.class.getSimpleName() + ".format"; + + /** {@link FieldInfo} attribute name used to store the segment suffix name for each field. */ + // FORK note: usage of PerFieldDocValuesFormat is needed for bwc purposes. + public static final String PER_FIELD_SUFFIX_KEY = PerFieldDocValuesFormat.class.getSimpleName() + ".suffix"; + + /** Sole constructor. */ + protected XPerFieldDocValuesFormat() { + super(PER_FIELD_NAME); + } + + @Override + public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + return new FieldsWriter(state); + } + + record ConsumerAndSuffix(DocValuesConsumer consumer, int suffix) implements Closeable { + @Override + public void close() throws IOException { + consumer.close(); + } + } + + @SuppressForbidden(reason = "forked from Lucene") + private class FieldsWriter extends DocValuesConsumer { + + private final Map formats = new HashMap<>(); + private final Map suffixes = new HashMap<>(); + + private final SegmentWriteState segmentWriteState; + + FieldsWriter(SegmentWriteState state) { + segmentWriteState = state; + } + + @Override + public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addNumericField(field, valuesProducer); + } + + @Override + public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addBinaryField(field, valuesProducer); + } + + @Override + public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addSortedField(field, valuesProducer); + } + + @Override + public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addSortedNumericField(field, valuesProducer); + } + + @Override + public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addSortedSetField(field, valuesProducer); + } + + @Override + public void merge(MergeState mergeState) throws IOException { + Map> consumersToField = new IdentityHashMap<>(); + + // Group each consumer by the fields it handles + for (FieldInfo fi : mergeState.mergeFieldInfos) { + if (fi.getDocValuesType() == DocValuesType.NONE) { + continue; + } + // merge should ignore current format for the fields being merged + DocValuesConsumer consumer = getInstance(fi, true); + Collection fieldsForConsumer = consumersToField.get(consumer); + if (fieldsForConsumer == null) { + fieldsForConsumer = new ArrayList<>(); + consumersToField.put(consumer, fieldsForConsumer); + } + fieldsForConsumer.add(fi.name); + } + + // Delegate the merge to the appropriate consumer + for (Map.Entry> e : consumersToField.entrySet()) { + e.getKey().merge(XPerFieldMergeState.restrictFields(mergeState, e.getValue())); + } + } + + private DocValuesConsumer getInstance(FieldInfo field) throws IOException { + return getInstance(field, false); + } + + /** + * DocValuesConsumer for the given field. + * + * @param field - FieldInfo object. + * @param ignoreCurrentFormat - ignore the existing format attributes. + * @return DocValuesConsumer for the field. + * @throws IOException if there is a low-level IO error + */ + private DocValuesConsumer getInstance(FieldInfo field, boolean ignoreCurrentFormat) throws IOException { + DocValuesFormat format = null; + if (field.getDocValuesGen() != -1) { + String formatName = null; + if (ignoreCurrentFormat == false) { + formatName = field.getAttribute(PER_FIELD_FORMAT_KEY); + } + // this means the field never existed in that segment, yet is applied updates + if (formatName != null) { + format = DocValuesFormat.forName(formatName); + } + } + if (format == null) { + format = getDocValuesFormatForField(field.name); + } + if (format == null) { + throw new IllegalStateException("invalid null DocValuesFormat for field=\"" + field.name + "\""); + } + final String formatName = format.getName(); + + field.putAttribute(PER_FIELD_FORMAT_KEY, formatName); + Integer suffix = null; + + ConsumerAndSuffix consumer = formats.get(format); + if (consumer == null) { + // First time we are seeing this format; create a new instance + + if (field.getDocValuesGen() != -1) { + String suffixAtt = null; + if (ignoreCurrentFormat == false) { + suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY); + } + // even when dvGen is != -1, it can still be a new field, that never + // existed in the segment, and therefore doesn't have the recorded + // attributes yet. + if (suffixAtt != null) { + suffix = Integer.valueOf(suffixAtt); + } + } + + if (suffix == null) { + // bump the suffix + suffix = suffixes.get(formatName); + if (suffix == null) { + suffix = 0; + } else { + suffix = suffix + 1; + } + } + suffixes.put(formatName, suffix); + + final String segmentSuffix = getFullSegmentSuffix( + segmentWriteState.segmentSuffix, + getSuffix(formatName, Integer.toString(suffix)) + ); + consumer = new ConsumerAndSuffix(format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix)), suffix); + formats.put(format, consumer); + } else { + // we've already seen this format, so just grab its suffix + assert suffixes.containsKey(formatName); + suffix = consumer.suffix; + } + + field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix)); + // TODO: we should only provide the "slice" of FIS + // that this DVF actually sees ... + return consumer.consumer; + } + + @Override + public void close() throws IOException { + // Close all subs + IOUtils.close(formats.values()); + } + } + + static String getSuffix(String formatName, String suffix) { + return formatName + "_" + suffix; + } + + static String getFullSegmentSuffix(String outerSegmentSuffix, String segmentSuffix) { + if (outerSegmentSuffix.length() == 0) { + return segmentSuffix; + } else { + return outerSegmentSuffix + "_" + segmentSuffix; + } + } + + @SuppressForbidden(reason = "forked from Lucene") + public static class FieldsReader extends DocValuesProducer { + + private final IntObjectHashMap fields = new IntObjectHashMap<>(); + private final Map formats = new HashMap<>(); + + // clone for merge + FieldsReader(FieldsReader other) { + Map oldToNew = new IdentityHashMap<>(); + // First clone all formats + for (Map.Entry ent : other.formats.entrySet()) { + DocValuesProducer values = ent.getValue().getMergeInstance(); + formats.put(ent.getKey(), values); + oldToNew.put(ent.getValue(), values); + } + + // Then rebuild fields: + for (IntObjectHashMap.IntObjectCursor ent : other.fields) { + DocValuesProducer producer = oldToNew.get(ent.value); + assert producer != null; + fields.put(ent.key, producer); + } + } + + FieldsReader(final SegmentReadState readState) throws IOException { + + // Init each unique format: + boolean success = false; + try { + // Read field name -> format name + for (FieldInfo fi : readState.fieldInfos) { + if (fi.getDocValuesType() != DocValuesType.NONE) { + final String fieldName = fi.name; + final String formatName = fi.getAttribute(PER_FIELD_FORMAT_KEY); + if (formatName != null) { + // null formatName means the field is in fieldInfos, but has no docvalues! + final String suffix = fi.getAttribute(PER_FIELD_SUFFIX_KEY); + if (suffix == null) { + throw new IllegalStateException("missing attribute: " + PER_FIELD_SUFFIX_KEY + " for field: " + fieldName); + } + DocValuesFormat format = DocValuesFormat.forName(formatName); + String segmentSuffix = getFullSegmentSuffix(readState.segmentSuffix, getSuffix(formatName, suffix)); + if (formats.containsKey(segmentSuffix) == false) { + formats.put(segmentSuffix, format.fieldsProducer(new SegmentReadState(readState, segmentSuffix))); + } + fields.put(fi.number, formats.get(segmentSuffix)); + } + } + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(formats.values()); + } + } + } + + // FORK note: the reason why PerFieldDocValuesFormat is forked: + public DocValuesProducer getDocValuesProducer(FieldInfo field) { + return fields.get(field.number); + } + + public Map getFormats() { + return formats; + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getNumeric(field); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getBinary(field); + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSorted(field); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSortedNumeric(field); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSortedSet(field); + } + + @Override + public void close() throws IOException { + IOUtils.close(formats.values()); + } + + @Override + public void checkIntegrity() throws IOException { + for (DocValuesProducer format : formats.values()) { + format.checkIntegrity(); + } + } + + @Override + public DocValuesProducer getMergeInstance() { + return new FieldsReader(this); + } + + @Override + public String toString() { + return "PerFieldDocValues(formats=" + formats.size() + ")"; + } + } + + @Override + public final DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException { + return new FieldsReader(state); + } + + /** + * Returns the doc values format that should be used for writing new segments of field + * . + * + *

The field to format mapping is written to the index, so this method is only invoked when + * writing, not when reading. + */ + public abstract DocValuesFormat getDocValuesFormatForField(String field); +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java new file mode 100644 index 0000000000000..5eaca2cd098f7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java @@ -0,0 +1,252 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.perfield; + +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.Terms; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** Fork of org.apache.lucene.codecs.perfield.PerFieldMergeState, because of {@link XPerFieldDocValuesFormat} */ +final class XPerFieldMergeState { + + /** + * Create a new MergeState from the given {@link MergeState} instance with restricted fields. + * + * @param fields The fields to keep in the new instance. + * @return The new MergeState with restricted fields + */ + static MergeState restrictFields(MergeState in, Collection fields) { + var fieldInfos = new FieldInfos[in.fieldInfos.length]; + for (int i = 0; i < in.fieldInfos.length; i++) { + fieldInfos[i] = new FilterFieldInfos(in.fieldInfos[i], fields); + } + var fieldsProducers = new FieldsProducer[in.fieldsProducers.length]; + for (int i = 0; i < in.fieldsProducers.length; i++) { + fieldsProducers[i] = in.fieldsProducers[i] == null ? null : new FilterFieldsProducer(in.fieldsProducers[i], fields); + } + var mergeFieldInfos = new FilterFieldInfos(in.mergeFieldInfos, fields); + return new MergeState( + in.docMaps, + in.segmentInfo, + mergeFieldInfos, + in.storedFieldsReaders, + in.termVectorsReaders, + in.normsProducers, + in.docValuesProducers, + fieldInfos, + in.liveDocs, + fieldsProducers, + in.pointsReaders, + in.knnVectorsReaders, + in.maxDocs, + in.infoStream, + in.intraMergeTaskExecutor, + in.needsIndexSort + ); + } + + private static class FilterFieldInfos extends FieldInfos { + private final Set filteredNames; + private final List filtered; + + // Copy of the private fields from FieldInfos + // Renamed so as to be less confusing about which fields we're referring to + private final boolean filteredHasPostings; + private final boolean filteredHasProx; + private final boolean filteredHasPayloads; + private final boolean filteredHasOffsets; + private final boolean filteredHasFreq; + private final boolean filteredHasNorms; + private final boolean filteredHasDocValues; + private final boolean filteredHasPointValues; + + FilterFieldInfos(FieldInfos src, Collection filterFields) { + // Copy all the input FieldInfo objects since the field numbering must be kept consistent + super(toArray(src)); + + boolean hasPostings = false; + boolean hasProx = false; + boolean hasPayloads = false; + boolean hasOffsets = false; + boolean hasFreq = false; + boolean hasNorms = false; + boolean hasDocValues = false; + boolean hasPointValues = false; + + this.filteredNames = new HashSet<>(filterFields); + this.filtered = new ArrayList<>(filterFields.size()); + for (FieldInfo fi : src) { + if (this.filteredNames.contains(fi.name)) { + this.filtered.add(fi); + hasPostings |= fi.getIndexOptions() != IndexOptions.NONE; + hasProx |= fi.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0; + hasFreq |= fi.getIndexOptions() != IndexOptions.DOCS; + hasOffsets |= fi.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0; + hasNorms |= fi.hasNorms(); + hasDocValues |= fi.getDocValuesType() != DocValuesType.NONE; + hasPayloads |= fi.hasPayloads(); + hasPointValues |= (fi.getPointDimensionCount() != 0); + } + } + + this.filteredHasPostings = hasPostings; + this.filteredHasProx = hasProx; + this.filteredHasPayloads = hasPayloads; + this.filteredHasOffsets = hasOffsets; + this.filteredHasFreq = hasFreq; + this.filteredHasNorms = hasNorms; + this.filteredHasDocValues = hasDocValues; + this.filteredHasPointValues = hasPointValues; + } + + private static FieldInfo[] toArray(FieldInfos src) { + FieldInfo[] res = new FieldInfo[src.size()]; + int i = 0; + for (FieldInfo fi : src) { + res[i++] = fi; + } + return res; + } + + @Override + public Iterator iterator() { + return filtered.iterator(); + } + + @Override + public boolean hasFreq() { + return filteredHasFreq; + } + + @Override + public boolean hasPostings() { + return filteredHasPostings; + } + + @Override + public boolean hasProx() { + return filteredHasProx; + } + + @Override + public boolean hasPayloads() { + return filteredHasPayloads; + } + + @Override + public boolean hasOffsets() { + return filteredHasOffsets; + } + + @Override + public boolean hasNorms() { + return filteredHasNorms; + } + + @Override + public boolean hasDocValues() { + return filteredHasDocValues; + } + + @Override + public boolean hasPointValues() { + return filteredHasPointValues; + } + + @Override + public int size() { + return filtered.size(); + } + + @Override + public FieldInfo fieldInfo(String fieldName) { + if (filteredNames.contains(fieldName) == false) { + // Throw IAE to be consistent with fieldInfo(int) which throws it as well on invalid numbers + throw new IllegalArgumentException( + "The field named '" + + fieldName + + "' is not accessible in the current " + + "merge context, available ones are: " + + filteredNames + ); + } + return super.fieldInfo(fieldName); + } + + @Override + public FieldInfo fieldInfo(int fieldNumber) { + FieldInfo res = super.fieldInfo(fieldNumber); + if (filteredNames.contains(res.name) == false) { + throw new IllegalArgumentException( + "The field named '" + + res.name + + "' numbered '" + + fieldNumber + + "' is not " + + "accessible in the current merge context, available ones are: " + + filteredNames + ); + } + return res; + } + } + + private static class FilterFieldsProducer extends FieldsProducer { + private final FieldsProducer in; + private final List filtered; + + FilterFieldsProducer(FieldsProducer in, Collection filterFields) { + this.in = in; + this.filtered = new ArrayList<>(filterFields); + } + + @Override + public Iterator iterator() { + return filtered.iterator(); + } + + @Override + public Terms terms(String field) throws IOException { + if (filtered.contains(field) == false) { + throw new IllegalArgumentException( + "The field named '" + field + "' is not accessible in the current " + "merge context, available ones are: " + filtered + ); + } + return in.terms(field); + } + + @Override + public int size() { + return filtered.size(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void checkIntegrity() throws IOException { + in.checkIntegrity(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java new file mode 100644 index 0000000000000..d6dae9ea882f9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.MergeState; +import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; + +/** + * Contains logic to determine whether optimized merge can occur. + */ +class DocValuesConsumerUtil { + + static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1); + + record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField) {} + + static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) { + if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) { + return UNSUPPORTED; + } + + // Documents marked as deleted should be rare. Maybe in the case of noop operation? + for (int i = 0; i < mergeState.liveDocs.length; i++) { + if (mergeState.liveDocs[i] != null) { + return UNSUPPORTED; + } + } + + long sumNumValues = 0; + int sumNumDocsWithField = 0; + + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer instanceof XPerFieldDocValuesFormat.FieldsReader perFieldReader) { + var wrapped = perFieldReader.getDocValuesProducer(fieldInfo); + if (wrapped instanceof ES819TSDBDocValuesProducer tsdbDocValuesProducer) { + switch (fieldInfo.getDocValuesType()) { + case NUMERIC -> { + var entry = tsdbDocValuesProducer.numerics.get(fieldInfo.number); + if (entry != null) { + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } + } + case SORTED_NUMERIC -> { + var entry = tsdbDocValuesProducer.sortedNumerics.get(fieldInfo.number); + if (entry != null) { + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } + } + case SORTED -> { + var entry = tsdbDocValuesProducer.sorted.get(fieldInfo.number); + if (entry != null) { + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } + } + case SORTED_SET -> { + var entry = tsdbDocValuesProducer.sortedSets.get(fieldInfo.number); + if (entry != null) { + if (entry.singleValueEntry != null) { + sumNumValues += entry.singleValueEntry.ordsEntry.numValues; + sumNumDocsWithField += entry.singleValueEntry.ordsEntry.numDocsWithField; + } else { + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } + } + } + default -> throw new IllegalStateException("unexpected doc values producer type: " + fieldInfo.getDocValuesType()); + } + } else { + return UNSUPPORTED; + } + } else { + return UNSUPPORTED; + } + } + + return new MergeStats(true, sumNumValues, sumNumDocsWithField); + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java index d008e5298f8ec..adfac1502d0b5 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java @@ -10,14 +10,13 @@ package org.elasticsearch.index.codec.tsdb.es819; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene90.IndexedDISI; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DocValues; -import org.apache.lucene.index.EmptyDocValuesProducer; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.MergeState; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.SortedDocValues; @@ -44,17 +43,25 @@ import java.io.IOException; import java.util.Arrays; +import static org.elasticsearch.index.codec.tsdb.es819.DocValuesConsumerUtil.compatibleWithOptimizedMerge; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SORTED_SET; -final class ES819TSDBDocValuesConsumer extends DocValuesConsumer { +final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer { IndexOutput data, meta; final int maxDoc; private byte[] termsDictBuffer; - - ES819TSDBDocValuesConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) - throws IOException { + final boolean enableOptimizedMerge; + + ES819TSDBDocValuesConsumer( + SegmentWriteState state, + boolean enableOptimizedMerge, + String dataCodec, + String dataExtension, + String metaCodec, + String metaExtension + ) throws IOException { this.termsDictBuffer = new byte[1 << 14]; boolean success = false; try { @@ -77,6 +84,7 @@ final class ES819TSDBDocValuesConsumer extends DocValuesConsumer { state.segmentSuffix ); maxDoc = state.segmentInfo.maxDoc(); + this.enableOptimizedMerge = enableOptimizedMerge; success = true; } finally { if (success == false) { @@ -89,7 +97,7 @@ final class ES819TSDBDocValuesConsumer extends DocValuesConsumer { public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(ES819TSDBDocValuesFormat.NUMERIC); - DocValuesProducer producer = new EmptyDocValuesProducer() { + var producer = new TsdbDocValuesProducer(valuesProducer) { @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { return DocValues.singleton(valuesProducer.getNumeric(field)); @@ -99,15 +107,21 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti writeField(field, producer, -1); } - private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, long maxOrd) throws IOException { + private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd) throws IOException { int numDocsWithValue = 0; long numValues = 0; - SortedNumericDocValues values = valuesProducer.getSortedNumeric(field); - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - numDocsWithValue++; - final int count = values.docValueCount(); - numValues += count; + SortedNumericDocValues values; + if (valuesProducer.mergeStats.supported()) { + numDocsWithValue = valuesProducer.mergeStats.sumNumDocsWithField(); + numValues = valuesProducer.mergeStats.sumNumValues(); + } else { + values = valuesProducer.getSortedNumeric(field); + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + numDocsWithValue++; + final int count = values.docValueCount(); + numValues += count; + } } meta.writeLong(numValues); @@ -196,6 +210,16 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon return new long[] { numDocsWithValue, numValues }; } + @Override + public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeNumericField(result, mergeFieldInfo, mergeState); + } else { + super.mergeNumericField(mergeFieldInfo, mergeState); + } + } + @Override public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); @@ -271,8 +295,18 @@ public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) th doAddSortedField(field, valuesProducer, false); } + @Override + public void mergeSortedField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeSortedField(result, mergeFieldInfo, mergeState); + } else { + super.mergeSortedField(mergeFieldInfo, mergeState); + } + } + private void doAddSortedField(FieldInfo field, DocValuesProducer valuesProducer, boolean addTypeByte) throws IOException { - DocValuesProducer producer = new EmptyDocValuesProducer() { + var producer = new TsdbDocValuesProducer(valuesProducer) { @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { SortedDocValues sorted = valuesProducer.getSorted(field); @@ -464,10 +498,10 @@ private void writeTermsIndex(SortedSetDocValues values) throws IOException { public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(ES819TSDBDocValuesFormat.SORTED_NUMERIC); - writeSortedNumericField(field, valuesProducer, -1); + writeSortedNumericField(field, new TsdbDocValuesProducer(valuesProducer), -1); } - private void writeSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer, long maxOrd) throws IOException { + private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd) throws IOException { if (maxOrd > -1) { meta.writeByte((byte) 1); // multiValued (1 = multiValued) } @@ -499,7 +533,22 @@ private void writeSortedNumericField(FieldInfo field, DocValuesProducer valuesPr } } - private static boolean isSingleValued(SortedSetDocValues values) throws IOException { + @Override + public void mergeSortedNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeSortedNumericField(result, mergeFieldInfo, mergeState); + } else { + super.mergeSortedNumericField(mergeFieldInfo, mergeState); + } + } + + private static boolean isSingleValued(FieldInfo field, TsdbDocValuesProducer producer) throws IOException { + if (producer.mergeStats.supported()) { + return producer.mergeStats.sumNumValues() == producer.mergeStats.sumNumDocsWithField(); + } + + var values = producer.getSortedSet(field); if (DocValues.unwrapSingleton(values) != null) { return true; } @@ -515,13 +564,23 @@ private static boolean isSingleValued(SortedSetDocValues values) throws IOExcept return true; } + @Override + public void mergeSortedSetField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeSortedSetField(result, mergeFieldInfo, mergeState); + } else { + super.mergeSortedSetField(mergeFieldInfo, mergeState); + } + } + @Override public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(SORTED_SET); - if (isSingleValued(valuesProducer.getSortedSet(field))) { - doAddSortedField(field, new EmptyDocValuesProducer() { + if (isSingleValued(field, new TsdbDocValuesProducer(valuesProducer))) { + doAddSortedField(field, new TsdbDocValuesProducer(valuesProducer) { @Override public SortedDocValues getSorted(FieldInfo field) throws IOException { return SortedSetSelector.wrap(valuesProducer.getSortedSet(field), SortedSetSelector.Type.MIN); @@ -532,7 +591,7 @@ public SortedDocValues getSorted(FieldInfo field) throws IOException { SortedSetDocValues values = valuesProducer.getSortedSet(field); long maxOrd = values.getValueCount(); - writeSortedNumericField(field, new EmptyDocValuesProducer() { + writeSortedNumericField(field, new TsdbDocValuesProducer(valuesProducer) { @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { SortedSetDocValues values = valuesProducer.getSortedSet(field); diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java index e3249bf816afb..d19ec2e6bf841 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java @@ -13,6 +13,7 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; +import org.elasticsearch.common.util.FeatureFlag; import java.io.IOException; @@ -56,14 +57,33 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues static final int TERMS_DICT_REVERSE_INDEX_SIZE = 1 << TERMS_DICT_REVERSE_INDEX_SHIFT; static final int TERMS_DICT_REVERSE_INDEX_MASK = TERMS_DICT_REVERSE_INDEX_SIZE - 1; + // Default for escape hatch: + static final boolean OPTIMIZED_MERGE_ENABLE_DEFAULT; + static final FeatureFlag TSDB_DOC_VALUES_OPTIMIZED_MERGE = new FeatureFlag("tsdb_doc_values_optimized_merge"); + static final String OPTIMIZED_MERGE_ENABLED_NAME = ES819TSDBDocValuesConsumer.class.getName() + ".enableOptimizedMerge"; + + static { + boolean optimizedMergeDefault = TSDB_DOC_VALUES_OPTIMIZED_MERGE.isEnabled(); + OPTIMIZED_MERGE_ENABLE_DEFAULT = Boolean.parseBoolean( + System.getProperty(OPTIMIZED_MERGE_ENABLED_NAME, Boolean.toString(optimizedMergeDefault)) + ); + } + + private final boolean enableOptimizedMerge; + /** Default constructor. */ public ES819TSDBDocValuesFormat() { + this(OPTIMIZED_MERGE_ENABLE_DEFAULT); + } + + public ES819TSDBDocValuesFormat(boolean enableOptimizedMerge) { super(CODEC_NAME); + this.enableOptimizedMerge = enableOptimizedMerge; } @Override public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { - return new ES819TSDBDocValuesConsumer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION); + return new ES819TSDBDocValuesConsumer(state, enableOptimizedMerge, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index 9c55a809359b2..5c486d63e8902 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -46,11 +46,11 @@ import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.TERMS_DICT_BLOCK_LZ4_SHIFT; final class ES819TSDBDocValuesProducer extends DocValuesProducer { - private final IntObjectHashMap numerics; + final IntObjectHashMap numerics; private final IntObjectHashMap binaries; - private final IntObjectHashMap sorted; - private final IntObjectHashMap sortedSets; - private final IntObjectHashMap sortedNumerics; + final IntObjectHashMap sorted; + final IntObjectHashMap sortedSets; + final IntObjectHashMap sortedNumerics; private final IndexInput data; private final int maxDoc; final int version; @@ -1303,7 +1303,7 @@ private void set() { private record DocValuesSkipperEntry(long offset, long length, long minValue, long maxValue, int docCount, int maxDocId) {} - private static class NumericEntry { + static class NumericEntry { long docsWithFieldOffset; long docsWithFieldLength; short jumpTableEntryCount; @@ -1333,18 +1333,18 @@ private static class BinaryEntry { DirectMonotonicReader.Meta addressesMeta; } - private static class SortedNumericEntry extends NumericEntry { + static class SortedNumericEntry extends NumericEntry { DirectMonotonicReader.Meta addressesMeta; long addressesOffset; long addressesLength; } - private static class SortedEntry { + static class SortedEntry { NumericEntry ordsEntry; TermsDictEntry termsDictEntry; } - private static class SortedSetEntry { + static class SortedSetEntry { SortedEntry singleValueEntry; SortedNumericEntry ordsEntry; TermsDictEntry termsDictEntry; diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java new file mode 100644 index 0000000000000..b8100f2e635f8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.EmptyDocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; + +import java.io.IOException; + +class TsdbDocValuesProducer extends EmptyDocValuesProducer { + + final DocValuesConsumerUtil.MergeStats mergeStats; + final DocValuesProducer actual; + + TsdbDocValuesProducer(DocValuesConsumerUtil.MergeStats mergeStats) { + this.mergeStats = mergeStats; + this.actual = null; + } + + TsdbDocValuesProducer(DocValuesProducer valuesProducer) { + if (valuesProducer instanceof TsdbDocValuesProducer tsdb) { + mergeStats = tsdb.mergeStats; + } else { + mergeStats = DocValuesConsumerUtil.UNSUPPORTED; + } + this.actual = valuesProducer; + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + if (actual != null) { + return actual.getSorted(field); + } else { + return super.getSorted(field); + } + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + if (actual != null) { + return actual.getSortedSet(field); + } else { + return super.getSortedSet(field); + } + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + if (actual != null) { + return actual.getSortedNumeric(field); + } else { + return super.getSortedNumeric(field); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java new file mode 100644 index 0000000000000..af6fc2587a49a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java @@ -0,0 +1,759 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilteredTermsEnum; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OrdinalMap; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LongBitSet; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.packed.PackedInts; +import org.elasticsearch.index.codec.tsdb.es819.DocValuesConsumerUtil.MergeStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Forks the merging logic from {@link DocValuesConsumer} that {@link ES819TSDBDocValuesConsumer} needs. + * This class should be removed when merging logic in {@link DocValuesConsumer} becomes accessible / overwritable in Lucene. + */ +public abstract class XDocValuesConsumer extends DocValuesConsumer { + + /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ + protected XDocValuesConsumer() {} + + /** Tracks state of one numeric sub-reader that we are merging */ + private static class NumericDocValuesSub extends DocIDMerger.Sub { + + final NumericDocValues values; + + NumericDocValuesSub(MergeState.DocMap docMap, NumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the numeric docvalues from MergeState. + * + *

The default implementation calls {@link #addNumericField}, passing a DocValuesProducer that + * merges and filters deleted documents on the fly. + */ + public void mergeNumericField(MergeStats mergeStats, final FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + addNumericField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public NumericDocValues getNumeric(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong fieldInfo"); + } + + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + NumericDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.NUMERIC) { + values = docValuesProducer.getNumeric(readerFieldInfo); + } + } + if (values != null) { + subs.add(new NumericDocValuesSub(mergeState.docMaps[i], values)); + } + } + + return mergeNumericValues(subs, mergeState.needsIndexSort); + } + }); + } + + private static NumericDocValues mergeNumericValues(List subs, boolean indexIsSorted) throws IOException { + long cost = 0; + for (NumericDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new NumericDocValues() { + private int docID = -1; + private NumericDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long longValue() throws IOException { + return current.values.longValue(); + } + }; + } + + /** Tracks state of one sorted numeric sub-reader that we are merging */ + private static class SortedNumericDocValuesSub extends DocIDMerger.Sub { + + final SortedNumericDocValues values; + + SortedNumericDocValuesSub(MergeState.DocMap docMap, SortedNumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the sorted docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedNumericField}, passing iterables that + * filter deleted documents. + */ + public void mergeSortedNumericField(MergeStats mergeStats, FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + + addSortedNumericField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + List subs = new ArrayList<>(); + long cost = 0; + boolean allSingletons = true; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + SortedNumericDocValues values = null; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_NUMERIC) { + values = docValuesProducer.getSortedNumeric(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedNumeric(); + } + cost += values.cost(); + if (allSingletons && DocValues.unwrapSingleton(values) == null) { + allSingletons = false; + } + subs.add(new SortedNumericDocValuesSub(mergeState.docMaps[i], values)); + } + + if (allSingletons) { + // All subs are single-valued. + // We specialize for that case since it makes it easier for codecs to optimize + // for single-valued fields. + List singleValuedSubs = new ArrayList<>(); + for (SortedNumericDocValuesSub sub : subs) { + final NumericDocValues singleValuedValues = DocValues.unwrapSingleton(sub.values); + assert singleValuedValues != null; + singleValuedSubs.add(new NumericDocValuesSub(sub.docMap, singleValuedValues)); + } + return DocValues.singleton(mergeNumericValues(singleValuedSubs, mergeState.needsIndexSort)); + } + + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + + return new SortedNumericDocValues() { + + private int docID = -1; + private SortedNumericDocValuesSub currentSub; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + currentSub = docIDMerger.next(); + if (currentSub == null) { + docID = NO_MORE_DOCS; + } else { + docID = currentSub.mappedDocID; + } + + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docValueCount() { + return currentSub.values.docValueCount(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long nextValue() throws IOException { + return currentSub.values.nextValue(); + } + }; + } + }); + } + + /** + * A merged {@link TermsEnum}. This helps avoid relying on the default terms enum, which calls + * {@link SortedDocValues#lookupOrd(int)} or {@link SortedSetDocValues#lookupOrd(long)} on every + * call to {@link TermsEnum#next()}. + */ + private static class MergedTermsEnum extends BaseTermsEnum { + + private final TermsEnum[] subs; + private final OrdinalMap ordinalMap; + private final long valueCount; + private long ord = -1; + private BytesRef term; + + MergedTermsEnum(OrdinalMap ordinalMap, TermsEnum[] subs) { + this.ordinalMap = ordinalMap; + this.subs = subs; + this.valueCount = ordinalMap.getValueCount(); + } + + @Override + public BytesRef term() throws IOException { + return term; + } + + @Override + public long ord() throws IOException { + return ord; + } + + @Override + public BytesRef next() throws IOException { + if (++ord >= valueCount) { + return null; + } + final int subNum = ordinalMap.getFirstSegmentNumber(ord); + final TermsEnum sub = subs[subNum]; + final long subOrd = ordinalMap.getFirstSegmentOrd(ord); + do { + term = sub.next(); + } while (sub.ord() < subOrd); + assert sub.ord() == subOrd; + return term; + } + + @Override + public AttributeSource attributes() { + throw new UnsupportedOperationException(); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(long ord) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long totalTermFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TermState termState() throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** Tracks state of one sorted sub-reader that we are merging */ + private static class SortedDocValuesSub extends DocIDMerger.Sub { + + final SortedDocValues values; + final LongValues map; + + SortedDocValuesSub(MergeState.DocMap docMap, SortedDocValues values, LongValues map) { + super(docMap); + this.values = values; + this.map = map; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the sorted docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedField}, passing an Iterable that merges + * ordinals and values and filters deleted documents . + */ + public void mergeSortedField(MergeStats mergeStats, FieldInfo fieldInfo, final MergeState mergeState) throws IOException { + List toMerge = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(fieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + toMerge.add(values); + } + + final int numReaders = toMerge.size(); + final SortedDocValues[] dvs = toMerge.toArray(new SortedDocValues[numReaders]); + + // step 1: iterate thru each sub and mark terms still in use + TermsEnum[] liveTerms = new TermsEnum[dvs.length]; + long[] weights = new long[liveTerms.length]; + for (int sub = 0; sub < numReaders; sub++) { + SortedDocValues dv = dvs[sub]; + Bits liveDocs = mergeState.liveDocs[sub]; + if (liveDocs == null) { + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); + } else { + LongBitSet bitset = new LongBitSet(dv.getValueCount()); + int docID; + while ((docID = dv.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs.get(docID)) { + int ord = dv.ordValue(); + if (ord >= 0) { + bitset.set(ord); + } + } + } + liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); + weights[sub] = bitset.cardinality(); + } + } + + // step 2: create ordinal map (this conceptually does the "merging") + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + + // step 3: add field + addSortedField(fieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public SortedDocValues getSorted(FieldInfo fieldInfoIn) throws IOException { + if (fieldInfoIn != fieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + + List subs = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(fieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + + subs.add(new SortedDocValuesSub(mergeState.docMaps[i], values, map.getGlobalOrds(i))); + } + + return mergeSortedValues(subs, mergeState.needsIndexSort, map); + } + }); + } + + private static SortedDocValues mergeSortedValues(List subs, boolean indexIsSorted, OrdinalMap map) + throws IOException { + long cost = 0; + for (SortedDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new SortedDocValues() { + private int docID = -1; + private SortedDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int ordValue() throws IOException { + int subOrd = current.values.ordValue(); + assert subOrd != -1; + return (int) current.map.get(subOrd); + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public int getValueCount() { + return (int) map.getValueCount(); + } + + @Override + public BytesRef lookupOrd(int ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + int segmentOrd = (int) map.getFirstSegmentOrd(ord); + return subs.get(segmentNumber).values.lookupOrd(segmentOrd); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] termsEnurmSubs = new TermsEnum[subs.size()]; + for (int sub = 0; sub < termsEnurmSubs.length; ++sub) { + termsEnurmSubs[sub] = subs.get(sub).values.termsEnum(); + } + return new MergedTermsEnum(map, termsEnurmSubs); + } + }; + } + + /** Tracks state of one sorted set sub-reader that we are merging */ + private static class SortedSetDocValuesSub extends DocIDMerger.Sub { + + final SortedSetDocValues values; + final LongValues map; + + SortedSetDocValuesSub(MergeState.DocMap docMap, SortedSetDocValues values, LongValues map) { + super(docMap); + this.values = values; + this.map = map; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + + @Override + public String toString() { + return "SortedSetDocValuesSub(mappedDocID=" + mappedDocID + " values=" + values + ")"; + } + } + + /** + * Merges the sortedset docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedSetField}, passing an Iterable that merges + * ordinals and values and filters deleted documents . + */ + public void mergeSortedSetField(MergeStats mergeStats, FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + + List toMerge = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (fieldInfo != null && fieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(fieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + toMerge.add(values); + } + + // step 1: iterate thru each sub and mark terms still in use + TermsEnum[] liveTerms = new TermsEnum[toMerge.size()]; + long[] weights = new long[liveTerms.length]; + for (int sub = 0; sub < liveTerms.length; sub++) { + SortedSetDocValues dv = toMerge.get(sub); + Bits liveDocs = mergeState.liveDocs[sub]; + if (liveDocs == null) { + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); + } else { + LongBitSet bitset = new LongBitSet(dv.getValueCount()); + int docID; + while ((docID = dv.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs.get(docID)) { + for (int i = 0; i < dv.docValueCount(); i++) { + bitset.set(dv.nextOrd()); + } + } + } + liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); + weights[sub] = bitset.cardinality(); + } + } + + // step 2: create ordinal map (this conceptually does the "merging") + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + + // step 3: add field + addSortedSetField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public SortedSetDocValues getSortedSet(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + List subs = new ArrayList<>(); + + long cost = 0; + boolean allSingletons = true; + + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + cost += values.cost(); + if (allSingletons && DocValues.unwrapSingleton(values) == null) { + allSingletons = false; + } + subs.add(new SortedSetDocValuesSub(mergeState.docMaps[i], values, map.getGlobalOrds(i))); + } + + if (allSingletons) { + // All subs are single-valued. + // We specialize for that case since it makes it easier for codecs to optimize + // for single-valued fields. + List singleValuedSubs = new ArrayList<>(); + for (SortedSetDocValuesSub sub : subs) { + final SortedDocValues singleValuedValues = DocValues.unwrapSingleton(sub.values); + assert singleValuedValues != null; + singleValuedSubs.add(new SortedDocValuesSub(sub.docMap, singleValuedValues, sub.map)); + } + return DocValues.singleton(mergeSortedValues(singleValuedSubs, mergeState.needsIndexSort, map)); + } + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + + final long finalCost = cost; + + return new SortedSetDocValues() { + private int docID = -1; + private SortedSetDocValuesSub currentSub; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + currentSub = docIDMerger.next(); + if (currentSub == null) { + docID = NO_MORE_DOCS; + } else { + docID = currentSub.mappedDocID; + } + + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long nextOrd() throws IOException { + long subOrd = currentSub.values.nextOrd(); + return currentSub.map.get(subOrd); + } + + @Override + public int docValueCount() { + return currentSub.values.docValueCount(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public BytesRef lookupOrd(long ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + long segmentOrd = map.getFirstSegmentOrd(ord); + return toMerge.get(segmentNumber).lookupOrd(segmentOrd); + } + + @Override + public long getValueCount() { + return map.getValueCount(); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] subs = new TermsEnum[toMerge.size()]; + for (int sub = 0; sub < subs.length; ++sub) { + subs[sub] = toMerge.get(sub).termsEnum(); + } + return new MergedTermsEnum(map, subs); + } + }; + } + }); + } + + // TODO: seek-by-ord to nextSetBit + static class BitsFilteredTermsEnum extends FilteredTermsEnum { + final LongBitSet liveTerms; + + BitsFilteredTermsEnum(TermsEnum in, LongBitSet liveTerms) { + super(in, false); // <-- not passing false here wasted about 3 hours of my time!!!!!!!!!!!!! + assert liveTerms != null; + this.liveTerms = liveTerms; + } + + @Override + protected AcceptStatus accept(BytesRef term) throws IOException { + if (liveTerms.get(ord())) { + return AcceptStatus.YES; + } else { + return AcceptStatus.NO; + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java index ea6d944a1271c..734671a2760eb 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.index.codec.tsdb; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; @@ -24,6 +26,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.index.codec.Elasticsearch816Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; import org.elasticsearch.test.ESTestCase; @@ -51,9 +54,19 @@ public void testDuel() throws IOException { baselineConfig.setMergePolicy(mergePolicy); baselineConfig.setCodec(TestUtil.alwaysDocValuesFormat(new Lucene90DocValuesFormat())); var contenderConf = newIndexWriterConfig(); - contenderConf.setCodec( - TestUtil.alwaysDocValuesFormat(rarely() ? new TestES87TSDBDocValuesFormat() : new ES819TSDBDocValuesFormat()) - ); + contenderConf.setMergePolicy(mergePolicy); + Codec codec = new Elasticsearch816Codec() { + + final DocValuesFormat docValuesFormat = randomBoolean() + ? new ES819TSDBDocValuesFormat() + : new TestES87TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; + contenderConf.setCodec(codec); contenderConf.setMergePolicy(mergePolicy); try ( diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java index f6cb7ab4c9a15..6812861986fa2 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedDocValuesField; @@ -33,8 +34,9 @@ import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.tests.index.BaseDocValuesFormatTestCase; import org.apache.lucene.tests.index.RandomIndexWriter; -import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.index.codec.Elasticsearch816Codec; import java.io.IOException; import java.util.ArrayList; @@ -50,6 +52,12 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase { private static final int NUM_DOCS = 10; + static { + // For Elasticsearch900Lucene101Codec: + LogConfigurator.loadLog4jPlugins(); + LogConfigurator.configureESLogging(); + } + static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat { TestES87TSDBDocValuesFormat() { @@ -62,7 +70,13 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept } } - private final Codec codec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); + private final Codec codec = new Elasticsearch816Codec() { + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new TestES87TSDBDocValuesFormat(); + } + }; @Override protected Codec getCodec() { diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java index 32b2a90322911..7620a3969b4ef 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.index.codec.tsdb; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; @@ -31,6 +32,9 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.index.codec.Elasticsearch814Codec; +import org.elasticsearch.index.codec.Elasticsearch816Codec; +import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; import org.elasticsearch.test.ESTestCase; @@ -45,8 +49,30 @@ public class TsdbDocValueBwcTests extends ESTestCase { public void testMixedIndex() throws Exception { - Codec oldCodec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); - Codec newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + var oldCodec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); + var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + testMixedIndex(oldCodec, newCodec); + } + + public void testMixedIndex816To900Lucene101() throws Exception { + var oldCodec = new Elasticsearch814Codec() { + + final DocValuesFormat docValuesFormat = new TestES87TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; + var newCodec = new Elasticsearch816Codec() { + + final DocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; testMixedIndex(oldCodec, newCodec); } @@ -101,55 +127,63 @@ void testMixedIndex(Codec oldCodec, Codec newCodec) throws IOException, NoSuchFi } } // Check documents before force merge: - try (var iw = new IndexWriter(dir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField, newCodec))) { - try (var reader = DirectoryReader.open(iw)) { - assertOldDocValuesFormatVersion(reader); - - var hostNameDV = MultiDocValues.getSortedValues(reader, hostnameField); - assertNotNull(hostNameDV); - var timestampDV = MultiDocValues.getSortedNumericValues(reader, timestampField); - assertNotNull(timestampDV); - var counterOneDV = MultiDocValues.getNumericValues(reader, "counter_1"); - if (counterOneDV == null) { - counterOneDV = DocValues.emptyNumeric(); - } - var gaugeOneDV = MultiDocValues.getSortedNumericValues(reader, "gauge_1"); - if (gaugeOneDV == null) { - gaugeOneDV = DocValues.emptySortedNumeric(); + try (var reader = DirectoryReader.open(dir)) { + assertOldDocValuesFormatVersion(reader); + // Assert per field format field info attributes: + // (XPerFieldDocValuesFormat must produce the same attributes as PerFieldDocValuesFormat for BWC. + // Otherwise, doc values fields may disappear) + for (var leaf : reader.leaves()) { + for (var fieldInfo : leaf.reader().getFieldInfos()) { + assertThat(fieldInfo.attributes(), Matchers.aMapWithSize(2)); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.suffix", "0")); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.format", "ES87TSDB")); } - var tagsDV = MultiDocValues.getSortedSetValues(reader, "tags"); - if (tagsDV == null) { - tagsDV = DocValues.emptySortedSet(); - } - for (int i = 0; i < numDocs; i++) { - assertEquals(i, hostNameDV.nextDoc()); - String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); - assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + } - assertEquals(i, timestampDV.nextDoc()); - long timestamp = timestampDV.nextValue(); - long lowerBound = baseTimestamp; - long upperBound = baseTimestamp + numDocs; - assertTrue( - "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", - timestamp >= lowerBound && timestamp < upperBound - ); - if (counterOneDV.advanceExact(i)) { - long counterOneValue = counterOneDV.longValue(); - assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); - } - if (gaugeOneDV.advanceExact(i)) { - for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { - long value = gaugeOneDV.nextValue(); - assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); - } + var hostNameDV = MultiDocValues.getSortedValues(reader, hostnameField); + assertNotNull(hostNameDV); + var timestampDV = MultiDocValues.getSortedNumericValues(reader, timestampField); + assertNotNull(timestampDV); + var counterOneDV = MultiDocValues.getNumericValues(reader, "counter_1"); + if (counterOneDV == null) { + counterOneDV = DocValues.emptyNumeric(); + } + var gaugeOneDV = MultiDocValues.getSortedNumericValues(reader, "gauge_1"); + if (gaugeOneDV == null) { + gaugeOneDV = DocValues.emptySortedNumeric(); + } + var tagsDV = MultiDocValues.getSortedSetValues(reader, "tags"); + if (tagsDV == null) { + tagsDV = DocValues.emptySortedSet(); + } + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); + assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.nextValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + numDocs; + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + if (counterOneDV.advanceExact(i)) { + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + } + if (gaugeOneDV.advanceExact(i)) { + for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { + long value = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); } - if (tagsDV.advanceExact(i)) { - for (int j = 0; j < tagsDV.docValueCount(); j++) { - long ordinal = tagsDV.nextOrd(); - String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); - assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); - } + } + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); } } } @@ -165,6 +199,15 @@ void testMixedIndex(Codec oldCodec, Codec newCodec) throws IOException, NoSuchFi assertEquals(numDocs, reader.maxDoc()); assertNewDocValuesFormatVersion(reader); var leaf = reader.leaves().get(0).reader(); + // Assert per field format field info attributes: + // (XPerFieldDocValuesFormat must produce the same attributes as PerFieldDocValuesFormat for BWC. + // Otherwise, doc values fields may disappear) + for (var fieldInfo : leaf.getFieldInfos()) { + assertThat(fieldInfo.attributes(), Matchers.aMapWithSize(2)); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.suffix", "0")); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.format", "ES819TSDB")); + } + var hostNameDV = leaf.getSortedDocValues(hostnameField); assertNotNull(hostNameDV); var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); @@ -249,6 +292,7 @@ private void assertOldDocValuesFormatVersion(DirectoryReader reader) throws NoSu var dvReader = leaf.getDocValuesReader(); var field = getFormatsFieldFromPerFieldFieldsReader(dvReader.getClass()); Map formats = (Map) field.get(dvReader); + assertThat(formats, Matchers.aMapWithSize(1)); var tsdbDvReader = (DocValuesProducer) formats.get("ES87TSDB_0"); tsdbDvReader.checkIntegrity(); assertThat(tsdbDvReader, Matchers.instanceOf(ES87TSDBDocValuesProducer.class)); @@ -257,25 +301,39 @@ private void assertOldDocValuesFormatVersion(DirectoryReader reader) throws NoSu private void assertNewDocValuesFormatVersion(DirectoryReader reader) throws NoSuchFieldException, IllegalAccessException, IOException, ClassNotFoundException { - if (System.getSecurityManager() != null) { - // With jvm version 24 entitlements are used and security manager is nog longer used. - // Making this assertion work with security manager requires granting the entire test codebase privileges to use - // suppressAccessChecks and suppressAccessChecks. This is undesired from a security manager perspective. - logger.info("not asserting doc values format version, because security manager is used"); - return; - } for (var leafReaderContext : reader.leaves()) { var leaf = (SegmentReader) leafReaderContext.reader(); var dvReader = leaf.getDocValuesReader(); - var field = getFormatsFieldFromPerFieldFieldsReader(dvReader.getClass()); - Map formats = (Map) field.get(dvReader); - var tsdbDvReader = (DocValuesProducer) formats.get("ES819TSDB_0"); - tsdbDvReader.checkIntegrity(); - assertThat( - tsdbDvReader, - Matchers.instanceOf(Class.forName("org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer")) - ); + dvReader.checkIntegrity(); + + if (dvReader instanceof XPerFieldDocValuesFormat.FieldsReader perFieldDvReader) { + var formats = perFieldDvReader.getFormats(); + assertThat(formats, Matchers.aMapWithSize(1)); + var tsdbDvReader = formats.get("ES819TSDB_0"); + tsdbDvReader.checkIntegrity(); + assertThat( + tsdbDvReader, + Matchers.instanceOf(Class.forName("org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer")) + ); + } else { + if (System.getSecurityManager() != null) { + // With jvm version 24 entitlements are used and security manager is nog longer used. + // Making this assertion work with security manager requires granting the entire test codebase privileges to use + // suppressAccessChecks and suppressAccessChecks. This is undesired from a security manager perspective. + logger.info("not asserting doc values format version, because security manager is used"); + continue; + } + var field = getFormatsFieldFromPerFieldFieldsReader(dvReader.getClass()); + Map formats = (Map) field.get(dvReader); + assertThat(formats, Matchers.aMapWithSize(1)); + var tsdbDvReader = (DocValuesProducer) formats.get("ES819TSDB_0"); + tsdbDvReader.checkIntegrity(); + assertThat( + tsdbDvReader, + Matchers.instanceOf(Class.forName("org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer")) + ); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index 3eb187bed9fb8..1e213d586b20d 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -10,16 +10,393 @@ package org.elasticsearch.index.codec.tsdb.es819; import org.apache.lucene.codecs.Codec; -import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.index.codec.Elasticsearch816Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests; +import java.util.Arrays; +import java.util.Locale; + public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests { - private final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + private final Codec codec = new Elasticsearch816Codec() { + + final ES819TSDBDocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; @Override protected Codec getCodec() { return codec; } + public void testForceMergeDenseCase() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + int numDocs = 256 + random().nextInt(1024); + int numHosts = numDocs / 20; + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + + int batchIndex = i / numHosts; + String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + long timestamp = baseTimestamp + (1000L * i); + + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + d.add(new NumericDocValuesField("counter_1", counter1++)); + d.add(new SortedNumericDocValuesField("counter_2", counter2++)); + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + int numTags = 1 + random().nextInt(8); + for (int j = 0; j < numTags; j++) { + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + + iw.forceMerge(1); + + // For asserting using binary search later on: + Arrays.sort(gauge2Values); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var counterOneDV = leaf.getNumericDocValues("counter_1"); + assertNotNull(counterOneDV); + var counterTwoDV = leaf.getSortedNumericDocValues("counter_2"); + assertNotNull(counterTwoDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var gaugeTwoDV = leaf.getSortedNumericDocValues("gauge_2"); + assertNotNull(gaugeTwoDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + int batchIndex = i / numHosts; + assertEquals(batchIndex, hostNameDV.ordValue()); + String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + assertEquals(expectedHostName, hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString()); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + (1000L * numDocs); + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + + assertEquals(i, counterOneDV.nextDoc()); + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + + assertEquals(i, counterTwoDV.nextDoc()); + assertEquals(1, counterTwoDV.docValueCount()); + long counterTwoValue = counterTwoDV.nextValue(); + assertTrue("unexpected counter [" + counterTwoValue + "]", counterTwoValue > 0 && counterTwoValue <= counter2); + + assertEquals(i, gaugeOneDV.nextDoc()); + assertEquals(1, gaugeOneDV.docValueCount()); + long gaugeOneValue = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0); + + assertEquals(i, gaugeTwoDV.nextDoc()); + assertEquals(1, gaugeTwoDV.docValueCount()); + long gaugeTwoValue = gaugeTwoDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0); + + assertEquals(i, tagsDV.nextDoc()); + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + + public void testForceMergeSparseCase() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + int numDocs = 256 + random().nextInt(1024); + int numHosts = numDocs / 20; + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + + int batchIndex = i / numHosts; + String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + long timestamp = baseTimestamp + (1000L * i); + + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + + if (random().nextBoolean()) { + d.add(new NumericDocValuesField("counter_1", counter1++)); + } + if (random().nextBoolean()) { + d.add(new SortedNumericDocValuesField("counter_2", counter2++)); + } + if (random().nextBoolean()) { + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + } + if (random().nextBoolean()) { + d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + } + if (random().nextBoolean()) { + int numTags = 1 + random().nextInt(8); + for (int j = 0; j < numTags; j++) { + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + } + if (random().nextBoolean()) { + int randomIndex = random().nextInt(tags.length); + d.add(new SortedDocValuesField("other_tag", new BytesRef(tags[randomIndex]))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + + iw.forceMerge(1); + + // For asserting using binary search later on: + Arrays.sort(gauge2Values); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var counterOneDV = leaf.getNumericDocValues("counter_1"); + assertNotNull(counterOneDV); + var counterTwoDV = leaf.getSortedNumericDocValues("counter_2"); + assertNotNull(counterTwoDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var gaugeTwoDV = leaf.getSortedNumericDocValues("gauge_2"); + assertNotNull(gaugeTwoDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + var otherTagDV = leaf.getSortedDocValues("other_tag"); + assertNotNull(otherTagDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + int batchIndex = i / numHosts; + assertEquals(batchIndex, hostNameDV.ordValue()); + String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + assertEquals(expectedHostName, hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString()); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + (1000L * numDocs); + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + + if (counterOneDV.advanceExact(i)) { + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + } + + if (counterTwoDV.advanceExact(i)) { + assertEquals(1, counterTwoDV.docValueCount()); + long counterTwoValue = counterTwoDV.nextValue(); + assertTrue("unexpected counter [" + counterTwoValue + "]", counterTwoValue > 0 && counterTwoValue <= counter2); + } + + if (gaugeOneDV.advanceExact(i)) { + assertEquals(1, gaugeOneDV.docValueCount()); + long gaugeOneValue = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0); + } + + if (gaugeTwoDV.advanceExact(i)) { + assertEquals(1, gaugeTwoDV.docValueCount()); + long gaugeTwoValue = gaugeTwoDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0); + } + + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + if (otherTagDV.advanceExact(i)) { + int ordinal = otherTagDV.ordValue(); + String actualTag = otherTagDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + + public void testWithNoValueMultiValue() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + int numRounds = 32 + random().nextInt(32); + int numDocsPerRound = 64 + random().nextInt(64); + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + { + long timestamp = baseTimestamp; + for (int i = 0; i < numRounds; i++) { + int r = random().nextInt(10); + for (int j = 0; j < numDocsPerRound; j++) { + var d = new Document(); + // host in reverse, otherwise merging will detect that segments are already ordered and will use sequential docid + // merger: + String hostName = String.format(Locale.ROOT, "host-%03d", numRounds - i); + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp++)); + + if (r % 10 == 5) { + // sometimes no values + } else if (r % 10 > 5) { + // often single value: + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[j % gauge1Values.length])); + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j % tags.length]))); + } else { + // otherwise multiple values: + int numValues = 2 + random().nextInt(4); + for (int k = 0; k < numValues; k++) { + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[(j + k) % gauge1Values.length])); + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(j + k) % tags.length]))); + } + } + iw.addDocument(d); + } + iw.commit(); + } + iw.forceMerge(1); + } + + int numDocs = numRounds * numDocsPerRound; + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); + assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + numDocs; + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + if (gaugeOneDV.advanceExact(i)) { + for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { + long value = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); + } + } + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + } + + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { + var config = new IndexWriterConfig(); + config.setIndexSort( + new Sort( + new SortField(hostnameField, SortField.Type.STRING, false), + new SortedNumericSortField(timestampField, SortField.Type.LONG, true) + ) + ); + config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + config.setMergePolicy(new LogByteSizeMergePolicy()); + config.setCodec(getCodec()); + return config; + } + } From 01ead9f30c8c2b1eee69ccec069ebecf1e36174e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 15 Apr 2025 11:46:07 +0200 Subject: [PATCH 2/3] fixed compile errors in benchmark --- .../index/codec/tsdb/TSDBDocValuesMergeBenchmark.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java index e3eb3405038dd..2add2e4db356f 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java @@ -26,7 +26,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.logging.LogConfigurator; -import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; +import org.elasticsearch.index.codec.Elasticsearch816Codec; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -55,7 +55,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@BenchmarkMode(Mode.SampleTime) +@BenchmarkMode(Mode.SingleShotTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) @Fork(1) @@ -183,8 +183,8 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE ); config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); config.setMergePolicy(new LogByteSizeMergePolicy()); - var docValuesFormat = new ES819TSDBDocValuesFormat(4096, optimizedMergeEnabled); - config.setCodec(new Elasticsearch900Lucene101Codec() { + var docValuesFormat = new ES819TSDBDocValuesFormat(optimizedMergeEnabled); + config.setCodec(new Elasticsearch816Codec() { @Override public DocValuesFormat getDocValuesFormatForField(String field) { From c4e647856e844be773b370744913d685a8d72de2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 15 Apr 2025 18:21:25 +0200 Subject: [PATCH 3/3] Fix DocValuesConsumerUtil (#126836) The compatibleWithOptimizedMerge() method doesn't handle codec readers that are wrapped by our source pruning filter codec reader. This change addresses that. Failing to detect this means that the optimized merge will not kick in. --- .../index/codec/FilterDocValuesProducer.java | 70 +++++++++++++++++++ .../tsdb/es819/DocValuesConsumerUtil.java | 5 ++ .../RecoverySourcePruneMergePolicy.java | 48 +------------ 3 files changed, 76 insertions(+), 47 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/codec/FilterDocValuesProducer.java diff --git a/server/src/main/java/org/elasticsearch/index/codec/FilterDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/FilterDocValuesProducer.java new file mode 100644 index 0000000000000..d8b5b11115178 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/FilterDocValuesProducer.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; + +import java.io.IOException; + +/** + * Implementation that allows wrapping another {@link DocValuesProducer} and alter behaviour of the wrapped instance. + */ +public abstract class FilterDocValuesProducer extends DocValuesProducer { + private final DocValuesProducer in; + + protected FilterDocValuesProducer(DocValuesProducer in) { + this.in = in; + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + return in.getNumeric(field); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + return in.getBinary(field); + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + return in.getSorted(field); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return in.getSortedNumeric(field); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + return in.getSortedSet(field); + } + + @Override + public void checkIntegrity() throws IOException { + in.checkIntegrity(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + public DocValuesProducer getIn() { + return in; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java index d6dae9ea882f9..9a9e2831d78da 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java @@ -12,6 +12,7 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.MergeState; +import org.elasticsearch.index.codec.FilterDocValuesProducer; import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; /** @@ -40,6 +41,10 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me for (int i = 0; i < mergeState.docValuesProducers.length; i++) { DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer instanceof FilterDocValuesProducer filterDocValuesProducer) { + docValuesProducer = filterDocValuesProducer.getIn(); + } + if (docValuesProducer instanceof XPerFieldDocValuesFormat.FieldsReader perFieldReader) { var wrapped = perFieldReader.getDocValuesProducer(fieldInfo); if (wrapped instanceof ES819TSDBDocValuesProducer tsdbDocValuesProducer) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java index c24ad42d1d61a..d406eea9a3578 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java @@ -11,7 +11,6 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.StoredFieldsReader; -import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CodecReader; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FilterCodecReader; @@ -19,9 +18,6 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.OneMergeWrappingMergePolicy; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.search.ConjunctionUtils; import org.apache.lucene.search.DocIdSetIterator; @@ -33,6 +29,7 @@ import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSetIterator; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.codec.FilterDocValuesProducer; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.search.internal.FilterStoredFieldVisitor; @@ -176,49 +173,6 @@ public CacheHelper getReaderCacheHelper() { return null; } - private static class FilterDocValuesProducer extends DocValuesProducer { - private final DocValuesProducer in; - - FilterDocValuesProducer(DocValuesProducer in) { - this.in = in; - } - - @Override - public NumericDocValues getNumeric(FieldInfo field) throws IOException { - return in.getNumeric(field); - } - - @Override - public BinaryDocValues getBinary(FieldInfo field) throws IOException { - return in.getBinary(field); - } - - @Override - public SortedDocValues getSorted(FieldInfo field) throws IOException { - return in.getSorted(field); - } - - @Override - public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { - return in.getSortedNumeric(field); - } - - @Override - public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { - return in.getSortedSet(field); - } - - @Override - public void checkIntegrity() throws IOException { - in.checkIntegrity(); - } - - @Override - public void close() throws IOException { - in.close(); - } - } - private abstract static class FilterStoredFieldsReader extends StoredFieldsReader { protected final StoredFieldsReader in;