Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ protected void masterOperation(
for (String name : dataStreamNames) {
systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
}
metadataDataStreamsService.removeDataStreamOptions(dataStreamNames, request.ackTimeout(), request.masterNodeTimeout(), listener);
metadataDataStreamsService.removeDataStreamOptions(
state.projectId(),
dataStreamNames,
request.ackTimeout(),
request.masterNodeTimeout(),
listener
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected void masterOperation(
systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
}
metadataDataStreamsService.setDataStreamOptions(
state.projectId(),
dataStreamNames,
request.getOptions(),
request.ackTimeout(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ public Tuple<ClusterState, ClusterStateAckListener> executeTask(
ClusterState clusterState
) {
return new Tuple<>(
updateDataStreamOptions(clusterState, modifyOptionsTask.getDataStreamNames(), modifyOptionsTask.getOptions()),
ClusterState.builder(clusterState)
.putProjectMetadata(
updateDataStreamOptions(
clusterState.projectState(modifyOptionsTask.projectId).metadata(),
modifyOptionsTask.getDataStreamNames(),
modifyOptionsTask.getOptions()
)
)
.build(),
modifyOptionsTask
);
}
Expand Down Expand Up @@ -195,6 +203,7 @@ public void removeLifecycle(
* Submits the task to set the provided data stream options to the requested data streams.
*/
public void setDataStreamOptions(
final ProjectId projectId,
final List<String> dataStreamNames,
DataStreamOptions options,
TimeValue ackTimeout,
Expand All @@ -203,7 +212,7 @@ public void setDataStreamOptions(
) {
updateOptionsTaskQueue.submitTask(
"set-data-stream-options",
new UpdateOptionsTask(dataStreamNames, options, ackTimeout, listener),
new UpdateOptionsTask(projectId, dataStreamNames, options, ackTimeout, listener),
masterTimeout
);
}
Expand All @@ -212,14 +221,15 @@ public void setDataStreamOptions(
* Submits the task to remove the data stream options from the requested data streams.
*/
public void removeDataStreamOptions(
ProjectId projectId,
List<String> dataStreamNames,
TimeValue ackTimeout,
TimeValue masterTimeout,
ActionListener<AcknowledgedResponse> listener
) {
updateOptionsTaskQueue.submitTask(
"delete-data-stream-options",
new UpdateOptionsTask(dataStreamNames, null, ackTimeout, listener),
new UpdateOptionsTask(projectId, dataStreamNames, null, ackTimeout, listener),
masterTimeout
);
}
Expand Down Expand Up @@ -308,18 +318,17 @@ ProjectMetadata updateDataLifecycle(ProjectMetadata project, List<String> dataSt
* Creates an updated cluster state in which the requested data streams have the data stream options provided.
* Visible for testing.
*/
ClusterState updateDataStreamOptions(
ClusterState currentState,
ProjectMetadata updateDataStreamOptions(
ProjectMetadata project,
List<String> dataStreamNames,
@Nullable DataStreamOptions dataStreamOptions
) {
Metadata metadata = currentState.metadata();
Metadata.Builder builder = Metadata.builder(metadata);
ProjectMetadata.Builder builder = ProjectMetadata.builder(project);
for (var dataStreamName : dataStreamNames) {
var dataStream = validateDataStream(metadata.getProject(), dataStreamName);
var dataStream = validateDataStream(project, dataStreamName);
builder.put(dataStream.copy().setDataStreamOptions(dataStreamOptions).build());
}
return ClusterState.builder(currentState).metadata(builder.build()).build();
return builder.build();
}

/**
Expand Down Expand Up @@ -525,21 +534,27 @@ public DataStreamLifecycle getDataLifecycle() {
* A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
*/
static class UpdateOptionsTask extends AckedBatchedClusterStateUpdateTask {

ProjectId projectId;
private final List<String> dataStreamNames;
private final DataStreamOptions options;

UpdateOptionsTask(
ProjectId projectId,
List<String> dataStreamNames,
@Nullable DataStreamOptions options,
TimeValue ackTimeout,
ActionListener<AcknowledgedResponse> listener
) {
super(ackTimeout, listener);
this.projectId = projectId;
this.dataStreamNames = dataStreamNames;
this.options = options;
}

public ProjectId getProjectId() {
return projectId;
}

public List<String> getDataStreamNames() {
return dataStreamNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,34 +446,39 @@ public void testUpdateLifecycle() {
}

public void testUpdateDataStreamOptions() {
final var projectId = randomProjectIdOrDefault();
String dataStream = randomAlphaOfLength(5);
// we want the data stream options to be non-empty, so we can see the removal in action
DataStreamOptions dataStreamOptions = randomValueOtherThan(
DataStreamOptions.EMPTY,
DataStreamOptionsTests::randomDataStreamOptions
);
ClusterState before = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStream, 2)), List.of());
ProjectMetadata before = DataStreamTestHelper.getClusterStateWithDataStreams(
projectId,
List.of(new Tuple<>(dataStream, 2)),
List.of()
).metadata().getProject(projectId);
MetadataDataStreamsService service = new MetadataDataStreamsService(
mock(ClusterService.class),
mock(IndicesService.class),
DataStreamGlobalRetentionSettings.create(ClusterSettings.createBuiltInClusterSettings())
);

// Ensure no data stream options are stored
DataStream updatedDataStream = before.metadata().getProject().dataStreams().get(dataStream);
DataStream updatedDataStream = before.dataStreams().get(dataStream);
assertNotNull(updatedDataStream);
assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY));

// Set non-empty data stream options
ClusterState after = service.updateDataStreamOptions(before, List.of(dataStream), dataStreamOptions);
updatedDataStream = after.metadata().getProject().dataStreams().get(dataStream);
ProjectMetadata after = service.updateDataStreamOptions(before, List.of(dataStream), dataStreamOptions);
updatedDataStream = after.dataStreams().get(dataStream);
assertNotNull(updatedDataStream);
assertThat(updatedDataStream.getDataStreamOptions(), equalTo(dataStreamOptions));
before = after;

// Remove data stream options
after = service.updateDataStreamOptions(before, List.of(dataStream), null);
updatedDataStream = after.metadata().getProject().dataStreams().get(dataStream);
updatedDataStream = after.dataStreams().get(dataStream);
assertNotNull(updatedDataStream);
assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ tasks.named("yamlRestTest").configure {
'^cat.snapshots/*/*',
'^cluster.desired_balance/10_basic/*',
'^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API
'^data_stream/230_data_stream_options/*', // updating data stream options does not yet support multi project
'^health/10_basic/*',
'^health/40_diagnosis/*',
'^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security
Expand Down