Skip to content

Commit a3f5ea5

Browse files
authored
TSDB ingest performance: combine routing and tsdb hashing (#132566)
Instead of hashing dimensions during routing and then again during document parsing, this combines the two steps. The tsid is created during routing and then used to create a routing hash. The tsid is then sent to the data nodes which acts as a signal that creating the tsid during document parsing isn't required anymore.
1 parent 8349191 commit a3f5ea5

File tree

45 files changed

+1116
-261
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1116
-261
lines changed

modules/data-streams/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
5252
task.skipTest("data_stream/30_auto_create_data_stream/Don't initialize failure store during data stream auto-creation on successful index", "Configuring the failure store via data stream templates is not supported anymore.")
5353

5454
task.skipTest("data_stream/150_tsdb/TSDB failures go to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
55+
// TODO remove these after removing exact _tsid assertions in 8.x
56+
task.skipTest("data_stream/150_tsdb/dynamic templates", "The _tsid has changed in a new index version. This tests verifies the exact _tsid value with is too brittle for compatibility testing.")
57+
task.skipTest("data_stream/150_tsdb/dynamic templates - conflicting aliases", "The _tsid has changed in a new index version. This tests verifies the exact _tsid value with is too brittle for compatibility testing.")
58+
task.skipTest("data_stream/150_tsdb/dynamic templates with nesting", "The _tsid has changed in a new index version. This tests verifies the exact _tsid value with is too brittle for compatibility testing.")
5559

5660
task.skipTest("data_stream/170_modify_data_stream/Modify a data stream's failure store", "Configuring the failure store via data stream templates is not supported anymore.")
5761

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

Lines changed: 214 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
1515
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
1616
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
17+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
18+
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
1719
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1820
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1921
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
@@ -24,6 +26,7 @@
2426
import org.elasticsearch.action.bulk.BulkRequestBuilder;
2527
import org.elasticsearch.action.bulk.BulkResponse;
2628
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
29+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
2730
import org.elasticsearch.action.get.GetRequest;
2831
import org.elasticsearch.action.index.IndexRequest;
2932
import org.elasticsearch.action.search.SearchRequest;
@@ -34,6 +37,7 @@
3437
import org.elasticsearch.cluster.metadata.Template;
3538
import org.elasticsearch.common.Strings;
3639
import org.elasticsearch.common.compress.CompressedXContent;
40+
import org.elasticsearch.common.settings.Setting;
3741
import org.elasticsearch.common.settings.Settings;
3842
import org.elasticsearch.common.time.DateFormatter;
3943
import org.elasticsearch.common.time.FormatNames;
@@ -58,13 +62,17 @@
5862
import java.time.temporal.ChronoUnit;
5963
import java.util.Collection;
6064
import java.util.List;
65+
import java.util.Map;
6166
import java.util.concurrent.CountDownLatch;
6267

68+
import static org.elasticsearch.datastreams.DataStreamIndexSettingsProvider.INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG;
6369
import static org.elasticsearch.test.MapMatcher.assertMap;
6470
import static org.elasticsearch.test.MapMatcher.matchesMap;
6571
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6672
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
73+
import static org.hamcrest.Matchers.containsInAnyOrder;
6774
import static org.hamcrest.Matchers.containsString;
75+
import static org.hamcrest.Matchers.empty;
6876
import static org.hamcrest.Matchers.equalTo;
6977
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
7078
import static org.hamcrest.Matchers.hasSize;
@@ -321,15 +329,18 @@ public void testTsdbTemplatesNoKeywordFieldType() throws Exception {
321329
.indexPatterns(List.of("k8s*"))
322330
.template(
323331
new Template(
324-
Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build(),
332+
Settings.builder()
333+
.put("index.mode", "time_series")
334+
.put("index.routing_path", randomBoolean() ? "metricset" : null)
335+
.build(),
325336
new CompressedXContent(mappingTemplate),
326337
null
327338
)
328339
)
329340
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
330341
.build()
331342
);
332-
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
343+
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request));
333344
}
334345

