1313import org .elasticsearch .action .ActionListener ;
1414import org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotRequest ;
1515import org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotResponse ;
16+ import org .elasticsearch .action .admin .cluster .snapshots .get .GetSnapshotsRequest ;
17+ import org .elasticsearch .action .admin .cluster .snapshots .get .GetSnapshotsResponse ;
18+ import org .elasticsearch .action .admin .cluster .snapshots .get .TransportGetSnapshotsAction ;
19+ import org .elasticsearch .action .support .master .AcknowledgedRequest ;
20+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
1621import org .elasticsearch .client .internal .Client ;
22+ import org .elasticsearch .cluster .AckedBatchedClusterStateUpdateTask ;
23+ import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
1724import org .elasticsearch .cluster .ClusterState ;
25+ import org .elasticsearch .cluster .ClusterStateTaskExecutor ;
1826import org .elasticsearch .cluster .ClusterStateUpdateTask ;
27+ import org .elasticsearch .cluster .ProjectState ;
28+ import org .elasticsearch .cluster .SimpleBatchedExecutor ;
1929import org .elasticsearch .cluster .SnapshotsInProgress ;
2030import org .elasticsearch .cluster .metadata .ProjectId ;
2131import org .elasticsearch .cluster .metadata .ProjectMetadata ;
2232import org .elasticsearch .cluster .service .ClusterService ;
33+ import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
34+ import org .elasticsearch .common .Priority ;
2335import org .elasticsearch .common .Strings ;
2436import org .elasticsearch .common .scheduler .SchedulerEngine ;
2537import org .elasticsearch .core .FixForMultiProject ;
2638import org .elasticsearch .core .SuppressForbidden ;
2739import org .elasticsearch .core .TimeValue ;
40+ import org .elasticsearch .core .Tuple ;
2841import org .elasticsearch .snapshots .RegisteredPolicySnapshots ;
2942import org .elasticsearch .snapshots .RegisteredPolicySnapshots .PolicySnapshot ;
3043import org .elasticsearch .snapshots .SnapshotException ;
3548import org .elasticsearch .xpack .core .ilm .LifecyclePolicySecurityClient ;
3649import org .elasticsearch .xpack .core .slm .SnapshotInvocationRecord ;
3750import org .elasticsearch .xpack .core .slm .SnapshotLifecycleMetadata ;
51+ import org .elasticsearch .xpack .core .slm .SnapshotLifecyclePolicy ;
3852import org .elasticsearch .xpack .core .slm .SnapshotLifecyclePolicyMetadata ;
3953import org .elasticsearch .xpack .core .slm .SnapshotLifecycleStats ;
4054import org .elasticsearch .xpack .slm .history .SnapshotHistoryItem ;
5367
5468import static org .elasticsearch .core .Strings .format ;
5569import static org .elasticsearch .xpack .core .ilm .LifecycleOperationMetadata .currentSLMMode ;
70+ import static org .elasticsearch .xpack .slm .SnapshotLifecycleService .getJobId ;
71+ import static org .elasticsearch .xpack .slm .SnapshotLifecycleService .getPolicyId ;
5672
5773public class SnapshotLifecycleTask implements SchedulerEngine .Listener {
5874
@@ -62,6 +78,7 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener {
6278 private final Client client ;
6379 private final ClusterService clusterService ;
6480 private final SnapshotHistoryStore historyStore ;
81+ private final MasterServiceTaskQueue <UpdatePolicyStatsTask > updatePolicyStatsQueue ;
6582
6683 public SnapshotLifecycleTask (
6784 final ProjectId projectId ,
@@ -73,13 +90,128 @@ public SnapshotLifecycleTask(
7390 this .client = client ;
7491 this .clusterService = clusterService ;
7592 this .historyStore = historyStore ;
93+
94+ ClusterStateTaskExecutor <UpdatePolicyStatsTask > executor = new SimpleBatchedExecutor <>() {
95+ @ Override
96+ public Tuple <ClusterState , Object > executeTask (UpdatePolicyStatsTask updatePolicyStatsTask , ClusterState clusterState ) throws Exception {
97+ // TODO
98+ return null ;
99+ }
100+
101+ @ Override
102+ public void taskSucceeded (UpdatePolicyStatsTask updatePolicyStatsTask , Object o ) {
103+ // TODO
104+ }
105+ };
106+ this .updatePolicyStatsQueue = clusterService .createTaskQueue ("slm-update-policy-stats" , Priority .HIGH , executor );
107+ }
108+
109+ static class UpdatePolicyStatsTask extends ClusterStateUpdateTask {
110+
111+ @ Override
112+ public ClusterState execute (ClusterState currentState ) throws Exception {
113+ return null ;
114+ }
115+
116+ @ Override
117+ public void onFailure (Exception e ) {
118+ // TODO
119+ }
120+ }
121+
122+ private static List <String > getStaleRegisteredSnapshotIds (ProjectState projectState , String policyId ) {
123+ Set <SnapshotId > runningSnapshots = currentlyRunningSnapshots (projectState .cluster ());
124+
125+ RegisteredPolicySnapshots registeredSnapshots = projectState .metadata ()
126+ .custom (RegisteredPolicySnapshots .TYPE , RegisteredPolicySnapshots .EMPTY );
127+
128+ List <String > staleRegisterSnapshotIds = registeredSnapshots .getSnapshots ().stream ()
129+ // look for snapshots of this SLM policy, leave the rest to the policy that owns it
130+ .filter (policySnapshot -> policySnapshot .getPolicy ().equals (policyId ))
131+ // look for snapshots that are no longer running
132+ .filter (policySnapshot -> runningSnapshots .contains (policySnapshot .getSnapshotId ()) == false )
133+ .map (policySnapshot -> policySnapshot .getSnapshotId ().getName ())
134+ .toList ();
135+
136+ return staleRegisterSnapshotIds ;
76137 }
77138
78139 @ Override
79140 public void triggered (SchedulerEngine .Event event ) {
80141 logger .debug ("snapshot lifecycle policy task triggered from job [{}]" , event .jobName ());
81- ProjectMetadata projectMetadata = clusterService .state ().getMetadata ().getProject (projectId );
82- final Optional <String > snapshotName = maybeTakeSnapshot (projectMetadata , event .jobName (), client , clusterService , historyStore );
142+ ProjectState projectState = clusterService .state ().projectState (projectId );
143+ ProjectMetadata metadata = projectState .metadata ();
144+ String policyId = getPolicyId (event .jobName ());
145+
146+ List <String > snapshotsToCleanup = getStaleRegisteredSnapshotIds (projectState , policyId );
147+ if (snapshotsToCleanup .isEmpty () == false ) {
148+ var policyMetadata = getSnapPolicyMetadata (metadata , event .jobName ());
149+ if (policyMetadata .isEmpty ()) {
150+ logger .warn ("snapshot lifecycle policy for job [{}] no longer exists" , event .jobName ());
151+ return ;
152+ }
153+ SnapshotLifecyclePolicy policy = policyMetadata .get ().getPolicy ();
154+
155+ GetSnapshotsRequest getSnapshotsRequest = new GetSnapshotsRequest (
156+ TimeValue .MAX_VALUE ,
157+ new String [] { policy .getRepository () },
158+ snapshotsToCleanup .toArray (new String [0 ])
159+ );
160+
161+ GetSnapshotsResponse getSnapshotsResponse = client .admin ().cluster ()
162+ .execute (TransportGetSnapshotsAction .TYPE , getSnapshotsRequest ).actionGet ();
163+
164+
165+ // cluster update task
166+ // verify
167+ int countSnapshotFailure = 0 ;
168+ int countSnapshotSuccess = 0 ;
169+ SnapshotInfo lastSuccess = null ;
170+ SnapshotInfo lastFailure = null ;
171+ for (SnapshotInfo snapshotInfo : getSnapshotsResponse .getSnapshots ()) {
172+ if (snapshotInfo .state () == null || snapshotInfo .state ().completed () == false ) {
173+ // skip unknown state and non-completed snapshots
174+ continue ;
175+ }
176+ if (snapshotInfo .failedShards () == 0 ) {
177+ countSnapshotSuccess ++;
178+ if (lastSuccess == null || snapshotInfo .startTime () > lastSuccess .startTime ()) {
179+ lastSuccess = snapshotInfo ;
180+ }
181+ } else {
182+ countSnapshotFailure ++;
183+ if (lastFailure == null || snapshotInfo .startTime () > lastFailure .startTime ()) {
184+ lastFailure = snapshotInfo ;
185+ }
186+ }
187+ }
188+
189+
190+ // client.admin().cluster().getSnapshots(getSnapshotsRequest, new ActionListener<>() {
191+ //
192+ // @Override
193+ // public void onResponse(GetSnapshotsResponse response) {
194+ // int countSnapshotFailed = 0;
195+ // int countSnapshotSuccessful = 0;
196+ // for (SnapshotInfo snapshot : response.getSnapshots()) {
197+ // boolean success = snapshot.failedShards() == 0;
198+ // if (success) {
199+ // countSnapshotSuccessful++;
200+ // } else {
201+ // countSnapshotFailed++;
202+ // }
203+ // }
204+ //
205+ // }
206+ //
207+ // @Override
208+ // public void onFailure(Exception e) {
209+ //
210+ // }
211+ // });
212+ }
213+
214+ final Optional <String > snapshotName = maybeTakeSnapshot (metadata , event .jobName (), client , clusterService , historyStore );
83215
84216 // Would be cleaner if we could use Optional#ifPresentOrElse
85217 snapshotName .ifPresent (
@@ -219,7 +351,7 @@ static Optional<SnapshotLifecyclePolicyMetadata> getSnapPolicyMetadata(final Pro
219351 .flatMap (
220352 configMap -> configMap .values ()
221353 .stream ()
222- .filter (policyMeta -> jobId .equals (SnapshotLifecycleService . getJobId (policyMeta )))
354+ .filter (policyMeta -> jobId .equals (getJobId (policyMeta )))
223355 .findFirst ()
224356 );
225357 }
0 commit comments