@@ -117,7 +117,7 @@ public void onFailure(Exception e) {
117117 }
118118 }
119119
120- private static List <String > getStaleRegisteredSnapshotIds (ProjectState projectState , String policyId ) {
120+ static List <String > findStaleRegisteredSnapshotIds (ProjectState projectState , String policyId ) {
121121 Set <SnapshotId > runningSnapshots = currentlyRunningSnapshots (projectState .cluster ());
122122
123123 RegisteredPolicySnapshots registeredSnapshots = projectState .metadata ()
@@ -225,13 +225,19 @@ public void triggered(SchedulerEngine.Event event) {
225225 }
226226 }
227227
228- private static void getStaleRegisteredSnapshotInfo (
228+ /**
229+ * Find SLM registered snapshots that are no longer running, and fetch their snapshot info. These snapshots are considered stale
230+ * because they should have been removed from the registered set when they completed, but they were not, likely due to failure in
231+ * the previous SLM completion handling. These stale snapshots should be cleaned up and their stats be recorded in SLM
232+ * cluster state based on their snapshot info.
233+ */
234+ private static void findStaleRegisteredSnapshotInfo (
229235 final ProjectState projectState ,
230236 final String policyId ,
231237 final Client client ,
232238 final ActionListener <List <SnapshotInfo >> listener
233239 ) {
234- var staleSnapshotIds = getStaleRegisteredSnapshotIds (projectState , policyId );
240+ var staleSnapshotIds = findStaleRegisteredSnapshotIds (projectState , policyId );
235241
236242 if (staleSnapshotIds .isEmpty () == false ) {
237243 var policyMetadata = getSnapPolicyMetadataById (projectState .metadata (), policyId );
@@ -241,14 +247,15 @@ private static void getStaleRegisteredSnapshotInfo(
241247 }
242248 SnapshotLifecyclePolicy policy = policyMetadata .get ().getPolicy ();
243249
244- GetSnapshotsRequest getSnapshotsRequest = new GetSnapshotsRequest (
245- TimeValue .MAX_VALUE ,
250+ GetSnapshotsRequest request = new GetSnapshotsRequest (
251+ TimeValue .MAX_VALUE , // do not time out internal request in case of slow master node
246252 new String []{policy .getRepository ()},
247253 staleSnapshotIds .toArray (new String [0 ])
248254 );
255+ request .ignoreUnavailable (true );
249256
250257 client .admin ().cluster ()
251- .execute (TransportGetSnapshotsAction .TYPE , getSnapshotsRequest , ActionListener .wrap (
258+ .execute (TransportGetSnapshotsAction .TYPE , request , ActionListener .wrap (
252259 response -> listener .onResponse (response .getSnapshots ()),
253260 listener ::onFailure )
254261 );
@@ -320,7 +327,7 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
320327
321328 // retrieve the current project state after snapshot is completed, since snapshotting can take a while
322329 ProjectState currentProjectState = clusterService .state ().projectState (projectId );
323- getStaleRegisteredSnapshotInfo (currentProjectState , policyId , client , new ActionListener <>() {
330+ findStaleRegisteredSnapshotInfo (currentProjectState , policyId , client , new ActionListener <>() {
324331 @ Override
325332 public void onResponse (List <SnapshotInfo > snapshotInfo ) {
326333 submitUnbatchedTask (
@@ -448,7 +455,7 @@ public void onFailure(Exception e) {
448455
449456 // retrieve the current project state after snapshot is completed, since snapshotting can take a while
450457 ProjectState currentProjectState = clusterService .state ().projectState (projectId );
451- getStaleRegisteredSnapshotInfo (currentProjectState , policyId , client , new ActionListener <>() {
458+ findStaleRegisteredSnapshotInfo (currentProjectState , policyId , client , new ActionListener <>() {
452459 @ Override
453460 public void onResponse (List <SnapshotInfo > snapshotInfo ) {
454461 submitUnbatchedTask (
@@ -567,6 +574,11 @@ static SnapshotInvocationRecord buildFailedSnapshotRecord(SnapshotId snapshot) {
567574 );
568575 }
569576
577+ static boolean isSnapshotSuccessful (SnapshotInfo snapshotInfo ) {
578+ return snapshotInfo .state () != null && snapshotInfo .state ().completed () && snapshotInfo .failedShards () == 0 ;
579+ }
580+
581+
570582 /**
571583 * A cluster state update task to write the result of a snapshot job to the cluster metadata for the associated policy.
572584 */
@@ -578,15 +590,16 @@ static class WriteJobStatus extends ClusterStateUpdateTask {
578590 private final long snapshotStartTime ;
579591 private final long snapshotFinishTime ;
580592 private final Optional <Exception > exception ;
581- private final List <SnapshotInfo > staleSnapshotInfo ;
593+ // preloaded snapshot info for registered snapshots that are no longer running
594+ private final List <SnapshotInfo > registeredSnapshotInfo ;
582595
583596 private WriteJobStatus (
584597 ProjectId projectId ,
585598 String policyName ,
586599 SnapshotId snapshotId ,
587600 long snapshotStartTime ,
588601 long snapshotFinishTime ,
589- List <SnapshotInfo > staleSnapshotInfo ,
602+ List <SnapshotInfo > registeredSnapshotInfo ,
590603 Optional <Exception > exception
591604 ) {
592605 this .projectId = projectId ;
@@ -595,7 +608,7 @@ private WriteJobStatus(
595608 this .exception = exception ;
596609 this .snapshotStartTime = snapshotStartTime ;
597610 this .snapshotFinishTime = snapshotFinishTime ;
598- this .staleSnapshotInfo = staleSnapshotInfo ;
611+ this .registeredSnapshotInfo = registeredSnapshotInfo ;
599612 }
600613
601614 static WriteJobStatus success (
@@ -635,7 +648,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
635648 }
636649
637650 Map <SnapshotId , SnapshotInfo > snapshotInfoById =
638- staleSnapshotInfo .stream ().collect (Collectors .toMap (SnapshotInfo ::snapshotId , Function .identity ()));
651+ registeredSnapshotInfo .stream ().collect (Collectors .toMap (SnapshotInfo ::snapshotId , Function .identity ()));
639652
640653 final SnapshotLifecyclePolicyMetadata .Builder newPolicyMetadata = SnapshotLifecyclePolicyMetadata .builder (policyMetadata );
641654 SnapshotLifecycleStats newStats = snapMeta .getStats ();
@@ -651,50 +664,41 @@ public ClusterState execute(ClusterState currentState) throws Exception {
651664 final Set <SnapshotId > runningSnapshots = currentlyRunningSnapshots (currentState );
652665 final List <PolicySnapshot > newRegistered = new ArrayList <>();
653666
654- // By the time this task is executed, it is likely that the cluster state has changed and there could be more/less
655- // registered snapshots than previously looked up with snapshot info. So we need to re-check the registered set and
656- // TODO: calculate stats for stale registered snapshots, should we assume failure if snapshot info is not found, so that
657- // the size of registered snaps won't grow indefinitely in the worst case?
658-
659- // calculate stats for stale registered snapshots
660- // int countSnapshotFailure = 0;
661- // int countSnapshotSuccess = 0;
662- // SnapshotInfo lastSuccess = null;
663- // SnapshotInfo lastFailure = null;
664- // for (SnapshotInfo snapshotInfo : staleSnapshotInfo) {
665- // if (snapshotInfo.state() == null || snapshotInfo.state().completed() == false) {
666- // // skip unknown state and non-completed snapshots
667- // continue;
668- // }
669- // if (snapshotInfo.failedShards() == 0) {
670- // countSnapshotSuccess++;
671- // if (lastSuccess == null || snapshotInfo.startTime() > lastSuccess.startTime()) {
672- // lastSuccess = snapshotInfo;
673- // }
674- // } else {
675- // countSnapshotFailure++;
676- // if (lastFailure == null || snapshotInfo.startTime() > lastFailure.startTime()) {
677- // lastFailure = snapshotInfo;
678- // }
679- // }
680- // }
681-
667+ // go through the registered set to find stale snapshots and calculate stats
682668 for (PolicySnapshot snapshot : registeredSnapshots .getSnapshots ()) {
683- if (snapshot .getSnapshotId ().equals (snapshotId ) == false ) {
684- if (snapshot .getPolicy ().equals (policyName )) {
685- if (runningSnapshots .contains (snapshot .getSnapshotId ())) {
686- // Snapshot is for this policy and is still running so keep it in registered set
687- newRegistered .add (snapshot );
669+ SnapshotId snapshotId = snapshot .getSnapshotId ();
670+ if (snapshot .getPolicy ().equals (policyName ) == false || runningSnapshots .contains (snapshotId )) {
671+ // the snapshot is for another policy, or is still running, so keep it in the registered set
672+ newRegistered .add (snapshot );
673+ } else {
674+ // the snapshot was completed and should be removed from registered snapshots, update state accordingly
675+ SnapshotInfo staleSnapshotInfo = snapshotInfoById .get (snapshotId );
676+ if (staleSnapshotInfo != null ) {
677+ if (isSnapshotSuccessful (staleSnapshotInfo )) {
678+ newStats = newStats .withTakenIncremented (policyName );
679+ newPolicyMetadata .setLastSuccess (new SnapshotInvocationRecord (
680+ staleSnapshotInfo .snapshotId ().getName (),
681+ staleSnapshotInfo .startTime (),
682+ staleSnapshotInfo .endTime (),
683+ null
684+ ));
685+ newPolicyMetadata .setInvocationsSinceLastSuccess (0L );
688686 } else {
689- // Snapshot is for this policy but is not running so infer failure, update stats accordingly,
690- // and remove from registered set
691687 newStats = newStats .withFailedIncremented (policyName );
692- newPolicyMetadata .incrementInvocationsSinceLastSuccess ()
693- .setLastFailure (buildFailedSnapshotRecord (snapshot .getSnapshotId ()));
688+ newPolicyMetadata .setLastFailure (new SnapshotInvocationRecord (
689+ staleSnapshotInfo .snapshotId ().getName (),
690+ staleSnapshotInfo .startTime (),
691+ staleSnapshotInfo .endTime (),
692+ null
693+ ));
694+ newPolicyMetadata .incrementInvocationsSinceLastSuccess ();
694695 }
695- } else if (snapLifecycles .containsKey (snapshot .getPolicy ())) {
696- // Snapshot is for another policy so keep in the registered set and that policy deal with it
697- newRegistered .add (snapshot );
696+ } else {
697+ // either the snapshot no longer exist in the repo or its info failed to be retrieved, assume failure to clean it up
698+ // so it is not stuck in the registered set forever
699+ newPolicyMetadata .incrementInvocationsSinceLastSuccess ()
700+ .setLastFailure (buildFailedSnapshotRecord (snapshotId ));
701+ newStats = newStats .withFailedIncremented (policyName );
698702 }
699703 }
700704 }
@@ -726,6 +730,58 @@ public ClusterState execute(ClusterState currentState) throws Exception {
726730 builder -> builder .putCustom (SnapshotLifecycleMetadata .TYPE , lifecycleMetadata )
727731 .putCustom (RegisteredPolicySnapshots .TYPE , new RegisteredPolicySnapshots (newRegistered ))
728732 );
733+
734+
735+ // original code
736+ //
737+ // for (PolicySnapshot snapshot : registeredSnapshots.getSnapshots()) {
738+ // if (snapshot.getSnapshotId().equals(snapshotId) == false) {
739+ // if (snapshot.getPolicy().equals(policyName)) {
740+ // if (runningSnapshots.contains(snapshot.getSnapshotId())) {
741+ // // Snapshot is for this policy and is still running so keep it in registered set
742+ // newRegistered.add(snapshot);
743+ // } else {
744+ // // Snapshot is for this policy but is not running so infer failure, update stats accordingly,
745+ // // and remove from registered set
746+ // newStats = newStats.withFailedIncremented(policyName);
747+ // newPolicyMetadata.incrementInvocationsSinceLastSuccess()
748+ // .setLastFailure(buildFailedSnapshotRecord(snapshot.getSnapshotId()));
749+ // }
750+ // } else if (snapLifecycles.containsKey(snapshot.getPolicy())) {
751+ // // Snapshot is for another policy so keep in the registered set and that policy deal with it
752+ // newRegistered.add(snapshot);
753+ // }
754+ // }
755+ // }
756+ //
757+ // // Add stats from the just completed snapshot execution
758+ // if (exception.isPresent()) {
759+ // newStats = newStats.withFailedIncremented(policyName);
760+ // newPolicyMetadata.setLastFailure(
761+ // new SnapshotInvocationRecord(
762+ // snapshotId.getName(),
763+ // null,
764+ // snapshotFinishTime,
765+ // exception.map(SnapshotLifecycleTask::exceptionToString).orElse(null)
766+ // )
767+ // );
768+ // newPolicyMetadata.incrementInvocationsSinceLastSuccess();
769+ // } else {
770+ // newStats = newStats.withTakenIncremented(policyName);
771+ // newPolicyMetadata.setLastSuccess(
772+ // new SnapshotInvocationRecord(snapshotId.getName(), snapshotStartTime, snapshotFinishTime, null)
773+ // );
774+ // newPolicyMetadata.setInvocationsSinceLastSuccess(0L);
775+ // }
776+ //
777+ // snapLifecycles.put(policyName, newPolicyMetadata.build());
778+ // SnapshotLifecycleMetadata lifecycleMetadata = new SnapshotLifecycleMetadata
779+ // (snapLifecycles, currentSLMMode(project), newStats);
780+ // return currentState.copyAndUpdateProject(
781+ // project.id(),
782+ // builder -> builder.putCustom(SnapshotLifecycleMetadata.TYPE, lifecycleMetadata)
783+ // .putCustom(RegisteredPolicySnapshots.TYPE, new RegisteredPolicySnapshots(newRegistered))
784+ // );
729785 }
730786
731787 @ Override
@@ -738,5 +794,21 @@ public void onFailure(Exception e) {
738794 e
739795 );
740796 }
797+
798+ // private static SnapshotInvocationRecord getNewerInvocation(@Nullable SnapshotInvocationRecord invocation,
799+ // SnapshotInfo snapshotInfo) {
800+ // if (invocation != null && invocation.getSnapshotStartTimestamp() != null &&
801+ // invocation.getSnapshotStartTimestamp() >= snapshotInfo.startTime()) {
802+ // return invocation;
803+ // } else {
804+ // return new SnapshotInvocationRecord(
805+ // snapshotInfo.snapshotId().getName(),
806+ // snapshotInfo.startTime(),
807+ // snapshotInfo.endTime(),
808+ // null
809+ // );
810+ // }
811+ // }
812+
741813 }
742814}
0 commit comments