Skip to content

Commit 5658fce

Browse files
authored
Write prefix partition for tsid in tsdb codec (#144617)
Follow-up to #143955, which introduced a single-byte metric prefix in the tsid layout. This PR writes prefix partition metadata for the _tsid field. The _tsid field is grouped by its first 2 bytes - the metric prefix byte (byte-0) plus one random byte (byte-1) - yielding up to 256 partitions per metric. The partition records the starting document for each prefix group, allowing the query engine to slice data so that each slice contains only time-series sharing the same prefix. This enables ESQL to partition work across slices without splitting any individual time-series - a requirement for aggregations like rate. This should reduce memory usage and improve performance compared to time-interval partitioning, which requires multiple queries over fragmented data. The compute engine is not wired up yet, so no improvements are expected yet, but this change may cause a small regression in indexing throughput and storage overhead, which is expected to be trivial. Relates #143955
1 parent 924581c commit 5658fce

17 files changed

+721
-65
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
264264
optimizedMergeEnabled,
265265
BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1,
266266
true,
267-
NUMERIC_LARGE_BLOCK_SHIFT
267+
NUMERIC_LARGE_BLOCK_SHIFT,
268+
false
268269
);
269270
config.setCodec(new Elasticsearch93Lucene104Codec() {
270271
@Override

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private static Version parseUnchecked(String version) {
241241
public static final IndexVersion TSID_SINGLE_PREFIX_BYTE_FEATURE_FLAG = def(9_080_00_0, Version.LUCENE_10_4_0);
242242
public static final IndexVersion SKIPPERS_ENABLED_BY_DEFAULT_IN_LOGSDB = def(9_081_0_00, Version.LUCENE_10_4_0);
243243
public static final IndexVersion STORE_IGNORED_WILDCARD_FIELDS_IN_BINARY_DOC_VALUES = def(9_082_0_00, Version.LUCENE_10_4_0);
244-
244+
public static final IndexVersion WRITE_TSID_PREFIX_PARTITION = def(9_083_0_00, Version.LUCENE_10_4_0);
245245
/*
246246
* STOP! READ THIS FIRST! No, really,
247247
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.codecs.KnnVectorsFormat;
1414
import org.apache.lucene.codecs.PostingsFormat;
1515
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
16+
import org.elasticsearch.cluster.routing.TsidBuilder;
1617
import org.elasticsearch.common.util.BigArrays;
1718
import org.elasticsearch.core.Nullable;
1819
import org.elasticsearch.index.IndexMode;
@@ -198,10 +199,19 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
198199
}
199200

200201
if (useTSDBDocValuesFormat(field)) {
201-
var indexCreatedVersion = mapperService.getIndexSettings().getIndexVersionCreated();
202+
IndexSettings indexSettings = mapperService.getIndexSettings();
203+
var indexCreatedVersion = indexSettings.getIndexVersionCreated();
202204
boolean useLargeNumericBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeNumericBlockSize();
203205
boolean useLargeBinaryBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeBinaryBlockSize();
204-
return TSDBDocValuesFormatFactory.createDocValuesFormat(indexCreatedVersion, useLargeNumericBlockSize, useLargeBinaryBlockSize);
206+
boolean writePartitions = indexSettings.getMode() == IndexMode.TIME_SERIES
207+
&& TsidBuilder.useSingleBytePrefixLayout(indexCreatedVersion)
208+
&& indexCreatedVersion.onOrAfter(IndexVersions.WRITE_TSID_PREFIX_PARTITION);
209+
return TSDBDocValuesFormatFactory.createDocValuesFormat(
210+
indexCreatedVersion,
211+
useLargeNumericBlockSize,
212+
useLargeBinaryBlockSize,
213+
writePartitions
214+
);
205215
}
206216

207217
return docValuesFormat;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb;
11+
12+
import org.apache.lucene.index.LeafReaderContext;
13+
import org.apache.lucene.search.IndexSearcher;
14+
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* An extension for doc-values which is the primary sort key, and values are grouped by some prefix bytes. One example is `_tsid` in
21+
* time-series, where time-series can be grouped by the first few bytes, and the query engine can process a slice of time-series
22+
* containing all "continuous" time-series sharing the same prefix. This is necessary instead of querying a slice of docs because some
23+
* aggregations such as rate require all data points from a single time-series to be processed by the same operator.
24+
*/
25+
public interface PartitionedDocValues {
26+
/**
27+
* @param prefixes the prefix keys
28+
* @param startDocs the startDocs of corresponding prefix keys
29+
* @param numPartitions the actual number of prefixes available in the partition
30+
*/
31+
record PrefixPartitions(int[] prefixes, int[] startDocs, int numPartitions) {
32+
33+
}
34+
35+
/**
36+
* Returns the prefixed partition from the doc-values of this field if exists.
37+
* @param reused an existing prefix partitions can be reused to avoid allocating memory
38+
*/
39+
@Nullable
40+
PrefixPartitions prefixPartitions(PrefixPartitions reused) throws IOException;
41+
42+
/**
43+
* The number of bits used in prefix partitions
44+
*/
45+
int prefixPartitionBits();
46+
47+
/**
48+
* Check if the given index searcher can be partitioned by tsid prefix.
49+
* @param searcher the index searcher to check
50+
* @return true if all non-empty segments support tsid prefix partitioning
51+
*/
52+
static boolean canPartitionByTsidPrefix(IndexSearcher searcher) throws IOException {
53+
for (LeafReaderContext leafContext : searcher.getLeafContexts()) {
54+
var sortedDV = leafContext.reader().getSortedDocValues(TimeSeriesIdFieldMapper.NAME);
55+
// empty segment
56+
if (sortedDV == null) {
57+
continue;
58+
}
59+
if (sortedDV instanceof PartitionedDocValues partition && partition.prefixPartitionBits() > 0) {
60+
continue;
61+
}
62+
return false;
63+
}
64+
return true;
65+
}
66+
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.lucene.util.packed.DirectMonotonicWriter;
4545
import org.apache.lucene.util.packed.PackedInts;
4646
import org.elasticsearch.core.IOUtils;
47+
import org.elasticsearch.core.Nullable;
4748
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
4849
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
4950

@@ -79,6 +80,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
7980
private final DocOffsetsCodec.Encoder docOffsetsEncoder;
8081
private final int blockBytesThreshold;
8182
private final int blockCountThreshold;
83+
private final boolean writePrefixPartitions;
8284

8385
ES819TSDBDocValuesConsumer(
8486
BinaryDVCompressionMode binaryDVCompressionMode,
@@ -94,7 +96,8 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
9496
String dataCodec,
9597
String dataExtension,
9698
String metaCodec,
97-
String metaExtension
99+
String metaExtension,
100+
boolean writePrefixPartitions
98101
) throws IOException {
99102
this.binaryDVCompressionMode = binaryDVCompressionMode;
100103
this.enablePerBlockCompression = enablePerBlockCompression;
@@ -136,6 +139,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
136139
maxDoc = state.segmentInfo.maxDoc();
137140
this.skipIndexIntervalSize = skipIndexIntervalSize;
138141
this.enableOptimizedMerge = enableOptimizedMerge;
142+
this.writePrefixPartitions = writePrefixPartitions;
139143
success = true;
140144
} finally {
141145
if (success == false) {
@@ -158,7 +162,7 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
158162
writeSkipIndex(field, producer);
159163
}
160164

161-
writeField(field, producer, -1, null);
165+
writeField(field, producer, -1, null, null);
162166
}
163167

164168
private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDocsWithValue, long numValues) {
@@ -168,8 +172,13 @@ private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDo
168172
&& (numDocsWithValue / maxOrd) >= minDocsPerOrdinalForOrdinalRangeEncoding;
169173
}
170174

171-
private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd, OffsetsAccumulator offsetsAccumulator)
172-
throws IOException {
175+
private long[] writeField(
176+
FieldInfo field,
177+
TsdbDocValuesProducer valuesProducer,
178+
long maxOrd,
179+
OffsetsAccumulator offsetsAccumulator,
180+
PrefixedPartitionsWriter partitionsWriter
181+
) throws IOException {
173182
int numDocsWithValue = 0;
174183
long numValues = 0;
175184

@@ -200,6 +209,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
200209
if (maxOrd == 1) {
201210
// Special case for maxOrd of 1, signal -1 that no blocks will be written
202211
meta.writeInt(-1);
212+
if (partitionsWriter != null) {
213+
partitionsWriter.trackDoc(0, 0);
214+
}
203215
} else if (shouldEncodeOrdinalRange(field, maxOrd, numDocsWithValue, numValues)) {
204216
assert offsetsAccumulator == null;
205217
// When a field is sorted, use ordinal range encode for long runs of the same ordinal.
@@ -218,6 +230,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
218230
);
219231
long lastOrd = 0;
220232
startDocs.add(0);
233+
if (partitionsWriter != null) {
234+
partitionsWriter.trackDoc(0, lastOrd);
235+
}
221236
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
222237
if (disiAccumulator != null) {
223238
disiAccumulator.addDocId(doc);
@@ -226,6 +241,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
226241
if (nextOrd != lastOrd) {
227242
lastOrd = nextOrd;
228243
startDocs.add(doc);
244+
if (partitionsWriter != null) {
245+
partitionsWriter.trackDoc(doc, nextOrd);
246+
}
229247
}
230248
}
231249
startDocs.add(maxDoc);
@@ -255,7 +273,11 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
255273
offsetsAccumulator.addDoc(count);
256274
}
257275
for (int i = 0; i < count; ++i) {
258-
buffer[bufferSize++] = values.nextValue();
276+
final long v = values.nextValue();
277+
buffer[bufferSize++] = v;
278+
if (partitionsWriter != null) {
279+
partitionsWriter.trackDoc(doc, v);
280+
}
259281
if (bufferSize == numericBlockSize) {
260282
indexWriter.add(data.getFilePointer() - valuesDataOffset);
261283
if (maxOrd >= 0) {
@@ -694,13 +716,23 @@ public long cost() {
694716
if (addTypeByte) {
695717
meta.writeByte((byte) 0); // multiValued (0 = singleValued)
696718
}
697-
SortedDocValues sorted = valuesProducer.getSorted(field);
698-
int maxOrd = sorted.getValueCount();
699-
writeField(field, producer, maxOrd, null);
700-
addTermsDict(DocValues.singleton(valuesProducer.getSorted(field)));
719+
final SortedDocValues sorted = valuesProducer.getSorted(field);
720+
final int maxOrd = sorted.getValueCount();
721+
var partitionWriter = primarySortFieldNumber == field.number && writePrefixPartitions ? new PrefixedPartitionsWriter() : null;
722+
addTermsDict(DocValues.singleton(sorted), partitionWriter);
723+
if (partitionWriter != null) {
724+
partitionWriter.prepareForTrackingDocs();
725+
}
726+
writeField(field, producer, maxOrd, null, partitionWriter);
727+
if (primarySortFieldNumber == field.number) {
728+
meta.writeByte(partitionWriter != null ? (byte) 1 : (byte) 0);
729+
}
730+
if (partitionWriter != null) {
731+
partitionWriter.flush(data, meta);
732+
}
701733
}
702734

703-
private void addTermsDict(SortedSetDocValues values) throws IOException {
735+
private void addTermsDict(SortedSetDocValues values, @Nullable PrefixedPartitionsWriter partitionWriter) throws IOException {
704736
final long size = values.getValueCount();
705737
meta.writeVLong(size);
706738

@@ -755,6 +787,9 @@ private void addTermsDict(SortedSetDocValues values) throws IOException {
755787
}
756788
bufferedOutput.writeBytes(term.bytes, term.offset + prefixLength, suffixLength);
757789
}
790+
if (partitionWriter != null) {
791+
partitionWriter.trackTerm(term, ord);
792+
}
758793
maxLength = Math.max(maxLength, term.length);
759794
previous.copyBytes(term);
760795
++ord;
@@ -860,16 +895,16 @@ private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valu
860895
int numDocsWithField = valuesProducer.mergeStats.sumNumDocsWithField();
861896
long numValues = valuesProducer.mergeStats.sumNumValues();
862897
if (numDocsWithField == numValues) {
863-
writeField(field, valuesProducer, maxOrd, null);
898+
writeField(field, valuesProducer, maxOrd, null, null);
864899
} else {
865900
assert numValues > numDocsWithField;
866901
try (var accumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField)) {
867-
writeField(field, valuesProducer, maxOrd, accumulator);
902+
writeField(field, valuesProducer, maxOrd, accumulator, null);
868903
accumulator.build(meta, data);
869904
}
870905
}
871906
} else {
872-
long[] stats = writeField(field, valuesProducer, maxOrd, null);
907+
long[] stats = writeField(field, valuesProducer, maxOrd, null, null);
873908
int numDocsWithField = Math.toIntExact(stats[0]);
874909
long numValues = stats[1];
875910
assert numValues >= numDocsWithField;
@@ -1012,7 +1047,7 @@ public long cost() {
10121047
}
10131048
}, maxOrd);
10141049

1015-
addTermsDict(valuesProducer.getSortedSet(field));
1050+
addTermsDict(valuesProducer.getSortedSet(field), null);
10161051
}
10171052

10181053
@Override

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues
5454
static final int VERSION_START = 0;
5555
static final int VERSION_BINARY_DV_COMPRESSION = 1;
5656
static final int VERSION_NUMERIC_LARGE_BLOCKS = 2;
57-
static final int VERSION_CURRENT = VERSION_NUMERIC_LARGE_BLOCKS;
57+
// version 3 was introduced for large binary/numeric blocks
58+
static final int VERSION_PREFIX_PARTITIONS = 4;
59+
static final int VERSION_CURRENT = VERSION_PREFIX_PARTITIONS;
5860

5961
static final int TERMS_DICT_BLOCK_LZ4_SHIFT = 6;
6062
static final int TERMS_DICT_BLOCK_LZ4_SIZE = 1 << TERMS_DICT_BLOCK_LZ4_SHIFT;
@@ -142,6 +144,7 @@ private static boolean getOptimizedMergeEnabledDefault() {
142144
final DocOffsetsCodec docOffsetsCodec;
143145
final int blockBytesThreshold;
144146
final int blockCountThreshold;
147+
final boolean writePrefixPartitions;
145148

146149
public static ES819TSDBDocValuesFormat getInstance(boolean useLargeNumericBlock) {
147150
return useLargeNumericBlock ? new ES819TSDBDocValuesFormat(NUMERIC_LARGE_BLOCK_SHIFT) : new ES819TSDBDocValuesFormat();
@@ -218,7 +221,8 @@ public ES819TSDBDocValuesFormat(
218221
binaryDVCompressionMode,
219222
enablePerBlockCompression,
220223
numericBlockShift,
221-
DocOffsetsCodec.GROUPED_VINT
224+
DocOffsetsCodec.GROUPED_VINT,
225+
false
222226
);
223227
}
224228

@@ -230,7 +234,8 @@ public ES819TSDBDocValuesFormat(
230234
BinaryDVCompressionMode binaryDVCompressionMode,
231235
final boolean enablePerBlockCompression,
232236
final int numericBlockShift,
233-
DocOffsetsCodec docOffsetsCodec
237+
DocOffsetsCodec docOffsetsCodec,
238+
boolean writePrefixPartitions
234239
) {
235240
this(
236241
codecName,
@@ -242,7 +247,8 @@ public ES819TSDBDocValuesFormat(
242247
numericBlockShift,
243248
docOffsetsCodec,
244249
BINARY_DV_BLOCK_BYTES_THRESHOLD_DEFAULT,
245-
BINARY_DV_BLOCK_COUNT_THRESHOLD_DEFAULT
250+
BINARY_DV_BLOCK_COUNT_THRESHOLD_DEFAULT,
251+
writePrefixPartitions
246252
);
247253
}
248254

@@ -256,7 +262,8 @@ public ES819TSDBDocValuesFormat(
256262
final int numericBlockShift,
257263
DocOffsetsCodec docOffsetsCodec,
258264
int blockBytesThreshold,
259-
int blockCountThreshold
265+
int blockCountThreshold,
266+
boolean writePrefixPartitions
260267
) {
261268
super(codecName);
262269
assert numericBlockShift == NUMERIC_BLOCK_SHIFT || numericBlockShift == NUMERIC_LARGE_BLOCK_SHIFT : numericBlockShift;
@@ -272,6 +279,7 @@ public ES819TSDBDocValuesFormat(
272279
this.docOffsetsCodec = docOffsetsCodec;
273280
this.blockBytesThreshold = blockBytesThreshold;
274281
this.blockCountThreshold = blockCountThreshold;
282+
this.writePrefixPartitions = writePrefixPartitions;
275283
}
276284

277285
@Override
@@ -290,7 +298,8 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept
290298
DATA_CODEC,
291299
DATA_EXTENSION,
292300
META_CODEC,
293-
META_EXTENSION
301+
META_EXTENSION,
302+
writePrefixPartitions
294303
);
295304
}
296305

0 commit comments

Comments
 (0)