Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
optimizedMergeEnabled,
BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1,
true,
NUMERIC_LARGE_BLOCK_SHIFT
NUMERIC_LARGE_BLOCK_SHIFT,
false
);
config.setCodec(new Elasticsearch93Lucene104Codec() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion TSID_SINGLE_PREFIX_BYTE_FEATURE_FLAG = def(9_080_00_0, Version.LUCENE_10_4_0);
public static final IndexVersion SKIPPERS_ENABLED_BY_DEFAULT_IN_LOGSDB = def(9_081_0_00, Version.LUCENE_10_4_0);
public static final IndexVersion STORE_IGNORED_WILDCARD_FIELDS_IN_BINARY_DOC_VALUES = def(9_082_0_00, Version.LUCENE_10_4_0);

public static final IndexVersion WRITE_TSID_PREFIX_PARTITION = def(9_083_0_00, Version.LUCENE_10_4_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.elasticsearch.cluster.routing.TsidBuilder;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -198,10 +199,19 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
}

if (useTSDBDocValuesFormat(field)) {
var indexCreatedVersion = mapperService.getIndexSettings().getIndexVersionCreated();
IndexSettings indexSettings = mapperService.getIndexSettings();
var indexCreatedVersion = indexSettings.getIndexVersionCreated();
boolean useLargeNumericBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeNumericBlockSize();
boolean useLargeBinaryBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeBinaryBlockSize();
return TSDBDocValuesFormatFactory.createDocValuesFormat(indexCreatedVersion, useLargeNumericBlockSize, useLargeBinaryBlockSize);
boolean writePartitions = indexSettings.getMode() == IndexMode.TIME_SERIES
&& TsidBuilder.useSingleBytePrefixLayout(indexCreatedVersion)
&& indexCreatedVersion.onOrAfter(IndexVersions.WRITE_TSID_PREFIX_PARTITION);
return TSDBDocValuesFormatFactory.createDocValuesFormat(
indexCreatedVersion,
useLargeNumericBlockSize,
useLargeBinaryBlockSize,
writePartitions
);
}

return docValuesFormat;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;

import java.io.IOException;

/**
* An extension for doc-values which is the primary sort key, and values are grouped by some prefix bytes. One example is `_tsid` in
* time-series, where time-series can be grouped by the first few bytes, and the query engine can process a slice of time-series
* containing all "continuous" time-series sharing the same prefix. This is necessary instead of querying a slice of docs because some
* aggregations such as rate require all data points from a single time-series to be processed by the same operator.
*/
public interface PartitionedDocValues {
/**
* @param prefixes the prefix keys
* @param startDocs the startDocs of corresponding prefix keys
* @param numPartitions the actual number of prefixes available in the partition
*/
record PrefixPartitions(int[] prefixes, int[] startDocs, int numPartitions) {

}

/**
* Returns the prefixed partition from the doc-values of this field if exists.
* @param reused an existing prefix partitions can be reused to avoid allocating memory
*/
@Nullable
PrefixPartitions prefixPartitions(PrefixPartitions reused) throws IOException;

/**
* Whether this doc-values has prefix partition.
*/
boolean hasPrefixPartitions();

/**
* Check if the given index searcher can be partitioned by tsid prefix.
* @param searcher the index searcher to check
* @return true if all non-empty segments support tsid prefix partitioning
*/
static boolean canPartitionByTsidPrefix(IndexSearcher searcher) throws IOException {
for (LeafReaderContext leafContext : searcher.getLeafContexts()) {
var sortedDV = leafContext.reader().getSortedDocValues(TimeSeriesIdFieldMapper.NAME);
// empty segment
if (sortedDV == null) {
continue;
}
if (sortedDV instanceof PartitionedDocValues partition && partition.hasPrefixPartitions()) {
continue;
}
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;

Expand Down Expand Up @@ -79,6 +80,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
private final DocOffsetsCodec.Encoder docOffsetsEncoder;
private final int blockBytesThreshold;
private final int blockCountThreshold;
private final boolean writePrefixPartitions;

ES819TSDBDocValuesConsumer(
BinaryDVCompressionMode binaryDVCompressionMode,
Expand All @@ -94,7 +96,8 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
String dataCodec,
String dataExtension,
String metaCodec,
String metaExtension
String metaExtension,
boolean writePrefixPartitions
) throws IOException {
this.binaryDVCompressionMode = binaryDVCompressionMode;
this.enablePerBlockCompression = enablePerBlockCompression;
Expand Down Expand Up @@ -136,6 +139,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
maxDoc = state.segmentInfo.maxDoc();
this.skipIndexIntervalSize = skipIndexIntervalSize;
this.enableOptimizedMerge = enableOptimizedMerge;
this.writePrefixPartitions = writePrefixPartitions;
success = true;
} finally {
if (success == false) {
Expand All @@ -158,7 +162,7 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
writeSkipIndex(field, producer);
}

writeField(field, producer, -1, null);
writeField(field, producer, -1, null, null);
}

private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDocsWithValue, long numValues) {
Expand All @@ -168,8 +172,13 @@ private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDo
&& (numDocsWithValue / maxOrd) >= minDocsPerOrdinalForOrdinalRangeEncoding;
}

private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd, OffsetsAccumulator offsetsAccumulator)
throws IOException {
private long[] writeField(
FieldInfo field,
TsdbDocValuesProducer valuesProducer,
long maxOrd,
OffsetsAccumulator offsetsAccumulator,
PrefixedPartitionsWriter partitionsWriter
) throws IOException {
int numDocsWithValue = 0;
long numValues = 0;

Expand Down Expand Up @@ -200,6 +209,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
if (maxOrd == 1) {
// Special case for maxOrd of 1, signal -1 that no blocks will be written
meta.writeInt(-1);
if (partitionsWriter != null) {
partitionsWriter.trackDoc(0, 0);
}
} else if (shouldEncodeOrdinalRange(field, maxOrd, numDocsWithValue, numValues)) {
assert offsetsAccumulator == null;
// When a field is sorted, use ordinal range encode for long runs of the same ordinal.
Expand All @@ -218,6 +230,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
);
long lastOrd = 0;
startDocs.add(0);
if (partitionsWriter != null) {
partitionsWriter.trackDoc(0, lastOrd);
}
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
if (disiAccumulator != null) {
disiAccumulator.addDocId(doc);
Expand All @@ -226,6 +241,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
if (nextOrd != lastOrd) {
lastOrd = nextOrd;
startDocs.add(doc);
if (partitionsWriter != null) {
partitionsWriter.trackDoc(doc, nextOrd);
}
}
}
startDocs.add(maxDoc);
Expand Down Expand Up @@ -255,7 +273,11 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
offsetsAccumulator.addDoc(count);
}
for (int i = 0; i < count; ++i) {
buffer[bufferSize++] = values.nextValue();
final long v = values.nextValue();
buffer[bufferSize++] = v;
if (partitionsWriter != null) {
partitionsWriter.trackDoc(doc, v);
}
if (bufferSize == numericBlockSize) {
indexWriter.add(data.getFilePointer() - valuesDataOffset);
if (maxOrd >= 0) {
Expand Down Expand Up @@ -694,13 +716,23 @@ public long cost() {
if (addTypeByte) {
meta.writeByte((byte) 0); // multiValued (0 = singleValued)
}
SortedDocValues sorted = valuesProducer.getSorted(field);
int maxOrd = sorted.getValueCount();
writeField(field, producer, maxOrd, null);
addTermsDict(DocValues.singleton(valuesProducer.getSorted(field)));
final SortedDocValues sorted = valuesProducer.getSorted(field);
final int maxOrd = sorted.getValueCount();
var partitionWriter = primarySortFieldNumber == field.number && writePrefixPartitions ? new PrefixedPartitionsWriter() : null;
addTermsDict(DocValues.singleton(sorted), partitionWriter);
if (partitionWriter != null) {
partitionWriter.prepareForTrackingDocs();
}
writeField(field, producer, maxOrd, null, partitionWriter);
if (primarySortFieldNumber == field.number) {
meta.writeByte(partitionWriter != null ? (byte) 1 : (byte) 0);
}
if (partitionWriter != null) {
partitionWriter.flush(data, meta);
}
}

private void addTermsDict(SortedSetDocValues values) throws IOException {
private void addTermsDict(SortedSetDocValues values, @Nullable PrefixedPartitionsWriter partitionWriter) throws IOException {
final long size = values.getValueCount();
meta.writeVLong(size);

Expand Down Expand Up @@ -755,6 +787,9 @@ private void addTermsDict(SortedSetDocValues values) throws IOException {
}
bufferedOutput.writeBytes(term.bytes, term.offset + prefixLength, suffixLength);
}
if (partitionWriter != null) {
partitionWriter.trackTerm(term, ord);
}
maxLength = Math.max(maxLength, term.length);
previous.copyBytes(term);
++ord;
Expand Down Expand Up @@ -860,16 +895,16 @@ private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valu
int numDocsWithField = valuesProducer.mergeStats.sumNumDocsWithField();
long numValues = valuesProducer.mergeStats.sumNumValues();
if (numDocsWithField == numValues) {
writeField(field, valuesProducer, maxOrd, null);
writeField(field, valuesProducer, maxOrd, null, null);
} else {
assert numValues > numDocsWithField;
try (var accumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField)) {
writeField(field, valuesProducer, maxOrd, accumulator);
writeField(field, valuesProducer, maxOrd, accumulator, null);
accumulator.build(meta, data);
}
}
} else {
long[] stats = writeField(field, valuesProducer, maxOrd, null);
long[] stats = writeField(field, valuesProducer, maxOrd, null, null);
int numDocsWithField = Math.toIntExact(stats[0]);
long numValues = stats[1];
assert numValues >= numDocsWithField;
Expand Down Expand Up @@ -1012,7 +1047,7 @@ public long cost() {
}
}, maxOrd);

addTermsDict(valuesProducer.getSortedSet(field));
addTermsDict(valuesProducer.getSortedSet(field), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues
static final int VERSION_START = 0;
static final int VERSION_BINARY_DV_COMPRESSION = 1;
static final int VERSION_NUMERIC_LARGE_BLOCKS = 2;
static final int VERSION_CURRENT = VERSION_NUMERIC_LARGE_BLOCKS;
static final int VERSION_PREFIX_PARTITIONS = 3;
static final int VERSION_CURRENT = VERSION_PREFIX_PARTITIONS;

static final int TERMS_DICT_BLOCK_LZ4_SHIFT = 6;
static final int TERMS_DICT_BLOCK_LZ4_SIZE = 1 << TERMS_DICT_BLOCK_LZ4_SHIFT;
Expand Down Expand Up @@ -142,6 +143,7 @@ private static boolean getOptimizedMergeEnabledDefault() {
final DocOffsetsCodec docOffsetsCodec;
final int blockBytesThreshold;
final int blockCountThreshold;
final boolean writePrefixPartitions;

public static ES819TSDBDocValuesFormat getInstance(boolean useLargeNumericBlock) {
return useLargeNumericBlock ? new ES819TSDBDocValuesFormat(NUMERIC_LARGE_BLOCK_SHIFT) : new ES819TSDBDocValuesFormat();
Expand Down Expand Up @@ -218,7 +220,8 @@ public ES819TSDBDocValuesFormat(
binaryDVCompressionMode,
enablePerBlockCompression,
numericBlockShift,
DocOffsetsCodec.GROUPED_VINT
DocOffsetsCodec.GROUPED_VINT,
false
);
}

Expand All @@ -230,7 +233,8 @@ public ES819TSDBDocValuesFormat(
BinaryDVCompressionMode binaryDVCompressionMode,
final boolean enablePerBlockCompression,
final int numericBlockShift,
DocOffsetsCodec docOffsetsCodec
DocOffsetsCodec docOffsetsCodec,
boolean writePrefixPartitions
) {
this(
codecName,
Expand All @@ -242,7 +246,8 @@ public ES819TSDBDocValuesFormat(
numericBlockShift,
docOffsetsCodec,
BINARY_DV_BLOCK_BYTES_THRESHOLD_DEFAULT,
BINARY_DV_BLOCK_COUNT_THRESHOLD_DEFAULT
BINARY_DV_BLOCK_COUNT_THRESHOLD_DEFAULT,
writePrefixPartitions
);
}

Expand All @@ -256,7 +261,8 @@ public ES819TSDBDocValuesFormat(
final int numericBlockShift,
DocOffsetsCodec docOffsetsCodec,
int blockBytesThreshold,
int blockCountThreshold
int blockCountThreshold,
boolean writePrefixPartitions
) {
super(codecName);
assert numericBlockShift == NUMERIC_BLOCK_SHIFT || numericBlockShift == NUMERIC_LARGE_BLOCK_SHIFT : numericBlockShift;
Expand All @@ -272,6 +278,7 @@ public ES819TSDBDocValuesFormat(
this.docOffsetsCodec = docOffsetsCodec;
this.blockBytesThreshold = blockBytesThreshold;
this.blockCountThreshold = blockCountThreshold;
this.writePrefixPartitions = writePrefixPartitions;
}

@Override
Expand All @@ -290,7 +297,8 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept
DATA_CODEC,
DATA_EXTENSION,
META_CODEC,
META_EXTENSION
META_EXTENSION,
writePrefixPartitions
);
}

Expand Down
Loading
Loading