Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package org.elasticsearch.xpack.slm;

import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.ReservedStateHandlerProvider;

import java.util.Collection;
Expand All @@ -27,7 +27,7 @@ public ReservedLifecycleStateHandlerProvider(SnapshotLifecycle plugin) {
}

@Override
public Collection<ReservedClusterStateHandler<?>> clusterHandlers() {
return plugin.reservedClusterStateHandlers();
public Collection<ReservedProjectStateHandler<?>> projectHandlers() {
return plugin.reservedProjectStateHandlers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.HealthPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.snapshots.RegisteredPolicySnapshots;
Expand Down Expand Up @@ -231,7 +231,7 @@ public List<ActionHandler> getActions() {
return actions;
}

List<ReservedClusterStateHandler<?>> reservedClusterStateHandlers() {
List<ReservedProjectStateHandler<?>> reservedProjectStateHandlers() {
return List.of(new ReservedSnapshotAction());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,8 @@ private SnapshotLifecycleProjectState getOrCreateProjectState(ProjectId projectI
* Validates that the {@code repository} exists as a registered snapshot repository
* @throws IllegalArgumentException if the repository does not exist
*/
@Deprecated(forRemoval = true)
public static void validateRepositoryExists(final String repository, final ClusterState state) {
if (RepositoriesMetadata.get(state).repository(repository) == null) {
public static void validateRepositoryExists(final ProjectMetadata projectMetadata, final String repository) {
if (RepositoriesMetadata.get(projectMetadata).repository(repository) == null) {
throw new IllegalArgumentException("no such repository [" + repository + "]");
}
}
Expand All @@ -274,6 +273,7 @@ public static void validateRepositoryExists(final String repository, final Clust
* (see {@link LifecycleSettings#SLM_MINIMUM_INTERVAL_SETTING})
* @throws IllegalArgumentException if the interval is less than the minimum
*/
@FixForMultiProject(description = "Replace with project settings")
public static void validateMinimumInterval(final SnapshotLifecyclePolicy lifecycle, final ClusterState state) {
TimeValue minimum = LifecycleSettings.SLM_MINIMUM_INTERVAL_SETTING.get(state.metadata().settings());
TimeValue next = lifecycle.calculateNextInterval(Clock.systemUTC());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand All @@ -36,7 +39,7 @@
* Internally it uses {@link TransportPutSnapshotLifecycleAction} and
* {@link TransportDeleteSnapshotLifecycleAction} to add, update and delete ILM policies.
*/
public class ReservedSnapshotAction implements ReservedClusterStateHandler<List<SnapshotLifecyclePolicy>> {
public class ReservedSnapshotAction implements ReservedProjectStateHandler<List<SnapshotLifecyclePolicy>> {

public static final String NAME = "slm";

Expand All @@ -45,7 +48,7 @@ public String name() {
return NAME;
}

private Collection<PutSnapshotLifecycleAction.Request> prepare(List<SnapshotLifecyclePolicy> policies, ClusterState state) {
private Collection<PutSnapshotLifecycleAction.Request> prepare(List<SnapshotLifecyclePolicy> policies, ProjectState projectState) {
List<PutSnapshotLifecycleAction.Request> result = new ArrayList<>();

List<Exception> exceptions = new ArrayList<>();
Expand All @@ -59,8 +62,8 @@ private Collection<PutSnapshotLifecycleAction.Request> prepare(List<SnapshotLife
);
try {
validate(request);
SnapshotLifecycleService.validateRepositoryExists(request.getLifecycle().getRepository(), state);
SnapshotLifecycleService.validateMinimumInterval(request.getLifecycle(), state);
SnapshotLifecycleService.validateRepositoryExists(projectState.metadata(), request.getLifecycle().getRepository());
SnapshotLifecycleService.validateMinimumInterval(request.getLifecycle(), projectState.cluster());
result.add(request);
} catch (Exception e) {
exceptions.add(e);
Expand All @@ -77,14 +80,14 @@ private Collection<PutSnapshotLifecycleAction.Request> prepare(List<SnapshotLife
}

@Override
public TransformState transform(List<SnapshotLifecyclePolicy> source, TransformState prevState) throws Exception {
var requests = prepare(source, prevState.state());
public TransformState transform(ProjectId projectId, List<SnapshotLifecyclePolicy> source, TransformState prevState) {
var requests = prepare(source, prevState.state().projectState(projectId));

ClusterState state = prevState.state();

for (var request : requests) {
TransportPutSnapshotLifecycleAction.UpdateSnapshotPolicyTask task =
new TransportPutSnapshotLifecycleAction.UpdateSnapshotPolicyTask(request);
new TransportPutSnapshotLifecycleAction.UpdateSnapshotPolicyTask(projectId, request);

state = task.execute(state);
}
Expand All @@ -96,6 +99,7 @@ public TransformState transform(List<SnapshotLifecyclePolicy> source, TransformS

for (var policyToDelete : toDelete) {
var task = new TransportDeleteSnapshotLifecycleAction.DeleteSnapshotPolicyTask(
projectId,
new DeleteSnapshotLifecycleAction.Request(
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeProjectAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.SuppressForbidden;
Expand All @@ -34,16 +37,16 @@
import java.util.Set;
import java.util.stream.Collectors;

public class TransportDeleteSnapshotLifecycleAction extends TransportMasterNodeAction<
DeleteSnapshotLifecycleAction.Request,
AcknowledgedResponse> {
public class TransportDeleteSnapshotLifecycleAction extends AcknowledgedTransportMasterNodeProjectAction<
DeleteSnapshotLifecycleAction.Request> {

@Inject
public TransportDeleteSnapshotLifecycleAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
DeleteSnapshotLifecycleAction.NAME,
Expand All @@ -52,7 +55,7 @@ public TransportDeleteSnapshotLifecycleAction(
threadPool,
actionFilters,
DeleteSnapshotLifecycleAction.Request::new,
AcknowledgedResponse::readFrom,
projectResolver,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
Expand All @@ -61,32 +64,42 @@ public TransportDeleteSnapshotLifecycleAction(
protected void masterOperation(
Task task,
DeleteSnapshotLifecycleAction.Request request,
ClusterState state,
ProjectState projectState,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
submitUnbatchedTask("delete-snapshot-lifecycle-" + request.getLifecycleId(), new DeleteSnapshotPolicyTask(request, listener));
) {
submitUnbatchedTask(
"delete-snapshot-lifecycle-" + request.getLifecycleId(),
new DeleteSnapshotPolicyTask(projectState.projectId(), request, listener)
);

}

/**
* Extracted extension of {@link AckedClusterStateUpdateTask} with only the execute method
* implementation, so that the execute() transformation can be reused for {@link ReservedSnapshotAction}
*/
public static class DeleteSnapshotPolicyTask extends AckedClusterStateUpdateTask {
private final ProjectId projectId;
private final DeleteSnapshotLifecycleAction.Request request;

DeleteSnapshotPolicyTask(DeleteSnapshotLifecycleAction.Request request, ActionListener<AcknowledgedResponse> listener) {
DeleteSnapshotPolicyTask(
ProjectId projectId,
DeleteSnapshotLifecycleAction.Request request,
ActionListener<AcknowledgedResponse> listener
) {
super(request, listener);
this.projectId = projectId;
this.request = request;
}

@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getProject();
final var project = currentState.metadata().getProject(projectId);
SnapshotLifecycleMetadata snapMeta = project.custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null) {
throw new ResourceNotFoundException("snapshot lifecycle policy not found: {}", request.getLifecycleId());
}
var currentMode = LifecycleOperationMetadata.currentSLMMode(currentState);
var currentMode = LifecycleOperationMetadata.currentSLMMode(project);
// Check that the policy exists in the first place
snapMeta.getSnapshotConfigurations()
.entrySet()
Expand Down Expand Up @@ -117,8 +130,8 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
}

@Override
protected ClusterBlockException checkBlock(DeleteSnapshotLifecycleAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected ClusterBlockException checkBlock(DeleteSnapshotLifecycleAction.Request request, ProjectState projectState) {
return projectState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeProjectAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
Expand All @@ -42,9 +45,7 @@
import java.util.Optional;
import java.util.Set;

public class TransportPutSnapshotLifecycleAction extends TransportMasterNodeAction<
PutSnapshotLifecycleAction.Request,
AcknowledgedResponse> {
public class TransportPutSnapshotLifecycleAction extends AcknowledgedTransportMasterNodeProjectAction<PutSnapshotLifecycleAction.Request> {

private static final Logger logger = LogManager.getLogger(TransportPutSnapshotLifecycleAction.class);

Expand All @@ -53,7 +54,8 @@ public TransportPutSnapshotLifecycleAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
PutSnapshotLifecycleAction.NAME,
Expand All @@ -62,7 +64,7 @@ public TransportPutSnapshotLifecycleAction(
threadPool,
actionFilters,
PutSnapshotLifecycleAction.Request::new,
AcknowledgedResponse::readFrom,
projectResolver,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
Expand All @@ -71,22 +73,25 @@ public TransportPutSnapshotLifecycleAction(
protected void masterOperation(
final Task task,
final PutSnapshotLifecycleAction.Request request,
final ClusterState state,
final ProjectState projectState,
final ActionListener<AcknowledgedResponse> listener
) {
SnapshotLifecycleService.validateRepositoryExists(request.getLifecycle().getRepository(), state);
SnapshotLifecycleService.validateMinimumInterval(request.getLifecycle(), state);
SnapshotLifecycleService.validateRepositoryExists(projectState.metadata(), request.getLifecycle().getRepository());
SnapshotLifecycleService.validateMinimumInterval(request.getLifecycle(), projectState.cluster());

// headers from the thread context stored by the AuthenticationService to be shared between the
// REST layer and the Transport layer here must be accessed within this thread and not in the
// cluster state thread in the ClusterStateUpdateTask below since that thread does not share the
// same context, and therefore does not have access to the appropriate security headers.
final Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(threadPool.getThreadContext(), state);
final Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
projectState.cluster()
);
LifecyclePolicy.validatePolicyName(request.getLifecycleId());

submitUnbatchedTask(
"put-snapshot-lifecycle-" + request.getLifecycleId(),
new UpdateSnapshotPolicyTask(request, listener, filteredHeaders)
new UpdateSnapshotPolicyTask(projectState.projectId(), request, listener, filteredHeaders)
);
}

Expand All @@ -95,15 +100,18 @@ protected void masterOperation(
* implementation, so that the execute() transformation can be reused for {@link ReservedSnapshotAction}
*/
public static class UpdateSnapshotPolicyTask extends AckedClusterStateUpdateTask {
private final ProjectId projectId;
private final PutSnapshotLifecycleAction.Request request;
private final Map<String, String> filteredHeaders;

UpdateSnapshotPolicyTask(
ProjectId projectId,
PutSnapshotLifecycleAction.Request request,
ActionListener<AcknowledgedResponse> listener,
Map<String, String> filteredHeaders
) {
super(request, listener);
this.projectId = projectId;
this.request = request;
this.filteredHeaders = filteredHeaders;
}
Expand All @@ -112,17 +120,18 @@ public static class UpdateSnapshotPolicyTask extends AckedClusterStateUpdateTask
* Used by the {@link ReservedClusterStateHandler} for SLM
* {@link ReservedSnapshotAction}
*/
UpdateSnapshotPolicyTask(PutSnapshotLifecycleAction.Request request) {
UpdateSnapshotPolicyTask(ProjectId projectId, PutSnapshotLifecycleAction.Request request) {
super(request, null);
this.projectId = projectId;
this.request = request;
this.filteredHeaders = Map.of();
}

@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getProject();
final var project = currentState.metadata().getProject(projectId);
SnapshotLifecycleMetadata snapMeta = project.custom(SnapshotLifecycleMetadata.TYPE, SnapshotLifecycleMetadata.EMPTY);
var currentMode = LifecycleOperationMetadata.currentSLMMode(currentState);
var currentMode = LifecycleOperationMetadata.currentSLMMode(project);
final SnapshotLifecyclePolicyMetadata existingPolicyMetadata = snapMeta.getSnapshotConfigurations()
.get(request.getLifecycleId());

Expand Down Expand Up @@ -158,7 +167,7 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
}

@Override
protected ClusterBlockException checkBlock(PutSnapshotLifecycleAction.Request request, ClusterState state) {
protected ClusterBlockException checkBlock(PutSnapshotLifecycleAction.Request request, ProjectState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,28 @@ public void testGetJobId() {
}

public void testRepositoryExistenceForExistingRepo() {
ClusterState state = ClusterState.builder(new ClusterName("cluster")).build();
ProjectMetadata metadata = ProjectMetadata.builder(projectId).build();

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> SnapshotLifecycleService.validateRepositoryExists("repo", state)
() -> SnapshotLifecycleService.validateRepositoryExists(metadata, "repo")
);

assertThat(e.getMessage(), containsString("no such repository [repo]"));

RepositoryMetadata repo = new RepositoryMetadata("repo", "fs", Settings.EMPTY);
RepositoriesMetadata repoMeta = new RepositoriesMetadata(Collections.singletonList(repo));
ClusterState stateWithRepo = ClusterState.builder(state)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, repoMeta))
.build();
ProjectMetadata metadataWithRepo = ProjectMetadata.builder(projectId).putCustom(RepositoriesMetadata.TYPE, repoMeta).build();

SnapshotLifecycleService.validateRepositoryExists("repo", stateWithRepo);
SnapshotLifecycleService.validateRepositoryExists(metadataWithRepo, "repo");
}

public void testRepositoryExistenceForMissingRepo() {
ClusterState state = ClusterState.builder(new ClusterName("cluster")).build();
ProjectMetadata metadata = ProjectMetadata.builder(projectId).build();

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> SnapshotLifecycleService.validateRepositoryExists("repo", state)
() -> SnapshotLifecycleService.validateRepositoryExists(metadata, "repo")
);

assertThat(e.getMessage(), containsString("no such repository [repo]"));
Expand Down
Loading