335346
public void testInvalidTsdbTemplatesMissingSettings() throws Exception {
@@ -619,6 +630,207 @@ public void testReindexing() throws Exception {
619630
getIndexResponse.getSetting(index2, IndexMetadata.INDEX_ROUTING_PATH.getKey()),
620631
equalTo(getIndexResponse.getSetting(index1, IndexMetadata.INDEX_ROUTING_PATH.getKey()))
621632
);
633+
assertThat(
634+
getIndexResponse.getSetting(index2, IndexMetadata.INDEX_DIMENSIONS.getKey()),
635+
equalTo(getIndexResponse.getSetting(index1, IndexMetadata.INDEX_DIMENSIONS.getKey()))
636+
);
637+
}
638+
639+
public void testAddDimensionToMapping() throws Exception {
640+
String dataStreamName = "my-ds";
641+
var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id");
642+
putTemplateRequest.indexTemplate(
643+
ComposableIndexTemplate.builder()
644+
.indexPatterns(List.of(dataStreamName))
645+
.template(
646+
new Template(
647+
Settings.builder().put("index.mode", "time_series").build(),
648+
new CompressedXContent(MAPPING_TEMPLATE),
649+
null
650+
)
651+
)
652+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
653+
.build()
654+
);
655+
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest));
656+
657+
// create data stream
658+
CreateDataStreamAction.Request createDsRequest = new CreateDataStreamAction.Request(
659+
TEST_REQUEST_TIMEOUT,
660+
TEST_REQUEST_TIMEOUT,
661+
"my-ds"
662+
);
663+
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDsRequest));
664+
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
665+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), equalTo(List.of("metricset")));
666+
} else {
667+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
668+
}
669+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
670+
671+
// put mapping with k8s.pod.uid as another time series dimension
672+
var putMappingRequest = new PutMappingRequest(dataStreamName).source("""
673+
{
674+
"properties": {
675+
"k8s.pod.name": {
676+
"type": "keyword",
677+
"time_series_dimension": true
678+
}
679+
}
680+
}
681+
""", XContentType.JSON);
682+
assertAcked(client().execute(TransportPutMappingAction.TYPE, putMappingRequest).actionGet());
683+
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
684+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), containsInAnyOrder("metricset", "k8s.pod.name"));
685+
} else {
686+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
687+
}
688+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
689+
690+
indexWithPodNames(dataStreamName, Instant.now(), Map.of(), "dog", "cat");
691+
}
692+
693+
public void testDynamicStringDimensions() throws Exception {
694+
String dataStreamName = "my-ds";
695+
var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id");
696+
putTemplateRequest.indexTemplate(
697+
ComposableIndexTemplate.builder()
698+
.indexPatterns(List.of(dataStreamName))
699+
.template(new Template(Settings.builder().put("index.mode", "time_series").build(), new CompressedXContent("""
700+
{
701+
"_doc": {
702+
"dynamic_templates": [
703+
{
704+
"labels": {
705+
"match_mapping_type": "string",
706+
"mapping": {
707+
"type": "keyword",
708+
"time_series_dimension": true
709+
}
710+
}
711+
}
712+
],
713+
"properties": {
714+
"@timestamp": {
715+
"type": "date"
716+
},
717+
"metricset": {
718+
"type": "keyword",
719+
"time_series_dimension": true
720+
}
721+
}
722+
}
723+
}"""), null))
724+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
725+
.build()
726+
);
727+
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest));
728+
729+
CreateDataStreamAction.Request createDsRequest = new CreateDataStreamAction.Request(
730+
TEST_REQUEST_TIMEOUT,
731+
TEST_REQUEST_TIMEOUT,
732+
"my-ds"
733+
);
734+
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDsRequest));
735+
736+
// doesn't populate index.dimensions custom metadata because the "labels" dynamic template doesn't have a path_math
737+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
738+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
739+
740+
// index doc
741+
BulkResponse bulkResponse = client().prepareBulk()
742+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
743+
.add(
744+
client().prepareIndex(dataStreamName)
745+
.setOpType(DocWriteRequest.OpType.CREATE)
746+
.setSource(DOC.replace("$time", formatInstant(Instant.now())), XContentType.JSON)
747+
)
748+
.get();
749+
assertThat(bulkResponse.hasFailures(), is(false));
750+
751+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
752+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
753+
}
754+
755+
public void testDynamicDimensions() throws Exception {
756+
String dataStreamName = "my-ds";
757+
var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id");
758+
putTemplateRequest.indexTemplate(
759+
ComposableIndexTemplate.builder()
760+
.indexPatterns(List.of(dataStreamName))
761+
.template(new Template(Settings.builder().put("index.mode", "time_series").build(), new CompressedXContent("""
762+
763+
{
764+
"_doc": {
765+
"dynamic_templates": [
766+
{
767+
"label": {
768+
"mapping": {
769+
"type": "keyword",
770+
"time_series_dimension": true
771+
}
772+
}
773+
}
774+
],
775+
"properties": {
776+
"@timestamp": {
777+
"type": "date"
778+
},
779+
"metricset": {
780+
"type": "keyword",
781+
"time_series_dimension": true
782+
}
783+
}
784+
}
785+
}"""), null))
786+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
787+
.build()
788+
);
789+
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest));
790+
791+
CreateDataStreamAction.Request createDsRequest = new CreateDataStreamAction.Request(
792+
TEST_REQUEST_TIMEOUT,
793+
TEST_REQUEST_TIMEOUT,
794+
"my-ds"
795+
);
796+
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDsRequest));
797+
798+
// doesn't populate index.dimensions because the "label" dynamic template doesn't have a path_math
799+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
800+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
801+
802+
// index doc
803+
indexWithPodNames(dataStreamName, Instant.now(), Map.of("k8s.pod.name", "label"), "dog", "cat");
804+
805+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
806+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
807+
}
808+
809+
private void indexWithPodNames(String dataStreamName, Instant timestamp, Map<String, String> dynamicTemplates, String... podNames) {
810+
// index doc
811+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
812+
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
813+
for (String podName : podNames) {
814+
bulkRequestBuilder.add(
815+
client().prepareIndex(dataStreamName)
816+
.setOpType(DocWriteRequest.OpType.CREATE)
817+
.setSource(DOC.replace("$time", formatInstant(timestamp)).replace("dog", podName), XContentType.JSON)
818+
.request()
819+
.setDynamicTemplates(dynamicTemplates)
820+
);
821+
}
822+
823+
BulkResponse bulkResponse = bulkRequestBuilder.get();
824+
assertThat(bulkResponse.hasFailures(), is(false));
825+
}
826+
827+
private <T> T getSetting(String dataStreamName, Setting<T> setting) {
828+
GetIndexResponse getIndexResponse = safeGet(
829+
indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStreamName))
830+
);
831+
assertThat(getIndexResponse.getIndices().length, equalTo(1));
832+
Settings settings = getIndexResponse.getSettings().get(getIndexResponse.getIndices()[0]);
833+
return setting.get(settings);
622834
}
623835

