Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark.index.codec.tsdb;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.lucene101.Lucene101Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.profile.AsyncProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.io.IOException;
import java.nio.file.Files;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Threads(1)
@Warmup(iterations = 0)
@Measurement(iterations = 1)
public class TSDBDocValuesSparseMergeBenchmark {

@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private static final String TIMESTAMP_FIELD = "@timestamp";
private static final String HOSTNAME_FIELD = "host.name";
private static final long BASE_TIMESTAMP = 1704067200000L;

private IndexWriter indexWriter;
private ExecutorService executorService;

public static void main(String[] args) throws RunnerException {
final Options options = new OptionsBuilder().include(TSDBDocValuesSparseMergeBenchmark.class.getSimpleName())
.addProfiler(AsyncProfiler.class)
.build();

new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws IOException {
executorService = Executors.newSingleThreadExecutor();

final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
indexWriter = createIndex(tempDirectoryWithoutDocValuesSkipper);
}

private IndexWriter createIndex(final Directory directory) throws IOException {

final IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer());
// NOTE: index sort config matching LogsDB's sort order
config.setIndexSort(
new Sort(
new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false),
new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true)
)
);
ES87TSDBDocValuesFormat docValuesFormat = new ES87TSDBDocValuesFormat();
config.setCodec(new Lucene101Codec() {

@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
return docValuesFormat;
}
});

long counter1 = 0;
long counter2 = 10_000_000;
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
int numHosts = 1000;
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };

final Random random = new Random(seed);
IndexWriter indexWriter = new IndexWriter(directory, config);
for (int i = 0; i < nDocs; i++) {
final Document doc = new Document();

final int batchIndex = i / numHosts;
final String hostName = "host-" + batchIndex;
// Slightly vary the timestamp in each document
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);

doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));

if (i % 2 == 0) {
doc.add(new NumericDocValuesField("counter_1", counter1++));
}
if (i % 2 == 1) {
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
}
if (i % 2 == 0) {
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
}
if (i % 2 == 1) {
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
}
if (i % 2 == 0) {
int numTags = tags.length % (i + 1);
for (int j = 0; j < numTags; j++) {
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
}
}
if (i % 2 == 1) {
int randomIndex = i % tags.length;
doc.add(new SortedDocValuesField("other_tag", new BytesRef(tags[randomIndex])));
}

indexWriter.addDocument(doc);
}
indexWriter.commit();
return indexWriter;
}

@Benchmark
public void forceMerge() throws IOException {
forceMerge(indexWriter);
}

private void forceMerge(final IndexWriter indexWriter) throws IOException {
indexWriter.forceMerge(1);
}

@TearDown(Level.Trial)
public void tearDown() {
if (executorService != null) {
executorService.shutdown();
try {
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.LongsRef;
import org.apache.lucene.util.RoaringDocIdSet;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.compress.LZ4;
import org.apache.lucene.util.packed.DirectMonotonicWriter;
Expand Down Expand Up @@ -124,28 +125,11 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon
final int count = values.docValueCount();
numValues += count;
}

if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getSortedNumeric(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
meta.writeLong(numValues);

// TODO: write bwc tests
// TODO: write DISI to temp file and append it later to data part:
var docIdSetBuilder = new RoaringDocIdSet.Builder(maxDoc);
if (numValues > 0) {
// Special case for maxOrd of 1, signal -1 that no blocks will be written
meta.writeInt(maxOrd != 1 ? ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
Expand All @@ -166,6 +150,7 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon
values = valuesProducer.getSortedNumeric(field);
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
docIdSetBuilder.add(doc);
final int count = values.docValueCount();
for (int i = 0; i < count; ++i) {
buffer[bufferSize++] = values.nextValue();
Expand Down Expand Up @@ -205,6 +190,35 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon
meta.writeLong(valuesDataOffset);
meta.writeLong(valuesDataLength);
}
if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
final short jumpTableEntryCount;
if (maxOrd != 1) {
var bitSet = docIdSetBuilder.build();
var iterator = bitSet.iterator();
if (iterator == null) {
iterator = DocIdSetIterator.empty();
}
jumpTableEntryCount = IndexedDISI.writeBitSet(iterator, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
} else {
values = valuesProducer.getSortedNumeric(field);
jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}

return new long[] { numDocsWithValue, numValues };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class ES87TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesF
static final String META_CODEC = "ES87TSDBDocValuesMetadata";
static final String META_EXTENSION = "dvm";
static final int VERSION_START = 0;
static final int VERSION_CURRENT = VERSION_START;
static final int VERSION_DISI_CHANGE = 1;
static final int VERSION_CURRENT = VERSION_DISI_CHANGE;
static final byte NUMERIC = 0;
static final byte BINARY = 1;
static final byte SORTED = 2;
Expand Down
Loading