Skip to content

Commit e9dcb50

Browse files
authored
[8.x] First step optimizing tsdb doc values codec merging. (#126827)
* [8.x] First step optimizing tsdb doc values codec merging. Backporting #125403 to the 8.x branch. The doc values codec iterates a few times over the doc value instance that needs to be written to disk. In case when merging and index sorting is enabled, this is much more expensive, as each time the doc values instance is iterated a merge sorting is performed (in order to get the doc ids of new segment in order of index sorting). There are several reasons why the doc value instance is iterated multiple times: * To compute stats (num values, number of docs with value) required for writing values to disk. * To write bitset that indicate which documents have a value. (indexed disi, jump table) * To write the actual values to disk. * To write the addresses to disk (in case docs have multiple values) This applies for numeric doc values, but also for the ordinals of sorted (set) doc values. This PR addresses solving the first reason why doc value instance needs to be iterated. This is done only when in case of merging and when the segments to be merged with are also of type es87 doc values, codec version is the same and there are no deletes. Note this optimized merged is behind a feature flag for now. * fixed compile errors in benchmark * Fix DocValuesConsumerUtil (#126836) The compatibleWithOptimizedMerge() method doesn't handle codec readers that are wrapped by our source pruning filter codec reader. This change addresses that. Failing to detect this means that the optimized merge will not kick in.
1 parent f8b3505 commit e9dcb50

18 files changed

+2455
-148
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.index.codec.tsdb;
11+
12+
import org.apache.lucene.analysis.standard.StandardAnalyzer;
13+
import org.apache.lucene.codecs.DocValuesFormat;
14+
import org.apache.lucene.document.Document;
15+
import org.apache.lucene.document.SortedDocValuesField;
16+
import org.apache.lucene.document.SortedNumericDocValuesField;
17+
import org.apache.lucene.document.SortedSetDocValuesField;
18+
import org.apache.lucene.index.IndexWriter;
19+
import org.apache.lucene.index.IndexWriterConfig;
20+
import org.apache.lucene.index.LogByteSizeMergePolicy;
21+
import org.apache.lucene.search.Sort;
22+
import org.apache.lucene.search.SortField;
23+
import org.apache.lucene.search.SortedNumericSortField;
24+
import org.apache.lucene.store.Directory;
25+
import org.apache.lucene.store.FSDirectory;
26+
import org.apache.lucene.util.BytesRef;
27+
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.common.logging.LogConfigurator;
29+
import org.elasticsearch.index.codec.Elasticsearch816Codec;
30+
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
31+
import org.openjdk.jmh.annotations.Benchmark;
32+
import org.openjdk.jmh.annotations.BenchmarkMode;
33+
import org.openjdk.jmh.annotations.Fork;
34+
import org.openjdk.jmh.annotations.Level;
35+
import org.openjdk.jmh.annotations.Measurement;
36+
import org.openjdk.jmh.annotations.Mode;
37+
import org.openjdk.jmh.annotations.OutputTimeUnit;
38+
import org.openjdk.jmh.annotations.Param;
39+
import org.openjdk.jmh.annotations.Scope;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.annotations.State;
42+
import org.openjdk.jmh.annotations.TearDown;
43+
import org.openjdk.jmh.annotations.Threads;
44+
import org.openjdk.jmh.annotations.Warmup;
45+
import org.openjdk.jmh.profile.AsyncProfiler;
46+
import org.openjdk.jmh.runner.Runner;
47+
import org.openjdk.jmh.runner.RunnerException;
48+
import org.openjdk.jmh.runner.options.Options;
49+
import org.openjdk.jmh.runner.options.OptionsBuilder;
50+
51+
import java.io.IOException;
52+
import java.nio.file.Files;
53+
import java.util.Random;
54+
import java.util.concurrent.ExecutorService;
55+
import java.util.concurrent.Executors;
56+
import java.util.concurrent.TimeUnit;
57+
58+
@BenchmarkMode(Mode.SingleShotTime)
59+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
60+
@State(Scope.Benchmark)
61+
@Fork(1)
62+
@Threads(1)
63+
@Warmup(iterations = 0)
64+
@Measurement(iterations = 1)
65+
public class TSDBDocValuesMergeBenchmark {
66+
67+
static {
68+
// For Elasticsearch900Lucene101Codec:
69+
LogConfigurator.loadLog4jPlugins();
70+
LogConfigurator.configureESLogging();
71+
LogConfigurator.setNodeName("test");
72+
}
73+
74+
@Param("20431204")
75+
private int nDocs;
76+
77+
@Param("1000")
78+
private int deltaTime;
79+
80+
@Param("42")
81+
private int seed;
82+
83+
private static final String TIMESTAMP_FIELD = "@timestamp";
84+
private static final String HOSTNAME_FIELD = "host.name";
85+
private static final long BASE_TIMESTAMP = 1704067200000L;
86+
87+
private IndexWriter indexWriterWithoutOptimizedMerge;
88+
private IndexWriter indexWriterWithOptimizedMerge;
89+
private ExecutorService executorService;
90+
91+
public static void main(String[] args) throws RunnerException {
92+
final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName())
93+
.addProfiler(AsyncProfiler.class)
94+
.build();
95+
96+
new Runner(options).run();
97+
}
98+
99+
@Setup(Level.Trial)
100+
public void setup() throws IOException {
101+
executorService = Executors.newSingleThreadExecutor();
102+
103+
final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
104+
final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-"));
105+
106+
indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false);
107+
indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true);
108+
}
109+
110+
private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException {
111+
final var iwc = createIndexWriterConfig(optimizedMergeEnabled);
112+
long counter1 = 0;
113+
long counter2 = 10_000_000;
114+
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
115+
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
116+
int numHosts = 1000;
117+
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
118+
119+
final Random random = new Random(seed);
120+
IndexWriter indexWriter = new IndexWriter(directory, iwc);
121+
for (int i = 0; i < nDocs; i++) {
122+
final Document doc = new Document();
123+
124+
final int batchIndex = i / numHosts;
125+
final String hostName = "host-" + batchIndex;
126+
// Slightly vary the timestamp in each document
127+
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);
128+
129+
doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
130+
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
131+
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
132+
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
133+
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
134+
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
135+
int numTags = tags.length % (i + 1);
136+
for (int j = 0; j < numTags; j++) {
137+
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
138+
}
139+
140+
indexWriter.addDocument(doc);
141+
}
142+
indexWriter.commit();
143+
return indexWriter;
144+
}
145+
146+
@Benchmark
147+
public void forceMergeWithoutOptimizedMerge() throws IOException {
148+
forceMerge(indexWriterWithoutOptimizedMerge);
149+
}
150+
151+
@Benchmark
152+
public void forceMergeWithOptimizedMerge() throws IOException {
153+
forceMerge(indexWriterWithOptimizedMerge);
154+
}
155+
156+
private void forceMerge(final IndexWriter indexWriter) throws IOException {
157+
indexWriter.forceMerge(1);
158+
}
159+
160+
@TearDown(Level.Trial)
161+
public void tearDown() {
162+
if (executorService != null) {
163+
executorService.shutdown();
164+
try {
165+
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
166+
executorService.shutdownNow();
167+
}
168+
} catch (InterruptedException e) {
169+
executorService.shutdownNow();
170+
Thread.currentThread().interrupt();
171+
}
172+
}
173+
}
174+
175+
private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeEnabled) {
176+
var config = new IndexWriterConfig(new StandardAnalyzer());
177+
// NOTE: index sort config matching LogsDB's sort order
178+
config.setIndexSort(
179+
new Sort(
180+
new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false),
181+
new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true)
182+
)
183+
);
184+
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
185+
config.setMergePolicy(new LogByteSizeMergePolicy());
186+
var docValuesFormat = new ES819TSDBDocValuesFormat(optimizedMergeEnabled);
187+
config.setCodec(new Elasticsearch816Codec() {
188+
189+
@Override
190+
public DocValuesFormat getDocValuesFormatForField(String field) {
191+
return docValuesFormat;
192+
}
193+
});
194+
return config;
195+
}
196+
}

