Skip to content

Commit 184c51b

Browse files
authored
Use a new synthetic _id format for time-series datastreams (#137274)
This pull request follows #136810 and introduces a new format for documents _id fields in time-series datastreams. In order to test this new format, the TSDB synthetic terms and postings format implementations are also changed. The current _id format is composed of a routing hash, the hashed value of the _tsid and a timestamp. While it is possible to extract the routing hash and the timestamp from a document _id value, it is not possible to extract the original _tsid value. This is an issue for synthetic _id as the document _id value is not indexed anymore: instead the synthetic _id is computed at runtime from the value of the routing hash, _tsid and timestamp. Therefore we need to be able to extract the routing hash/_tsid/timestamp values from the _id value and vice-versa. The format for the synthetic _id in this pull request has been changed to be: _tsid (variable length) Long.MAX_VALUE - timestamp (unsigned long on 8 bytes encoded using big endian) routing hash _ts_routing_hash (4 bytes) Extracting the values from the _id is then used for routing GETs and DELETEs requests to the appropriate shard or setting the _tsid/timestamp/_ts_routing_hash fields in tombstone documents. Note that in searches, the document _id is built using the existing TsIdLoader (that has been adjusted). It is also important that the generated _id can be sorted lexicographically, as Lucene stops applying doc values updates when it seeks to a term that is greater than the one used in the soft-update. The ordering of the arrays of bytes representing the _id must match the ordering of documents in the segment, and to do that the timestamp value is stored in the array as Long.MAX_VALUE - timestamp. Docs with higher timestamp are then sorted first. The SyntheticIdTermsEnum and SyntheticIdPostingsEnum introduced in #136810 had to be adjusted for the new _id format. We expect their implementation to be somewhat slow as it requires several lookups to work. In a different change we'll add a bloom filter on top of these enumerations to avoid costly lookups. Relates #136304
1 parent 136677b commit 184c51b

File tree

20 files changed

+1033
-275
lines changed

20 files changed

+1033
-275
lines changed

docs/changelog/137274.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137274
2+
summary: Use a new synthetic `_id` format for time-series datastreams
3+
area: TSDB
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBSyntheticIdsIT.java

Lines changed: 259 additions & 68 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,8 @@ public Iterator<Setting<?>> settings() {
705705
@Nullable
706706
private final IndexReshardingMetadata reshardingMetadata;
707707

708+
private final boolean useTimeSeriesSyntheticId;
709+
708710
private IndexMetadata(
709711
final Index index,
710712
final long version,
@@ -754,7 +756,8 @@ private IndexMetadata(
754756
@Nullable final IndexMetadataStats stats,
755757
@Nullable final Double writeLoadForecast,
756758
@Nullable Long shardSizeInBytesForecast,
757-
@Nullable IndexReshardingMetadata reshardingMetadata
759+
@Nullable IndexReshardingMetadata reshardingMetadata,
760+
final boolean useTimeSeriesSyntheticId
758761
) {
759762
this.index = index;
760763
this.version = version;
@@ -815,6 +818,7 @@ private IndexMetadata(
815818
this.shardSizeInBytesForecast = shardSizeInBytesForecast;
816819
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
817820
this.reshardingMetadata = reshardingMetadata;
821+
this.useTimeSeriesSyntheticId = useTimeSeriesSyntheticId;
818822
}
819823

820824
IndexMetadata withMappingMetadata(MappingMetadata mapping) {
@@ -870,7 +874,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
870874
this.stats,
871875
this.writeLoadForecast,
872876
this.shardSizeInBytesForecast,
873-
this.reshardingMetadata
877+
this.reshardingMetadata,
878+
this.useTimeSeriesSyntheticId
874879
);
875880
}
876881

@@ -933,7 +938,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
933938
this.stats,
934939
this.writeLoadForecast,
935940
this.shardSizeInBytesForecast,
936-
this.reshardingMetadata
941+
this.reshardingMetadata,
942+
this.useTimeSeriesSyntheticId
937943
);
938944
}
939945

@@ -1004,7 +1010,8 @@ public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) {
10041010
this.stats,
10051011
this.writeLoadForecast,
10061012
this.shardSizeInBytesForecast,
1007-
this.reshardingMetadata
1013+
this.reshardingMetadata,
1014+
this.useTimeSeriesSyntheticId
10081015
);
10091016
}
10101017