624836
static String formatInstant(Instant instant) {

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBPassthroughIndexingIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@
4444
import java.util.List;
4545
import java.util.Map;
4646

47+
import static org.elasticsearch.datastreams.DataStreamIndexSettingsProvider.INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG;
4748
import static org.elasticsearch.test.MapMatcher.assertMap;
4849
import static org.elasticsearch.test.MapMatcher.matchesMap;
4950
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
5051
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
5152
import static org.hamcrest.Matchers.equalTo;
5253
import static org.hamcrest.Matchers.is;
54+
import static org.hamcrest.Matchers.nullValue;
5355

5456
public class TSDBPassthroughIndexingIT extends ESSingleNodeTestCase {
5557

@@ -185,6 +187,11 @@ public void testIndexingGettingAndSearching() throws Exception {
185187
// validate index:
186188
var getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index)).actionGet();
187189
assertThat(getIndexResponse.getSettings().get(index).get("index.routing_path"), equalTo("[attributes.*]"));
190+
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
191+
assertThat(getIndexResponse.getSettings().get(index).get("index.dimensions"), equalTo("[attributes.*]"));
192+
} else {
193+
assertThat(getIndexResponse.getSettings().get(index).get("index.dimensions"), nullValue());
194+
}
188195
// validate mapping
189196
var mapping = getIndexResponse.mappings().get(index).getSourceAsMap();
190197
assertMap(

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DisabledSecurityDataStreamTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
1515
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16+
import org.elasticsearch.test.cluster.FeatureFlag;
1617
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1718
import org.elasticsearch.test.rest.ESRestTestCase;
1819
import org.junit.ClassRule;
@@ -28,6 +29,7 @@ public abstract class DisabledSecurityDataStreamTestCase extends ESRestTestCase
2829
.distribution(DistributionType.DEFAULT)
2930
.setting("xpack.security.enabled", "false")
3031
.setting("xpack.watcher.enabled", "false")
32+
.feature(FeatureFlag.INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG)
3133
.build();
3234

3335
@Override

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ public void testSimulateTsdbDataStreamTemplate() throws Exception {
436436
ObjectPath.evaluate(responseBody, "template.settings.index.routing_path"),
437437
containsInAnyOrder("metricset", "k8s.pod.uid", "pod.labels.*")
438438
);
439+
assertThat(
440+
ObjectPath.evaluate(responseBody, "template.settings.index.dimensions"),
441+
containsInAnyOrder("metricset", "k8s.pod.uid", "pod.labels.*")
442+
);
439443
assertThat(ObjectPath.evaluate(responseBody, "overlapping"), empty());
440444
}
441445

0 commit comments

Comments
 (0)