Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122731.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122731
summary: Fork post-snapshot-delete cleanup off master thread
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ setup:
wait_for_completion: false

- match: { acknowledged: true }

# now create another snapshot just to ensure that the async delete finishes before the test cleanup runs:
- do:
snapshot.create:
repository: test_repo_create_1
snapshot: barrier_snapshot
wait_for_completion: true
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj
.orElseThrow()
.queue();

// There is one task in the queue for computing and forking the cleanup work.
assertThat(queueLength.getAsInt(), equalTo(1));

safeAwait(barrier); // unblock the barrier thread and let it process the queue
safeAwait(barrier); // wait for the queue to be processed

// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

Expand Down Expand Up @@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots(
* @return map of index to index metadata blob id to delete
*/
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,14 +1129,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener<RepositoryData> rep
);
})

.<RepositoryData>andThen((l, newRepositoryData) -> {
l.onResponse(newRepositoryData);
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
try (var refs = new RefCountingRunnable(onCompletion)) {
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
.<RepositoryData>andThen(
// writeIndexGen finishes on master-service thread so must fork here.
snapshotExecutor,
threadPool.getThreadContext(),
(l, newRepositoryData) -> {
l.onResponse(newRepositoryData);
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
// deletion
try (var refs = new RefCountingRunnable(onCompletion)) {
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
}
}
})
)

.addListener(repositoryDataUpdateListener);
}
Expand Down