Skip to content

Commit 50aeb99

Browse files
committed
Always update shardGenerations for previous in-progress snapshots
If an index is deleted after a snapshot has written its shardGenerations file but before the snapshot is finalized, we exclude this index from the snapshot because its indexMetadata is no longer available. However, the shardGenerations file is still valid in that it is the latest copy with all necessary information despite it containing an extra snapshot entry. This is OK. Instead of dropping this shardGenerations file, this PR changes to carry it forward so that the next snapshot to finalize, either earlier or later in the list of entries, builts the next shardGenerations based on this one. Resolves: #108907
1 parent 1ab2e6c commit 50aeb99

File tree

7 files changed

+251
-30
lines changed

7 files changed

+251
-30
lines changed

server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*/
2828
public final class FinalizeSnapshotContext extends DelegatingActionListener<RepositoryData, RepositoryData> {
2929

30-
private final ShardGenerations updatedShardGenerations;
30+
private final UpdatedShardGenerations updatedShardGenerations;
3131

3232
/**
3333
* Obsolete shard generations map computed from the cluster state update that this finalization executed in
@@ -46,7 +46,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
4646
private final Runnable onDone;
4747

4848
/**
49-
* @param updatedShardGenerations updated shard generations
49+
* @param updatedShardGenerations updated shard generations for both live and deleted indices
5050
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
5151
* @param clusterMetadata cluster metadata
5252
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
@@ -57,7 +57,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
5757
* once all cleanup operations after snapshot completion have executed
5858
*/
5959
public FinalizeSnapshotContext(
60-
ShardGenerations updatedShardGenerations,
60+
UpdatedShardGenerations updatedShardGenerations,
6161
long repositoryStateId,
6262
Metadata clusterMetadata,
6363
SnapshotInfo snapshotInfo,
@@ -78,7 +78,7 @@ public long repositoryStateId() {
7878
return repositoryStateId;
7979
}
8080

81-
public ShardGenerations updatedShardGenerations() {
81+
public UpdatedShardGenerations updatedShardGenerations() {
8282
return updatedShardGenerations;
8383
}
8484

@@ -120,4 +120,22 @@ public void onDone() {
120120
public void onResponse(RepositoryData repositoryData) {
121121
delegate.onResponse(repositoryData);
122122
}
123+
124+
/**
125+
* A record used to track the new shard generations that have been written for each shard in a snapshot.
126+
* An index may be deleted after the shard generation is written but before the snapshot is finalized.
127+
* In this case, its shard generation is tracked in {@link #deletedIndices}. Otherwise, it is tracked in
128+
* {@link #liveIndices}.
129+
*/
130+
public record UpdatedShardGenerations(ShardGenerations liveIndices, ShardGenerations deletedIndices) {
131+
public static final UpdatedShardGenerations EMPTY = new UpdatedShardGenerations(ShardGenerations.EMPTY, ShardGenerations.EMPTY);
132+
133+
public UpdatedShardGenerations(ShardGenerations updated) {
134+
this(updated, ShardGenerations.EMPTY);
135+
}
136+
137+
public boolean hasShardGen(RepositoryShardId repositoryShardId) {
138+
return liveIndices.hasShardGen(repositoryShardId) || deletedIndices.hasShardGen(repositoryShardId);
139+
}
140+
}
123141
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1749,7 +1749,7 @@ int sizeInBytes() {
17491749
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
17501750
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
17511751
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
1752-
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
1752+
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations().liveIndices();
17531753
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
17541754
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
17551755
: "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.elasticsearch.indices.SystemDataStreamDescriptor;
9090
import org.elasticsearch.indices.SystemIndices;
9191
import org.elasticsearch.repositories.FinalizeSnapshotContext;
92+
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
9293
import org.elasticsearch.repositories.IndexId;
9394
import org.elasticsearch.repositories.RepositoriesService;
9495
import org.elasticsearch.repositories.Repository;
@@ -469,7 +470,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone
469470
endingSnapshots.add(targetSnapshot);
470471
initializingClones.remove(targetSnapshot);
471472
logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
472-
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY);
473+
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, UpdatedShardGenerations.EMPTY);
473474
};
474475

475476
// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
@@ -748,21 +749,23 @@ private static void validate(final String repositoryName, final String snapshotN
748749
}
749750
}
750751

751-
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
752+
private static UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
752753
ShardGenerations.Builder builder = ShardGenerations.builder();
754+
ShardGenerations.Builder deletedBuilder = ShardGenerations.builder();
753755
if (snapshot.isClone()) {
754756
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value));
755757
} else {
756758
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> {
757759
final Index index = snapshot.indexByName(key.indexName());
758760
if (metadata.findIndex(index).isEmpty()) {
759761
assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial.";
762+
deletedBuilder.put(key.index(), key.shardId(), value);
760763
return;
761764
}
762765
builder.put(key.index(), key.shardId(), value);
763766
});
764767
}
765-
return builder.build();
768+
return new UpdatedShardGenerations(builder.build(), deletedBuilder.build());
766769
}
767770