@@ -1066,7 +1073,8 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind
10661073
this.stats,
10671074
this.writeLoadForecast,
10681075
this.shardSizeInBytesForecast,
1069-
this.reshardingMetadata
1076+
this.reshardingMetadata,
1077+
this.useTimeSeriesSyntheticId
10701078
);
10711079
}
10721080

@@ -1123,7 +1131,8 @@ public IndexMetadata withIncrementedVersion() {
11231131
this.stats,
11241132
this.writeLoadForecast,
11251133
this.shardSizeInBytesForecast,
1126-
this.reshardingMetadata
1134+
this.reshardingMetadata,
1135+
this.useTimeSeriesSyntheticId
11271136
);
11281137
}
11291138

@@ -1314,6 +1323,13 @@ public Instant getTimeSeriesEnd() {
13141323
return timeSeriesEnd;
13151324
}
13161325

1326+
/**
1327+
* @return whether the index is a time-series index that uses synthetic ids or not.
1328+
*/
1329+
public boolean useTimeSeriesSyntheticId() {
1330+
return useTimeSeriesSyntheticId;
1331+
}
1332+
13171333
/**
13181334
* Return the concrete mapping for this index or {@code null} if this index has no mappings at all.
13191335
*/
@@ -2497,6 +2513,16 @@ IndexMetadata build(boolean repair) {
24972513
String indexModeString = settings.get(IndexSettings.MODE.getKey());
24982514
final IndexMode indexMode = indexModeString != null ? IndexMode.fromString(indexModeString.toLowerCase(Locale.ROOT)) : null;
24992515
final boolean isTsdb = indexMode == IndexMode.TIME_SERIES;
2516+
boolean useTimeSeriesSyntheticId = false;
2517+
if (isTsdb
2518+
&& IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG
2519+
&& indexCreatedVersion.onOrAfter(IndexVersions.TIME_SERIES_USE_SYNTHETIC_ID)) {
2520+
var setting = settings.get(IndexSettings.USE_SYNTHETIC_ID.getKey());
2521+
if (setting != null && setting.equalsIgnoreCase(Boolean.TRUE.toString())) {
2522+
assert IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG;
2523+
useTimeSeriesSyntheticId = true;
2524+
}
2525+
}
25002526
return new IndexMetadata(
25012527
new Index(index, uuid),
25022528
version,
@@ -2546,7 +2572,8 @@ IndexMetadata build(boolean repair) {
25462572
stats,
25472573
indexWriteLoadForecast,
25482574
shardSizeInBytesForecast,
2549-
reshardingMetadata
2575+
reshardingMetadata,
2576+
useTimeSeriesSyntheticId
25502577
);
25512578
}
25522579

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.index.IndexVersion;
3030
import org.elasticsearch.index.IndexVersions;
3131
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
32+
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
3233
import org.elasticsearch.transport.Transports;
3334
import org.elasticsearch.xcontent.XContentParser;
3435
import org.elasticsearch.xcontent.XContentParserConfiguration;
@@ -321,6 +322,7 @@ public abstract static class ExtractFromSource extends IndexRouting {
321322
protected final XContentParserConfiguration parserConfig;
322323
private final IndexMode indexMode;
323324
private final boolean trackTimeSeriesRoutingHash;
325+
private final boolean useTimeSeriesSyntheticId;
324326
private final boolean addIdWithRoutingHash;
325327
private int hash = Integer.MAX_VALUE;
326328

@@ -333,6 +335,7 @@ public abstract static class ExtractFromSource extends IndexRouting {
333335
assert indexMode != null : "Index mode must be set for ExtractFromSource routing";
334336
this.trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES
335337
&& metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
338+
this.useTimeSeriesSyntheticId = metadata.useTimeSeriesSyntheticId();
336339
addIdWithRoutingHash = indexMode == IndexMode.LOGSDB;
337340
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(includePaths), null, true);
338341
}
@@ -417,10 +420,19 @@ private int idToHash(String id) {
417420
if (idBytes.length < 4) {
418421
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
419422
}
420-
// For TSDB, the hash is stored as the id prefix.
421-
// For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
422-
// see IndexRequest#autoGenerateTimeBasedId.
423-
return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0));
423+
int hash;
424+
if (addIdWithRoutingHash) {
425+
// For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
426+
// see IndexRequest#autoGenerateTimeBasedId.
427+
hash = ByteUtils.readIntLE(idBytes, idBytes.length - 9);
428+
} else if (useTimeSeriesSyntheticId) {
429+
// For TSDB with synthetic ids, the hash is stored as the id suffix.
430+
hash = TsidExtractingIdFieldMapper.extractRoutingHashFromSyntheticId(new BytesRef(idBytes));
431+
} else {
432+
// For TSDB, the hash is stored as the id prefix.
433+
hash = ByteUtils.readIntLE(idBytes, 0);
434+
}
435+
return hashToShardId(hash);
424436
}
425437

