Skip to content

Commit 014ab7d

Browse files
committed
Merge branch 'main' into exception_type_header
2 parents 3a93fcd + b9360a4 commit 014ab7d

File tree

27 files changed

+1555
-58
lines changed

27 files changed

+1555
-58
lines changed

docs/reference/search-connectors/es-connectors-mongodb.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,6 @@ The full host in this example will look like this:
249249

250250
A bug introduced in **8.12.0** causes the Connectors docker image to error out if run using MongoDB as its source. The command line will output the error `cannot import name 'coroutine' from 'asyncio'`. *** This issue is fixed in versions *8.12.2** and **8.13.0**. ** This bug does not affect Elastic managed connectors.
251251

252-
See [Known issues](/release-notes/known-issues.md) for any issues affecting all connectors.
253-
254252
#### UUIDs are not correctly deserialized, causing problems with ingesting documents into Elasticsearch
255253

256254
MongoDB has special handling of UUID type: there is a legacy and a modern approach. You can read [official docs](https://pymongo.readthedocs.io/en/stable/examples/uuid.html) about the details.
@@ -267,6 +265,8 @@ If you’re using a legacy UUID representation, you should adjust the connection
267265

268266
You can find a full explanation in the [official docs](https://pymongo.readthedocs.io/en/stable/examples/uuid.html#configuring-a-uuid-representation).
269267

268+
See [Known issues](/release-notes/known-issues.md) for any issues affecting all connectors.
269+
270270
### Troubleshooting [es-connectors-mongodb-client-troubleshooting]
271271

272272
See [Troubleshooting](/reference/search-connectors/es-connectors-troubleshooting.md).

modules/data-streams/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ tasks.withType(StandaloneRestIntegTestTask).configureEach {
3030
usesDefaultDistribution("to be triaged")
3131
}
3232

33+
tasks.named("internalClusterTest").configure {
34+
systemProperty 'es.logs_stream_feature_flag_enabled', 'true'
35+
}
36+
3337
if (buildParams.inFipsJvm){
3438
// These fail in CI but only when run as part of checkPart2 and not individually.
3539
// Tracked in :
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
14+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
15+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
16+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
17+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
18+
import org.elasticsearch.action.bulk.BulkRequest;
19+
import org.elasticsearch.action.bulk.BulkResponse;
20+
import org.elasticsearch.action.bulk.TransportBulkAction;
21+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
22+
import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
23+
import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
24+
import org.elasticsearch.action.index.IndexRequest;
25+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
26+
import org.elasticsearch.cluster.metadata.DataStream;
27+
import org.elasticsearch.cluster.metadata.MappingMetadata;
28+
import org.elasticsearch.cluster.metadata.Template;
29+
import org.elasticsearch.common.compress.CompressedXContent;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.xcontent.XContentHelper;
32+
import org.elasticsearch.core.TimeValue;
33+
import org.elasticsearch.plugins.Plugin;
34+
import org.elasticsearch.test.ESIntegTestCase;
35+
import org.elasticsearch.xcontent.XContentType;
36+
37+
import java.io.IOException;
38+
import java.util.Collection;
39+
import java.util.List;
40+
import java.util.Locale;
41+
import java.util.Map;
42+
43+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
44+
import static org.hamcrest.Matchers.equalTo;
45+
46+
public class TransportUpdateDataStreamMappingsActionIT extends ESIntegTestCase {
47+
48+
@Override
49+
protected Collection<Class<? extends Plugin>> nodePlugins() {
50+
return List.of(DataStreamsPlugin.class);
51+
}
52+
53+
public void testGetAndUpdateMappings() throws IOException {
54+
String dataStreamName = "my-data-stream-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
55+
createDataStream(dataStreamName);
56+
57+
Map<String, Object> originalMappings = Map.of(
58+
"dynamic",
59+
"strict",
60+
"properties",
61+
Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "text"))
62+
);
63+
Map<String, Object> mappingOverrides = Map.of(
64+
"properties",
65+
Map.of("foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text"))
66+
);
67+
Map<String, Object> expectedEffectiveMappings = Map.of(
68+
"dynamic",
69+
"strict",
70+
"properties",
71+
Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text"))
72+
);
73+
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
74+
updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, true);
75+
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
76+
updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, false);
77+
assertExpectedMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings);
78+
79+
// Now make sure that the backing index still has the original mappings:
80+
Map<String, Object> originalIndexMappings = Map.of(
81+
"dynamic",
82+
"strict",
83+
"_data_stream_timestamp",
84+
Map.of("enabled", true),
85+
"properties",
86+
Map.of("@timestamp", Map.of("type", "date"), "foo1", Map.of("type", "text"), "foo2", Map.of("type", "text"))
87+
);
88+
assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().getFirst().getName(), originalIndexMappings);
89+
90+
// Do a rollover, and then make sure that the updated mappnigs are on the new write index:
91+
assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
92+
Map<String, Object> updatedIndexMappings = Map.of(
93+
"dynamic",
94+
"strict",
95+
"_data_stream_timestamp",
96+
Map.of("enabled", true),
97+
"properties",
98+
Map.of(
99+
"@timestamp",
100+
Map.of("type", "date"),
101+
"foo1",
102+
Map.of("type", "text"),
103+
"foo2",
104+
Map.of("type", "keyword"),
105+
"foo3",
106+
Map.of("type", "text")
107+
)
108+
);
109+
assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().get(1).getName(), updatedIndexMappings);
110+
111+
// Now undo the mapping overrides, and expect the original mapping to be in effect:
112+
updateMappings(dataStreamName, Map.of(), originalMappings, false);
113+
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
114+
assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
115+
assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().get(2).getName(), originalIndexMappings);
116+
}
117+
118+
private void createDataStream(String dataStreamName) throws IOException {
119+
String mappingString = """
120+
{
121+
"_doc":{
122+
"dynamic":"strict",
123+
"properties":{
124+
"foo1":{
125+
"type":"text"
126+
},
127+
"foo2":{
128+
"type":"text"
129+
}
130+
}
131+
}
132+
}
133+
""";
134+
CompressedXContent mapping = CompressedXContent.fromJSON(mappingString);
135+
Template template = new Template(Settings.EMPTY, mapping, null);
136+
ComposableIndexTemplate.DataStreamTemplate dataStreamTemplate = new ComposableIndexTemplate.DataStreamTemplate();
137+
ComposableIndexTemplate composableIndexTemplate = ComposableIndexTemplate.builder()
138+
.indexPatterns(List.of("my-data-stream-*"))
139+
.dataStreamTemplate(dataStreamTemplate)
140+
.template(template)
141+
.build();
142+
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request("test");
143+
request.indexTemplate(composableIndexTemplate);
144+
145+
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
146+
147+
BulkRequest bulkRequest = new BulkRequest();
148+
bulkRequest.add(new IndexRequest(dataStreamName).source("""
149+
{
150+
"@timestamp": "2024-08-27",
151+
"foo1": "baz"
152+
}
153+
""", XContentType.JSON).id(randomUUID()));
154+
bulkRequest.add(new IndexRequest(dataStreamName).source("""
155+
{
156+
"@timestamp": "2024-08-27",
157+
"foo3": "baz"
158+
}
159+
""", XContentType.JSON).id(randomUUID()));
160+
BulkResponse response = client().execute(new ActionType<BulkResponse>(TransportBulkAction.NAME), bulkRequest).actionGet();
161+
assertThat(response.getItems().length, equalTo(2));
162+
}
163+
164+
private void assertExpectedMappings(
165+
String dataStreamName,
166+
Map<String, Object> expectedMappingOverrides,
167+
Map<String, Object> expectedEffectiveMappings
168+
) {
169+
GetDataStreamMappingsAction.Response getMappingsResponse = client().execute(
170+
new ActionType<GetDataStreamMappingsAction.Response>(GetDataStreamMappingsAction.NAME),
171+
new GetDataStreamMappingsAction.Request(TimeValue.THIRTY_SECONDS).indices(dataStreamName)
172+
).actionGet();
173+
List<GetDataStreamMappingsAction.DataStreamMappingsResponse> responses = getMappingsResponse.getDataStreamMappingsResponses();
174+
assertThat(responses.size(), equalTo(1));
175+
GetDataStreamMappingsAction.DataStreamMappingsResponse mappingsResponse = responses.getFirst();
176+
assertThat(mappingsResponse.dataStreamName(), equalTo(dataStreamName));
177+
assertThat(
178+
XContentHelper.convertToMap(mappingsResponse.mappings().uncompressed(), true, XContentType.JSON).v2(),
179+
equalTo(expectedMappingOverrides)
180+
);
181+
assertThat(
182+
XContentHelper.convertToMap(mappingsResponse.effectiveMappings().uncompressed(), true, XContentType.JSON).v2(),
183+
equalTo(expectedEffectiveMappings)
184+
);
185+
186+
DataStream dataStream = getDataStream(dataStreamName);
187+
assertThat(
188+
XContentHelper.convertToMap(dataStream.getMappings().uncompressed(), true, XContentType.JSON).v2(),
189+
equalTo(expectedMappingOverrides)
190+
);
191+
}
192+
193+
private void assertExpectedMappingsOnIndex(String indexName, Map<String, Object> expectedMappings) {
194+
GetMappingsResponse mappingsResponse = client().execute(
195+
new ActionType<GetMappingsResponse>(GetMappingsAction.NAME),
196+
new GetMappingsRequest(TimeValue.THIRTY_SECONDS).indices(indexName)
197+
).actionGet();
198+
Map<String, MappingMetadata> mappings = mappingsResponse.mappings();
199+
assertThat(mappings.size(), equalTo(1));
200+
assertThat(mappings.values().iterator().next().sourceAsMap(), equalTo(expectedMappings));
201+
}
202+
203+
private void updateMappings(
204+
String dataStreamName,
205+
Map<String, Object> mappingOverrides,
206+
Map<String, Object> expectedEffectiveMappings,
207+
boolean dryRun
208+
) throws IOException {
209+
CompressedXContent mappingOverride = new CompressedXContent(mappingOverrides);
210+
UpdateDataStreamMappingsAction.Response putMappingsResponse = client().execute(
211+
new ActionType<UpdateDataStreamMappingsAction.Response>(UpdateDataStreamMappingsAction.NAME),
212+
new UpdateDataStreamMappingsAction.Request(mappingOverride, dryRun, TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).indices(
213+
dataStreamName
214+
)
215+
).actionGet();
216+
assertThat(putMappingsResponse.getDataStreamMappingsResponses().size(), equalTo(1));
217+
UpdateDataStreamMappingsAction.DataStreamMappingsResponse firstPutMappingsResponse = putMappingsResponse
218+
.getDataStreamMappingsResponses()
219+
.getFirst();
220+
assertThat(firstPutMappingsResponse.dataStreamName(), equalTo(dataStreamName));
221+
assertThat(
222+
XContentHelper.convertToMap(firstPutMappingsResponse.mappings().uncompressed(), true, XContentType.JSON).v2(),
223+
equalTo(mappingOverrides)
224+
);
225+
assertThat(
226+
XContentHelper.convertToMap(firstPutMappingsResponse.effectiveMappings().uncompressed(), true, XContentType.JSON).v2(),
227+
equalTo(expectedEffectiveMappings)
228+
);
229+
}
230+
231+
private DataStream getDataStream(String dataStreamName) {
232+
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
233+
TEST_REQUEST_TIMEOUT,
234+
new String[] { dataStreamName }
235+
);
236+
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
237+
.actionGet();
238+
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
239+
return getDataStreamResponse.getDataStreams().get(0).getDataStream();
240+
}
241+
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
import org.elasticsearch.action.datastreams.DataStreamsStatsAction;
1515
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
1616
import org.elasticsearch.action.datastreams.GetDataStreamAction;
17+
import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
1718
import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction;
1819
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
1920
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2021
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
2122
import org.elasticsearch.action.datastreams.PutDataStreamOptionsAction;
23+
import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
2224
import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction;
2325
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
2426
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2527
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
2628
import org.elasticsearch.client.internal.OriginSettingClient;
29+
import org.elasticsearch.cluster.metadata.DataStream;
2730
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2831
import org.elasticsearch.cluster.node.DiscoveryNodes;
2932
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -37,11 +40,13 @@
3740
import org.elasticsearch.datastreams.action.TransportCreateDataStreamAction;
3841
import org.elasticsearch.datastreams.action.TransportDataStreamsStatsAction;
3942
import org.elasticsearch.datastreams.action.TransportDeleteDataStreamAction;
43+
import org.elasticsearch.datastreams.action.TransportGetDataStreamMappingsAction;
4044
import org.elasticsearch.datastreams.action.TransportGetDataStreamSettingsAction;
4145
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
4246
import org.elasticsearch.datastreams.action.TransportMigrateToDataStreamAction;
4347
import org.elasticsearch.datastreams.action.TransportModifyDataStreamsAction;
4448
import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction;
49+
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction;
4550
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction;
4651
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
4752
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
@@ -246,6 +251,10 @@ public List<ActionHandler> getActions() {
246251
actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
247252
actions.add(new ActionHandler(GetDataStreamSettingsAction.INSTANCE, TransportGetDataStreamSettingsAction.class));
248253
actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class));
254+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
255+
actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class));
256+
actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class));
257+
}
249258
return actions;
250259
}
251260

0 commit comments

Comments
 (0)