Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void onFailure(Exception e) {
updateSingleDataStream(
dataStreamName,
request.getSettings(),
request.isDryRun(),
request.masterNodeTimeout(),
request.ackTimeout(),
new ActionListener<>() {
Expand Down Expand Up @@ -154,6 +155,7 @@ public void onFailure(Exception e) {
private void updateSingleDataStream(
String dataStreamName,
Settings settingsOverrides,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
Expand Down Expand Up @@ -198,11 +200,12 @@ private void updateSingleDataStream(
ackTimeout,
dataStreamName,
settingsOverrides,
dryRun,
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
updateSettingsOnIndices(dataStreamName, settingsOverrides, masterNodeTimeout, ackTimeout, listener);
public void onResponse(DataStream dataStream) {
if (dataStream != null) {
updateSettingsOnIndices(dataStream, settingsOverrides, dryRun, masterNodeTimeout, ackTimeout, listener);
} else {
listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
Expand All @@ -226,8 +229,9 @@ public void onFailure(Exception e) {
}

private void updateSettingsOnIndices(
String dataStreamName,
DataStream dataStream,
Settings requestSettings,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
Expand All @@ -243,26 +247,16 @@ private void updateSettingsOnIndices(
appliedToDataStreamOnly.add(settingName);
}
}
final List<Index> concreteIndices = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName)
.getIndices();
final List<Index> concreteIndices = dataStream.getIndices();
final List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> indexSettingErrors = new ArrayList<>();

CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() {
// Called when all indices for all settings are complete
@Override
public void onResponse(Void unused) {
DataStream dataStream = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName);
listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
dataStream.getName(),
true,
null,
settingsFilter.filter(dataStream.getSettings()),
Expand All @@ -286,7 +280,7 @@ public void onFailure(Exception e) {
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
Settings applyToIndexSettings = builder().loadFromMap(settingsToApply).build();
for (Index index : concreteIndices) {
updateSettingsOnSingleIndex(index, applyToIndexSettings, masterNodeTimeout, ackTimeout, new ActionListener<>() {
updateSettingsOnSingleIndex(index, applyToIndexSettings, dryRun, masterNodeTimeout, ackTimeout, new ActionListener<>() {
@Override
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) {
if (indexSettingError != null) {
Expand All @@ -306,6 +300,7 @@ public void onFailure(Exception e) {
private void updateSettingsOnSingleIndex(
Index index,
Settings requestSettings,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> listener
Expand All @@ -326,39 +321,50 @@ private void updateSettingsOnSingleIndex(
);
return;
}
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
requestSettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
if (response.isAcknowledged() == false) {
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
index.getName(),
"Updating settings not acknowledged for unknown reason"
);
} else {
error = null;
if (dryRun) {
/*
* This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
* are not blocked, but there is no value in going behond this. So just respond to the listener and move on.
*/
listener.onResponse(null);
} else {
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
requestSettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
if (response.isAcknowledged() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're getting quite deeply nested at this point. Is it worth pulling out some of these blocks into methods / inner classes for readability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I switched everything over to ActionListeners helper methods rather than creating my own ActionListeners. I think it improves readability a bit. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :-) Yeah I think that's a little easier on the eyes 👍🏻

error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
index.getName(),
"Updating settings not acknowledged for unknown reason"
);
} else {
error = null;
}
listener.onResponse(error);
}
listener.onResponse(error);
}

@Override
public void onFailure(Exception e) {
listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage())
);
@Override
public void onFailure(Exception e) {
listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
index.getName(),
e.getMessage()
)
);
}
}
}
);
);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try (XContentParser parser = request.contentParser()) {
settings = Settings.fromXContent(parser);
}
boolean dryRun = request.paramAsBoolean("dry_run", false);
UpdateDataStreamSettingsAction.Request putDataStreamRequest = new UpdateDataStreamSettingsAction.Request(
settings,
dryRun,
RestUtils.getMasterNodeTimeout(request),
RestUtils.getAckTimeout(request)
).indices(Strings.splitStringByCommaToArray(request.param("name")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,103 @@ setup:
- match: { data_streams.0.name: my-data-stream-1 }
- match: { data_streams.0.applied_to_data_stream: false }
- match: { data_streams.0.error: "Cannot set the following settings on a data stream: [index.fake_setting]" }

---
"Test dry run":
- requires:
cluster_features: [ "logs_stream" ]
reason: requires setting 'logs_stream' to get or set data stream settings
- do:
allowed_warnings:
- "index template [my-template] has index patterns [my-data-stream-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [ my-data-stream-* ]
data_stream: { }
template:
settings:
number_of_replicas: 0
lifecycle.name: my-policy

- do:
indices.create_data_stream:
name: my-data-stream-1

- do:
cluster.health:
index: "my-data-stream-1"
wait_for_status: green

- do:
indices.get_data_stream_settings:
name: my-data-stream-1
- match: { data_streams.0.name: my-data-stream-1 }
- match: { data_streams.0.settings: {} }
- match: { data_streams.0.effective_settings.index.number_of_shards: null }
- match: { data_streams.0.effective_settings.index.number_of_replicas: "0" }
- match: { data_streams.0.effective_settings.index.lifecycle.name: "my-policy" }

- do:
indices.get_data_stream:
name: my-data-stream-1
- match: { data_streams.0.name: my-data-stream-1 }
- match: { data_streams.0.settings: {} }
- match: { data_streams.0.effective_settings: null }

- do:
indices.put_data_stream_settings:
name: my-data-stream-1
dry_run: true
body:
index:
number_of_shards: 2
lifecycle.name: my-new-policy
- match: { data_streams.0.name: my-data-stream-1 }
- match: { data_streams.0.applied_to_data_stream: true }
- match: { data_streams.0.index_settings_results.applied_to_data_stream_only: [index.number_of_shards]}
- match: { data_streams.0.index_settings_results.applied_to_data_stream_and_backing_indices: [index.lifecycle.name] }
- match: { data_streams.0.settings.index.number_of_shards: "2" }
- match: { data_streams.0.settings.index.lifecycle.name: "my-new-policy" }
- match: { data_streams.0.effective_settings.index.number_of_shards: "2" }
- match: { data_streams.0.effective_settings.index.number_of_replicas: "0" }
- match: { data_streams.0.effective_settings.index.lifecycle.name: "my-new-policy" }

- do:
indices.rollover:
alias: "my-data-stream-1"

- do:
cluster.health:
index: "my-data-stream-1"
wait_for_status: green

- do:
indices.get_data_stream_settings:
name: my-data-stream-1
- match: { data_streams.0.name: my-data-stream-1 }
- match: { data_streams.0.settings: {} }
- match: { data_streams.0.effective_settings.index.number_of_shards: null }
- match: { data_streams.0.effective_settings.index.number_of_replicas: "0" }
- match: { data_streams.0.effective_settings.index.lifecycle.name: "my-policy" }

- do:
indices.get_data_stream:
name: my-data-stream-1
- match: { data_streams.0.name: my-data-stream-1 }
- match: { data_streams.0.settings: {} }
- match: { data_streams.0.effective_settings: null }

- do:
indices.get_data_stream:
name: my-data-stream-1
- set: { data_streams.0.indices.0.index_name: idx0name }
- set: { data_streams.0.indices.1.index_name: idx1name }

- do:
indices.get_settings:
index: my-data-stream-1
- match: { .$idx0name.settings.index.number_of_shards: "1" }
- match: { .$idx0name.settings.index.lifecycle.name: "my-policy" }
- match: { .$idx1name.settings.index.number_of_shards: "1" }
- match: { .$idx1name.settings.index.lifecycle.name: "my-policy" }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
]
},
"params":{
"dry_run":{
"type":"boolean",
"description":"Perform a dry run but do not actually change any settings",
"default":false
},
"timeout":{
"type":"time",
"description":"Specify timeout for acknowledging the cluster state update"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME = def(9_077_0_00);
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED = def(9_078_0_00);
public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_079_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_080_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.datastreams;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
Expand Down Expand Up @@ -47,10 +48,16 @@ public UpdateDataStreamSettingsAction() {
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
private final Settings settings;
private String[] dataStreamNames = Strings.EMPTY_ARRAY;
private final boolean dryRun;

public Request(Settings settings, TimeValue masterNodeTimeout, TimeValue ackTimeout) {
this(settings, false, masterNodeTimeout, ackTimeout);
}

public Request(Settings settings, boolean dryRun, TimeValue masterNodeTimeout, TimeValue ackTimeout) {
super(masterNodeTimeout, ackTimeout);
this.settings = settings;
this.dryRun = dryRun;
}

@Override
Expand All @@ -63,6 +70,10 @@ public Settings getSettings() {
return settings;
}

public boolean isDryRun() {
return dryRun;
}

@Override
public boolean includeDataStreams() {
return true;
Expand All @@ -72,13 +83,21 @@ public Request(StreamInput in) throws IOException {
super(in);
this.dataStreamNames = in.readStringArray();
this.settings = Settings.readSettingsFromStream(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be TransportVersions.SETTINGS_IN_DATA_STREAMS_DRY_RUN?

this.dryRun = in.readBoolean();
} else {
this.dryRun = false;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(dataStreamNames);
settings.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS_DRY_RUN)) {
out.writeBoolean(dryRun);
}
}

@Override
Expand All @@ -103,13 +122,14 @@ public boolean equals(Object o) {
Request request = (Request) o;
return Arrays.equals(dataStreamNames, request.dataStreamNames)
&& settings.equals(request.settings)
&& dryRun == request.dryRun
&& Objects.equals(masterNodeTimeout(), request.masterNodeTimeout())
&& Objects.equals(ackTimeout(), request.ackTimeout());
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(dataStreamNames), settings, masterNodeTimeout(), ackTimeout());
return Objects.hash(Arrays.hashCode(dataStreamNames), settings, dryRun, masterNodeTimeout(), ackTimeout());
}

}
Expand Down
Loading