Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;

import java.util.Map;
Expand All @@ -25,7 +25,7 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep {
@Override
public final void performAction(
IndexMetadata indexMetadata,
ClusterState currentClusterState,
ProjectState currentState,
ClusterStateObserver observer,
ActionListener<Void> listener
) {
Expand All @@ -36,8 +36,8 @@ public final void performAction(
return;
}

innerPerformAction(followerIndex, currentClusterState, listener);
innerPerformAction(followerIndex, currentState, listener);
}

abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Void> listener);
abstract void innerPerformAction(String followerIndex, ProjectState currentState, ActionListener<Void> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.Nullable;

/**
* Performs an action which must be performed asynchronously because it may take time to complete.
Expand All @@ -24,17 +26,23 @@ public AsyncActionStep(StepKey key, StepKey nextStepKey, Client client) {
this.client = client;
}

protected Client getClient() {
// For testing only
@Nullable
Client getClient() {
return client;
}

protected Client getClient(ProjectId projectId) {
return client.projectClient(projectId);
}

public boolean indexSurvives() {
return true;
}

public abstract void performAction(
IndexMetadata indexMetadata,
ClusterState currentClusterState,
ProjectState currentState,
ClusterStateObserver observer,
ActionListener<Void> listener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.TriConsumer;

import java.util.Objects;
import java.util.function.BiConsumer;

/**
* This step changes its {@link #getNextStepKey()} depending on the
Expand All @@ -26,14 +27,14 @@ public class AsyncBranchingStep extends AsyncActionStep {

private final StepKey nextStepKeyOnFalse;
private final StepKey nextStepKeyOnTrue;
private final BiConsumer<IndexMetadata, ActionListener<Boolean>> asyncPredicate;
private final TriConsumer<ProjectId, IndexMetadata, ActionListener<Boolean>> asyncPredicate;
private final SetOnce<Boolean> predicateValue;

public AsyncBranchingStep(
StepKey key,
StepKey nextStepKeyOnFalse,
StepKey nextStepKeyOnTrue,
BiConsumer<IndexMetadata, ActionListener<Boolean>> asyncPredicate,
TriConsumer<ProjectId, IndexMetadata, ActionListener<Boolean>> asyncPredicate,
Client client
) {
// super.nextStepKey is set to null since it is not used by this step
Expand All @@ -52,11 +53,11 @@ public boolean isRetryable() {
@Override
public void performAction(
IndexMetadata indexMetadata,
ClusterState currentClusterState,
ProjectState currentState,
ClusterStateObserver observer,
ActionListener<Void> listener
) {
asyncPredicate.accept(indexMetadata, listener.safeMap(value -> {
asyncPredicate.apply(currentState.projectId(), indexMetadata, listener.safeMap(value -> {
predicateValue.set(value);
return null;
}));
Expand Down Expand Up @@ -87,7 +88,7 @@ final StepKey getNextStepKeyOnTrue() {
/**
* @return the next step if {@code predicate} is true
*/
final BiConsumer<IndexMetadata, ActionListener<Boolean>> getAsyncPredicate() {
final TriConsumer<ProjectId, IndexMetadata, ActionListener<Boolean>> getAsyncPredicate() {
return asyncPredicate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand All @@ -39,44 +42,48 @@ public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Clie
@Override
public final void performAction(
IndexMetadata indexMetadata,
ClusterState currentClusterState,
ProjectState currentState,
ClusterStateObserver observer,
ActionListener<Void> listener
) {
// Wrap the original listener to handle exceptions caused by ongoing snapshots
SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(
currentState.projectId(),
indexMetadata.getIndex(),
listener,
observer,
currentClusterState.nodes().getLocalNode()
currentState.cluster().nodes().getLocalNode()
);
performDuringNoSnapshot(indexMetadata, currentClusterState, snapshotExceptionListener);
performDuringNoSnapshot(indexMetadata, currentState.metadata(), snapshotExceptionListener);
}

/**
* Method to be performed during which no snapshots for the index are already underway.
*/
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener);
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener);

/**
* SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular
* action failed due to a {@code SnapshotInProgressException}. If it did, then it registers a
* ClusterStateObserver listener waiting for the next time the snapshot is not running,
* re-running the step's {@link #performAction(IndexMetadata, ClusterState, ClusterStateObserver, ActionListener)}
* re-running the step's {@link #performAction(IndexMetadata, ProjectState, ClusterStateObserver, ActionListener)}
* method when the snapshot is no longer running.
*/
class SnapshotExceptionListener implements ActionListener<Void> {
private final ProjectId projectId;
private final Index index;
private final ActionListener<Void> originalListener;
private final ClusterStateObserver observer;
private final DiscoveryNode localNode;

SnapshotExceptionListener(
ProjectId projectId,
Index index,
ActionListener<Void> originalListener,
ClusterStateObserver observer,
DiscoveryNode localNode
) {
this.projectId = projectId;
this.index = index;
this.originalListener = originalListener;
this.observer = observer;
Expand Down Expand Up @@ -106,13 +113,14 @@ public void onNewClusterState(ClusterState state) {
}
try {
logger.debug("[{}] retrying ILM step after snapshot has completed", indexName);
IndexMetadata idxMeta = state.metadata().getProject().index(index);
final var projectState = state.projectState(projectId);
IndexMetadata idxMeta = projectState.metadata().index(index);
if (idxMeta == null) {
// The index has since been deleted, mission accomplished!
originalListener.onResponse(null);
} else {
// Re-invoke the performAction method with the new state
performAction(idxMeta, state, observer, originalListener);
performAction(idxMeta, projectState, observer, originalListener);
}
} catch (Exception e) {
originalListener.onFailure(e);
Expand All @@ -133,7 +141,7 @@ public void onTimeout(TimeValue timeout) {
// ILM actions should only run on master, lets bail on failover
return true;
}
if (state.metadata().getProject().index(index) == null) {
if (state.metadata().getProject(projectId).index(index) == null) {
// The index has since been deleted, mission accomplished!
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -36,11 +36,11 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
final String shrunkenIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(shrunkenIndexSource) == false) {
// the current managed index is a shrunk index
if (currentClusterState.metadata().getProject().index(shrunkenIndexSource) == null) {
if (currentProject.index(shrunkenIndexSource) == null) {
// if the source index does not exist, we'll skip deleting the
// (managed) shrunk index as that will cause data loss
String policyName = indexMetadata.getLifecyclePolicyName();
Expand All @@ -64,7 +64,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
listener.onResponse(null);
return;
}
getClient().admin()
getClient(currentProject.id()).admin()
.indices()
.delete(new DeleteIndexRequest(shrinkIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.RepositoryMissingException;
Expand All @@ -33,7 +33,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
final String indexName = indexMetadata.getIndex().getName();

LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
Expand All @@ -48,7 +48,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
listener.onResponse(null);
return;
}
getClient().admin()
getClient(currentProject.id()).admin()
.cluster()
.prepareDeleteSnapshot(TimeValue.MAX_VALUE, repositoryName, snapshotName)
.execute(new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.core.TimeValue;

import java.util.Map;
Expand All @@ -32,7 +32,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
String followerIndex = indexMetadata.getIndex().getName();
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
Expand All @@ -42,7 +42,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl

if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().indices().close(closeIndexRequest, listener.delegateFailureAndWrap((l, r) -> {
getClient(currentProject.id()).admin().indices().close(closeIndexRequest, listener.delegateFailureAndWrap((l, r) -> {
if (r.isAcknowledged() == false) {
throw new ElasticsearchException("close index request failed to be acknowledged");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;

Expand All @@ -30,18 +30,20 @@ public class CloseIndexStep extends AsyncActionStep {
@Override
public void performAction(
IndexMetadata indexMetadata,
ClusterState currentClusterState,
ProjectState currentState,
ClusterStateObserver observer,
ActionListener<Void> listener
) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().indices().close(request, listener.delegateFailureAndWrap((l, closeIndexResponse) -> {
if (closeIndexResponse.isAcknowledged() == false) {
throw new ElasticsearchException("close index request failed to be acknowledged");
}
l.onResponse(null);
}));
getClient(currentState.projectId()).admin()
.indices()
.close(request, listener.delegateFailureAndWrap((l, closeIndexResponse) -> {
if (closeIndexResponse.isAcknowledged() == false) {
throw new ElasticsearchException("close index request failed to be acknowledged");
}
l.onResponse(null);
}));
} else {
listener.onResponse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -49,8 +50,8 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
createSnapshot(indexMetadata, new ActionListener<>() {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
createSnapshot(currentProject.id(), indexMetadata, new ActionListener<>() {
@Override
public void onResponse(Boolean complete) {
// based on the result of action we'll decide what the next step will be
Expand All @@ -77,7 +78,7 @@ public void onFailure(Exception e) {
});
}

void createSnapshot(IndexMetadata indexMetadata, ActionListener<Boolean> listener) {
void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionListener<Boolean> listener) {
final String indexName = indexMetadata.getIndex().getName();

final LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
Expand Down Expand Up @@ -107,7 +108,7 @@ void createSnapshot(IndexMetadata indexMetadata, ActionListener<Boolean> listene
request.waitForCompletion(true);
request.includeGlobalState(false);

getClient().admin().cluster().createSnapshot(request, listener.map(response -> {
getClient(projectId).admin().cluster().createSnapshot(request, listener.map(response -> {
logger.debug(
"create snapshot response for policy [{}] and index [{}] is: {}",
policyName,
Expand Down
Loading