-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Update shardGenerations for all indices on snapshot finalization #128650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
50aeb99
08ed60a
d0a7ab5
efdeaed
ce88b2c
b478b95
164d2f6
a75c850
46b8fd6
86fd758
8378949
9ffed9b
a055214
1071d95
81f88ba
1852486
6d148e5
a257d39
145ff85
11b9e52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 128650 | ||
| summary: Always update `shardGenerations` for previous in-progress snapshots | ||
| area: Snapshot/Restore | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ | |
| */ | ||
| public final class FinalizeSnapshotContext extends DelegatingActionListener<RepositoryData, RepositoryData> { | ||
|
|
||
| private final ShardGenerations updatedShardGenerations; | ||
| private final UpdatedShardGenerations updatedShardGenerations; | ||
|
|
||
| /** | ||
| * Obsolete shard generations map computed from the cluster state update that this finalization executed in | ||
|
|
@@ -46,7 +46,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo | |
| private final Runnable onDone; | ||
|
|
||
| /** | ||
| * @param updatedShardGenerations updated shard generations | ||
| * @param updatedShardGenerations updated shard generations for both live and deleted indices | ||
| * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began | ||
| * @param clusterMetadata cluster metadata | ||
| * @param snapshotInfo SnapshotInfo instance to write for this snapshot | ||
|
|
@@ -57,7 +57,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo | |
| * once all cleanup operations after snapshot completion have executed | ||
| */ | ||
| public FinalizeSnapshotContext( | ||
| ShardGenerations updatedShardGenerations, | ||
| UpdatedShardGenerations updatedShardGenerations, | ||
| long repositoryStateId, | ||
| Metadata clusterMetadata, | ||
| SnapshotInfo snapshotInfo, | ||
|
|
@@ -78,7 +78,7 @@ public long repositoryStateId() { | |
| return repositoryStateId; | ||
| } | ||
|
|
||
| public ShardGenerations updatedShardGenerations() { | ||
| public UpdatedShardGenerations updatedShardGenerations() { | ||
| return updatedShardGenerations; | ||
| } | ||
|
|
||
|
|
@@ -120,4 +120,24 @@ public void onDone() { | |
| public void onResponse(RepositoryData repositoryData) { | ||
| delegate.onResponse(repositoryData); | ||
| } | ||
|
|
||
| /** | ||
| * A record used to track the new shard generations that have been written for each shard in a snapshot. | ||
| * An index may be deleted after the shard generation is written but before the snapshot is finalized. | ||
| * In this case, its shard generation is tracked in {@link #deletedIndices} because it's still a valid | ||
| * shard generation blob that exists in the repository and may be used by subsequent snapshots, even though | ||
| * the index will not be included in the snapshot being finalized. Otherwise, it is tracked in | ||
| * {@link #liveIndices}. | ||
| */ | ||
| public record UpdatedShardGenerations(ShardGenerations liveIndices, ShardGenerations deletedIndices) { | ||
| public static final UpdatedShardGenerations EMPTY = new UpdatedShardGenerations(ShardGenerations.EMPTY, ShardGenerations.EMPTY); | ||
|
|
||
| public UpdatedShardGenerations(ShardGenerations updated) { | ||
|
||
| this(updated, ShardGenerations.EMPTY); | ||
| } | ||
|
|
||
| public boolean hasShardGen(RepositoryShardId repositoryShardId) { | ||
| return liveIndices.hasShardGen(repositoryShardId) || deletedIndices.hasShardGen(repositoryShardId); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |||||
| import org.elasticsearch.index.IndexVersions; | ||||||
| import org.elasticsearch.logging.LogManager; | ||||||
| import org.elasticsearch.logging.Logger; | ||||||
| import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; | ||||||
| import org.elasticsearch.snapshots.SnapshotId; | ||||||
| import org.elasticsearch.snapshots.SnapshotInfo; | ||||||
| import org.elasticsearch.snapshots.SnapshotState; | ||||||
|
|
@@ -405,16 +406,16 @@ public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapsh | |||||
| * | ||||||
| * @param snapshotId Id of the new snapshot | ||||||
| * @param details Details of the new snapshot | ||||||
| * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new | ||||||
| * generations indexed by the shard id they correspond to must be supplied. | ||||||
| * @param updatedShardGenerations Updated shard generations in the new snapshot, including both indices that are included | ||||||
| * in the given snapshot and those got deleted while finalizing. | ||||||
| * @param indexMetaBlobs Map of index metadata blob uuids | ||||||
| * @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the | ||||||
| * {@link IndexMetadata} in them | ||||||
| */ | ||||||
| public RepositoryData addSnapshot( | ||||||
| final SnapshotId snapshotId, | ||||||
| final SnapshotDetails details, | ||||||
| final ShardGenerations shardGenerations, | ||||||
| final UpdatedShardGenerations updatedShardGenerations, | ||||||
| @Nullable final Map<IndexId, String> indexMetaBlobs, | ||||||
| @Nullable final Map<String, String> newIdentifiers | ||||||
| ) { | ||||||
|
|
@@ -424,6 +425,7 @@ public RepositoryData addSnapshot( | |||||
| // the new master, so we make the operation idempotent | ||||||
| return this; | ||||||
| } | ||||||
| final var shardGenerations = updatedShardGenerations.liveIndices(); | ||||||
|
||||||
| final var shardGenerations = updatedShardGenerations.liveIndices(); | |
| final var liveIndexIds = updatedShardGenerations.liveIndices().indices(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep fair call 6d148e5
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,8 @@ | |
| import java.util.regex.Pattern; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; | ||
|
|
||
| /** | ||
| * Represents the current {@link ShardGeneration} for each shard in a repository. | ||
| */ | ||
|
|
@@ -231,6 +233,14 @@ public Builder putAll(ShardGenerations shardGenerations) { | |
| return this; | ||
| } | ||
|
|
||
| public Builder update(UpdatedShardGenerations updatedShardGenerations) { | ||
| putAll(updatedShardGenerations.liveIndices()); | ||
| // For deleted indices, we only update the generations if they are present in the existing generations, i.e. | ||
| // they are referenced by other snapshots. | ||
| updateIfPresent(updatedShardGenerations.deletedIndices()); | ||
| return this; | ||
| } | ||
|
|
||
| public Builder put(IndexId indexId, int shardId, SnapshotsInProgress.ShardSnapshotStatus status) { | ||
| // only track generations for successful shard status values | ||
| return put(indexId, shardId, status.state().failed() ? null : status.generation()); | ||
|
|
@@ -244,6 +254,20 @@ public Builder put(IndexId indexId, int shardId, ShardGeneration generation) { | |
| return this; | ||
| } | ||
|
|
||
| private Builder updateIfPresent(ShardGenerations shardGenerations) { | ||
| shardGenerations.shardGenerations.forEach((indexId, gens) -> { | ||
| if (generations.containsKey(indexId)) { | ||
|
||
| for (int i = 0; i < gens.size(); i++) { | ||
| final ShardGeneration gen = gens.get(i); | ||
| if (gen != null) { | ||
| generations.get(indexId).put(i, gen); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| return this; | ||
|
||
| } | ||
|
|
||
| private boolean noDuplicateIndicesWithSameName(IndexId newId) { | ||
| for (IndexId id : generations.keySet()) { | ||
| if (id.getName().equals(newId.getName()) && id.equals(newId) == false) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1749,7 +1749,7 @@ int sizeInBytes() { | |
| public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { | ||
| assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); | ||
| final long repositoryStateId = finalizeSnapshotContext.repositoryStateId(); | ||
| final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations(); | ||
| final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations().liveIndices(); | ||
|
||
| final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); | ||
| assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN | ||
| : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; | ||
|
|
@@ -1867,7 +1867,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new | |
| existingRepositoryData.addSnapshot( | ||
| snapshotId, | ||
| snapshotDetails, | ||
| shardGenerations, | ||
| finalizeSnapshotContext.updatedShardGenerations(), | ||
| metadataWriteResult.indexMetas(), | ||
| metadataWriteResult.indexMetaIdentifiers() | ||
| ), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think this will work (but I will add some other comments elsewhere)