Skip to content

Commit ff52007

Browse files
authored
Adding rest actions for getting and updating data stream mappings (#130241)
1 parent a239306 commit ff52007

File tree

10 files changed

+328
-27
lines changed

10 files changed

+328
-27
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,13 @@
7575
import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction;
7676
import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction;
7777
import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction;
78+
import org.elasticsearch.datastreams.rest.RestGetDataStreamMappingsAction;
7879
import org.elasticsearch.datastreams.rest.RestGetDataStreamSettingsAction;
7980
import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction;
8081
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
8182
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
8283
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
84+
import org.elasticsearch.datastreams.rest.RestUpdateDataStreamMappingsAction;
8385
import org.elasticsearch.datastreams.rest.RestUpdateDataStreamSettingsAction;
8486
import org.elasticsearch.features.NodeFeature;
8587
import org.elasticsearch.health.HealthIndicatorService;
@@ -292,6 +294,10 @@ public List<RestHandler> getRestHandlers(
292294
handlers.add(new RestDeleteDataStreamOptionsAction());
293295
handlers.add(new RestGetDataStreamSettingsAction());
294296
handlers.add(new RestUpdateDataStreamSettingsAction());
297+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
298+
handlers.add(new RestGetDataStreamMappingsAction());
299+
handlers.add(new RestUpdateDataStreamMappingsAction());
300+
}
295301
return handlers;
296302
}
297303

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.rest;
11+
12+
import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
13+
import org.elasticsearch.client.internal.node.NodeClient;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.rest.BaseRestHandler;
16+
import org.elasticsearch.rest.RestRequest;
17+
import org.elasticsearch.rest.RestUtils;
18+
import org.elasticsearch.rest.Scope;
19+
import org.elasticsearch.rest.ServerlessScope;
20+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
21+
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
22+
23+
import java.io.IOException;
24+
import java.util.List;
25+
26+
import static org.elasticsearch.rest.RestRequest.Method.GET;
27+
28+
@ServerlessScope(Scope.PUBLIC)
29+
public class RestGetDataStreamMappingsAction extends BaseRestHandler {
30+
@Override
31+
public String getName() {
32+
return "get_data_stream_mappings_action";
33+
}
34+
35+
@Override
36+
public List<Route> routes() {
37+
return List.of(new Route(GET, "/_data_stream/{name}/_mappings"));
38+
}
39+
40+
@Override
41+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
42+
GetDataStreamMappingsAction.Request getDataStreamRequest = new GetDataStreamMappingsAction.Request(
43+
RestUtils.getMasterNodeTimeout(request)
44+
).indices(Strings.splitStringByCommaToArray(request.param("name")));
45+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
46+
GetDataStreamMappingsAction.INSTANCE,
47+
getDataStreamRequest,
48+
new RestRefCountedChunkedToXContentListener<>(channel)
49+
);
50+
}
51+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.datastreams.rest;
10+
11+
import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.cluster.metadata.Template;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.compress.CompressedXContent;
16+
import org.elasticsearch.rest.BaseRestHandler;
17+
import org.elasticsearch.rest.RestRequest;
18+
import org.elasticsearch.rest.RestUtils;
19+
import org.elasticsearch.rest.Scope;
20+
import org.elasticsearch.rest.ServerlessScope;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
22+
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
23+
import org.elasticsearch.xcontent.XContentParser;
24+
25+
import java.io.IOException;
26+
import java.util.List;
27+
28+
import static org.elasticsearch.rest.RestRequest.Method.PUT;
29+
30+
@ServerlessScope(Scope.PUBLIC)
31+
public class RestUpdateDataStreamMappingsAction extends BaseRestHandler {
32+
33+
@Override
34+
public String getName() {
35+
return "update_data_stream_mappings_action";
36+
}
37+
38+
@Override
39+
public List<Route> routes() {
40+
return List.of(new Route(PUT, "/_data_stream/{name}/_mappings"));
41+
}
42+
43+
@Override
44+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
45+
CompressedXContent mappings;
46+
try (XContentParser parser = request.contentParser()) {
47+
parser.nextToken(); // advance the parser to the expected location
48+
mappings = Template.parseMappings(parser);
49+
}
50+
boolean dryRun = request.paramAsBoolean("dry_run", false);
51+
52+
UpdateDataStreamMappingsAction.Request updateDataStreamMappingsRequest = new UpdateDataStreamMappingsAction.Request(
53+
mappings,
54+
dryRun,
55+
RestUtils.getMasterNodeTimeout(request),
56+
RestUtils.getAckTimeout(request)
57+
).indices(Strings.splitStringByCommaToArray(request.param("name")));
58+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
59+
UpdateDataStreamMappingsAction.INSTANCE,
60+
updateDataStreamMappingsRequest,
61+
new RestRefCountedChunkedToXContentListener<>(channel)
62+
);
63+
}
64+
}

