Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128650.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128650
summary: Always update `shardGenerations` for previous in-progress snapshots
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Context for finalizing a snapshot.
*/
public final class FinalizeSnapshotContext extends DelegatingActionListener<RepositoryData, RepositoryData> {

private final ShardGenerations updatedShardGenerations;
private final UpdatedShardGenerations updatedShardGenerations;

/**
* Obsolete shard generations map computed from the cluster state update that this finalization executed in
Expand All @@ -46,7 +48,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
private final Runnable onDone;

/**
* @param updatedShardGenerations updated shard generations
* @param updatedShardGenerations updated shard generations for both live and deleted indices
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
Expand All @@ -57,7 +59,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
* once all cleanup operations after snapshot completion have executed
*/
public FinalizeSnapshotContext(
ShardGenerations updatedShardGenerations,
UpdatedShardGenerations updatedShardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Expand All @@ -78,7 +80,7 @@ public long repositoryStateId() {
return repositoryStateId;
}

public ShardGenerations updatedShardGenerations() {
public UpdatedShardGenerations updatedShardGenerations() {
return updatedShardGenerations;
}

Expand Down Expand Up @@ -106,8 +108,21 @@ public ClusterState updatedClusterState(ClusterState state) {
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations);
// Now that the updated cluster state may have changed in-progress shard snapshots' shard generations to the latest shard
// generation, let's mark any now unreferenced shard generations as obsolete and ready to be deleted.

final Collection<IndexId> deletedIndices = updatedShardGenerations.deletedIndices.indices();
obsoleteGenerations.set(
SnapshotsInProgress.get(updatedState).obsoleteGenerations(snapshotInfo.repository(), SnapshotsInProgress.get(state))
SnapshotsInProgress.get(updatedState)
.obsoleteGenerations(snapshotInfo.repository(), SnapshotsInProgress.get(state))
.entrySet()
.stream()
// We want to keep both old and new generations for deleted indices, so we filter them out here to avoid deletion.
// We need the old generations because they are what get recorded in the RepositoryData.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this seems suspicious. We should be updating the shard generations for existing shards in RepositoryData and discarding the previous values, even if the index isn't included in the snapshot. AIUI the tripping assertion you mentioned in an earlier comment related to generations for shards that were totally absent from RepositoryData. We should drop those shards entirely from RepositoryData, but update any ones that do exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes you are absolutely right. We should conditionally update the shard generations in RepositoryData for the deleted indices. The issue with my previous attempt is that the update is "unconditional" which triggered the assertion. It's now updated as suggested. Thanks a lot!

// We also need the new generations a future finalization may build upon them. It may ends up not being used at all
// when current batch of in-progress snapshots are completed and no new index of the same name is created.
// That is also OK. It means we have some redundant shard generations in the repository and they will be deleted
// when a snapshot deletion runs.
.filter(e -> deletedIndices.contains(e.getKey().index()) == false)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue))
);
return updatedState;
}
Expand All @@ -120,4 +135,22 @@ public void onDone() {
public void onResponse(RepositoryData repositoryData) {
delegate.onResponse(repositoryData);
}

/**
* A record used to track the new shard generations that have been written for each shard in a snapshot.
* An index may be deleted after the shard generation is written but before the snapshot is finalized.
* In this case, its shard generation is tracked in {@link #deletedIndices}. Otherwise, it is tracked in
* {@link #liveIndices}.
*/
public record UpdatedShardGenerations(ShardGenerations liveIndices, ShardGenerations deletedIndices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this will work (but I will add some other comments elsewhere)

public static final UpdatedShardGenerations EMPTY = new UpdatedShardGenerations(ShardGenerations.EMPTY, ShardGenerations.EMPTY);

public UpdatedShardGenerations(ShardGenerations updated) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is only used in tests; IMO it'd be better to just be explicit and pass in ShardGenerations.EMPTY as the second parameter rather than take the risk that some future caller uses this constructor when they should be accounting for deleted indices too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep see 1852486

this(updated, ShardGenerations.EMPTY);
}

public boolean hasShardGen(RepositoryShardId repositoryShardId) {
return liveIndices.hasShardGen(repositoryShardId) || deletedIndices.hasShardGen(repositoryShardId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ int sizeInBytes() {
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations().liveIndices();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe inline this, it's only used in one place, and we now need to care about which ShardGenerations we're talking about so the variable name is ambiguous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. See 1071d95

final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
: "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand Down Expand Up @@ -469,7 +470,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone
endingSnapshots.add(targetSnapshot);
initializingClones.remove(targetSnapshot);
logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, UpdatedShardGenerations.EMPTY);
};

// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
Expand Down Expand Up @@ -748,21 +749,23 @@ private static void validate(final String repositoryName, final String snapshotN
}
}

private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
private static UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
ShardGenerations.Builder builder = ShardGenerations.builder();
ShardGenerations.Builder deletedBuilder = ShardGenerations.builder();
if (snapshot.isClone()) {
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value));
} else {
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> {
final Index index = snapshot.indexByName(key.indexName());
if (metadata.findIndex(index).isEmpty()) {
assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial.";
deletedBuilder.put(key.index(), key.shardId(), value);
return;
}
builder.put(key.index(), key.shardId(), value);
});
}
return builder.build();
return new UpdatedShardGenerations(builder.build(), deletedBuilder.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deletedBuilder.build() is mostly going to be ShardGenerations.EMPTY, can we special-case the empty collection in ShardGenerations.Builder to skip a bunch of unnecessary allocations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Special cased in both Builder#build and initialization of the variable here 81f88ba

}

