Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122731.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122731
summary: Fork post-snapshot-delete cleanup off master thread
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ setup:
wait_for_completion: false

- match: { acknowledged: true }

# now create another snapshot just to ensure that the async delete finishes before the test cleanup runs:
- do:
snapshot.create:
repository: test_repo_create_1
snapshot: barrier_snapshot
wait_for_completion: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I had a look at this as well when the original problems popped up. I investigated adding a

  - do:
      tasks.list:
        actions: cluster:admin/snapshot/delete
        wait_for_completion: true

to the end of the test, but looking at the code, the task completing didn't necessarily mean the delete had occurred?

Copy link
Contributor

@mhl-b mhl-b Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does. My understanding that deletion request goes into snapshotDeletionListeners and proceed with deletion, update cluster state when deletion is complete, and notify snapshotDeletionListeners on cluster state update. I guess it comes to the repository implementation, if delete calls are synchronous, otherwise we need to pull state...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ?wait_for_completion=false case we don't register the listener with snapshotDeletionListeners:

if (newDelete == null || request.waitForCompletion() == false) {
listener.onResponse(null);
} else {
addDeleteListener(newDelete.uuid(), listener);
}

Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj
.orElseThrow()
.queue();

// There is one task in the queue for computing and forking the cleanup work.
assertThat(queueLength.getAsInt(), equalTo(1));

safeAwait(barrier); // unblock the barrier thread and let it process the queue
safeAwait(barrier); // wait for the queue to be processed

// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

Expand Down Expand Up @@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots(
* @return map of index to index metadata blob id to delete
*/
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,14 +1134,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener<RepositoryData> rep
);
})

.<RepositoryData>andThen((l, newRepositoryData) -> {
l.onResponse(newRepositoryData);
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
try (var refs = new RefCountingRunnable(onCompletion)) {
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
.<RepositoryData>andThen(
// writeIndexGen finishes on master-service thread so must fork here.
snapshotExecutor,
threadPool.getThreadContext(),
(l, newRepositoryData) -> {
l.onResponse(newRepositoryData);
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
// deletion
try (var refs = new RefCountingRunnable(onCompletion)) {
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
}
}
})
)

.addListener(repositoryDataUpdateListener);
}
Expand Down