Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public final class FinalizeSnapshotContext extends DelegatingActionListener<RepositoryData, RepositoryData> {

private final ShardGenerations updatedShardGenerations;
private final UpdatedShardGenerations updatedShardGenerations;

/**
* Obsolete shard generations map computed from the cluster state update that this finalization executed in
Expand All @@ -46,7 +46,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
private final Runnable onDone;

/**
* @param updatedShardGenerations updated shard generations
* @param updatedShardGenerations updated shard generations for both live and deleted indices
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
Expand All @@ -57,7 +57,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
* once all cleanup operations after snapshot completion have executed
*/
public FinalizeSnapshotContext(
ShardGenerations updatedShardGenerations,
UpdatedShardGenerations updatedShardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Expand All @@ -78,7 +78,7 @@ public long repositoryStateId() {
return repositoryStateId;
}

public ShardGenerations updatedShardGenerations() {
public UpdatedShardGenerations updatedShardGenerations() {
return updatedShardGenerations;
}

Expand Down Expand Up @@ -120,4 +120,22 @@ public void onDone() {
public void onResponse(RepositoryData repositoryData) {
delegate.onResponse(repositoryData);
}

/**
* A record used to track the new shard generations that have been written for each shard in a snapshot.
* An index may be deleted after the shard generation is written but before the snapshot is finalized.
* In this case, its shard generation is tracked in {@link #deletedIndices}. Otherwise, it is tracked in
* {@link #liveIndices}.
*/
public record UpdatedShardGenerations(ShardGenerations liveIndices, ShardGenerations deletedIndices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this will work (but I will add some other comments elsewhere)

public static final UpdatedShardGenerations EMPTY = new UpdatedShardGenerations(ShardGenerations.EMPTY, ShardGenerations.EMPTY);

public UpdatedShardGenerations(ShardGenerations updated) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is only used in tests; IMO it'd be better to just be explicit and pass in ShardGenerations.EMPTY as the second parameter rather than take the risk that some future caller uses this constructor when they should be accounting for deleted indices too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep see 1852486

this(updated, ShardGenerations.EMPTY);
}

public boolean hasShardGen(RepositoryShardId repositoryShardId) {
return liveIndices.hasShardGen(repositoryShardId) || deletedIndices.hasShardGen(repositoryShardId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ int sizeInBytes() {
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations().liveIndices();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe inline this, it's only used in one place, and we now need to care about which ShardGenerations we're talking about so the variable name is ambiguous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. See 1071d95

final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
: "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand Down Expand Up @@ -469,7 +470,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone
endingSnapshots.add(targetSnapshot);
initializingClones.remove(targetSnapshot);
logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, UpdatedShardGenerations.EMPTY);
};

// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
Expand Down Expand Up @@ -748,21 +749,23 @@ private static void validate(final String repositoryName, final String snapshotN
}
}

private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
private static UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
ShardGenerations.Builder builder = ShardGenerations.builder();
ShardGenerations.Builder deletedBuilder = ShardGenerations.builder();
if (snapshot.isClone()) {
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value));
} else {
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> {
final Index index = snapshot.indexByName(key.indexName());
if (metadata.findIndex(index).isEmpty()) {
assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial.";
deletedBuilder.put(key.index(), key.shardId(), value);
return;
}
builder.put(key.index(), key.shardId(), value);
});
}
return builder.build();
return new UpdatedShardGenerations(builder.build(), deletedBuilder.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deletedBuilder.build() is mostly going to be ShardGenerations.EMPTY, can we special-case the empty collection in ShardGenerations.Builder to skip a bunch of unnecessary allocations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Special cased in both Builder#build and initialization of the variable here 81f88ba

}

private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
Expand Down Expand Up @@ -1360,7 +1363,7 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
snapshot,
new SnapshotException(snapshot, entry.failure()),
null,
ShardGenerations.EMPTY
UpdatedShardGenerations.EMPTY
);
}
return;
Expand Down Expand Up @@ -1454,8 +1457,9 @@ protected void doRun() {
SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
final String failure = entry.failure();
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
final List<String> finalIndices = shardGenerations.indices().stream().map(IndexId::getName).toList();
final var snapshotShardGenerations = buildGenerations(entry, metadata);
final ShardGenerations updatedShardGenerations = snapshotShardGenerations.liveIndices();
final List<String> finalIndices = updatedShardGenerations.indices().stream().map(IndexId::getName).toList();
final Set<String> indexNames = new HashSet<>(finalIndices);
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shardStatus : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
Expand Down Expand Up @@ -1552,7 +1556,7 @@ protected void doRun() {
entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(),
failure,
threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
entry.partial() ? updatedShardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
shardFailures,
entry.includeGlobalState(),
entry.userMetadata(),
Expand All @@ -1562,7 +1566,7 @@ protected void doRun() {
final ListenableFuture<List<ActionListener<SnapshotInfo>>> snapshotListeners = new ListenableFuture<>();
repo.finalizeSnapshot(
new FinalizeSnapshotContext(
shardGenerations,
snapshotShardGenerations,
repositoryData.getGenId(),
metaForSnapshot,
snapshotInfo,
Expand All @@ -1579,7 +1583,7 @@ protected void doRun() {
snapshot,
repositoryData,
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
shardGenerations
snapshotShardGenerations
)
),
() -> snapshotListeners.addListener(new ActionListener<>() {
Expand All @@ -1604,7 +1608,7 @@ public void onFailure(Exception e) {
repositoryData,
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
// use the updated shardGenerations for all pending shard snapshots
shardGenerations
snapshotShardGenerations
)
));
}
Expand All @@ -1613,7 +1617,7 @@ public void onFailure(Exception e) {
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
logger.debug("failing finalization of {} due to shutdown", snapshot);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
} else {
onFailure(e);
}
Expand All @@ -1623,7 +1627,7 @@ public void onRejection(Exception e) {
public void onFailure(Exception e) {
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
}
}

Expand Down Expand Up @@ -1679,7 +1683,7 @@ private void handleFinalizationFailure(
Exception e,
Snapshot snapshot,
RepositoryData repositoryData,
ShardGenerations shardGenerations
UpdatedShardGenerations updatedShardGenerations
) {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
Expand All @@ -1693,7 +1697,7 @@ private void handleFinalizationFailure(
failAllListenersOnMasterFailOver(e);
} else {
logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, updatedShardGenerations);
}
}

Expand Down Expand Up @@ -1817,7 +1821,11 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
public static ClusterState stateWithoutSnapshot(
ClusterState state,
Snapshot snapshot,
UpdatedShardGenerations updatedShardGenerations
) {
final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state);
ClusterState result = state;
int indexOfEntry = -1;
Expand Down Expand Up @@ -1883,7 +1891,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
if (shardState.state() != ShardState.SUCCESS
|| previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false
|| shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
|| updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
continue;
}
updatedShardAssignments = maybeAddUpdatedAssignment(
Expand All @@ -1902,7 +1910,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
if (shardState.state() == ShardState.SUCCESS
&& previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey())
&& shardGenerations.hasShardGen(finishedShardEntry.getKey())) {
&& updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) {
updatedShardAssignments = maybeAddUpdatedAssignment(
updatedShardAssignments,
shardState,
Expand Down Expand Up @@ -1992,14 +2000,14 @@ private void removeFailedSnapshotFromClusterState(
Snapshot snapshot,
Exception failure,
@Nullable RepositoryData repositoryData,
ShardGenerations shardGenerations
UpdatedShardGenerations updatedShardGenerations
) {
assert failure != null : "Failure must be supplied";
submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, updatedShardGenerations);
assert updatedState == currentState || endingSnapshots.contains(snapshot)
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -170,16 +171,16 @@ public void testSnapshotWithConflictingName() throws Exception {
repository.getMetadata().name(),
new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2")
);
final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build();
final var snapshotShardGenerations = new UpdatedShardGenerations(ShardGenerations.builder().put(indexId, 0, shardGen).build());
final RepositoryData ignoredRepositoryData = safeAwait(
listener -> repository.finalizeSnapshot(
new FinalizeSnapshotContext(
shardGenerations,
snapshotShardGenerations,
RepositoryData.EMPTY_REPO_GEN,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
new SnapshotInfo(
snapshot,
shardGenerations.indices().stream().map(IndexId::getName).toList(),
snapshotShardGenerations.liveIndices().indices().stream().map(IndexId::getName).toList(),
Collections.emptyList(),
Collections.emptyList(),
null,
Expand Down
Loading
Loading