Skip to content

Commit 5c2e28e

Browse files
authored
Add tests for migration between source modes in logsdb data stream (#115283)
1 parent 19a35a4 commit 5c2e28e

File tree

3 files changed

+301
-11
lines changed

3 files changed

+301
-11
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.logsdb;
9+
10+
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.Response;
12+
import org.elasticsearch.client.RestClient;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.xcontent.XContentHelper;
16+
import org.elasticsearch.index.query.QueryBuilders;
17+
import org.elasticsearch.search.builder.SearchSourceBuilder;
18+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
19+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
20+
import org.elasticsearch.xcontent.XContentBuilder;
21+
import org.elasticsearch.xcontent.XContentFactory;
22+
import org.elasticsearch.xcontent.XContentType;
23+
import org.hamcrest.Matchers;
24+
import org.junit.Before;
25+
import org.junit.ClassRule;
26+
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.Comparator;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.stream.Stream;
33+
34+
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.greaterThan;
36+
import static org.hamcrest.Matchers.is;
37+
38+
public class LogsDbSourceModeMigrationIT extends LogsIndexModeRestTestIT {
39+
public static final String INDEX_TEMPLATE = """
40+
{
41+
"index_patterns": ["my-logs-*-*"],
42+
"priority": 100,
43+
"data_stream": {},
44+
"composed_of": [
45+
"my-logs-mapping",
46+
"my-logs-original-source",
47+
"my-logs-migrated-source"
48+
],
49+
"ignore_missing_component_templates": ["my-logs-original-source", "my-logs-migrated-source"]
50+
}
51+
""";
52+
53+
public static final String MAPPING_COMPONENT_TEMPLATE = """
54+
{
55+
"template": {
56+
"settings": {
57+
"index": {
58+
"mode": "logsdb"
59+
}
60+
},
61+
"mappings": {
62+
"properties": {
63+
"@timestamp": {
64+
"type": "date",
65+
"format": "epoch_millis"
66+
},
67+
"message": {
68+
"type": "text"
69+
},
70+
"method": {
71+
"type": "keyword"
72+
},
73+
"hits": {
74+
"type": "long"
75+
}
76+
}
77+
}
78+
}
79+
}""";
80+
81+
public static final String STORED_SOURCE_COMPONENT_TEMPLATE = """
82+
{
83+
"template": {
84+
"settings": {
85+
"index": {
86+
"mapping.source.mode": "stored"
87+
}
88+
}
89+
}
90+
}""";
91+
92+
public static final String SYNTHETIC_SOURCE_COMPONENT_TEMPLATE = """
93+
{
94+
"template": {
95+
"settings": {
96+
"index": {
97+
"mapping.source.mode": "synthetic"
98+
}
99+
}
100+
}
101+
}""";
102+
103+
@ClassRule()
104+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
105+
.distribution(DistributionType.DEFAULT)
106+
.module("constant-keyword")
107+
.module("data-streams")
108+
.module("mapper-extras")
109+
.module("x-pack-aggregate-metric")
110+
.module("x-pack-stack")
111+
.setting("xpack.security.enabled", "false")
112+
.setting("xpack.otel_data.registry.enabled", "false")
113+
.setting("xpack.license.self_generated.type", "trial")
114+
.setting("cluster.logsdb.enabled", "true")
115+
.setting("stack.templates.enabled", "false")
116+
.build();
117+
118+
@Override
119+
protected String getTestRestCluster() {
120+
return cluster.getHttpAddresses();
121+
}
122+
123+
@Before
124+
public void setup() {
125+
client = client();
126+
}
127+
128+
private RestClient client;
129+
130+
public void testSwitchFromStoredToSyntheticSource() throws IOException {
131+
assertOK(putComponentTemplate(client, "my-logs-mapping", MAPPING_COMPONENT_TEMPLATE));
132+
assertOK(putComponentTemplate(client, "my-logs-original-source", STORED_SOURCE_COMPONENT_TEMPLATE));
133+
134+
assertOK(putTemplate(client, "my-logs", INDEX_TEMPLATE));
135+
assertOK(createDataStream(client, "my-logs-ds-test"));
136+
137+
var initialSourceMode = (String) getSetting(
138+
client,
139+
getDataStreamBackingIndex(client, "my-logs-ds-test", 0),
140+
"index.mapping.source.mode"
141+
);
142+
assertThat(initialSourceMode, equalTo("stored"));
143+
var initialIndexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "my-logs-ds-test", 0), "index.mode");
144+
assertThat(initialIndexMode, equalTo("logsdb"));
145+
146+
var indexedWithStoredSource = new ArrayList<XContentBuilder>();
147+
var indexedWithSyntheticSource = new ArrayList<XContentBuilder>();
148+
for (int i = 0; i < 10; i++) {
149+
indexedWithStoredSource.add(generateDoc());
150+
indexedWithSyntheticSource.add(generateDoc());
151+
}
152+
153+
Response storedSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithStoredSource, 0);
154+
assertOK(storedSourceBulkResponse);
155+
assertThat(entityAsMap(storedSourceBulkResponse).get("errors"), Matchers.equalTo(false));
156+
157+
assertOK(putComponentTemplate(client, "my-logs-migrated-source", SYNTHETIC_SOURCE_COMPONENT_TEMPLATE));
158+
var rolloverResponse = rolloverDataStream(client, "my-logs-ds-test");
159+
assertOK(rolloverResponse);
160+
assertThat(entityAsMap(rolloverResponse).get("rolled_over"), is(true));
161+
162+
var finalSourceMode = (String) getSetting(
163+
client,
164+
getDataStreamBackingIndex(client, "my-logs-ds-test", 1),
165+
"index.mapping.source.mode"
166+
);
167+
assertThat(finalSourceMode, equalTo("synthetic"));
168+
169+
Response syntheticSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithSyntheticSource, 10);
170+
assertOK(syntheticSourceBulkResponse);
171+
assertThat(entityAsMap(syntheticSourceBulkResponse).get("errors"), Matchers.equalTo(false));
172+
173+
var allDocs = Stream.concat(indexedWithStoredSource.stream(), indexedWithSyntheticSource.stream()).toList();
174+
175+
var sourceList = search(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(allDocs.size()), "my-logs-ds-test");
176+
assertThat(sourceList.size(), equalTo(allDocs.size()));
177+
178+
for (int i = 0; i < sourceList.size(); i++) {
179+
var expected = XContentHelper.convertToMap(BytesReference.bytes(allDocs.get(i)), false, XContentType.JSON).v2();
180+
assertThat(sourceList.get(i), equalTo(expected));
181+
}
182+
}
183+
184+
public void testSwitchFromSyntheticToStoredSource() throws IOException {
185+
assertOK(putComponentTemplate(client, "my-logs-mapping", MAPPING_COMPONENT_TEMPLATE));
186+
assertOK(putComponentTemplate(client, "my-logs-original-source", SYNTHETIC_SOURCE_COMPONENT_TEMPLATE));
187+
188+
assertOK(putTemplate(client, "my-logs", INDEX_TEMPLATE));
189+
assertOK(createDataStream(client, "my-logs-ds-test"));
190+
191+
var initialSourceMode = (String) getSetting(
192+
client,
193+
getDataStreamBackingIndex(client, "my-logs-ds-test", 0),
194+
"index.mapping.source.mode"
195+
);
196+
assertThat(initialSourceMode, equalTo("synthetic"));
197+
var initialIndexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "my-logs-ds-test", 0), "index.mode");
198+
assertThat(initialIndexMode, equalTo("logsdb"));
199+
200+
var indexedWithSyntheticSource = new ArrayList<XContentBuilder>();
201+
var indexedWithStoredSource = new ArrayList<XContentBuilder>();
202+
for (int i = 0; i < 10; i++) {
203+
indexedWithSyntheticSource.add(generateDoc());
204+
indexedWithStoredSource.add(generateDoc());
205+
}
206+
207+
Response syntheticSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithSyntheticSource, 0);
208+
assertOK(syntheticSourceBulkResponse);
209+
assertThat(entityAsMap(syntheticSourceBulkResponse).get("errors"), Matchers.equalTo(false));
210+
211+
assertOK(putComponentTemplate(client, "my-logs-migrated-source", STORED_SOURCE_COMPONENT_TEMPLATE));
212+
var rolloverResponse = rolloverDataStream(client, "my-logs-ds-test");
213+
assertOK(rolloverResponse);
214+
assertThat(entityAsMap(rolloverResponse).get("rolled_over"), is(true));
215+
216+
var finalSourceMode = (String) getSetting(
217+
client,
218+
getDataStreamBackingIndex(client, "my-logs-ds-test", 1),
219+
"index.mapping.source.mode"
220+
);
221+
assertThat(finalSourceMode, equalTo("stored"));
222+
223+
Response storedSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithStoredSource, 10);
224+
assertOK(storedSourceBulkResponse);
225+
assertThat(entityAsMap(storedSourceBulkResponse).get("errors"), Matchers.equalTo(false));
226+
227+
var allDocs = Stream.concat(indexedWithSyntheticSource.stream(), indexedWithStoredSource.stream()).toList();
228+
229+
var sourceList = search(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(allDocs.size()), "my-logs-ds-test");
230+
assertThat(sourceList.size(), equalTo(allDocs.size()));
231+
232+
for (int i = 0; i < sourceList.size(); i++) {
233+
var expected = XContentHelper.convertToMap(BytesReference.bytes(allDocs.get(i)), false, XContentType.JSON).v2();
234+
assertThat(sourceList.get(i), equalTo(expected));
235+
}
236+
}
237+
238+
private static Response bulkIndex(RestClient client, String dataStreamName, List<XContentBuilder> documents, int startId)
239+
throws IOException {
240+
var sb = new StringBuilder();
241+
int id = startId;
242+
for (var document : documents) {
243+
sb.append(Strings.format("{ \"create\": { \"_id\" : \"%d\" } }", id)).append("\n");
244+
sb.append(Strings.toString(document)).append("\n");
245+
id++;
246+
}
247+
248+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
249+
bulkRequest.setJsonEntity(sb.toString());
250+
bulkRequest.addParameter("refresh", "true");
251+
return client.performRequest(bulkRequest);
252+
}
253+
254+
@SuppressWarnings("unchecked")
255+
private List<Map<String, Object>> search(SearchSourceBuilder search, String dataStreamName) throws IOException {
256+
var request = new Request("GET", "/" + dataStreamName + "/_search");
257+
request.setJsonEntity(Strings.toString(search));
258+
var searchResponse = client.performRequest(request);
259+
assertOK(searchResponse);
260+
261+
Map<String, Object> searchResponseMap = XContentHelper.convertToMap(
262+
XContentType.JSON.xContent(),
263+
searchResponse.getEntity().getContent(),
264+
false
265+
);
266+
var hitsMap = (Map<String, Object>) searchResponseMap.get("hits");
267+
268+
var hitsList = (List<Map<String, Object>>) hitsMap.get("hits");
269+
assertThat(hitsList.size(), greaterThan(0));
270+
271+
return hitsList.stream()
272+
.sorted(Comparator.comparingInt((Map<String, Object> hit) -> Integer.parseInt((String) hit.get("_id"))))
273+
.map(hit -> (Map<String, Object>) hit.get("_source"))
274+
.toList();
275+
}
276+
277+
private static XContentBuilder generateDoc() throws IOException {
278+
var doc = XContentFactory.jsonBuilder();
279+
doc.startObject();
280+
{
281+
doc.field("@timestamp", Long.toString(randomMillisUpToYear9999()));
282+
doc.field("message", randomAlphaOfLengthBetween(20, 50));
283+
doc.field("method", randomAlphaOfLength(3));
284+
doc.field("hits", randomLong());
285+
}
286+
doc.endObject();
287+
288+
return doc;
289+
}
290+
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeCustomSettingsIT.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.logsdb;
99

