Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.HeaderWarning;
Expand All @@ -41,14 +42,16 @@ public class TransportPromoteDataStreamAction extends AcknowledgedTransportMaste
private static final Logger logger = LogManager.getLogger(TransportPromoteDataStreamAction.class);

private final SystemIndices systemIndices;
private final ProjectResolver projectResolver;

@Inject
public TransportPromoteDataStreamAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
SystemIndices systemIndices
SystemIndices systemIndices,
ProjectResolver projectResolver
) {
super(
PromoteDataStreamAction.NAME,
Expand All @@ -60,6 +63,7 @@ public TransportPromoteDataStreamAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.systemIndices = systemIndices;
this.projectResolver = projectResolver;
}

@Override
Expand All @@ -70,6 +74,7 @@ protected void masterOperation(
ActionListener<AcknowledgedResponse> listener
) throws Exception {
systemIndices.validateDataStreamAccess(request.getName(), threadPool.getThreadContext());
final var projectId = projectResolver.getProjectId();
submitUnbatchedTask(
"promote-data-stream [" + request.getName() + "]",
new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) {
Expand All @@ -81,7 +86,9 @@ public void onFailure(Exception e) {

@Override
public ClusterState execute(ClusterState currentState) {
return promoteDataStream(currentState, request);
final var currentProject = currentState.metadata().getProject(projectId);
final var updatedProject = promoteDataStream(currentProject, request);
return ClusterState.builder(currentState).putProjectMetadata(updatedProject).build();
}

@Override
Expand All @@ -97,27 +104,23 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
clusterService.submitUnbatchedStateUpdateTask(source, task);
}

static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) {
DataStream dataStream = currentState.getMetadata().getProject().dataStreams().get(request.getName());
static ProjectMetadata promoteDataStream(ProjectMetadata project, PromoteDataStreamAction.Request request) {
DataStream dataStream = project.dataStreams().get(request.getName());

if (dataStream == null) {
throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist");
}

warnIfTemplateMissingForDatastream(dataStream, currentState);
warnIfTemplateMissingForDatastream(dataStream, project);

DataStream promotedDataStream = dataStream.promoteDataStream();
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
metadata.put(promotedDataStream);
return ClusterState.builder(currentState).metadata(metadata).build();
return ProjectMetadata.builder(project).put(promotedDataStream).build();
}

private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterState currentState) {
private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ProjectMetadata project) {
var datastreamName = dataStream.getName();

var matchingIndex = currentState.metadata()
.getProject()
.templatesV2()
var matchingIndex = project.templatesV2()
.values()
.stream()
.filter(cit -> cit.getDataStreamTemplate() != null)
Expand Down