Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126344.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126344
summary: Experiment setting settings on data stream
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.PostDataStreamAction;
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
Expand All @@ -37,6 +38,7 @@
import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction;
import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction;
import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction;
import org.elasticsearch.datastreams.action.PostDataStreamTransportAction;
import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
Expand Down Expand Up @@ -70,6 +72,7 @@
import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction;
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
import org.elasticsearch.datastreams.rest.RestPostDataStreamAction;
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.health.HealthIndicatorService;
Expand Down Expand Up @@ -226,6 +229,7 @@ public Collection<?> createComponents(PluginServices services) {
public List<ActionHandler> getActions() {
List<ActionHandler> actions = new ArrayList<>();
actions.add(new ActionHandler(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class));
actions.add(new ActionHandler(PostDataStreamAction.INSTANCE, PostDataStreamTransportAction.class));
actions.add(new ActionHandler(DeleteDataStreamAction.INSTANCE, DeleteDataStreamTransportAction.class));
actions.add(new ActionHandler(GetDataStreamAction.INSTANCE, TransportGetDataStreamsAction.class));
actions.add(new ActionHandler(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class));
Expand Down Expand Up @@ -263,6 +267,7 @@ public List<RestHandler> getRestHandlers(
});
List<RestHandler> handlers = new ArrayList<>();
handlers.add(new RestCreateDataStreamAction());
handlers.add(new RestPostDataStreamAction());
handlers.add(new RestDeleteDataStreamAction());
handlers.add(new RestGetDataStreamsAction());
handlers.add(new RestDataStreamsStatsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.datastreams.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.datastreams.PostDataStreamAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PostDataStreamTransportAction extends TransportMasterNodeAction<PostDataStreamAction.Request, PostDataStreamAction.Response> {
private static final Logger logger = LogManager.getLogger(PostDataStreamTransportAction.class);
private static final Set<String> APPLY_TO_BACKING_INDICES = Set.of("index.number_of_replicas", "index.lifecycle.name");
private final MetadataDataStreamsService metadataDataStreamsService;
private final MetadataUpdateSettingsService updateSettingsService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final SystemIndices systemIndices;
private final ProjectResolver projectResolver;

@Inject
public PostDataStreamTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver,
MetadataDataStreamsService metadataDataStreamsService,
MetadataUpdateSettingsService updateSettingsService,
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices
) {
super(
PostDataStreamAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
PostDataStreamAction.Request::new,
PostDataStreamAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.projectResolver = projectResolver;
this.metadataDataStreamsService = metadataDataStreamsService;
this.updateSettingsService = updateSettingsService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.systemIndices = systemIndices;
}

@Override
protected void masterOperation(
Task task,
PostDataStreamAction.Request request,
ClusterState state,
ActionListener<PostDataStreamAction.Response> listener
) throws Exception {
String dataStreamNamePattern = request.getName();
List<String> dataStreamNames = indexNameExpressionResolver.dataStreamNames(
clusterService.state(),
IndicesOptions.DEFAULT,
dataStreamNamePattern
);
List<PostDataStreamAction.DataStreamResponse> dataStreamResponses = new ArrayList<>();
CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size(), new ActionListener<>() {
@Override
public void onResponse(Void unused) {
listener.onResponse(new PostDataStreamAction.Response(dataStreamResponses));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e); // TODO ?
}
});
// TODO there's a much better way
for (String dataStreamName : dataStreamNames) {
updateSingleDataStream(
dataStreamName,
request.getTemplateOverrides(),
request.masterNodeTimeout(),
request.ackTimeout(),
new ActionListener<>() {
@Override
public void onResponse(PostDataStreamAction.DataStreamResponse dataStreamResponse) {
dataStreamResponses.add(dataStreamResponse);
countDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
// TODO this is not actually correct at all:
dataStreamResponses.add(
new PostDataStreamAction.DataStreamResponse(dataStreamName, false, e.getMessage(), null, List.of())
);
countDownListener.onResponse(null);
}
}
);
}
}

private void updateSingleDataStream(
String dataStreamName,
ComposableIndexTemplate templateOverrides,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<PostDataStreamAction.DataStreamResponse> listener
) {
if (systemIndices.isSystemDataStream(dataStreamName)) {
listener.onResponse(
new PostDataStreamAction.DataStreamResponse(dataStreamName, false, "Cannot update a system data stream", null, List.of())
);
return;
}
metadataDataStreamsService.updateTemplateOverrides(
projectResolver,
masterNodeTimeout,
ackTimeout,
dataStreamName,
templateOverrides,
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
updateSettingsOnIndices(dataStreamName, templateOverrides, masterNodeTimeout, ackTimeout, listener);
} else {
listener.onResponse(
new PostDataStreamAction.DataStreamResponse(
dataStreamName,
false,
"Updating template overrides not accepted for unknown reasons",
null,
List.of()
)
);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}

private void updateSettingsOnIndices(
String dataStreamName,
ComposableIndexTemplate templateOverrides,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<PostDataStreamAction.DataStreamResponse> listener
) {
final List<Index> concreteIndices = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName)
.getIndices();
final Settings requestSettings = templateOverrides.template().settings();
final List<PostDataStreamAction.DataStreamResponse.IndexSettingResult> indexSettingResults = new ArrayList<>();
CountDownActionListener settingCountDownListener = new CountDownActionListener(requestSettings.size() + 1, new ActionListener<>() {
// Called once all settings are completed for all indices
@Override
public void onResponse(Void unused) {
ComposableIndexTemplate effectiveIndexTemplate = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName)
.getEffectiveIndexTemplate(clusterService.state().projectState(projectResolver.getProjectId()).metadata());
listener.onResponse(
new PostDataStreamAction.DataStreamResponse(dataStreamName, true, null, effectiveIndexTemplate, indexSettingResults)
);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
settingCountDownListener.onResponse(null); // handles the case when there were zero settings
for (String setting : requestSettings.keySet()) {
updateSingleSettingOnIndices(
setting,
requestSettings.get(setting),
concreteIndices,
masterNodeTimeout,
ackTimeout,
new ActionListener<PostDataStreamAction.DataStreamResponse.IndexSettingResult>() {
@Override
public void onResponse(PostDataStreamAction.DataStreamResponse.IndexSettingResult indexSettingResult) {
indexSettingResults.add(indexSettingResult);
settingCountDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
settingCountDownListener.onFailure(e);
}
}
);
}
}

private void updateSingleSettingOnIndices(
String setting,
Object settingValue,
List<Index> concreteIndices,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<PostDataStreamAction.DataStreamResponse.IndexSettingResult> listener
) {
if (APPLY_TO_BACKING_INDICES.contains(setting)) {
final List<PostDataStreamAction.DataStreamResponse.IndexSettingError> errors = new ArrayList<>();
CountDownActionListener indexCountDownListener = new CountDownActionListener(
concreteIndices.size() + 1,
new ActionListener<>() {
// Called when all indices for a single setting are complete
@Override
public void onResponse(Void unused) {
listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, true, errors));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
for (Index index : concreteIndices) {
updateSingleSettingForSingleIndex(setting, settingValue, index, masterNodeTimeout, ackTimeout, new ActionListener<>() {
// Called when a single setting for a single index is complete
@Override
public void onResponse(PostDataStreamAction.DataStreamResponse.IndexSettingError response) {
if (response != null) {
errors.add(response);
}
indexCountDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
indexCountDownListener.onFailure(e);
}
});
}
} else {
// This is not a setting that we will apply to backing indices
listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, false, List.of()));
}
}

private void updateSingleSettingForSingleIndex(
String settingName,
Object settingValue,
Index index,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<PostDataStreamAction.DataStreamResponse.IndexSettingError> listener
) {
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
Settings.builder().loadFromMap(Map.of(settingName, settingValue)).build(),
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
PostDataStreamAction.DataStreamResponse.IndexSettingError error;
if (response.isAcknowledged() == false) {
error = new PostDataStreamAction.DataStreamResponse.IndexSettingError(
index.getName(),
"Updating setting not acknowledged for unknown reason"
);
} else {
error = null;
}
listener.onResponse(error);
}

@Override
public void onFailure(Exception e) {
listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingError(index.getName(), e.getMessage()));
}
}
);
}

@Override
protected ClusterBlockException checkBlock(PostDataStreamAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
Loading
Loading