Skip to content

Commit b02c86e

Browse files
committed
adding PutDataStreamSettingsAction
1 parent 19a7573 commit b02c86e

File tree

8 files changed

+804
-14
lines changed

8 files changed

+804
-14
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
1818
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
1919
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
20+
import org.elasticsearch.action.datastreams.PutDataStreamSettingsAction;
2021
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
2122
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2223
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction;
3940
import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction;
4041
import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
42+
import org.elasticsearch.datastreams.action.PutDataStreamSettingsTransportAction;
4143
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
4244
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
4345
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
@@ -71,6 +73,7 @@
7173
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
7274
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
7375
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
76+
import org.elasticsearch.datastreams.rest.RestPutDataStreamSettingsAction;
7477
import org.elasticsearch.features.NodeFeature;
7578
import org.elasticsearch.health.HealthIndicatorService;
7679
import org.elasticsearch.index.IndexSettingProvider;
@@ -228,6 +231,7 @@ public List<ActionHandler> getActions() {
228231
actions.add(new ActionHandler(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class));
229232
actions.add(new ActionHandler(DeleteDataStreamAction.INSTANCE, DeleteDataStreamTransportAction.class));
230233
actions.add(new ActionHandler(GetDataStreamAction.INSTANCE, TransportGetDataStreamsAction.class));
234+
actions.add(new ActionHandler(PutDataStreamSettingsAction.INSTANCE, PutDataStreamSettingsTransportAction.class));
231235
actions.add(new ActionHandler(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class));
232236
actions.add(new ActionHandler(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class));
233237
actions.add(new ActionHandler(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class));
@@ -265,6 +269,7 @@ public List<RestHandler> getRestHandlers(
265269
handlers.add(new RestCreateDataStreamAction());
266270
handlers.add(new RestDeleteDataStreamAction());
267271
handlers.add(new RestGetDataStreamsAction());
272+
handlers.add(new RestPutDataStreamSettingsAction());
268273
handlers.add(new RestDataStreamsStatsAction());
269274
handlers.add(new RestMigrateToDataStreamAction());
270275
handlers.add(new RestPromoteDataStreamAction());
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.datastreams.action;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
15+
import org.elasticsearch.action.datastreams.PutDataStreamSettingsAction;
16+
import org.elasticsearch.action.support.ActionFilters;
17+
import org.elasticsearch.action.support.CountDownActionListener;
18+
import org.elasticsearch.action.support.IndicesOptions;
19+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
20+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.block.ClusterBlockException;
23+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
24+
import org.elasticsearch.cluster.metadata.DataStream;
25+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
26+
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
27+
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
28+
import org.elasticsearch.cluster.project.ProjectResolver;
29+
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.util.concurrent.EsExecutors;
32+
import org.elasticsearch.core.Strings;
33+
import org.elasticsearch.core.TimeValue;
34+
import org.elasticsearch.index.Index;
35+
import org.elasticsearch.indices.SystemIndices;
36+
import org.elasticsearch.injection.guice.Inject;
37+
import org.elasticsearch.tasks.Task;
38+
import org.elasticsearch.threadpool.ThreadPool;
39+
import org.elasticsearch.transport.TransportService;
40+
41+
import java.util.ArrayList;
42+
import java.util.HashMap;
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.Set;
47+
48+
public class PutDataStreamSettingsTransportAction extends TransportMasterNodeAction<
49+
PutDataStreamSettingsAction.Request,
50+
PutDataStreamSettingsAction.Response> {
51+
private static final Logger logger = LogManager.getLogger(PutDataStreamSettingsTransportAction.class);
52+
private static final Set<String> APPLY_TO_BACKING_INDICES = Set.of("index.lifecycle.name");
53+
private static final Set<String> APPLY_TO_DATA_STREAM_ONLY = Set.of("index.number_of_shards");
54+
private final MetadataDataStreamsService metadataDataStreamsService;
55+
private final MetadataUpdateSettingsService updateSettingsService;
56+
private final IndexNameExpressionResolver indexNameExpressionResolver;
57+
private final SystemIndices systemIndices;
58+
private final ProjectResolver projectResolver;
59+
60+
@Inject
61+
public PutDataStreamSettingsTransportAction(
62+
TransportService transportService,
63+
ClusterService clusterService,
64+
ThreadPool threadPool,
65+
ActionFilters actionFilters,
66+
ProjectResolver projectResolver,
67+
MetadataDataStreamsService metadataDataStreamsService,
68+
MetadataUpdateSettingsService updateSettingsService,
69+
IndexNameExpressionResolver indexNameExpressionResolver,
70+
SystemIndices systemIndices
71+
) {
72+
super(
73+
PutDataStreamSettingsAction.NAME,
74+
transportService,
75+
clusterService,
76+
threadPool,
77+
actionFilters,
78+
PutDataStreamSettingsAction.Request::new,
79+
PutDataStreamSettingsAction.Response::new,
80+
EsExecutors.DIRECT_EXECUTOR_SERVICE
81+
);
82+
this.projectResolver = projectResolver;
83+
this.metadataDataStreamsService = metadataDataStreamsService;
84+
this.updateSettingsService = updateSettingsService;
85+
this.indexNameExpressionResolver = indexNameExpressionResolver;
86+
this.systemIndices = systemIndices;
87+
}
88+
89+
@Override
90+
protected void masterOperation(
91+
Task task,
92+
PutDataStreamSettingsAction.Request request,
93+
ClusterState state,
94+
ActionListener<PutDataStreamSettingsAction.Response> listener
95+
) throws Exception {
96+
String dataStreamNamePattern = request.getName();
97+
List<String> dataStreamNames = indexNameExpressionResolver.dataStreamNames(
98+
clusterService.state(),
99+
IndicesOptions.DEFAULT,
100+
dataStreamNamePattern
101+
);
102+
List<PutDataStreamSettingsAction.DataStreamSettingsResponse> dataStreamSettingsResponse = new ArrayList<>();
103+
CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() {
104+
@Override
105+
public void onResponse(Void unused) {
106+
listener.onResponse(new PutDataStreamSettingsAction.Response(dataStreamSettingsResponse));
107+
}
108+
109+
@Override
110+
public void onFailure(Exception e) {
111+
listener.onFailure(e); // TODO ?
112+
}
113+
});
114+
countDownListener.onResponse(null);
115+
for (String dataStreamName : dataStreamNames) {
116+
updateSingleDataStream(
117+
dataStreamName,
118+
request.getSettings(),
119+
request.masterNodeTimeout(),
120+
request.ackTimeout(),
121+
new ActionListener<>() {
122+
@Override
123+
public void onResponse(PutDataStreamSettingsAction.DataStreamSettingsResponse dataStreamResponse) {
124+
dataStreamSettingsResponse.add(dataStreamResponse);
125+
countDownListener.onResponse(null);
126+
}
127+
128+
@Override
129+
public void onFailure(Exception e) {
130+
dataStreamSettingsResponse.add(
131+
new PutDataStreamSettingsAction.DataStreamSettingsResponse(
132+
dataStreamName,
133+
false,
134+
e.getMessage(),
135+
Settings.EMPTY,
136+
Settings.EMPTY,
137+
PutDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
138+
)
139+
);
140+
countDownListener.onResponse(null);
141+
}
142+
}
143+
);
144+
}
145+
}
146+
147+
private void updateSingleDataStream(
148+
String dataStreamName,
149+
Settings settingsOverrides,
150+
TimeValue masterNodeTimeout,
151+
TimeValue ackTimeout,
152+
ActionListener<PutDataStreamSettingsAction.DataStreamSettingsResponse> listener
153+
) {
154+
Set<String> settingsToReject = new HashSet<>();
155+
for (String settingName : settingsOverrides.keySet()) {
156+
if (APPLY_TO_BACKING_INDICES.contains(settingName) == false && APPLY_TO_DATA_STREAM_ONLY.contains(settingName) == false) {
157+
settingsToReject.add(settingName);
158+
}
159+
}
160+
if (settingsToReject.isEmpty() == false) {
161+
listener.onResponse(
162+
new PutDataStreamSettingsAction.DataStreamSettingsResponse(
163+
dataStreamName,
164+
false,
165+
Strings.format("Cannot set the following settings on a data stream: [%s]", String.join(",", settingsToReject)),
166+
Settings.EMPTY,
167+
Settings.EMPTY,
168+
PutDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
169+
)
170+
);
171+
return;
172+
}
173+
174+
if (systemIndices.isSystemDataStream(dataStreamName)) {
175+
listener.onResponse(
176+
new PutDataStreamSettingsAction.DataStreamSettingsResponse(
177+
dataStreamName,
178+
false,
179+
"Cannot update a system data stream",
180+
Settings.EMPTY,
181+
Settings.EMPTY,
182+
PutDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
183+
)
184+
);
185+
return;
186+
}
187+
metadataDataStreamsService.updateSettings(
188+
projectResolver,
189+
masterNodeTimeout,
190+
ackTimeout,
191+
dataStreamName,
192+
settingsOverrides,
193+
new ActionListener<>() {
194+
@Override
195+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
196+
if (acknowledgedResponse.isAcknowledged()) {
197+
updateSettingsOnIndices(dataStreamName, settingsOverrides, masterNodeTimeout, ackTimeout, listener);
198+
} else {
199+
listener.onResponse(
200+
new PutDataStreamSettingsAction.DataStreamSettingsResponse(
201+
dataStreamName,
202+
false,
203+
"Updating settings not accepted for unknown reasons",
204+
Settings.EMPTY,
205+
Settings.EMPTY,
206+
PutDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
207+
)
208+
);
209+
}
210+
}
211+
212+
@Override
213+
public void onFailure(Exception e) {
214+
listener.onFailure(e);
215+
}
216+
}
217+
);
218+
}
219+
220+
private void updateSettingsOnIndices(
221+
String dataStreamName,
222+
Settings requestSettings,
223+
TimeValue masterNodeTimeout,
224+
TimeValue ackTimeout,
225+
ActionListener<PutDataStreamSettingsAction.DataStreamSettingsResponse> listener
226+
) {
227+
Map<String, Object> settingsToApply = new HashMap<>();
228+
List<String> appliedToDataStreamOnly = new ArrayList<>();
229+
List<String> appliedToDataStreamAndBackingIndices = new ArrayList<>();
230+
for (String settingName : requestSettings.keySet()) {
231+
if (APPLY_TO_BACKING_INDICES.contains(settingName)) {
232+
settingsToApply.put(settingName, requestSettings.get(settingName));
233+
appliedToDataStreamAndBackingIndices.add(settingName);
234+
} else if (APPLY_TO_DATA_STREAM_ONLY.contains(settingName)) {
235+
appliedToDataStreamOnly.add(settingName);
236+
}
237+
}
238+
final List<Index> concreteIndices = clusterService.state()
239+
.projectState(projectResolver.getProjectId())
240+
.metadata()
241+
.dataStreams()
242+
.get(dataStreamName)
243+
.getIndices();
244+
final List<PutDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> indexSettingErrors = new ArrayList<>();
245+
246+
CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() {
247+
// Called when all indices for all settings are complete
248+
@Override
249+
public void onResponse(Void unused) {
250+
DataStream dataStream = clusterService.state().projectState(projectResolver.getProjectId()).metadata().dataStreams().get(dataStreamName);
251+
listener.onResponse(
252+
new PutDataStreamSettingsAction.DataStreamSettingsResponse(
253+
dataStreamName,
254+
true,
255+
null,
256+
dataStream.getSettings(),
257+
dataStream.getEffectiveSettings(clusterService.state().projectState(projectResolver.getProjectId()).metadata()),
258+
new PutDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult(
259+
appliedToDataStreamOnly,
260+
appliedToDataStreamAndBackingIndices,
261+
indexSettingErrors
262+
)
263+
)
264+
);
265+
}
266+
267+
@Override
268+
public void onFailure(Exception e) {
269+
listener.onFailure(e);
270+
}
271+
});
272+
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
273+
Settings applyToIndexSettings = Settings.builder().loadFromMap(settingsToApply).build();
274+
for (Index index : concreteIndices) {
275+
updateSettingsOnSingleIndex(index, applyToIndexSettings, masterNodeTimeout, ackTimeout, new ActionListener<>() {
276+
@Override
277+
public void onResponse(PutDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) {
278+
if (indexSettingError != null) {
279+
indexSettingErrors.add(indexSettingError);
280+
}
281+
indexCountDownListener.onResponse(null);
282+
}
283+
284+
@Override
285+
public void onFailure(Exception e) {
286+
indexCountDownListener.onFailure(e);
287+
}
288+
});
289+
}
290+
}
291+
292+
private void updateSettingsOnSingleIndex(
293+
Index index,
294+
Settings requestSettings,
295+
TimeValue masterNodeTimeout,
296+
TimeValue ackTimeout,
297+
ActionListener<PutDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> listener
298+
) {
299+
if (requestSettings.isEmpty()) {
300+
listener.onResponse(null);
301+
} else {
302+
updateSettingsService.updateSettings(
303+
new UpdateSettingsClusterStateUpdateRequest(
304+
projectResolver.getProjectId(),
305+
masterNodeTimeout,
306+
ackTimeout,
307+
requestSettings,
308+
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
309+
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
310+
index
311+
),
312+
new ActionListener<>() {
313+
@Override
314+
public void onResponse(AcknowledgedResponse response) {
315+
PutDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
316+
if (response.isAcknowledged() == false) {
317+
error = new PutDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
318+
index.getName(),
319+
"Updating settings not acknowledged for unknown reason"
320+
);
321+
} else {
322+
error = null;
323+
}
324+
listener.onResponse(error);
325+
}
326+
327+
@Override
328+
public void onFailure(Exception e) {
329+
listener.onResponse(
330+
new PutDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage())
331+
);
332+
}
333+
}
334+
);
335+
}
336+
337+
}
338+
339+
@Override
340+
protected ClusterBlockException checkBlock(PutDataStreamSettingsAction.Request request, ClusterState state) {
341+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
342+
}
343+
}

0 commit comments

Comments
 (0)