Skip to content

Commit f9fdaa3

Browse files
committed
Adding dry_run mode for setting data stream settings
1 parent b335c1a commit f9fdaa3

File tree

5 files changed

+153
-79
lines changed

5 files changed

+153
-79
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamSettingsAction.java

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public void onFailure(Exception e) {
123123
updateSingleDataStream(
124124
dataStreamName,
125125
request.getSettings(),
126+
request.isDryRun(),
126127
request.masterNodeTimeout(),
127128
request.ackTimeout(),
128129
new ActionListener<>() {
@@ -154,6 +155,7 @@ public void onFailure(Exception e) {
154155
private void updateSingleDataStream(
155156
String dataStreamName,
156157
Settings settingsOverrides,
158+
boolean dryRun,
157159
TimeValue masterNodeTimeout,
158160
TimeValue ackTimeout,
159161
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
@@ -198,11 +200,12 @@ private void updateSingleDataStream(
198200
ackTimeout,
199201
dataStreamName,
200202
settingsOverrides,
203+
dryRun,
201204
new ActionListener<>() {
202205
@Override
203-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
204-
if (acknowledgedResponse.isAcknowledged()) {
205-
updateSettingsOnIndices(dataStreamName, settingsOverrides, masterNodeTimeout, ackTimeout, listener);
206+
public void onResponse(DataStream dataStream) {
207+
if (dataStream != null) {
208+
updateSettingsOnIndices(dataStream, settingsOverrides, dryRun, masterNodeTimeout, ackTimeout, listener);
206209
} else {
207210
listener.onResponse(
208211
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
@@ -226,8 +229,9 @@ public void onFailure(Exception e) {
226229
}
227230

228231
private void updateSettingsOnIndices(
229-
String dataStreamName,
232+
DataStream dataStream,
230233
Settings requestSettings,
234+
boolean dryRun,
231235
TimeValue masterNodeTimeout,
232236
TimeValue ackTimeout,
233237
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
@@ -243,26 +247,16 @@ private void updateSettingsOnIndices(
243247
appliedToDataStreamOnly.add(settingName);
244248
}
245249
}
246-
final List<Index> concreteIndices = clusterService.state()
247-
.projectState(projectResolver.getProjectId())
248-
.metadata()
249-
.dataStreams()
250-
.get(dataStreamName)
251-
.getIndices();
250+
final List<Index> concreteIndices = dataStream.getIndices();
252251
final List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> indexSettingErrors = new ArrayList<>();
253252

254253
CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() {
255254
// Called when all indices for all settings are complete
256255
@Override
257256
public void onResponse(Void unused) {
258-
DataStream dataStream = clusterService.state()
259-
.projectState(projectResolver.getProjectId())
260-
.metadata()
261-
.dataStreams()
262-
.get(dataStreamName);
263257
listener.onResponse(
264258
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
265-
dataStreamName,
259+
dataStream.getName(),
266260
true,
267261
null,
268262
settingsFilter.filter(dataStream.getSettings()),
@@ -286,7 +280,7 @@ public void onFailure(Exception e) {
286280
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
287281
Settings applyToIndexSettings = builder().loadFromMap(settingsToApply).build();
288282
for (Index index : concreteIndices) {
289-
updateSettingsOnSingleIndex(index, applyToIndexSettings, masterNodeTimeout, ackTimeout, new ActionListener<>() {
283+
updateSettingsOnSingleIndex(index, applyToIndexSettings, dryRun, masterNodeTimeout, ackTimeout, new ActionListener<>() {
290284
@Override
291285
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) {
292286
if (indexSettingError != null) {
@@ -306,6 +300,7 @@ public void onFailure(Exception e) {
306300
private void updateSettingsOnSingleIndex(
307301
Index index,
308302
Settings requestSettings,
303+
boolean dryRun,
309304
TimeValue masterNodeTimeout,
310305
TimeValue ackTimeout,
311306
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> listener
@@ -326,39 +321,46 @@ private void updateSettingsOnSingleIndex(
326321
);
327322
return;
328323
}
329-
updateSettingsService.updateSettings(
330-
new UpdateSettingsClusterStateUpdateRequest(
331-
projectResolver.getProjectId(),
332-
masterNodeTimeout,
333-
ackTimeout,
334-
requestSettings,
335-
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
336-
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
337-
index
338-
),
339-
new ActionListener<>() {
340-
@Override
341-
public void onResponse(AcknowledgedResponse response) {
342-
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
343-
if (response.isAcknowledged() == false) {
344-
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
345-
index.getName(),
346-
"Updating settings not acknowledged for unknown reason"
347-
);
348-
} else {
349-
error = null;
324+
if (dryRun) {
325+
listener.onResponse(null);
326+
} else {
327+
updateSettingsService.updateSettings(
328+
new UpdateSettingsClusterStateUpdateRequest(
329+
projectResolver.getProjectId(),
330+
masterNodeTimeout,
331+
ackTimeout,
332+
requestSettings,
333+
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
334+
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
335+
index
336+
),
337+
new ActionListener<>() {
338+
@Override
339+
public void onResponse(AcknowledgedResponse response) {
340+
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
341+
if (response.isAcknowledged() == false) {
342+
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
343+
index.getName(),
344+
"Updating settings not acknowledged for unknown reason"
345+
);
346+
} else {
347+
error = null;
348+
}
349+
listener.onResponse(error);
350350
}
351-
listener.onResponse(error);
352-
}
353351

354-
@Override
355-
public void onFailure(Exception e) {
356-
listener.onResponse(
357-
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage())
358-
);
352+
@Override
353+
public void onFailure(Exception e) {
354+
listener.onResponse(
355+
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
356+
index.getName(),
357+
e.getMessage()
358+
)
359+
);
360+
}
359361
}
360-
}
361-
);
362+
);
363+
}
362364
}
363365

364366
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestUpdateDataStreamSettingsAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4545
try (XContentParser parser = request.contentParser()) {
4646
settings = Settings.fromXContent(parser);
4747
}
48+
boolean dryRun = request.paramAsBoolean("dry_run", false);
4849
UpdateDataStreamSettingsAction.Request putDataStreamRequest = new UpdateDataStreamSettingsAction.Request(
4950
settings,
51+
dryRun,
5052
RestUtils.getMasterNodeTimeout(request),
5153
RestUtils.getAckTimeout(request)
5254
).indices(Strings.splitStringByCommaToArray(request.param("name")));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ static TransportVersion def(int id) {
260260
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);
261261
public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME = def(9_077_0_00);
262262
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED = def(9_078_0_00);
263+
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_079_0_00);
263264

264265
/*
265266
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsAction.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.datastreams;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.action.ActionResponse;
1314
import org.elasticsearch.action.ActionType;
1415
import org.elasticsearch.action.IndicesRequest;
@@ -47,10 +48,16 @@ public UpdateDataStreamSettingsAction() {
4748
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
4849
private final Settings settings;
4950
private String[] dataStreamNames = Strings.EMPTY_ARRAY;
51+
private final boolean dryRun;
5052

5153
public Request(Settings settings, TimeValue masterNodeTimeout, TimeValue ackTimeout) {
54+
this(settings, false, masterNodeTimeout, ackTimeout);
55+
}
56+
57+
public Request(Settings settings, boolean dryRun, TimeValue masterNodeTimeout, TimeValue ackTimeout) {
5258
super(masterNodeTimeout, ackTimeout);
5359
this.settings = settings;
60+
this.dryRun = dryRun;
5461
}
5562

5663
@Override
@@ -63,6 +70,10 @@ public Settings getSettings() {
6370
return settings;
6471
}
6572

73+
public boolean isDryRun() {
74+
return dryRun;
75+
}
76+
6677
@Override
6778
public boolean includeDataStreams() {
6879
return true;
@@ -72,13 +83,21 @@ public Request(StreamInput in) throws IOException {
7283
super(in);
7384
this.dataStreamNames = in.readStringArray();
7485
this.settings = Settings.readSettingsFromStream(in);
86+
if (in.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) {
87+
this.dryRun = in.readBoolean();
88+
} else {
89+
this.dryRun = false;
90+
}
7591
}
7692

7793
@Override
7894
public void writeTo(StreamOutput out) throws IOException {
7995
super.writeTo(out);
8096
out.writeStringArray(dataStreamNames);
8197
settings.writeTo(out);
98+
if (out.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS_DRY_RUN)) {
99+
out.writeBoolean(dryRun);
100+
}
82101
}
83102

84103
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

Lines changed: 82 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12+
import org.elasticsearch.ElasticsearchException;
1213
import org.elasticsearch.ResourceNotFoundException;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
@@ -144,32 +145,15 @@ public Tuple<ClusterState, ClusterStateAckListener> executeTask(
144145
UpdateSettingsTask updateSettingsTask,
145146
ClusterState clusterState
146147
) throws Exception {
147-
148-
ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateSettingsTask.projectId);
149-
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata);
150-
Map<String, DataStream> dataStreamMap = projectMetadata.dataStreams();
151-
DataStream dataStream = dataStreamMap.get(updateSettingsTask.dataStreamName);
152-
Settings existingSettings = dataStream.getSettings();
153-
154-
Template.Builder templateBuilder = Template.builder();
155-
Settings.Builder mergedSettingsBuilder = Settings.builder().put(existingSettings).put(updateSettingsTask.settingsOverrides);
156-
Settings mergedSettings = mergedSettingsBuilder.build();
157-
158-
final ComposableIndexTemplate template = lookupTemplateForDataStream(updateSettingsTask.dataStreamName, projectMetadata);
159-
ComposableIndexTemplate mergedTemplate = template.mergeSettings(mergedSettings);
160-
MetadataIndexTemplateService.validateTemplate(
161-
mergedTemplate.template().settings(),
162-
mergedTemplate.template().mappings(),
163-
indicesService
148+
return new Tuple<>(
149+
createClusterStateForUpdatedDataStreamSettings(
150+
updateSettingsTask.projectId,
151+
updateSettingsTask.dataStreamName,
152+
updateSettingsTask.settingsOverrides,
153+
clusterState
154+
),
155+
updateSettingsTask
164156
);
165-
166-
templateBuilder.settings(mergedSettingsBuilder);
167-
DataStream.Builder dataStreamBuilder = dataStream.copy().setSettings(mergedSettings);
168-
projectMetadataBuilder.removeDataStream(updateSettingsTask.dataStreamName);
169-
projectMetadataBuilder.put(dataStreamBuilder.build());
170-
ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build();
171-
172-
return new Tuple<>(updatedClusterState, updateSettingsTask);
173157
}
174158
};
175159
this.updateSettingsTaskQueue = clusterService.createTaskQueue(
@@ -420,13 +404,64 @@ public void updateSettings(
420404
TimeValue ackTimeout,
421405
String dataStreamName,
422406
Settings settingsOverrides,
423-
ActionListener<AcknowledgedResponse> listener
407+
boolean dryRun,
408+
ActionListener<DataStream> listener
424409
) {
425-
updateSettingsTaskQueue.submitTask(
426-
"updating settings on data stream",
427-
new UpdateSettingsTask(projectId, dataStreamName, settingsOverrides, ackTimeout, listener),
428-
masterNodeTimeout
410+
if (dryRun) {
411+
try {
412+
ClusterState newState = createClusterStateForUpdatedDataStreamSettings(
413+
projectId,
414+
dataStreamName,
415+
settingsOverrides,
416+
clusterService.state()
417+
);
418+
listener.onResponse(newState.projectState(projectId).metadata().dataStreams().get(dataStreamName));
419+
} catch (Exception e) {
420+
listener.onFailure(e);
421+
}
422+
} else {
423+
UpdateSettingsTask updateSettingsTask = new UpdateSettingsTask(
424+
projectId,
425+
dataStreamName,
426+
settingsOverrides,
427+
clusterService,
428+
ackTimeout,
429+
listener
430+
);
431+
updateSettingsTaskQueue.submitTask("updating settings on data stream", updateSettingsTask, masterNodeTimeout);
432+
}
433+
}
434+
435+
private ClusterState createClusterStateForUpdatedDataStreamSettings(
436+
ProjectId projectId,
437+
String dataStreamName,
438+
Settings settingsOverrides,
439+
ClusterState clusterState
440+
) throws Exception {
441+
442+
ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId);
443+
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata);
444+
Map<String, DataStream> dataStreamMap = projectMetadata.dataStreams();
445+
DataStream dataStream = dataStreamMap.get(dataStreamName);
446+
Settings existingSettings = dataStream.getSettings();
447+
448+
Template.Builder templateBuilder = Template.builder();
449+
Settings.Builder mergedSettingsBuilder = Settings.builder().put(existingSettings).put(settingsOverrides);
450+
Settings mergedSettings = mergedSettingsBuilder.build();
451+
452+
final ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, projectMetadata);
453+
ComposableIndexTemplate mergedTemplate = template.mergeSettings(mergedSettings);
454+
MetadataIndexTemplateService.validateTemplate(
455+
mergedTemplate.template().settings(),
456+
mergedTemplate.template().mappings(),
457+
indicesService
429458
);
459+
460+
templateBuilder.settings(mergedSettingsBuilder);
461+
DataStream.Builder dataStreamBuilder = dataStream.copy().setSettings(mergedSettings);
462+
projectMetadataBuilder.removeDataStream(dataStreamName);
463+
projectMetadataBuilder.put(dataStreamBuilder.build());
464+
return ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build();
430465
}
431466

432467
private static void addBackingIndex(
@@ -683,10 +718,25 @@ static class UpdateSettingsTask extends AckedBatchedClusterStateUpdateTask {
683718
ProjectId projectId,
684719
String dataStreamName,
685720
Settings settingsOverrides,
721+
ClusterService clusterService,
686722
TimeValue ackTimeout,
687-
ActionListener<AcknowledgedResponse> listener
723+
ActionListener<DataStream> listener
688724
) {
689-
super(ackTimeout, listener);
725+
super(ackTimeout, new ActionListener<>() {
726+
@Override
727+
public void onResponse(AcknowledgedResponse response) {
728+
if (response.isAcknowledged()) {
729+
listener.onResponse(clusterService.state().projectState(projectId).metadata().dataStreams().get(dataStreamName));
730+
} else {
731+
listener.onFailure(new ElasticsearchException("Updating settings not accepted for unknown reasons"));
732+
}
733+
}
734+
735+
@Override
736+
public void onFailure(Exception e) {
737+
listener.onFailure(e);
738+
}
739+
});
690740
this.projectId = projectId;
691741
this.dataStreamName = dataStreamName;
692742
this.settingsOverrides = settingsOverrides;

0 commit comments

Comments
 (0)