Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -107,53 +106,46 @@ protected void masterOperation(
request.indices()
);
List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> dataStreamSettingsResponse = new ArrayList<>();
CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
listener.onResponse(new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
CountDownActionListener countDownListener = new CountDownActionListener(
dataStreamNames.size() + 1,
listener.delegateFailure(
(responseActionListener, unused) -> responseActionListener.onResponse(
new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse)
)
)
);
countDownListener.onResponse(null);
for (String dataStreamName : dataStreamNames) {
updateSingleDataStream(
dataStreamName,
request.getSettings(),
request.isDryRun(),
request.masterNodeTimeout(),
request.ackTimeout(),
new ActionListener<>() {
@Override
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamResponse) {
dataStreamSettingsResponse.add(dataStreamResponse);
countDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
dataStreamSettingsResponse.add(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
e.getMessage(),
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
countDownListener.onResponse(null);
}
}
ActionListener.wrap(dataStreamResponse -> {
dataStreamSettingsResponse.add(dataStreamResponse);
countDownListener.onResponse(null);
}, e -> {
dataStreamSettingsResponse.add(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
e.getMessage(),
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
countDownListener.onResponse(null);
})
);
}
}

private void updateSingleDataStream(
String dataStreamName,
Settings settingsOverrides,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
Expand Down Expand Up @@ -198,36 +190,30 @@ private void updateSingleDataStream(
ackTimeout,
dataStreamName,
settingsOverrides,
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
updateSettingsOnIndices(dataStreamName, settingsOverrides, masterNodeTimeout, ackTimeout, listener);
} else {
listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
"Updating settings not accepted for unknown reasons",
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
dryRun,
listener.delegateFailure((dataStreamSettingsResponseActionListener, dataStream) -> {
if (dataStream != null) {
updateSettingsOnIndices(dataStream, settingsOverrides, dryRun, masterNodeTimeout, ackTimeout, listener);
} else {
dataStreamSettingsResponseActionListener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
"Updating settings not accepted for unknown reasons",
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
}
}
})
);
}

private void updateSettingsOnIndices(
String dataStreamName,
DataStream dataStream,
Settings requestSettings,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
Expand All @@ -243,26 +229,15 @@ private void updateSettingsOnIndices(
appliedToDataStreamOnly.add(settingName);
}
}
final List<Index> concreteIndices = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName)
.getIndices();
final List<Index> concreteIndices = dataStream.getIndices();
final List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> indexSettingErrors = new ArrayList<>();

CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() {
// Called when all indices for all settings are complete
@Override
public void onResponse(Void unused) {
DataStream dataStream = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName);
listener.onResponse(
CountDownActionListener indexCountDownListener = new CountDownActionListener(
concreteIndices.size() + 1,
listener.delegateFailure(
(dataStreamSettingsResponseActionListener, unused) -> dataStreamSettingsResponseActionListener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
dataStream.getName(),
true,
null,
settingsFilter.filter(dataStream.getSettings()),
Expand All @@ -275,37 +250,33 @@ public void onResponse(Void unused) {
indexSettingErrors
)
)
);
}
)
)
);

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
Settings applyToIndexSettings = builder().loadFromMap(settingsToApply).build();
for (Index index : concreteIndices) {
updateSettingsOnSingleIndex(index, applyToIndexSettings, masterNodeTimeout, ackTimeout, new ActionListener<>() {
@Override
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) {
updateSettingsOnSingleIndex(
index,
applyToIndexSettings,
dryRun,
masterNodeTimeout,
ackTimeout,
indexCountDownListener.delegateFailure((listener1, indexSettingError) -> {
if (indexSettingError != null) {
indexSettingErrors.add(indexSettingError);
}
indexCountDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
indexCountDownListener.onFailure(e);
}
});
listener1.onResponse(null);
})
);
}
}

private void updateSettingsOnSingleIndex(
Index index,
Settings requestSettings,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> listener
Expand All @@ -326,19 +297,24 @@ private void updateSettingsOnSingleIndex(
);
return;
}
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
requestSettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
if (dryRun) {
/*
* This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
* are not blocked, but there is no value in going beyond this. So just respond to the listener and move on.
*/
listener.onResponse(null);
} else {
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
requestSettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
ActionListener.wrap(response -> {
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
if (response.isAcknowledged() == false) {
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
Expand All @@ -349,16 +325,13 @@ public void onResponse(AcknowledgedResponse response) {
error = null;
}
listener.onResponse(error);
}

@Override
public void onFailure(Exception e) {
listener.onResponse(
},
e -> listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage())
);
}
}
);
)
)
);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try (XContentParser parser = request.contentParser()) {
settings = Settings.fromXContent(parser);
}
boolean dryRun = request.paramAsBoolean("dry_run", false);
UpdateDataStreamSettingsAction.Request putDataStreamRequest = new UpdateDataStreamSettingsAction.Request(
settings,
dryRun,
RestUtils.getMasterNodeTimeout(request),
RestUtils.getAckTimeout(request)
).indices(Strings.splitStringByCommaToArray(request.param("name")));
Expand Down
Loading