Skip to content

Commit da4f894

Browse files
authored
Introduce basic support for synthetic ids (#136810)
This change introduces basic support for the synthetic id feature, in which document identifiers (_id) are not indexed but instead computed at runtime from other document fields (_tsid and @timestamp). This pull request adds a new tsdb_synthetic_id flag along with a new index.mapping.use_synthetic_id index setting that can only be applied to time series indices. When both are enabled, the index uses a specialized Lucene codec that ensures the _id field is never indexed, and also overrides the field informations when a segment is opened to expose the _id field as if it has postings. This way it is still possible to soft-update Lucene documents using a term value while not paying the price of indexing such a field. Note that the synthetic _id field is still declared as stored since we plan to use the stored value in the future in a bloom filter. This change also introduces a new posting format to support synthetic _id. This implementation is not important for now: it only exists to demonstrate that doc values updates can work with a synthetic id posting format. The implementation uses the current _id format of TSDB indices which is hashed from a routing hash value, the tsid and the timestamp. This format cannot be unhashed to extract the real _tsid value and is also not naturally sorted, something that Lucene expects when applying doc values updates. To work around this the integration test only updates the first documents of the segments. In a follow up we'd like to change the _id format for TSDS to be [_tsid, @timestamp] and maybe also include the routing hash. Relates ES-13191 Relates #136304
1 parent 93d08aa commit da4f894

File tree

16 files changed

+1120
-12
lines changed

16 files changed

+1120
-12
lines changed
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams;
11+
12+
import org.apache.lucene.tests.util.LuceneTestCase;
13+
import org.elasticsearch.action.DocWriteRequest;
14+
import org.elasticsearch.action.DocWriteResponse;
15+
import org.elasticsearch.action.admin.indices.diskusage.AnalyzeIndexDiskUsageRequest;
16+
import org.elasticsearch.action.admin.indices.diskusage.AnalyzeIndexDiskUsageTestUtils;
17+
import org.elasticsearch.action.admin.indices.diskusage.IndexDiskUsageStats;
18+
import org.elasticsearch.action.admin.indices.diskusage.TransportAnalyzeIndexDiskUsageAction;
19+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
20+
import org.elasticsearch.action.bulk.BulkItemResponse;
21+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
22+
import org.elasticsearch.cluster.metadata.Template;
23+
import org.elasticsearch.common.compress.CompressedXContent;
24+
import org.elasticsearch.common.time.DateFormatter;
25+
import org.elasticsearch.index.IndexMode;
26+
import org.elasticsearch.index.IndexSettings;
27+
import org.elasticsearch.index.mapper.IdFieldMapper;
28+
import org.elasticsearch.plugins.Plugin;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.test.InternalSettingsPlugin;
31+
import org.elasticsearch.test.junit.annotations.TestLogging;
32+
import org.elasticsearch.xcontent.XContentBuilder;
33+
import org.elasticsearch.xcontent.XContentFactory;
34+
35+
import java.io.IOException;
36+
import java.time.Instant;
37+
import java.util.ArrayList;
38+
import java.util.Collection;
39+
import java.util.List;
40+
import java.util.Locale;
41+
import java.util.Map;
42+
import java.util.Random;
43+
44+
import static org.elasticsearch.common.time.FormatNames.STRICT_DATE_OPTIONAL_TIME;
45+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
46+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
47+
import static org.hamcrest.Matchers.containsString;
48+
import static org.hamcrest.Matchers.equalTo;
49+
import static org.hamcrest.Matchers.notNullValue;
50+
51+
/**
52+
* Test suite for time series indices that use synthetic ids for documents.
53+
* <p>
54+
* Synthetic _id fields are not indexed in Lucene, instead they are generated on demand by concatenating the values of two other fields of
55+
* the document (typically the {@code @timestamp} and {@code _tsid} fields).
56+
* </p>
57+
*/
58+
@LuceneTestCase.SuppressCodecs("*") // requires codecs used in production only
59+
public class TSDBSyntheticIdsIT extends ESIntegTestCase {
60+
61+
private static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern(STRICT_DATE_OPTIONAL_TIME.getName());
62+
63+
@Override
64+
protected Collection<Class<? extends Plugin>> nodePlugins() {
65+
var plugins = new ArrayList<>(super.nodePlugins());
66+
plugins.add(InternalSettingsPlugin.class);
67+
plugins.add(DataStreamsPlugin.class);
68+
return plugins;
69+
}
70+
71+
public void testInvalidIndexMode() {
72+
assumeTrue("Test should only run with feature flag", IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG);
73+
final var indexName = randomIdentifier();
74+
var randomNonTsdbIndexMode = randomValueOtherThan(IndexMode.TIME_SERIES, () -> randomFrom(IndexMode.values()));
75+
76+
var exception = expectThrows(
77+
IllegalArgumentException.class,
78+
() -> createIndex(
79+
indexName,
80+
indexSettings(1, 0).put(IndexSettings.MODE.getKey(), randomNonTsdbIndexMode)
81+
.put(IndexSettings.USE_SYNTHETIC_ID.getKey(), true)
82+
.build()
83+
)
84+
);
85+
assertThat(
86+
exception.getMessage(),
87+
containsString(
88+
"The setting ["
89+
+ IndexSettings.USE_SYNTHETIC_ID.getKey()
90+
+ "] is only permitted when [index.mode] is set to [TIME_SERIES]. Current mode: ["
91+
+ randomNonTsdbIndexMode.getName().toUpperCase(Locale.ROOT)
92+
+ "]."
93+
)
94+
);
95+
}
96+
97+
@TestLogging(reason = "debug", value = "org.elasticsearch.index.engine.Engine:TRACE")
98+
public void testSyntheticId() throws Exception {
99+
assumeTrue("Test should only run with feature flag", IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG);
100+
final var indexName = randomIdentifier();
101+
putDataStreamTemplate(random(), indexName);
102+
103+
final var timestamp = Instant.now();
104+
105+
// Index 5 docs in datastream
106+
var results = createDocuments(
107+
indexName,
108+
document(timestamp, "vm-dev01", "cpu-load", 0), // will be updated
109+
document(timestamp.plusSeconds(2), "vm-dev01", "cpu-load", 1), // will be deleted
110+
document(timestamp, "vm-dev02", "cpu-load", 2),
111+
document(timestamp.plusSeconds(2), "vm-dev03", "cpu-load", 3),
112+
document(timestamp.plusSeconds(3), "vm-dev03", "cpu-load", 4)
113+
);
114+
115+
// Verify documents
116+
assertThat(results[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
117+
assertThat(results[0].getVersion(), equalTo(1L));
118+
119+
assertThat(results[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
120+
assertThat(results[1].getVersion(), equalTo(1L));
121+
122+
assertThat(results[2].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
123+
assertThat(results[2].getVersion(), equalTo(1L));
124+
125+
assertThat(results[3].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
126+
assertThat(results[3].getVersion(), equalTo(1L));
127+
128+
assertThat(results[4].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
129+
assertThat(results[4].getVersion(), equalTo(1L));
130+
131+
final var docIndex = results[1].getIndex();
132+
final var docId = results[1].getId();
133+
134+
enum Operation {
135+
FLUSH,
136+
REFRESH,
137+
NONE
138+
}
139+
switch (randomFrom(Operation.values())) {
140+
case FLUSH:
141+
flush(indexName);
142+
break;
143+
case REFRESH:
144+
refresh(indexName);
145+
break;
146+
case NONE:
147+
default:
148+
break;
149+
}
150+
151+
// Get by synthetic _id
152+
// Note: before synthetic _id this would have required postings on disks
153+
var getResponse = client().prepareGet(docIndex, docId).setFetchSource(true).execute().actionGet();
154+
assertThat(getResponse.isExists(), equalTo(true));
155+
assertThat(getResponse.getVersion(), equalTo(1L));
156+
var source = asInstanceOf(Map.class, getResponse.getSourceAsMap().get("metric"));
157+
assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(1));
158+
159+
// Update by synthetic _id
160+
// Note: it doesn't work, is that expected? Is is blocked by IndexRouting.ExtractFromSource.updateShard
161+
var exception = expectThrows(IllegalArgumentException.class, () -> {
162+
var doc = document(timestamp, "vm-dev01", "cpu-load", 10); // update
163+
client().prepareUpdate(docIndex, docId).setDoc(doc).get();
164+
});
165+
assertThat(
166+
exception.getMessage(),
167+
containsString("update is not supported because the destination index [" + docIndex + "] is in time_series mode")
168+
);
169+
170+
// Delete by synthetic _id
171+
var deleteResponse = client().prepareDelete(docIndex, docId).get();
172+
assertThat(deleteResponse.getId(), equalTo(docId));
173+
assertThat(deleteResponse.getResult(), equalTo(DocWriteResponse.Result.DELETED));
174+
assertThat(deleteResponse.getVersion(), equalTo(2L));
175+
176+
// Index more docs
177+
// TODO Randomize this to have segments only composed of deleted docs
178+
createDocuments(
179+
indexName,
180+
document(timestamp.plusSeconds(4), "vm-dev03", "cpu-load", 5),
181+
document(timestamp.plusSeconds(5), "vm-dev03", "cpu-load", 6)
182+
);
183+
184+
flushAndRefresh(indexName);
185+
186+
// Check that synthetic _id field has no postings on disk
187+
var diskUsage = diskUsage(docIndex);
188+
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
189+
assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
190+
191+
// TODO Search datastream and count hits
192+
}
193+
194+
private static XContentBuilder document(Instant timestamp, String hostName, String metricField, Integer metricValue)
195+
throws IOException {
196+
var source = XContentFactory.jsonBuilder();
197+
source.startObject();
198+
{
199+
source.field("@timestamp", DATE_FORMATTER.format(timestamp));
200+
source.field("hostname", hostName);
201+
source.startObject("metric");
202+
{
203+
source.field("field", metricField);
204+
source.field("value", metricValue);
205+
206+
}
207+
source.endObject();
208+
}
209+
source.endObject();
210+
return source;
211+
}
212+
213+
private static BulkItemResponse[] createDocuments(String indexName, XContentBuilder... docs) throws IOException {
214+
assertThat(docs, notNullValue());
215+
final var client = client();
216+
var bulkRequest = client.prepareBulk();
217+
for (var doc : docs) {
218+
bulkRequest.add(client.prepareIndex(indexName).setOpType(DocWriteRequest.OpType.CREATE).setSource(doc));
219+
}
220+
var bulkResponse = bulkRequest.get();
221+
assertNoFailures(bulkResponse);
222+
return bulkResponse.getItems();
223+
}
224+
225+
private static void putDataStreamTemplate(Random random, String indexPattern) throws IOException {
226+
final var settings = indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.getName())
227+
.put(IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.getKey(), false)
228+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
229+
.put(IndexSettings.USE_SYNTHETIC_ID.getKey(), true);
230+
231+
final var mappings = """
232+
{
233+
"_doc": {
234+
"properties": {
235+
"@timestamp": {
236+
"type": "date"
237+
},
238+
"hostname": {
239+
"type": "keyword",
240+
"time_series_dimension": true
241+
},
242+
"metric": {
243+
"properties": {
244+
"field": {
245+
"type": "keyword",
246+
"time_series_dimension": true
247+
},
248+
"value": {
249+
"type": "integer",
250+
"time_series_metric": "counter"
251+
}
252+
}
253+
}
254+
}
255+
}
256+
}""";
257+
258+
var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request(getTestClass().getName().toLowerCase(Locale.ROOT))
259+
.indexTemplate(
260+
ComposableIndexTemplate.builder()
261+
.indexPatterns(List.of(indexPattern))
262+
.template(new Template(settings.build(), new CompressedXContent(mappings), null))
263+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
264+
.build()
265+
);
266+
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest).actionGet());
267+
}
268+
269+
private static IndexDiskUsageStats diskUsage(String indexName) {
270+
var diskUsageResponse = client().execute(
271+
TransportAnalyzeIndexDiskUsageAction.TYPE,
272+
new AnalyzeIndexDiskUsageRequest(new String[] { indexName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, false)
273+
).actionGet();
274+
275+
var indexDiskUsageStats = AnalyzeIndexDiskUsageTestUtils.getIndexStats(diskUsageResponse, indexName);
276+
assertNotNull(indexDiskUsageStats);
277+
return indexDiskUsageStats;
278+
}
279+
}

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
246246
if (IndexSettings.DOC_VALUES_SKIPPER) {
247247
settings.add(IndexSettings.USE_DOC_VALUES_SKIPPER);
248248
}
249+
if (IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG) {
250+
settings.add(IndexSettings.USE_SYNTHETIC_ID);
251+
}
249252
settings.add(IndexSettings.INDEX_MAPPING_EXCLUDE_SOURCE_VECTORS_SETTING);
250253
BUILT_IN_INDEX_SETTINGS = Collections.unmodifiableSet(settings);
251254
};

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,44 @@ public boolean isES87TSDBCodecEnabled() {
676676
Property.Final
677677
);
678678

679+
public static final boolean TSDB_SYNTHETIC_ID_FEATURE_FLAG = new FeatureFlag("tsdb_synthetic_id").isEnabled();
680+
public static final Setting<Boolean> USE_SYNTHETIC_ID = Setting.boolSetting(
681+
"index.mapping.use_synthetic_id",
682+
false,
683+
new Setting.Validator<>() {
684+
@Override
685+
public void validate(Boolean value) {}
686+
687+
@Override
688+
public void validate(Boolean enabled, Map<Setting<?>, Object> settings) {
689+
if (enabled) {
690+
// Verify if index mode is TIME_SERIES
691+
var indexMode = (IndexMode) settings.get(MODE);
692+
if (indexMode != IndexMode.TIME_SERIES) {
693+
throw new IllegalArgumentException(
694+
String.format(
695+
Locale.ROOT,
696+
"The setting [%s] is only permitted when [%s] is set to [%s]. Current mode: [%s].",
697+
USE_SYNTHETIC_ID.getKey(),
698+
MODE.getKey(),
699+
IndexMode.TIME_SERIES.name(),
700+
indexMode.name()
701+
)
702+
);
703+
}
704+
}
705+
}
706+
707+
@Override
708+
public Iterator<Setting<?>> settings() {
709+
List<Setting<?>> list = List.of(MODE);
710+
return list.iterator();
711+
}
712+
},
713+
Property.IndexScope,
714+
Property.Final
715+
);
716+
679717
/**
680718
* The {@link IndexMode "mode"} of the index.
681719
*/
@@ -937,6 +975,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
937975
private final boolean recoverySourceEnabled;
938976
private final boolean recoverySourceSyntheticEnabled;
939977
private final boolean useDocValuesSkipper;
978+
private final boolean tsdbSyntheticId;
940979

941980
/**
942981
* The maximum number of refresh listeners allows on this shard.
@@ -1123,6 +1162,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11231162
&& scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);
11241163
useDocValuesSkipper = DOC_VALUES_SKIPPER && scopedSettings.get(USE_DOC_VALUES_SKIPPER);
11251164
seqNoIndexOptions = scopedSettings.get(SEQ_NO_INDEX_OPTIONS_SETTING);
1165+
tsdbSyntheticId = TSDB_SYNTHETIC_ID_FEATURE_FLAG && scopedSettings.get(USE_SYNTHETIC_ID);
1166+
assert tsdbSyntheticId == false || mode == IndexMode.TIME_SERIES : mode;
11261167
if (recoverySourceSyntheticEnabled) {
11271168
if (DiscoveryNode.isStateless(settings)) {
11281169
throw new IllegalArgumentException("synthetic recovery source is only allowed in stateful");
@@ -1855,6 +1896,13 @@ public boolean useDocValuesSkipper() {
18551896
return useDocValuesSkipper;
18561897
}
18571898

1899+
/**
1900+
* @return Whether the index is a time-series index that use synthetic ids.
1901+
*/
1902+
public boolean useTsdbSyntheticId() {
1903+
return tsdbSyntheticId;
1904+
}
1905+
18581906
/**
18591907
* The bounds for {@code @timestamp} on this index or
18601908
* {@code null} if there are no bounds.

server/src/main/java/org/elasticsearch/index/codec/CodecService.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.common.util.BigArrays;
1717
import org.elasticsearch.common.util.FeatureFlag;
1818
import org.elasticsearch.core.Nullable;
19+
import org.elasticsearch.index.IndexMode;
20+
import org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdCodec;
1921
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;
2022
import org.elasticsearch.index.mapper.MapperService;
2123

@@ -65,12 +67,20 @@ public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays)
6567
for (String codec : Codec.availableCodecs()) {
6668
codecs.put(codec, Codec.forName(codec));
6769
}
70+
final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTsdbSyntheticId();
71+
assert useTsdbSyntheticId == false || mapperService.getIndexSettings().getMode() == IndexMode.TIME_SERIES;
72+
6873
this.codecs = codecs.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
69-
var codec = e.getValue();
70-
if (codec instanceof DeduplicateFieldInfosCodec) {
71-
return codec;
74+
Codec codec;
75+
if (e.getValue() instanceof DeduplicateFieldInfosCodec dedupCodec) {
76+
codec = dedupCodec;
77+
} else {
78+
codec = new DeduplicateFieldInfosCodec(e.getValue().getName(), e.getValue());
79+
}
80+
if (useTsdbSyntheticId && codec instanceof TSDBSyntheticIdCodec == false) {
81+
codec = new TSDBSyntheticIdCodec(codec.getName(), codec);
7282
}
73-
return new DeduplicateFieldInfosCodec(codec.getName(), codec);
83+
return codec;
7484
}));
7585
}
7686

0 commit comments

Comments
 (0)