2929import org .elasticsearch .common .util .Maps ;
3030import org .elasticsearch .common .util .concurrent .ThrottledTaskRunner ;
3131import org .elasticsearch .core .Nullable ;
32+ import org .elasticsearch .core .Releasable ;
3233import org .elasticsearch .index .IndexVersion ;
3334import org .elasticsearch .index .engine .Engine ;
3435import org .elasticsearch .index .seqno .SequenceNumbers ;
5455import org .elasticsearch .transport .TransportService ;
5556
5657import java .io .IOException ;
57- import java .util .ArrayList ;
5858import java .util .HashMap ;
5959import java .util .Iterator ;
6060import java .util .List ;
@@ -87,6 +87,9 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl
8787 // A map of snapshots to the shardIds that we already reported to the master as failed
8888 private final ResultDeduplicator <UpdateIndexShardSnapshotStatusRequest , Void > remoteFailedRequestDeduplicator ;
8989
90+ // Runs the tasks that start each shard snapshot (e.g. acquiring the index commit)
91+ private final ThrottledTaskRunner startShardSnapshotTaskRunner ;
92+
9093 // Runs the tasks that promptly notify shards of aborted snapshots so that resources can be released ASAP
9194 private final ThrottledTaskRunner notifyOnAbortTaskRunner ;
9295
@@ -114,6 +117,11 @@ public SnapshotShardsService(
114117 threadPool .info (ThreadPool .Names .SNAPSHOT ).getMax (),
115118 threadPool .generic ()
116119 );
120+ this .startShardSnapshotTaskRunner = new ThrottledTaskRunner (
121+ "start-shard-snapshots" ,
122+ threadPool .info (ThreadPool .Names .SNAPSHOT ).getMax (),
123+ threadPool .executor (ThreadPool .Names .SNAPSHOT )
124+ );
117125 }
118126
119127 @ Override
@@ -304,7 +312,6 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
304312
305313 final var newSnapshotShards = shardSnapshots .computeIfAbsent (snapshot , s -> new HashMap <>());
306314
307- final List <Runnable > shardSnapshotTasks = new ArrayList <>(shardsToStart .size ());
308315 for (final Map .Entry <ShardId , ShardGeneration > shardEntry : shardsToStart .entrySet ()) {
309316 final ShardId shardId = shardEntry .getKey ();
310317 final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus .newInitializing (shardEntry .getValue ());
@@ -316,10 +323,36 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
316323 : "Found non-null, non-numeric shard generation ["
317324 + snapshotStatus .generation ()
318325 + "] for snapshot with old-format compatibility" ;
319- shardSnapshotTasks .add (newShardSnapshotTask (shardId , snapshot , indexId , snapshotStatus , entry .version (), entry .startTime ()));
326+ final var shardSnapshotTask = newShardSnapshotTask (
327+ shardId ,
328+ snapshot ,
329+ indexId ,
330+ snapshotStatus ,
331+ entry .version (),
332+ entry .startTime ()
333+ );
334+ startShardSnapshotTaskRunner .enqueueTask (new ActionListener <>() {
335+ @ Override
336+ public void onResponse (Releasable releasable ) {
337+ try (releasable ) {
338+ shardSnapshotTask .run ();
339+ }
340+ }
341+
342+ @ Override
343+ public void onFailure (Exception e ) {
344+ final var wrapperException = new IllegalStateException (
345+ "impossible failure starting shard snapshot for " + shardId + " in " + snapshot ,
346+ e
347+ );
348+ logger .error (wrapperException .getMessage (), wrapperException );
349+ assert false : wrapperException ; // impossible
350+ }
351+ });
320352 }
321353
322- threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (() -> shardSnapshotTasks .forEach (Runnable ::run ));
354+ // apply some backpressure by reserving one SNAPSHOT thread for the startup work
355+ startShardSnapshotTaskRunner .runSyncTasksEagerly (threadPool .executor (ThreadPool .Names .SNAPSHOT ));
323356 }
324357
325358 private void pauseShardSnapshotsForNodeRemoval (String localNodeId , SnapshotsInProgress .Entry entry ) {
0 commit comments