768771
private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
@@ -1360,7 +1363,7 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
13601363
snapshot,
13611364
new SnapshotException(snapshot, entry.failure()),
13621365
null,
1363-
ShardGenerations.EMPTY
1366+
UpdatedShardGenerations.EMPTY
13641367
);
13651368
}
13661369
return;
@@ -1454,8 +1457,9 @@ protected void doRun() {
14541457
SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
14551458
final String failure = entry.failure();
14561459
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
1457-
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
1458-
final List<String> finalIndices = shardGenerations.indices().stream().map(IndexId::getName).toList();
1460+
final var snapshotShardGenerations = buildGenerations(entry, metadata);
1461+
final ShardGenerations updatedShardGenerations = snapshotShardGenerations.liveIndices();
1462+
final List<String> finalIndices = updatedShardGenerations.indices().stream().map(IndexId::getName).toList();
14591463
final Set<String> indexNames = new HashSet<>(finalIndices);
14601464
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
14611465
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shardStatus : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
@@ -1552,7 +1556,7 @@ protected void doRun() {
15521556
entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(),
15531557
failure,
15541558
threadPool.absoluteTimeInMillis(),
1555-
entry.partial() ? shardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
1559+
entry.partial() ? updatedShardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
15561560
shardFailures,
15571561
entry.includeGlobalState(),
15581562
entry.userMetadata(),
@@ -1562,7 +1566,7 @@ protected void doRun() {
15621566
final ListenableFuture<List<ActionListener<SnapshotInfo>>> snapshotListeners = new ListenableFuture<>();
15631567
repo.finalizeSnapshot(
15641568
new FinalizeSnapshotContext(
1565-
shardGenerations,
1569+
snapshotShardGenerations,
15661570
repositoryData.getGenId(),
15671571
metaForSnapshot,
15681572
snapshotInfo,
@@ -1579,7 +1583,7 @@ protected void doRun() {
15791583
snapshot,
15801584
repositoryData,
15811585
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
1582-
shardGenerations
1586+
snapshotShardGenerations
15831587
)
15841588
),
15851589
() -> snapshotListeners.addListener(new ActionListener<>() {
@@ -1604,7 +1608,7 @@ public void onFailure(Exception e) {
16041608
repositoryData,
16051609
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
16061610
// use the updated shardGenerations for all pending shard snapshots
1607-
shardGenerations
1611+
snapshotShardGenerations
16081612
)
16091613
));
16101614
}
@@ -1613,7 +1617,7 @@ public void onFailure(Exception e) {
16131617
public void onRejection(Exception e) {
16141618
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
16151619
logger.debug("failing finalization of {} due to shutdown", snapshot);
1616-
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
1620+
handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
16171621
} else {
16181622
onFailure(e);
16191623
}
@@ -1623,7 +1627,7 @@ public void onRejection(Exception e) {
16231627
public void onFailure(Exception e) {
16241628
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
16251629
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
1626-
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
1630+
handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
16271631
}
16281632
}
16291633

@@ -1679,7 +1683,7 @@ private void handleFinalizationFailure(
16791683
Exception e,
16801684
Snapshot snapshot,
16811685
RepositoryData repositoryData,
1682-
ShardGenerations shardGenerations
1686+
UpdatedShardGenerations updatedShardGenerations
16831687
) {
16841688
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
16851689
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
@@ -1693,7 +1697,7 @@ private void handleFinalizationFailure(
16931697
failAllListenersOnMasterFailOver(e);
16941698
} else {
16951699
logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e);
1696-
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations);
1700+
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, updatedShardGenerations);
16971701
}
16981702
}
16991703

@@ -1817,7 +1821,11 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
18171821
* @param snapshot snapshot for which to remove the snapshot operation
18181822
* @return updated cluster state
18191823
*/
1820-
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
1824+
public static ClusterState stateWithoutSnapshot(
1825+
ClusterState state,
1826+
Snapshot snapshot,
1827+
UpdatedShardGenerations updatedShardGenerations
1828+
) {
18211829
final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state);
18221830
ClusterState result = state;
18231831
int indexOfEntry = -1;
@@ -1883,7 +1891,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
18831891
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
18841892
if (shardState.state() != ShardState.SUCCESS
18851893
|| previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false
1886-
|| shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
1894+
|| updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
18871895
continue;
18881896
}
18891897
updatedShardAssignments = maybeAddUpdatedAssignment(
@@ -1902,7 +1910,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
19021910
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
19031911
if (shardState.state() == ShardState.SUCCESS
19041912
&& previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey())
1905-
&& shardGenerations.hasShardGen(finishedShardEntry.getKey())) {
1913+
&& updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) {
19061914
updatedShardAssignments = maybeAddUpdatedAssignment(
19071915
updatedShardAssignments,
19081916
shardState,
@@ -1992,14 +2000,14 @@ private void removeFailedSnapshotFromClusterState(
19922000
Snapshot snapshot,
19932001
Exception failure,
19942002
@Nullable RepositoryData repositoryData,
1995-
ShardGenerations shardGenerations
2003+
UpdatedShardGenerations updatedShardGenerations
19962004
) {
19972005
assert failure != null : "Failure must be supplied";
19982006
submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() {
19992007

20002008
@Override
20012009
public ClusterState execute(ClusterState currentState) {
2002-
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
2010+
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, updatedShardGenerations);
20032011
assert updatedState == currentState || endingSnapshots.contains(snapshot)
20042012
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
20052013
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.index.store.StoreFileMetadata;
3737
import org.elasticsearch.indices.recovery.RecoverySettings;
3838
import org.elasticsearch.repositories.FinalizeSnapshotContext;
39+
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
3940
import org.elasticsearch.repositories.IndexId;
4041
import org.elasticsearch.repositories.Repository;
4142
import org.elasticsearch.repositories.RepositoryData;
@@ -170,16 +171,16 @@ public void testSnapshotWithConflictingName() throws Exception {
170171
repository.getMetadata().name(),
171172
new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2")
172173
);
173-
final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build();
174+
final var snapshotShardGenerations = new UpdatedShardGenerations(ShardGenerations.builder().put(indexId, 0, shardGen).build());
174175
final RepositoryData ignoredRepositoryData = safeAwait(
175176
listener -> repository.finalizeSnapshot(
176177
new FinalizeSnapshotContext(
177-
shardGenerations,
178+
snapshotShardGenerations,
178179
RepositoryData.EMPTY_REPO_GEN,
179180
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
180181
new SnapshotInfo(
181182
snapshot,
182-
shardGenerations.indices().stream().map(IndexId::getName).toList(),
183+
snapshotShardGenerations.liveIndices().indices().stream().map(IndexId::getName).toList(),
183184
Collections.emptyList(),
184185
Collections.emptyList(),
185186
null,

0 commit comments

Comments
 (0)