diff --git a/docs/changelog/122731.yaml b/docs/changelog/122731.yaml new file mode 100644 index 0000000000000..afde587a9538b --- /dev/null +++ b/docs/changelog/122731.yaml @@ -0,0 +1,5 @@ +pr: 122731 +summary: Fork post-snapshot-delete cleanup off master thread +area: Snapshot/Restore +type: bug +issues: [] diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml index 5a60f76f6da2c..84e0d5b3e524a 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml @@ -68,3 +68,10 @@ setup: wait_for_completion: false - match: { acknowledged: true } + + # now create another snapshot just to ensure that the async delete finishes before the test cleanup runs: + - do: + snapshot.create: + repository: test_repo_create_1 + snapshot: barrier_snapshot + wait_for_completion: true diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index c318ebf78dd96..562f752b82220 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -508,6 +508,12 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj .orElseThrow() .queue(); + // There is one task in the queue for computing and forking the cleanup work. + assertThat(queueLength.getAsInt(), equalTo(1)); + + safeAwait(barrier); // unblock the barrier thread and let it process the queue + safeAwait(barrier); // wait for the queue to be processed + // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 2c429954f5f49..e89998dc919e6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -29,6 +29,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -377,6 +378,7 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots( * @return map of index to index metadata blob id to delete */ public Map> indexMetaDataToRemoveAfterRemovingSnapshots(Collection snapshotIds) { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); Iterator indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); final Set allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet() .stream() diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 11386eba10196..e3ea37c6a4b20 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1134,14 +1134,20 @@ private void runWithUniqueShardMetadataNaming(ActionListener rep ); }) - .andThen((l, newRepositoryData) -> { - l.onResponse(newRepositoryData); - // Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion - try (var refs = new RefCountingRunnable(onCompletion)) { - cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); - cleanupUnlinkedShardLevelBlobs(refs.acquireListener()); + .andThen( + // writeIndexGen finishes on master-service thread so must fork here. + snapshotExecutor, + threadPool.getThreadContext(), + (l, newRepositoryData) -> { + l.onResponse(newRepositoryData); + // Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot + // deletion + try (var refs = new RefCountingRunnable(onCompletion)) { + cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); + cleanupUnlinkedShardLevelBlobs(refs.acquireListener()); + } } - }) + ) .addListener(repositoryDataUpdateListener); }