426438
@Override
@@ -510,7 +522,6 @@ public static class ForIndexDimensions extends ExtractFromSource {
510522

511523
@Override
512524
protected int hashSource(IndexRequest indexRequest) {
513-
// System.out.println("hashSource for tsid");
514525
BytesRef tsid = indexRequest.tsid();
515526
if (tsid == null) {
516527
tsid = buildTsid(indexRequest.getContentType(), indexRequest.indexSource().bytes());

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
import org.apache.lucene.index.LeafReaderContext;
1515
import org.apache.lucene.util.BytesRef;
1616
import org.apache.lucene.util.CloseableThreadLocal;
17-
import org.elasticsearch.common.util.ByteUtils;
1817
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1918
import org.elasticsearch.core.Assertions;
19+
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
2020

2121
import java.io.IOException;
2222
import java.util.Base64;
@@ -153,22 +153,30 @@ public static DocIdAndVersion timeSeriesLoadDocIdAndVersion(IndexReader reader,
153153
* This allows this method to know whether there is no document with the specified id without loading the docid for
154154
* the specified id.
155155
*
156-
* @param reader The reader load docid, version and seqno from.
157-
* @param uid The term that describes the uid of the document to load docid, version and seqno for.
158-
* @param id The id that contains the encoded timestamp. The timestamp is used to skip checking the id for entire segments.
159-
* @param loadSeqNo Whether to load sequence number from _seq_no doc values field.
156+
* @param reader The reader load docid, version and seqno from.
157+
* @param uid The term that describes the uid of the document to load docid, version and seqno for.
158+
* @param id The id that contains the encoded timestamp. The timestamp is used to skip checking the id for entire segments.
159+
* @param loadSeqNo Whether to load sequence number from _seq_no doc values field.
160+
* @param useSyntheticId Whether the id is a synthetic (true) or standard (false ) document id.
160161
* @return the internal doc ID and version for the specified term from the specified reader or
161162
* returning <code>null</code> if no document was found for the specified id
162163
* @throws IOException In case of an i/o related failure
163164
*/
164-
public static DocIdAndVersion timeSeriesLoadDocIdAndVersion(IndexReader reader, BytesRef uid, String id, boolean loadSeqNo)
165-
throws IOException {
166-
byte[] idAsBytes = Base64.getUrlDecoder().decode(id);
167-
assert idAsBytes.length == 20;
168-
// id format: [4 bytes (basic hash routing fields), 8 bytes prefix of 128 murmurhash dimension fields, 8 bytes
169-
// @timestamp)
170-
long timestamp = ByteUtils.readLongBE(idAsBytes, 12);
171-
165+
public static DocIdAndVersion timeSeriesLoadDocIdAndVersion(
166+
IndexReader reader,
167+
BytesRef uid,
168+
String id,
169+
boolean loadSeqNo,
170+
boolean useSyntheticId
171+
) throws IOException {
172+
final long timestamp;
173+
if (useSyntheticId) {
174+
assert uid.equals(new BytesRef(Base64.getUrlDecoder().decode(id)));
175+
timestamp = TsidExtractingIdFieldMapper.extractTimestampFromSyntheticId(uid);
176+
} else {
177+
byte[] idAsBytes = Base64.getUrlDecoder().decode(id);
178+
timestamp = TsidExtractingIdFieldMapper.extractTimestampFromId(idAsBytes);
179+
}
172180
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, true);
173181
List<LeafReaderContext> leaves = reader.leaves();
174182
// iterate in default order, the segments should be sorted by DataStream#TIMESERIES_LEAF_READERS_SORTER

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,19 @@ public boolean isES87TSDBCodecEnabled() {
690690
false,
691691
new Setting.Validator<>() {
692692
@Override
693-
public void validate(Boolean value) {}
693+
public void validate(Boolean enabled) {
694+
if (enabled) {
695+
if (TSDB_SYNTHETIC_ID_FEATURE_FLAG == false) {
696+
throw new IllegalArgumentException(
697+
String.format(
698+
Locale.ROOT,
699+
"The setting [%s] is only permitted when the feature flag is enabled.",
700+
USE_SYNTHETIC_ID.getKey()
701+
)
702+
);
703+
}
704+
}
705+
}
694706

695707
@Override
696708
public void validate(Boolean enabled, Map<Setting<?>, Object> settings) {
@@ -983,7 +995,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
983995
private final boolean recoverySourceEnabled;
984996
private final boolean recoverySourceSyntheticEnabled;
985997
private final boolean useDocValuesSkipper;
986-
private final boolean tsdbSyntheticId;
998+
private final boolean useTimeSeriesSyntheticId;
987999

9881000
/**
9891001
* The maximum number of refresh listeners allows on this shard.
@@ -1170,8 +1182,28 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11701182
&& scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);
11711183
useDocValuesSkipper = DOC_VALUES_SKIPPER && scopedSettings.get(USE_DOC_VALUES_SKIPPER);
11721184
seqNoIndexOptions = scopedSettings.get(SEQ_NO_INDEX_OPTIONS_SETTING);
1173-
tsdbSyntheticId = TSDB_SYNTHETIC_ID_FEATURE_FLAG && scopedSettings.get(USE_SYNTHETIC_ID);
1174-
assert tsdbSyntheticId == false || mode == IndexMode.TIME_SERIES : mode;
1185+
final var useSyntheticId = IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG && scopedSettings.get(USE_SYNTHETIC_ID);
1186+
if (indexMetadata.useTimeSeriesSyntheticId() != useSyntheticId) {
1187+
assert false;
1188+
throw new IllegalArgumentException(
1189+
String.format(
1190+
Locale.ROOT,
1191+
"The setting [%s] is set to [%s] but index metadata has a different value [%s].",
1192+
USE_SYNTHETIC_ID.getKey(),
1193+
useSyntheticId,
1194+
indexMetadata.useTimeSeriesSyntheticId()
1195+
)
1196+
);
1197+
}
1198+
if (useSyntheticId) {
1199+
assert TSDB_SYNTHETIC_ID_FEATURE_FLAG;
1200+
assert indexMetadata.useTimeSeriesSyntheticId();
1201+
assert indexMetadata.getIndexMode() == IndexMode.TIME_SERIES : indexMetadata.getIndexMode();
1202+
assert indexMetadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_USE_SYNTHETIC_ID);
1203+
useTimeSeriesSyntheticId = true;
1204+
} else {
1205+
useTimeSeriesSyntheticId = false;
1206+
}
11751207
if (recoverySourceSyntheticEnabled) {
11761208
if (DiscoveryNode.isStateless(settings)) {
11771209
throw new IllegalArgumentException("synthetic recovery source is only allowed in stateful");
@@ -1907,8 +1939,8 @@ public boolean useDocValuesSkipper() {
19071939
/**
19081940
* @return Whether the index is a time-series index that use synthetic ids.
19091941
*/
1910-
public boolean useTsdbSyntheticId() {
1911-
return tsdbSyntheticId;
1942+
public boolean useTimeSeriesSyntheticId() {
1943+
return useTimeSeriesSyntheticId;
19121944
}
19131945

19141946
/**

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ private static Version parseUnchecked(String version) {
192192

193193
public static final IndexVersion REENABLED_TIMESTAMP_DOC_VALUES_SPARSE_INDEX = def(9_042_0_00, Version.LUCENE_10_3_1);
194194
public static final IndexVersion SKIPPERS_ENABLED_BY_DEFAULT = def(9_043_0_00, Version.LUCENE_10_3_1);
195+
public static final IndexVersion TIME_SERIES_USE_SYNTHETIC_ID = def(9_044_0_00, Version.LUCENE_10_3_1);
195196

196197
/*
197198
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/codec/CodecService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays)
6767
for (String codec : Codec.availableCodecs()) {
6868
codecs.put(codec, Codec.forName(codec));
6969
}
70-
final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTsdbSyntheticId();
70+
final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTimeSeriesSyntheticId();
7171
assert useTsdbSyntheticId == false || mapperService.getIndexSettings().getMode() == IndexMode.TIME_SERIES;
7272

7373
this.codecs = codecs.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {

0 commit comments

Comments
 (0)