modules/data-streams/src/yamlRestTest/java/org/elasticsearch/datastreams/DataStreamsClientYamlTestSuiteIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.common.util.concurrent.ThreadContext;
1616
import org.elasticsearch.test.cluster.ElasticsearchCluster;
17+
import org.elasticsearch.test.cluster.FeatureFlag;
1718
import org.elasticsearch.test.cluster.local.LocalClusterSpecBuilder;
1819
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1920
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
@@ -47,6 +48,7 @@ private static ElasticsearchCluster createCluster() {
4748
.setting("xpack.security.enabled", "true")
4849
.keystore("bootstrap.password", "x-pack-test-password")
4950
.user("x_pack_rest_user", "x-pack-test-password")
51+
.feature(FeatureFlag.LOGS_STREAM)
5052
.systemProperty("es.queryable_built_in_roles_enabled", "false");
5153
if (initTestSeed().nextBoolean()) {
5254
clusterBuilder.setting("xpack.license.self_generated.type", "trial");
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
setup:
2+
- skip:
3+
features: allowed_warnings
4+
5+
---
6+
"Test single data stream":
7+
- requires:
8+
cluster_features: [ "logs_stream" ]
9+
reason: requires setting 'logs_stream' to get or set data stream settings
10+
- do:
11+
allowed_warnings:
12+
- "index template [my-template] has index patterns [my-data-stream-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
13+
indices.put_index_template:
14+
name: my-template
15+
body:
16+
index_patterns: [ my-data-stream-* ]
17+
data_stream: { }
18+
template:
19+
settings:
20+
number_of_replicas: 0
21+
mappings:
22+
properties:
23+
field1:
24+
type: keyword
25+
26+
- do:
27+
indices.create_data_stream:
28+
name: my-data-stream-1
29+
30+
- do:
31+
cluster.health:
32+
index: "my-data-stream-1"
33+
wait_for_status: green
34+
35+
- do:
36+
indices.get_data_stream_mappings:
37+
name: my-data-stream-1
38+
- match: { data_streams.0.name: my-data-stream-1 }
39+
- match: { data_streams.0.mappings: {} }
40+
- length: { data_streams.0.effective_mappings.properties: 1 }
41+
42+
- do:
43+
indices.get_data_stream:
44+
name: my-data-stream-1
45+
- match: { data_streams.0.name: my-data-stream-1 }
46+
- match: { data_streams.0.mappings: {} }
47+
- match: { data_streams.0.effective_mappings: null }
48+
49+
- do:
50+
indices.put_data_stream_mappings:
51+
name: my-data-stream-1
52+
body:
53+
properties:
54+
name:
55+
type: keyword
56+
fields:
57+
english:
58+
type: text
59+
- match: { data_streams.0.name: my-data-stream-1 }
60+
- match: { data_streams.0.applied_to_data_stream: true }
61+
- match: { data_streams.0.mappings.properties.name.type: "keyword" }
62+
- match: { data_streams.0.effective_mappings.properties.name.type: "keyword" }
63+
64+
- do:
65+
indices.rollover:
66+
alias: "my-data-stream-1"
67+
68+
- do:
69+
cluster.health:
70+
index: "my-data-stream-1"
71+
wait_for_status: green
72+
73+
- do:
74+
indices.get_data_stream_mappings:
75+
name: my-data-stream-1
76+
- match: { data_streams.0.name: my-data-stream-1 }
77+
- length: { data_streams.0.effective_mappings.properties: 2 }
78+
- match: { data_streams.0.mappings.properties.name.type: "keyword" }
79+
- match: { data_streams.0.effective_mappings.properties.name.type: "keyword" }
80+
81+
- do:
82+
indices.get_data_stream:
83+
name: my-data-stream-1
84+
- match: { data_streams.0.name: my-data-stream-1 }
85+
- match: { data_streams.0.mappings.properties.name.type: "keyword" }
86+
- match: { data_streams.0.effective_mappings: null }
87+
- set: { data_streams.0.indices.0.index_name: oldIndexName }
88+
- set: { data_streams.0.indices.1.index_name: newIndexName }
89+
90+
- do:
91+
indices.get_mapping:
92+
index: my-data-stream-1
93+
- match: { .$oldIndexName.mappings.properties.name: null }
94+
- match: { .$newIndexName.mappings.properties.name.type: "keyword" }
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"indices.get_data_stream_mappings":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
5+
"description":"Gets a data stream's mappings"
6+
},
7+
"stability":"stable",
8+
"visibility": "feature_flag",
9+
"feature_flag": "logs_stream",
10+
"headers":{
11+
"accept": [ "application/json"]
12+
},
13+
"url":{
14+
"paths":[
15+
{
16+
"path":"/_data_stream/{name}/_mappings",
17+
"methods":[
18+
"GET"
19+
],
20+
"parts":{
21+
"name":{
22+
"type":"string",
23+
"description":"Comma-separated list of data streams or data stream patterns"
24+
}
25+
}
26+
}
27+
]
28+
},
29+
"params":{
30+
"master_timeout":{
31+
"type":"time",
32+
"description":"Period to wait for a connection to the master node"
33+
}
34+
}
35+
}
36+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
{
2+
"indices.put_data_stream_mappings":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
5+
"description":"Updates a data stream's mappings"
6+
},
7+
"stability":"stable",
8+
"visibility": "feature_flag",
9+
"feature_flag": "logs_stream",
10+
"headers":{
11+
"accept": [ "application/json"]
12+
},
13+
"url":{
14+
"paths":[
15+
{
16+
"path":"/_data_stream/{name}/_mappings",
17+
"methods":[
18+
"PUT"
19+
],
20+
"parts":{
21+
"name":{
22+
"type":"string",
23+
"description":"Comma-separated list of data streams or data stream patterns"
24+
}
25+
}
26+
}
27+
]
28+
},
29+
"params":{
30+
"dry_run":{
31+
"type":"boolean",
32+
"description":"Whether this request should only be a dry run rather than actually applying mappings",
33+
"default":false
34+
},
35+
"timeout":{
36+
"type":"time",
37+
"description":"Period to wait for a response"
38+
},
39+
"master_timeout":{
40+
"type":"time",
41+
"description":"Period to wait for a connection to the master node"
42+
}
43+
},
44+
"body":{
45+
"description":"The data stream mappings to be updated",
46+
"required":true
47+
}
48+
}
49+
}

