Skip to content

Commit 04bc70c

Browse files
authored
Introduce CRUD APIs for data stream options (elastic#113945) (elastic#114718)
In this PR we introduce two endpoint PUT and GET to manage the data stream options and consequently the failure store configuration on the data stream level. This means that we can manage the failure store of existing data streams. The APIs look like: ``` # Enable/disable PUT _data_stream/my-data-stream/_options { "failure_store": { "enabled": true } } # Remove existing configuration DELETE _data_stream/my-data-stream/_options # Retrieve GET _data_stream/my-data-stream/_options { "failure_store": { "enabled": true } } ``` Future work: - Document the new APIs - Convert `DataStreamOptionsIT.java` to a yaml test.
1 parent 599ac16 commit 04bc70c

File tree

16 files changed

+1253
-1
lines changed

16 files changed

+1253
-1
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams;
11+
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.Response;
14+
import org.junit.Before;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.is;
22+
import static org.hamcrest.Matchers.nullValue;
23+
24+
/**
25+
* This should be a yaml test, but in order to write one we would need to expose the new APIs in the rest-api-spec.
26+
* We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the new APIs here.
27+
* Please convert this to a yaml test when the feature flag is removed.
28+
*/
29+
public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
30+
31+
private static final String DATA_STREAM_NAME = "failure-data-stream";
32+
33+
@SuppressWarnings("unchecked")
34+
@Before
35+
public void setup() throws IOException {
36+
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template");
37+
putComposableIndexTemplateRequest.setJsonEntity("""
38+
{
39+
"index_patterns": ["failure-data-stream"],
40+
"template": {
41+
"settings": {
42+
"number_of_replicas": 0
43+
}
44+
},
45+
"data_stream": {
46+
"failure_store": true
47+
}
48+
}
49+
""");
50+
assertOK(client().performRequest(putComposableIndexTemplateRequest));
51+
52+
assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
53+
// Initialize the failure store.
54+
assertOK(client().performRequest(new Request("POST", DATA_STREAM_NAME + "/_rollover?target_failure_store")));
55+
ensureGreen(DATA_STREAM_NAME);
56+
57+
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
58+
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
59+
assertThat(dataStreams.size(), is(1));
60+
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
61+
assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
62+
List<String> backingIndices = getIndices(dataStream);
63+
assertThat(backingIndices.size(), is(1));
64+
List<String> failureStore = getFailureStore(dataStream);
65+
assertThat(failureStore.size(), is(1));
66+
}
67+
68+
public void testEnableDisableFailureStore() throws IOException {
69+
{
70+
assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));
71+
assertFailureStore(false, 1);
72+
assertDataStreamOptions(null);
73+
}
74+
{
75+
Request enableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
76+
enableRequest.setJsonEntity("""
77+
{
78+
"failure_store": {
79+
"enabled": true
80+
}
81+
}""");
82+
assertAcknowledged(client().performRequest(enableRequest));
83+
assertFailureStore(true, 1);
84+
assertDataStreamOptions(true);
85+
}
86+
87+
{
88+
Request disableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
89+
disableRequest.setJsonEntity("""
90+
{
91+
"failure_store": {
92+
"enabled": false
93+
}
94+
}""");
95+
assertAcknowledged(client().performRequest(disableRequest));
96+
assertFailureStore(false, 1);
97+
assertDataStreamOptions(false);
98+
}
99+
}
100+
101+
@SuppressWarnings("unchecked")
102+
private void assertFailureStore(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
103+
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
104+
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
105+
assertThat(dataStreams.size(), is(1));
106+
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
107+
assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
108+
assertThat(dataStream.containsKey("failure_store"), is(true));
109+
// Ensure the failure store is set to the provided value
110+
assertThat(((Map<String, Object>) dataStream.get("failure_store")).get("enabled"), equalTo(failureStoreEnabled));
111+
// And the failure indices preserved
112+
List<String> failureStore = getFailureStore(dataStream);
113+
assertThat(failureStore.size(), is(failureStoreSize));
114+
}
115+
116+
@SuppressWarnings("unchecked")
117+
private void assertDataStreamOptions(Boolean failureStoreEnabled) throws IOException {
118+
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME + "/_options"));
119+
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
120+
assertThat(dataStreams.size(), is(1));
121+
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
122+
assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
123+
Map<String, Map<String, Object>> options = (Map<String, Map<String, Object>>) dataStream.get("options");
124+
if (failureStoreEnabled == null) {
125+
assertThat(options, nullValue());
126+
} else {
127+
assertThat(options.containsKey("failure_store"), is(true));
128+
assertThat(options.get("failure_store").get("enabled"), equalTo(failureStoreEnabled));
129+
}
130+
}
131+
132+
@SuppressWarnings("unchecked")
133+
private List<String> getFailureStore(Map<String, Object> response) {
134+
var failureStore = (Map<String, Object>) response.get("failure_store");
135+
return getIndices(failureStore);
136+
137+
}
138+
139+
@SuppressWarnings("unchecked")
140+
private List<String> getIndices(Map<String, Object> response) {
141+
List<Map<String, String>> indices = (List<Map<String, String>>) response.get("indices");
142+
return indices.stream().map(index -> index.get("index_name")).toList();
143+
}
144+
}

