From a33eb117f0806528d1fda3b9fba3de25d23a6c38 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 10 Feb 2025 23:49:36 -0800 Subject: [PATCH] Revert #122047 (#122227) (cherry picked from commit 3ec1d12c99b690d129f919764fb4babafae4049d) --- docs/changelog/122047.yaml | 5 ----- .../snapshots/RepositoriesIT.java | 6 ------ .../repositories/RepositoryData.java | 2 -- .../blobstore/BlobStoreRepository.java | 20 +++++++------------ 4 files changed, 7 insertions(+), 26 deletions(-) delete mode 100644 docs/changelog/122047.yaml diff --git a/docs/changelog/122047.yaml b/docs/changelog/122047.yaml deleted file mode 100644 index 8bd711e3b1b29..0000000000000 --- a/docs/changelog/122047.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 122047 -summary: Fork post-snapshot-delete cleanup off master thread -area: Snapshot/Restore -type: bug -issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index 562f752b82220..c318ebf78dd96 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -508,12 +508,6 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj .orElseThrow() .queue(); - // There is one task in the queue for computing and forking the cleanup work. - assertThat(queueLength.getAsInt(), equalTo(1)); - - safeAwait(barrier); // unblock the barrier thread and let it process the queue - safeAwait(barrier); // wait for the queue to be processed - // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index e89998dc919e6..2c429954f5f49 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -29,7 +29,6 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotsService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -378,7 +377,6 @@ private static boolean isIndexToUpdateAfterRemovingSnapshots( * @return map of index to index metadata blob id to delete */ public Map> indexMetaDataToRemoveAfterRemovingSnapshots(Collection snapshotIds) { - assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); Iterator indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); final Set allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet() .stream() diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index e3ea37c6a4b20..11386eba10196 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1134,20 +1134,14 @@ private void runWithUniqueShardMetadataNaming(ActionListener rep ); }) - .andThen( - // writeIndexGen finishes on master-service thread so must fork here. - snapshotExecutor, - threadPool.getThreadContext(), - (l, newRepositoryData) -> { - l.onResponse(newRepositoryData); - // Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot - // deletion - try (var refs = new RefCountingRunnable(onCompletion)) { - cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); - cleanupUnlinkedShardLevelBlobs(refs.acquireListener()); - } + .andThen((l, newRepositoryData) -> { + l.onResponse(newRepositoryData); + // Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion + try (var refs = new RefCountingRunnable(onCompletion)) { + cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); + cleanupUnlinkedShardLevelBlobs(refs.acquireListener()); } - ) + }) .addListener(repositoryDataUpdateListener); }