2121import org .elasticsearch .cluster .SnapshotsInProgress ;
2222import org .elasticsearch .cluster .SnapshotsInProgress .ShardSnapshotStatus ;
2323import org .elasticsearch .cluster .SnapshotsInProgress .ShardState ;
24+ import org .elasticsearch .cluster .metadata .NodesShutdownMetadata ;
25+ import org .elasticsearch .cluster .metadata .SingleNodeShutdownMetadata ;
2426import org .elasticsearch .cluster .node .DiscoveryNode ;
2527import org .elasticsearch .cluster .service .ClusterService ;
2628import org .elasticsearch .common .Strings ;
@@ -82,6 +84,8 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl
8284
8385 private final ThreadPool threadPool ;
8486
87+ private final SnapshotShutdownProgressTracker snapshotShutdownProgressTracker ;
88+
8589 private final Map <Snapshot , Map <ShardId , IndexShardSnapshotStatus >> shardSnapshots = new HashMap <>();
8690
8791 // A map of snapshots to the shardIds that we already reported to the master as failed
@@ -102,6 +106,11 @@ public SnapshotShardsService(
102106 this .transportService = transportService ;
103107 this .clusterService = clusterService ;
104108 this .threadPool = transportService .getThreadPool ();
109+ this .snapshotShutdownProgressTracker = new SnapshotShutdownProgressTracker (
110+ () -> clusterService .state ().nodes ().getLocalNodeId (),
111+ clusterService .getClusterSettings (),
112+ threadPool
113+ );
105114 this .remoteFailedRequestDeduplicator = new ResultDeduplicator <>(threadPool .getThreadContext ());
106115 if (DiscoveryNode .canContainData (settings )) {
107116 // this is only useful on the nodes that can hold data
@@ -130,11 +139,38 @@ protected void doClose() {
130139 @ Override
131140 public void clusterChanged (ClusterChangedEvent event ) {
132141 try {
142+ final var localNodeId = clusterService .localNode ().getId ();
143+
144+ // Track when this node enters and leaves shutdown mode because we pause shard snapshots for shutdown.
145+ // The snapshotShutdownProgressTracker will report (via logging) on the progress shard snapshots make
146+ // towards either completing (successfully or otherwise) or pausing.
147+ NodesShutdownMetadata currentShutdownMetadata = event .state ().metadata ().custom (NodesShutdownMetadata .TYPE );
148+ NodesShutdownMetadata previousShutdownMetadata = event .previousState ().metadata ().custom (NodesShutdownMetadata .TYPE );
149+ SingleNodeShutdownMetadata currentLocalNodeShutdownMetadata = currentShutdownMetadata != null
150+ ? currentShutdownMetadata .get (localNodeId )
151+ : null ;
152+ SingleNodeShutdownMetadata previousLocalNodeShutdownMetadata = previousShutdownMetadata != null
153+ ? previousShutdownMetadata .get (localNodeId )
154+ : null ;
155+
156+ boolean isLocalNodeAddingShutdown = false ;
157+ if (isPausingProgressTrackedShutdown (previousLocalNodeShutdownMetadata ) == false
158+ && isPausingProgressTrackedShutdown (currentLocalNodeShutdownMetadata )) {
159+ snapshotShutdownProgressTracker .onClusterStateAddShutdown ();
160+ isLocalNodeAddingShutdown = true ;
161+ } else if (isPausingProgressTrackedShutdown (previousLocalNodeShutdownMetadata )
162+ && isPausingProgressTrackedShutdown (currentLocalNodeShutdownMetadata ) == false ) {
163+ snapshotShutdownProgressTracker .onClusterStateRemoveShutdown ();
164+ }
165+
133166 final var currentSnapshots = SnapshotsInProgress .get (event .state ());
167+
134168 if (SnapshotsInProgress .get (event .previousState ()).equals (currentSnapshots ) == false ) {
135- final var localNodeId = clusterService .localNode ().getId ();
136169 synchronized (shardSnapshots ) {
170+ // Cancel any snapshots that have been removed from the cluster state.
137171 cancelRemoved (currentSnapshots );
172+
173+ // Update running snapshots or start any snapshots that are set to run.
138174 for (final var oneRepoSnapshotsInProgress : currentSnapshots .entriesByRepo ()) {
139175 for (final var snapshotsInProgressEntry : oneRepoSnapshotsInProgress ) {
140176 handleUpdatedSnapshotsInProgressEntry (
@@ -147,6 +183,11 @@ public void clusterChanged(ClusterChangedEvent event) {
147183 }
148184 }
149185
186+ if (isLocalNodeAddingShutdown ) {
187+ // Any active snapshots would have been signalled to pause in the previous code block.
188+ snapshotShutdownProgressTracker .onClusterStatePausingSetForAllShardSnapshots ();
189+ }
190+
150191 String previousMasterNodeId = event .previousState ().nodes ().getMasterNodeId ();
151192 String currentMasterNodeId = event .state ().nodes ().getMasterNodeId ();
152193 if (currentMasterNodeId != null && currentMasterNodeId .equals (previousMasterNodeId ) == false ) {
@@ -164,6 +205,17 @@ public void clusterChanged(ClusterChangedEvent event) {
164205 }
165206 }
166207
208+ /**
209+ * Determines whether we want to track this kind of shutdown for snapshot pausing progress.
210+ * We want tracking is shutdown metadata is set, and not type RESTART.
211+ * Note that the Shutdown API is idempotent and the type of shutdown may change to / from RESTART to / from some other type of interest.
212+ *
213+ * @return true if snapshots will be paused during this type of local node shutdown.
214+ */
215+ private static boolean isPausingProgressTrackedShutdown (@ Nullable SingleNodeShutdownMetadata localNodeShutdownMetadata ) {
216+ return localNodeShutdownMetadata != null && localNodeShutdownMetadata .getType () != SingleNodeShutdownMetadata .Type .RESTART ;
217+ }
218+
167219 @ Override
168220 public void beforeIndexShardClosed (ShardId shardId , @ Nullable IndexShard indexShard , Settings indexSettings ) {
169221 // abort any snapshots occurring on the soon-to-be closed shard
@@ -231,6 +283,9 @@ private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) {
231283 }
232284 }
233285
286+ /**
287+ * Starts new snapshots and pauses or aborts active shard snapshot based on the updated {@link SnapshotsInProgress} entry.
288+ */
234289 private void handleUpdatedSnapshotsInProgressEntry (String localNodeId , boolean removingLocalNode , SnapshotsInProgress .Entry entry ) {
235290 if (entry .isClone ()) {
236291 // This is a snapshot clone, it will be executed on the current master
@@ -364,8 +419,7 @@ private Runnable newShardSnapshotTask(
364419 final IndexVersion entryVersion ,
365420 final long entryStartTime
366421 ) {
367- // separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry
368- return () -> snapshot (shardId , snapshot , indexId , snapshotStatus , entryVersion , entryStartTime , new ActionListener <>() {
422+ ActionListener <ShardSnapshotResult > snapshotResultListener = new ActionListener <>() {
369423 @ Override
370424 public void onResponse (ShardSnapshotResult shardSnapshotResult ) {
371425 final ShardGeneration newGeneration = shardSnapshotResult .getGeneration ();
@@ -405,7 +459,15 @@ public void onFailure(Exception e) {
405459 final var shardState = snapshotStatus .moveToUnsuccessful (nextStage , failure , threadPool .absoluteTimeInMillis ());
406460 notifyUnsuccessfulSnapshotShard (snapshot , shardId , shardState , failure , snapshotStatus .generation ());
407461 }
462+ };
463+
464+ snapshotShutdownProgressTracker .incNumberOfShardSnapshotsInProgress (shardId , snapshot );
465+ var decTrackerRunsBeforeResultListener = ActionListener .runAfter (snapshotResultListener , () -> {
466+ snapshotShutdownProgressTracker .decNumberOfShardSnapshotsInProgress (shardId , snapshot , snapshotStatus );
408467 });
468+
469+ // separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry
470+ return () -> snapshot (shardId , snapshot , indexId , snapshotStatus , entryVersion , entryStartTime , decTrackerRunsBeforeResultListener );
409471 }
410472
411473 // package private for testing
@@ -665,19 +727,25 @@ private void notifyUnsuccessfulSnapshotShard(
665727
666728 /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
667729 private void sendSnapshotShardUpdate (final Snapshot snapshot , final ShardId shardId , final ShardSnapshotStatus status ) {
730+ ActionListener <Void > updateResultListener = new ActionListener <>() {
731+ @ Override
732+ public void onResponse (Void aVoid ) {
733+ logger .trace ("[{}][{}] updated snapshot state to [{}]" , shardId , snapshot , status );
734+ }
735+
736+ @ Override
737+ public void onFailure (Exception e ) {
738+ logger .warn (() -> format ("[%s][%s] failed to update snapshot state to [%s]" , shardId , snapshot , status ), e );
739+ }
740+ };
741+ snapshotShutdownProgressTracker .trackRequestSentToMaster (snapshot , shardId );
742+ var releaseTrackerRequestRunsBeforeResultListener = ActionListener .runBefore (updateResultListener , () -> {
743+ snapshotShutdownProgressTracker .releaseRequestSentToMaster (snapshot , shardId );
744+ });
745+
668746 remoteFailedRequestDeduplicator .executeOnce (
669747 new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status ),
670- new ActionListener <>() {
671- @ Override
672- public void onResponse (Void aVoid ) {
673- logger .trace ("[{}][{}] updated snapshot state to [{}]" , shardId , snapshot , status );
674- }
675-
676- @ Override
677- public void onFailure (Exception e ) {
678- logger .warn (() -> format ("[%s][%s] failed to update snapshot state to [%s]" , shardId , snapshot , status ), e );
679- }
680- },
748+ releaseTrackerRequestRunsBeforeResultListener ,
681749 (req , reqListener ) -> transportService .sendRequest (
682750 transportService .getLocalNode (),
683751 SnapshotsService .UPDATE_SNAPSHOT_STATUS_ACTION_NAME ,
0 commit comments