Skip to content

Commit 6200e5a

Browse files
authored
Fork post-snapshot-delete cleanup off master thread (#122047) (#122072)
We shouldn't run the post-snapshot-delete cleanup work on the master thread, since it can be quite expensive and need not block subsequent cluster state updates. This commit forks it onto a `SNAPSHOT` thread.
1 parent d22a971 commit 6200e5a

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

docs/changelog/122047.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122047
2+
summary: Fork post-snapshot-delete cleanup off master thread
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj
508508
.orElseThrow()
509509
.queue();
510510

511+
// There is one task in the queue for computing and forking the cleanup work.
512+
assertThat(queueLength.getAsInt(), equalTo(1));
513+
514+
safeAwait(barrier); // unblock the barrier thread and let it process the queue
515+
safeAwait(barrier); // wait for the queue to be processed
516+
511517
// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
512518
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
513519
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.snapshots.SnapshotInfo;
3030
import org.elasticsearch.snapshots.SnapshotState;
3131
import org.elasticsearch.snapshots.SnapshotsService;
32+
import org.elasticsearch.threadpool.ThreadPool;
3233
import org.elasticsearch.xcontent.XContentBuilder;
3334
import org.elasticsearch.xcontent.XContentParser;
3435

@@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots(
377378
* @return map of index to index metadata blob id to delete
378379
*/
379380
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
381+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
380382
Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
381383
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
382384
.stream()

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,14 +1129,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener<RepositoryData> rep
11291129
);
11301130
})
11311131

1132-
.<RepositoryData>andThen((l, newRepositoryData) -> {
1133-
l.onResponse(newRepositoryData);
1134-
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1135-
try (var refs = new RefCountingRunnable(onCompletion)) {
1136-
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1137-
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1132+
.<RepositoryData>andThen(
1133+
// writeIndexGen finishes on master-service thread so must fork here.
1134+
snapshotExecutor,
1135+
threadPool.getThreadContext(),
1136+
(l, newRepositoryData) -> {
1137+
l.onResponse(newRepositoryData);
1138+
// Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
1139+
// deletion
1140+
try (var refs = new RefCountingRunnable(onCompletion)) {
1141+
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
1142+
cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
1143+
}
11381144
}
1139-
})
1145+
)
11401146

11411147
.addListener(repositoryDataUpdateListener);
11421148
}

0 commit comments

Comments
 (0)