diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportDeleteDataStreamOptionsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportDeleteDataStreamOptionsAction.java index acb7e1e8d4825..56ca98b3abfa2 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportDeleteDataStreamOptionsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportDeleteDataStreamOptionsAction.java @@ -81,7 +81,13 @@ protected void masterOperation( for (String name : dataStreamNames) { systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext()); } - metadataDataStreamsService.removeDataStreamOptions(dataStreamNames, request.ackTimeout(), request.masterNodeTimeout(), listener); + metadataDataStreamsService.removeDataStreamOptions( + state.projectId(), + dataStreamNames, + request.ackTimeout(), + request.masterNodeTimeout(), + listener + ); } @Override diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportPutDataStreamOptionsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportPutDataStreamOptionsAction.java index 01abcef4f3550..249aa36760a25 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportPutDataStreamOptionsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportPutDataStreamOptionsAction.java @@ -81,6 +81,7 @@ protected void masterOperation( systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext()); } metadataDataStreamsService.setDataStreamOptions( + state.projectId(), dataStreamNames, request.getOptions(), request.ackTimeout(), diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index f78ab7e4778de..18895be0d1b2e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -119,7 +119,15 @@ public Tuple executeTask( ClusterState clusterState ) { return new Tuple<>( - updateDataStreamOptions(clusterState, modifyOptionsTask.getDataStreamNames(), modifyOptionsTask.getOptions()), + ClusterState.builder(clusterState) + .putProjectMetadata( + updateDataStreamOptions( + clusterState.projectState(modifyOptionsTask.projectId).metadata(), + modifyOptionsTask.getDataStreamNames(), + modifyOptionsTask.getOptions() + ) + ) + .build(), modifyOptionsTask ); } @@ -195,6 +203,7 @@ public void removeLifecycle( * Submits the task to set the provided data stream options to the requested data streams. */ public void setDataStreamOptions( + final ProjectId projectId, final List dataStreamNames, DataStreamOptions options, TimeValue ackTimeout, @@ -203,7 +212,7 @@ public void setDataStreamOptions( ) { updateOptionsTaskQueue.submitTask( "set-data-stream-options", - new UpdateOptionsTask(dataStreamNames, options, ackTimeout, listener), + new UpdateOptionsTask(projectId, dataStreamNames, options, ackTimeout, listener), masterTimeout ); } @@ -212,6 +221,7 @@ public void setDataStreamOptions( * Submits the task to remove the data stream options from the requested data streams. */ public void removeDataStreamOptions( + ProjectId projectId, List dataStreamNames, TimeValue ackTimeout, TimeValue masterTimeout, @@ -219,7 +229,7 @@ public void removeDataStreamOptions( ) { updateOptionsTaskQueue.submitTask( "delete-data-stream-options", - new UpdateOptionsTask(dataStreamNames, null, ackTimeout, listener), + new UpdateOptionsTask(projectId, dataStreamNames, null, ackTimeout, listener), masterTimeout ); } @@ -308,18 +318,17 @@ ProjectMetadata updateDataLifecycle(ProjectMetadata project, List dataSt * Creates an updated cluster state in which the requested data streams have the data stream options provided. * Visible for testing. */ - ClusterState updateDataStreamOptions( - ClusterState currentState, + ProjectMetadata updateDataStreamOptions( + ProjectMetadata project, List dataStreamNames, @Nullable DataStreamOptions dataStreamOptions ) { - Metadata metadata = currentState.metadata(); - Metadata.Builder builder = Metadata.builder(metadata); + ProjectMetadata.Builder builder = ProjectMetadata.builder(project); for (var dataStreamName : dataStreamNames) { - var dataStream = validateDataStream(metadata.getProject(), dataStreamName); + var dataStream = validateDataStream(project, dataStreamName); builder.put(dataStream.copy().setDataStreamOptions(dataStreamOptions).build()); } - return ClusterState.builder(currentState).metadata(builder.build()).build(); + return builder.build(); } /** @@ -525,21 +534,27 @@ public DataStreamLifecycle getDataLifecycle() { * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion. */ static class UpdateOptionsTask extends AckedBatchedClusterStateUpdateTask { - + ProjectId projectId; private final List dataStreamNames; private final DataStreamOptions options; UpdateOptionsTask( + ProjectId projectId, List dataStreamNames, @Nullable DataStreamOptions options, TimeValue ackTimeout, ActionListener listener ) { super(ackTimeout, listener); + this.projectId = projectId; this.dataStreamNames = dataStreamNames; this.options = options; } + public ProjectId getProjectId() { + return projectId; + } + public List getDataStreamNames() { return dataStreamNames; } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index 1d650175430ed..4f05ff8a70a59 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -446,13 +446,18 @@ public void testUpdateLifecycle() { } public void testUpdateDataStreamOptions() { + final var projectId = randomProjectIdOrDefault(); String dataStream = randomAlphaOfLength(5); // we want the data stream options to be non-empty, so we can see the removal in action DataStreamOptions dataStreamOptions = randomValueOtherThan( DataStreamOptions.EMPTY, DataStreamOptionsTests::randomDataStreamOptions ); - ClusterState before = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStream, 2)), List.of()); + ProjectMetadata before = DataStreamTestHelper.getClusterStateWithDataStreams( + projectId, + List.of(new Tuple<>(dataStream, 2)), + List.of() + ).metadata().getProject(projectId); MetadataDataStreamsService service = new MetadataDataStreamsService( mock(ClusterService.class), mock(IndicesService.class), @@ -460,20 +465,20 @@ public void testUpdateDataStreamOptions() { ); // Ensure no data stream options are stored - DataStream updatedDataStream = before.metadata().getProject().dataStreams().get(dataStream); + DataStream updatedDataStream = before.dataStreams().get(dataStream); assertNotNull(updatedDataStream); assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY)); // Set non-empty data stream options - ClusterState after = service.updateDataStreamOptions(before, List.of(dataStream), dataStreamOptions); - updatedDataStream = after.metadata().getProject().dataStreams().get(dataStream); + ProjectMetadata after = service.updateDataStreamOptions(before, List.of(dataStream), dataStreamOptions); + updatedDataStream = after.dataStreams().get(dataStream); assertNotNull(updatedDataStream); assertThat(updatedDataStream.getDataStreamOptions(), equalTo(dataStreamOptions)); before = after; // Remove data stream options after = service.updateDataStreamOptions(before, List.of(dataStream), null); - updatedDataStream = after.metadata().getProject().dataStreams().get(dataStream); + updatedDataStream = after.dataStreams().get(dataStream); assertNotNull(updatedDataStream); assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY)); } diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 153311c26d9fe..74592a5e3626b 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -39,7 +39,6 @@ tasks.named("yamlRestTest").configure { '^cat.snapshots/*/*', '^cluster.desired_balance/10_basic/*', '^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API - '^data_stream/230_data_stream_options/*', // updating data stream options does not yet support multi project '^health/10_basic/*', '^health/40_diagnosis/*', '^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security