7171import org .elasticsearch .common .unit .ByteSizeValue ;
7272import org .elasticsearch .common .util .CollectionUtils ;
7373import org .elasticsearch .common .util .Maps ;
74+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
7475import org .elasticsearch .common .util .concurrent .EsExecutors ;
76+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
7577import org .elasticsearch .common .util .concurrent .ListenableFuture ;
7678import org .elasticsearch .core .Nullable ;
7779import org .elasticsearch .core .Predicates ;
@@ -1399,9 +1401,34 @@ private void leaveRepoLoop(String repository) {
13991401 }
14001402
14011403 private void finalizeSnapshotEntry (Snapshot snapshot , Metadata metadata , RepositoryData repositoryData ) {
1402- assert currentlyFinalizing .contains (snapshot .getRepository ());
1403- assert repositoryOperations .assertNotQueued (snapshot );
1404- try {
1404+ threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (new SnapshotFinalization (snapshot , metadata , repositoryData ));
1405+ }
1406+
1407+ /**
1408+ * Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a
1409+ * {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners
1410+ * and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.)
1411+ */
1412+ // This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and
1413+ // BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify
1414+ private class SnapshotFinalization extends AbstractRunnable {
1415+
1416+ private final Snapshot snapshot ;
1417+ private final Metadata metadata ;
1418+ private final RepositoryData repositoryData ;
1419+
1420+ SnapshotFinalization (Snapshot snapshot , Metadata metadata , RepositoryData repositoryData ) {
1421+ this .snapshot = snapshot ;
1422+ this .metadata = metadata ;
1423+ this .repositoryData = repositoryData ;
1424+ }
1425+
1426+ @ Override
1427+ protected void doRun () {
1428+ assert ThreadPool .assertCurrentThreadPool (ThreadPool .Names .SNAPSHOT );
1429+ assert currentlyFinalizing .contains (snapshot .getRepository ());
1430+ assert repositoryOperations .assertNotQueued (snapshot );
1431+
14051432 SnapshotsInProgress .Entry entry = SnapshotsInProgress .get (clusterService .state ()).snapshot (snapshot );
14061433 final String failure = entry .failure ();
14071434 logger .trace ("[{}] finalizing snapshot in repository, state: [{}], failure[{}]" , snapshot , entry .state (), failure );
@@ -1429,7 +1456,9 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
14291456 final ListenableFuture <Metadata > metadataListener = new ListenableFuture <>();
14301457 final Repository repo = repositoriesService .repository (snapshot .getRepository ());
14311458 if (entry .isClone ()) {
1432- threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (ActionRunnable .supply (metadataListener , () -> {
1459+ // This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions.
1460+ // TODO simplify this.
1461+ ActionListener .completeWith (metadataListener , () -> {
14331462 final Metadata existing = repo .getSnapshotGlobalMetadata (entry .source ());
14341463 final Metadata .Builder metaBuilder = Metadata .builder (existing );
14351464 final Set <Index > existingIndices = new HashSet <>();
@@ -1451,11 +1480,12 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
14511480 );
14521481 metaBuilder .dataStreams (dataStreamsToCopy , dataStreamAliasesToCopy );
14531482 return metaBuilder .build ();
1454- })) ;
1483+ });
14551484 } else {
14561485 metadataListener .onResponse (metadata );
14571486 }
14581487 metadataListener .addListener (ActionListener .wrap (meta -> {
1488+ assert ThreadPool .assertCurrentThreadPool (ThreadPool .Names .SNAPSHOT );
14591489 final Metadata metaForSnapshot = metadataForSnapshot (entry , meta );
14601490
14611491 final Map <String , SnapshotInfo .IndexSnapshotDetails > indexSnapshotDetails = Maps .newMapWithExpectedSize (
@@ -1555,7 +1585,20 @@ public void onFailure(Exception e) {
15551585 shardGenerations
15561586 )
15571587 ));
1558- } catch (Exception e ) {
1588+ }
1589+
1590+ @ Override
1591+ public void onRejection (Exception e ) {
1592+ if (e instanceof EsRejectedExecutionException esre && esre .isExecutorShutdown ()) {
1593+ logger .debug ("failing finalization of {} due to shutdown" , snapshot );
1594+ handleFinalizationFailure (e , snapshot , repositoryData , ShardGenerations .EMPTY );
1595+ } else {
1596+ onFailure (e );
1597+ }
1598+ }
1599+
1600+ @ Override
1601+ public void onFailure (Exception e ) {
15591602 logger .error (Strings .format ("unexpected failure finalizing %s" , snapshot ), e );
15601603 assert false : new AssertionError ("unexpected failure finalizing " + snapshot , e );
15611604 handleFinalizationFailure (e , snapshot , repositoryData , ShardGenerations .EMPTY );
0 commit comments