modules/data-streams/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
1818
exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server;
1919
exports org.elasticsearch.datastreams.lifecycle;
20+
exports org.elasticsearch.datastreams.options.action to org.elasticsearch.server;
2021

2122
provides org.elasticsearch.features.FeatureSpecification with org.elasticsearch.datastreams.DataStreamFeatures;
2223
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2424
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
2525
import org.elasticsearch.client.internal.OriginSettingClient;
26+
import org.elasticsearch.cluster.metadata.DataStream;
2627
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2728
import org.elasticsearch.cluster.node.DiscoveryNodes;
2829
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -56,6 +57,15 @@
5657
import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction;
5758
import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
5859
import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction;
60+
import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction;
61+
import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
62+
import org.elasticsearch.datastreams.options.action.PutDataStreamOptionsAction;
63+
import org.elasticsearch.datastreams.options.action.TransportDeleteDataStreamOptionsAction;
64+
import org.elasticsearch.datastreams.options.action.TransportGetDataStreamOptionsAction;
65+
import org.elasticsearch.datastreams.options.action.TransportPutDataStreamOptionsAction;
66+
import org.elasticsearch.datastreams.options.rest.RestDeleteDataStreamOptionsAction;
67+
import org.elasticsearch.datastreams.options.rest.RestGetDataStreamOptionsAction;
68+
import org.elasticsearch.datastreams.options.rest.RestPutDataStreamOptionsAction;
5969
import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction;
6070
import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction;
6171
import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction;
@@ -229,6 +239,11 @@ public Collection<?> createComponents(PluginServices services) {
229239
actions.add(new ActionHandler<>(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
230240
actions.add(new ActionHandler<>(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
231241
actions.add(new ActionHandler<>(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
242+
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
243+
actions.add(new ActionHandler<>(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
244+
actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
245+
actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
246+
}
232247
return actions;
233248
}
234249

@@ -261,6 +276,11 @@ public List<RestHandler> getRestHandlers(
261276
handlers.add(new RestDeleteDataStreamLifecycleAction());
262277
handlers.add(new RestExplainDataStreamLifecycleAction());
263278
handlers.add(new RestDataStreamLifecycleStatsAction());
279+
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
280+
handlers.add(new RestGetDataStreamOptionsAction());
281+
handlers.add(new RestPutDataStreamOptionsAction());
282+
handlers.add(new RestDeleteDataStreamOptionsAction());
283+
}
264284
return handlers;
265285
}
266286

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.options.action;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.IndicesRequest;
14+
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.core.TimeValue;
20+
21+
import java.io.IOException;
22+
import java.util.Arrays;
23+
import java.util.Objects;
24+
25+
/**
26+
* Removes the data stream options configuration from the requested data streams.
27+
*/
28+
public class DeleteDataStreamOptionsAction {
29+
30+
public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("indices:admin/data_stream/options/delete");
31+
32+
private DeleteDataStreamOptionsAction() {/* no instances */}
33+
34+
public static final class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
35+
36+
private String[] names;
37+
private IndicesOptions indicesOptions = IndicesOptions.builder()
38+
.concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS)
39+
.wildcardOptions(
40+
IndicesOptions.WildcardOptions.builder().matchOpen(true).matchClosed(true).allowEmptyExpressions(true).resolveAliases(false)
41+
)
42+
.gatekeeperOptions(IndicesOptions.GatekeeperOptions.builder().allowAliasToMultipleIndices(false).allowClosedIndices(true))
43+
.build();
44+
45+
public Request(StreamInput in) throws IOException {
46+
super(in);
47+
this.names = in.readOptionalStringArray();
48+
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
49+
}
50+
51+
@Override
52+
public void writeTo(StreamOutput out) throws IOException {
53+
super.writeTo(out);
54+
out.writeOptionalStringArray(names);
55+
indicesOptions.writeIndicesOptions(out);
56+
}
57+
58+
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names) {
59+
super(masterNodeTimeout, ackTimeout);
60+
this.names = names;
61+
}
62+
63+
public String[] getNames() {
64+
return names;
65+
}
66+
67+
@Override
68+
public String[] indices() {
69+
return names;
70+
}
71+
72+
@Override
73+
public IndicesOptions indicesOptions() {
74+
return indicesOptions;
75+
}
76+
77+
public Request indicesOptions(IndicesOptions indicesOptions) {
78+
this.indicesOptions = indicesOptions;
79+
return this;
80+
}
81+
82+
@Override
83+
public boolean includeDataStreams() {
84+
return true;
85+
}
86+
87+
@Override
88+
public boolean equals(Object o) {
89+
if (this == o) return true;
90+
if (o == null || getClass() != o.getClass()) return false;
91+
Request request = (Request) o;
92+
return Arrays.equals(names, request.names) && Objects.equals(indicesOptions, request.indicesOptions);
93+
}
94+
95+
@Override
96+
public int hashCode() {
97+
int result = Objects.hash(indicesOptions);
98+
result = 31 * result + Arrays.hashCode(names);
99+
return result;
100+
}
101+
102+
@Override
103+
public IndicesRequest indices(String... indices) {
104+
this.names = indices;
105+
return this;
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)