docs/changelog/125403.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125403
2+
summary: First step optimizing tsdb doc values codec merging
3+
area: Codec
4+
type: enhancement
5+
issues: []

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,4 +479,5 @@
479479
exports org.elasticsearch.inference.configuration;
480480
exports org.elasticsearch.monitor.metrics;
481481
exports org.elasticsearch.plugins.internal.rewriter to org.elasticsearch.inference;
482+
exports org.elasticsearch.index.codec.perfield;
482483
}

server/src/main/java/org/elasticsearch/index/codec/Elasticsearch816Codec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
import org.apache.lucene.codecs.lucene912.Lucene912Codec;
1818
import org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat;
1919
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat;
20-
import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat;
2120
import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat;
2221
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
22+
import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat;
2323
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;
2424

2525
/**
@@ -39,7 +39,7 @@ public PostingsFormat getPostingsFormatForField(String field) {
3939
};
4040

4141
private final DocValuesFormat defaultDVFormat;
42-
private final DocValuesFormat docValuesFormat = new PerFieldDocValuesFormat() {
42+
private final DocValuesFormat docValuesFormat = new XPerFieldDocValuesFormat() {
4343
@Override
4444
public DocValuesFormat getDocValuesFormatForField(String field) {
4545
return Elasticsearch816Codec.this.getDocValuesFormatForField(field);
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec;
11+
12+
import org.apache.lucene.codecs.DocValuesProducer;
13+
import org.apache.lucene.index.BinaryDocValues;
14+
import org.apache.lucene.index.FieldInfo;
15+
import org.apache.lucene.index.NumericDocValues;
16+
import org.apache.lucene.index.SortedDocValues;
17+
import org.apache.lucene.index.SortedNumericDocValues;
18+
import org.apache.lucene.index.SortedSetDocValues;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Implementation that allows wrapping another {@link DocValuesProducer} and alter behaviour of the wrapped instance.
24+
*/
25+
public abstract class FilterDocValuesProducer extends DocValuesProducer {
26+
private final DocValuesProducer in;
27+
28+
protected FilterDocValuesProducer(DocValuesProducer in) {
29+
this.in = in;
30+
}
31+
32+
@Override
33+
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
34+
return in.getNumeric(field);
35+
}
36+
37+
@Override
38+
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
39+
return in.getBinary(field);
40+
}
41+
42+
@Override
43+
public SortedDocValues getSorted(FieldInfo field) throws IOException {
44+
return in.getSorted(field);
45+
}
46+
47+
@Override
48+
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
49+
return in.getSortedNumeric(field);
50+
}
51+
52+
@Override
53+
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
54+
return in.getSortedSet(field);
55+
}
56+
57+
@Override
58+
public void checkIntegrity() throws IOException {
59+
in.checkIntegrity();
60+
}
61+
62+
@Override
63+
public void close() throws IOException {
64+
in.close();
65+
}
66+
67+
public DocValuesProducer getIn() {
68+
return in;
69+
}
70+
}

0 commit comments

Comments
 (0)