server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
129129
Map<String, Object> uncompressedEffectiveMappings = XContentHelper.convertToMap(
130130
effectiveMappings.uncompressed(),
131131
true,
132-
builder.contentType()
132+
XContentType.JSON
133133
).v2();
134134
builder.field("effective_mappings");
135135
builder.map(uncompressedEffectiveMappings);

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.elasticsearch.xcontent.ParseField;
5353
import org.elasticsearch.xcontent.ToXContentObject;
5454
import org.elasticsearch.xcontent.XContentBuilder;
55-
import org.elasticsearch.xcontent.XContentFactory;
5655
import org.elasticsearch.xcontent.XContentParser;
5756
import org.elasticsearch.xcontent.XContentParserConfiguration;
5857
import org.elasticsearch.xcontent.XContentType;
@@ -61,7 +60,6 @@
6160
import java.time.Instant;
6261
import java.time.temporal.ChronoUnit;
6362
import java.util.ArrayList;
64-
import java.util.Base64;
6563
import java.util.Comparator;
6664
import java.util.HashMap;
6765
import java.util.List;
@@ -1488,18 +1486,12 @@ public void writeTo(StreamOutput out) throws IOException {
14881486
DATA_STREAM_OPTIONS_FIELD
14891487
);
14901488
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
1491-
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
1492-
XContentParser.Token token = p.currentToken();
1493-
if (token == XContentParser.Token.VALUE_STRING) {
1494-
return new CompressedXContent(Base64.getDecoder().decode(p.text()));
1495-
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
1496-
return new CompressedXContent(p.binaryValue());
1497-
} else if (token == XContentParser.Token.START_OBJECT) {
1498-
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered())));
1499-
} else {
1500-
throw new IllegalArgumentException("Unexpected token: " + token);
1501-
}
1502-
}, MAPPINGS_FIELD, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
1489+
PARSER.declareField(
1490+
ConstructingObjectParser.optionalConstructorArg(),
1491+
(p, c) -> { return Template.parseMappings(p); },
1492+
MAPPINGS_FIELD,
1493+
ObjectParser.ValueType.VALUE_OBJECT_ARRAY
1494+
);
15031495
}
15041496

15051497
public static DataStream fromXContent(XContentParser parser) throws IOException {

0 commit comments

Comments
 (0)