10-
import org.elasticsearch.client.Request;
1110
import org.elasticsearch.client.ResponseException;
1211
import org.elasticsearch.client.RestClient;
1312
import org.elasticsearch.common.settings.Settings;
@@ -496,16 +495,6 @@ public void testIgnoreAboveSetting() throws IOException {
496495
}
497496
}
498497

499-
private static Map<String, Object> getMapping(final RestClient client, final String indexName) throws IOException {
500-
final Request request = new Request("GET", "/" + indexName + "/_mapping");
501-
502-
Map<String, Object> mappings = ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get(
503-
"mappings"
504-
);
505-
506-
return mappings;
507-
}
508-
509498
private Function<Object, Map<String, Object>> subObject(String key) {
510499
return (mapAsObject) -> (Map<String, Object>) ((Map<String, Object>) mapAsObject).get(key);
511500
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeRestTestIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,15 @@ protected static Response putClusterSetting(final RestClient client, final Strin
9898
request.setJsonEntity("{ \"transient\": { \"" + settingName + "\": " + settingValue + " } }");
9999
return client.performRequest(request);
100100
}
101+
102+
@SuppressWarnings("unchecked")
103+
protected static Map<String, Object> getMapping(final RestClient client, final String indexName) throws IOException {
104+
final Request request = new Request("GET", "/" + indexName + "/_mapping");
105+
106+
Map<String, Object> mappings = ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get(
107+
"mappings"
108+
);
109+
110+
return mappings;
111+
}
101112
}

0 commit comments

Comments
 (0)