private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
Expand Down Expand Up @@ -1360,7 +1363,7 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
snapshot,
new SnapshotException(snapshot, entry.failure()),
null,
ShardGenerations.EMPTY
UpdatedShardGenerations.EMPTY
);
}
return;
Expand Down Expand Up @@ -1454,8 +1457,9 @@ protected void doRun() {
SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
final String failure = entry.failure();
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
final List<String> finalIndices = shardGenerations.indices().stream().map(IndexId::getName).toList();
final var updatedShardGenerations = buildGenerations(entry, metadata);
final ShardGenerations updatedShardGensForLiveIndices = updatedShardGenerations.liveIndices();
final List<String> finalIndices = updatedShardGensForLiveIndices.indices().stream().map(IndexId::getName).toList();
final Set<String> indexNames = new HashSet<>(finalIndices);
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shardStatus : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
Expand Down Expand Up @@ -1552,7 +1556,7 @@ protected void doRun() {
entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(),
failure,
threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
entry.partial() ? updatedShardGensForLiveIndices.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
shardFailures,
entry.includeGlobalState(),
entry.userMetadata(),
Expand All @@ -1562,7 +1566,7 @@ protected void doRun() {
final ListenableFuture<List<ActionListener<SnapshotInfo>>> snapshotListeners = new ListenableFuture<>();
repo.finalizeSnapshot(
new FinalizeSnapshotContext(
shardGenerations,
updatedShardGenerations,
repositoryData.getGenId(),
metaForSnapshot,
snapshotInfo,
Expand All @@ -1579,7 +1583,7 @@ protected void doRun() {
snapshot,
repositoryData,
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
shardGenerations
updatedShardGenerations
)
),
() -> snapshotListeners.addListener(new ActionListener<>() {
Expand All @@ -1604,7 +1608,7 @@ public void onFailure(Exception e) {
repositoryData,
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
// use the updated shardGenerations for all pending shard snapshots
shardGenerations
updatedShardGenerations
)
));
}
Expand All @@ -1613,7 +1617,7 @@ public void onFailure(Exception e) {
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
logger.debug("failing finalization of {} due to shutdown", snapshot);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
} else {
onFailure(e);
}
Expand All @@ -1623,7 +1627,7 @@ public void onRejection(Exception e) {
public void onFailure(Exception e) {
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
}
}

Expand Down Expand Up @@ -1679,7 +1683,7 @@ private void handleFinalizationFailure(
Exception e,
Snapshot snapshot,
RepositoryData repositoryData,
ShardGenerations shardGenerations
UpdatedShardGenerations updatedShardGenerations
) {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
Expand All @@ -1693,7 +1697,7 @@ private void handleFinalizationFailure(
failAllListenersOnMasterFailOver(e);
} else {
logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, updatedShardGenerations);
}
}

Expand Down Expand Up @@ -1817,7 +1821,11 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
public static ClusterState stateWithoutSnapshot(
ClusterState state,
Snapshot snapshot,
UpdatedShardGenerations updatedShardGenerations
) {
final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state);
ClusterState result = state;
int indexOfEntry = -1;
Expand Down Expand Up @@ -1883,7 +1891,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
if (shardState.state() != ShardState.SUCCESS
|| previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false
|| shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
|| updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
continue;
}
updatedShardAssignments = maybeAddUpdatedAssignment(
Expand All @@ -1902,7 +1910,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
if (shardState.state() == ShardState.SUCCESS
&& previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey())
&& shardGenerations.hasShardGen(finishedShardEntry.getKey())) {
&& updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) {
updatedShardAssignments = maybeAddUpdatedAssignment(
updatedShardAssignments,
shardState,
Expand Down Expand Up @@ -1992,14 +2000,14 @@ private void removeFailedSnapshotFromClusterState(
Snapshot snapshot,
Exception failure,
@Nullable RepositoryData repositoryData,
ShardGenerations shardGenerations
UpdatedShardGenerations updatedShardGenerations
) {
assert failure != null : "Failure must be supplied";
submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, updatedShardGenerations);
assert updatedState == currentState || endingSnapshots.contains(snapshot)
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -170,16 +171,16 @@ public void testSnapshotWithConflictingName() throws Exception {
repository.getMetadata().name(),
new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2")
);
final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build();
final var snapshotShardGenerations = new UpdatedShardGenerations(ShardGenerations.builder().put(indexId, 0, shardGen).build());
final RepositoryData ignoredRepositoryData = safeAwait(
listener -> repository.finalizeSnapshot(
new FinalizeSnapshotContext(
shardGenerations,
snapshotShardGenerations,
RepositoryData.EMPTY_REPO_GEN,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
new SnapshotInfo(
snapshot,
shardGenerations.indices().stream().map(IndexId::getName).toList(),
snapshotShardGenerations.liveIndices().indices().stream().map(IndexId::getName).toList(),
Collections.emptyList(),
Collections.emptyList(),
null,
Expand Down
Loading
Loading