1313import org .elasticsearch .cluster .ClusterState ;
1414import org .elasticsearch .cluster .ClusterStateListener ;
1515import org .elasticsearch .cluster .ClusterStateUpdateTask ;
16+ import org .elasticsearch .cluster .metadata .ProjectId ;
17+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
1618import org .elasticsearch .cluster .metadata .RepositoriesMetadata ;
1719import org .elasticsearch .cluster .service .ClusterService ;
1820import org .elasticsearch .common .scheduler .SchedulerEngine ;
1921import org .elasticsearch .common .settings .Settings ;
2022import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
23+ import org .elasticsearch .core .FixForMultiProject ;
2124import org .elasticsearch .core .SuppressForbidden ;
2225import org .elasticsearch .core .TimeValue ;
2326import org .elasticsearch .xpack .core .ilm .LifecycleSettings ;
3134import java .time .Clock ;
3235import java .util .Map ;
3336import java .util .Set ;
37+ import java .util .concurrent .ConcurrentMap ;
3438import java .util .concurrent .atomic .AtomicBoolean ;
35- import java .util .function .Supplier ;
39+ import java .util .function .Function ;
3640import java .util .regex .Pattern ;
3741import java .util .stream .Collectors ;
3842
@@ -47,22 +51,23 @@ public class SnapshotLifecycleService implements Closeable, ClusterStateListener
4751 private static final Logger logger = LogManager .getLogger (SnapshotLifecycleService .class );
4852 private static final String JOB_PATTERN_SUFFIX = "-\\ d+$" ;
4953
50- private final SchedulerEngine scheduler ;
5154 private final ClusterService clusterService ;
52- private final SnapshotLifecycleTask snapshotTask ;
53- private final Map < String , SchedulerEngine . Job > scheduledTasks = ConcurrentCollections . newConcurrentMap () ;
54- private final AtomicBoolean running = new AtomicBoolean ( true ) ;
55+ private final Function < ProjectId , SnapshotLifecycleTask > taskProvider ;
56+ private final Settings settings ;
57+ private final Clock clock ;
5558 private volatile boolean isMaster = false ;
59+ private final ConcurrentMap <ProjectId , SnapshotLifecycleProjectState > projects = ConcurrentCollections .newConcurrentMap ();
5660
5761 public SnapshotLifecycleService (
5862 Settings settings ,
59- Supplier < SnapshotLifecycleTask > taskSupplier ,
63+ Function < ProjectId , SnapshotLifecycleTask > taskProvider ,
6064 ClusterService clusterService ,
6165 Clock clock
6266 ) {
63- this .scheduler = new SchedulerEngine (settings , clock );
67+ this .settings = settings ;
68+ this .taskProvider = taskProvider ;
6469 this .clusterService = clusterService ;
65- this .snapshotTask = taskSupplier . get () ;
70+ this .clock = clock ;
6671 }
6772
6873 /**
@@ -73,36 +78,40 @@ public void init() {
7378 }
7479
7580 @ Override
81+ @ FixForMultiProject (description = "Project deletion should cancel the scheduled SLM tasks" )
7682 public void clusterChanged (final ClusterChangedEvent event ) {
7783 // Instead of using a LocalNodeMasterListener to track master changes, this service will
7884 // track them here to avoid conditions where master listener events run after other
7985 // listeners that depend on what happened in the master listener
8086 final boolean prevIsMaster = this .isMaster ;
81- if (prevIsMaster != event .localNodeMaster ()) {
82- this .isMaster = event .localNodeMaster ();
83- if (this .isMaster ) {
84- scheduler .register (snapshotTask );
85- } else {
86- scheduler .unregister (snapshotTask );
87- cancelSnapshotJobs ();
88- }
89- }
87+ final boolean masterChanged = prevIsMaster != event .localNodeMaster ();
88+ this .isMaster = event .localNodeMaster ();
9089
91- if (this .isMaster ) {
92- final ClusterState state = event .state ();
93-
94- if (slmStoppedOrStopping (state )) {
95- if (scheduler .scheduledJobIds ().size () > 0 ) {
96- cancelSnapshotJobs ();
97- }
98- if (slmStopping (state )) {
99- submitUnbatchedTask ("slm_operation_mode_update[stopped]" , OperationModeUpdateTask .slmMode (OperationMode .STOPPED ));
90+ for (ProjectMetadata metadata : event .state ().metadata ().projects ().values ()) {
91+ SnapshotLifecycleProjectState project = getOrCreateProjectState (metadata .id ());
92+ if (masterChanged ) {
93+ if (this .isMaster ) {
94+ project .scheduler .register (project .snapshotTask );
95+ } else {
96+ project .scheduler .unregister (project .snapshotTask );
97+ cancelSnapshotJobs (project );
10098 }
101- return ;
10299 }
103100
104- scheduleSnapshotJobs (state );
105- cleanupDeletedPolicies (state );
101+ if (this .isMaster ) {
102+ if (slmStoppedOrStopping (metadata )) {
103+ if (project .scheduler .scheduledJobIds ().isEmpty () == false ) {
104+ cancelSnapshotJobs (project );
105+ }
106+ if (slmStopping (metadata )) {
107+ submitUnbatchedTask ("slm_operation_mode_update[stopped]" , OperationModeUpdateTask .slmMode (OperationMode .STOPPED ));
108+ }
109+ continue ;
110+ }
111+
112+ scheduleSnapshotJobs (project , metadata );
113+ cleanupDeletedPolicies (project , metadata );
114+ }
106115 }
107116 }
108117
@@ -112,48 +121,61 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
112121 }
113122
114123 // Only used for testing
115- SchedulerEngine getScheduler () {
116- return this .scheduler ;
124+ SchedulerEngine getScheduler (ProjectId projectId ) {
125+ SnapshotLifecycleProjectState project = projects .get (projectId );
126+ return project == null ? null : project .scheduler ;
117127 }
118128
119129 /**
120130 * Returns true if SLM is in the stopping or stopped state
121131 */
132+ @ Deprecated (forRemoval = true )
122133 static boolean slmStoppedOrStopping (ClusterState state ) {
123134 OperationMode mode = currentSLMMode (state );
124135 return OperationMode .STOPPING == mode || OperationMode .STOPPED == mode ;
125136 }
126137
138+ /**
139+ * Returns true if SLM is in the stopping or stopped state
140+ */
141+ static boolean slmStoppedOrStopping (ProjectMetadata metadata ) {
142+ OperationMode mode = currentSLMMode (metadata );
143+ return OperationMode .STOPPING == mode || OperationMode .STOPPED == mode ;
144+ }
145+
127146 /**
128147 * Returns true if SLM is in the stopping state
129148 */
130- static boolean slmStopping (ClusterState state ) {
131- OperationMode mode = currentSLMMode (state );
149+ static boolean slmStopping (ProjectMetadata metadata ) {
150+ OperationMode mode = currentSLMMode (metadata );
132151 return OperationMode .STOPPING == mode ;
133152 }
134153
135154 /**
136155 * Schedule all non-scheduled snapshot jobs contained in the cluster state
137156 */
138- public void scheduleSnapshotJobs (final ClusterState state ) {
139- SnapshotLifecycleMetadata snapMeta = state . metadata (). getProject () .custom (SnapshotLifecycleMetadata .TYPE );
157+ private void scheduleSnapshotJobs (final SnapshotLifecycleProjectState project , final ProjectMetadata metadata ) {
158+ SnapshotLifecycleMetadata snapMeta = metadata .custom (SnapshotLifecycleMetadata .TYPE );
140159 if (snapMeta != null ) {
141- snapMeta .getSnapshotConfigurations ().values ().forEach (this :: maybeScheduleSnapshot );
160+ snapMeta .getSnapshotConfigurations ().values ().forEach (( config ) -> maybeScheduleSnapshot ( project , config ) );
142161 }
143162 }
144163
145- public void cleanupDeletedPolicies (final ClusterState state ) {
146- SnapshotLifecycleMetadata snapMeta = state . metadata (). getProject () .custom (SnapshotLifecycleMetadata .TYPE );
164+ private void cleanupDeletedPolicies (final SnapshotLifecycleProjectState project , final ProjectMetadata metadata ) {
165+ SnapshotLifecycleMetadata snapMeta = metadata .custom (SnapshotLifecycleMetadata .TYPE );
147166 if (snapMeta != null ) {
148- // Retrieve all of the expected policy job ids from the policies in the metadata
167+ // Retrieve all the expected policy job ids from the policies in the metadata
149168 final Set <String > policyJobIds = snapMeta .getSnapshotConfigurations ()
150169 .values ()
151170 .stream ()
152171 .map (SnapshotLifecycleService ::getJobId )
153172 .collect (Collectors .toSet ());
154173
155174 // Cancel all jobs that are *NOT* in the scheduled tasks map
156- scheduledTasks .keySet ().stream ().filter (jobId -> policyJobIds .contains (jobId ) == false ).forEach (this ::cancelScheduledSnapshot );
175+ project .scheduledTasks .keySet ()
176+ .stream ()
177+ .filter (jobId -> policyJobIds .contains (jobId ) == false )
178+ .forEach (jobId -> cancelScheduledSnapshot (project , jobId ));
157179 }
158180 }
159181
@@ -162,16 +184,16 @@ public void cleanupDeletedPolicies(final ClusterState state) {
162184 * to see if any previous versions of the policy were scheduled, and if so, cancels those. If
163185 * the same version of a policy has already been scheduled it does not overwrite the job.
164186 */
165- public void maybeScheduleSnapshot (final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy ) {
166- if (this .running .get () == false ) {
187+ void maybeScheduleSnapshot (final SnapshotLifecycleProjectState project , final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy ) {
188+ if (project .running .get () == false ) {
167189 return ;
168190 }
169191
170192 final String jobId = getJobId (snapshotLifecyclePolicy );
171193 final Pattern existingJobPattern = Pattern .compile (snapshotLifecyclePolicy .getPolicy ().getId () + JOB_PATTERN_SUFFIX );
172194
173195 // Find and cancel any existing jobs for this policy
174- final boolean existingJobsFoundAndCancelled = scheduledTasks .keySet ()
196+ final boolean existingJobsFoundAndCancelled = project . scheduledTasks .keySet ()
175197 .stream ()
176198 // Find all jobs matching the `jobid-\d+` pattern
177199 .filter (jId -> existingJobPattern .matcher (jId ).matches ())
@@ -180,8 +202,8 @@ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshot
180202 .map (existingJobId -> {
181203 // Cancel existing job so the new one can be scheduled
182204 logger .debug ("removing existing snapshot lifecycle job [{}] as it has been updated" , existingJobId );
183- scheduledTasks .remove (existingJobId );
184- boolean existed = scheduler .remove (existingJobId );
205+ project . scheduledTasks .remove (existingJobId );
206+ boolean existed = project . scheduler .remove (existingJobId );
185207 assert existed : "expected job for " + existingJobId + " to exist in scheduler" ;
186208 return existed ;
187209 })
@@ -190,15 +212,15 @@ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshot
190212 // Now atomically schedule the new job and add it to the scheduled tasks map. If the jobId
191213 // is identical to an existing job (meaning the version has not changed) then this does
192214 // not reschedule it.
193- scheduledTasks .computeIfAbsent (jobId , id -> {
215+ project . scheduledTasks .computeIfAbsent (jobId , id -> {
194216 if (existingJobsFoundAndCancelled ) {
195217 logger .info ("rescheduling updated snapshot lifecycle job [{}]" , jobId );
196218 } else {
197219 logger .info ("scheduling snapshot lifecycle job [{}]" , jobId );
198220 }
199221
200222 final SchedulerEngine .Job job = snapshotLifecyclePolicy .buildSchedulerJob (jobId );
201- scheduler .add (job );
223+ project . scheduler .add (job );
202224 return job ;
203225 });
204226 }
@@ -213,25 +235,34 @@ public static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
213235 /**
214236 * Cancel all scheduled snapshot jobs
215237 */
216- public void cancelSnapshotJobs () {
238+ private void cancelSnapshotJobs (SnapshotLifecycleProjectState project ) {
217239 logger .trace ("cancelling all snapshot lifecycle jobs" );
218- scheduler .scheduledJobIds ().forEach (scheduler ::remove );
219- scheduledTasks .clear ();
240+ project . scheduler .scheduledJobIds ().forEach (project . scheduler ::remove );
241+ project . scheduledTasks .clear ();
220242 }
221243
222244 /**
223245 * Cancel the given policy job id (from {@link #getJobId(SnapshotLifecyclePolicyMetadata)}
224246 */
225- public void cancelScheduledSnapshot (final String lifecycleJobId ) {
226- logger .debug ("cancelling snapshot lifecycle job [{}] as it no longer exists" , lifecycleJobId );
227- scheduledTasks .remove (lifecycleJobId );
228- scheduler .remove (lifecycleJobId );
247+ private void cancelScheduledSnapshot (final SnapshotLifecycleProjectState project , final String lifecycleJobId ) {
248+ logger .debug ("cancelling project [{}] snapshot lifecycle job [{}] as it no longer exists" , project .projectId , lifecycleJobId );
249+ project .scheduledTasks .remove (lifecycleJobId );
250+ project .scheduler .remove (lifecycleJobId );
251+ }
252+
253+ private SnapshotLifecycleProjectState getOrCreateProjectState (ProjectId projectId ) {
254+ // initialize the project state if it does not exist (i.e. new project)
255+ return projects .computeIfAbsent (
256+ projectId ,
257+ id -> new SnapshotLifecycleProjectState (projectId , new SchedulerEngine (settings , clock ), taskProvider .apply (projectId ))
258+ );
229259 }
230260
231261 /**
232262 * Validates that the {@code repository} exists as a registered snapshot repository
233263 * @throws IllegalArgumentException if the repository does not exist
234264 */
265+ @ Deprecated (forRemoval = true )
235266 public static void validateRepositoryExists (final String repository , final ClusterState state ) {
236267 if (RepositoriesMetadata .get (state ).repository (repository ) == null ) {
237268 throw new IllegalArgumentException ("no such repository [" + repository + "]" );
@@ -260,8 +291,27 @@ public static void validateMinimumInterval(final SnapshotLifecyclePolicy lifecyc
260291
261292 @ Override
262293 public void close () {
263- if (this .running .compareAndSet (true , false )) {
264- this .scheduler .stop ();
294+ projects .values ().forEach (project -> {
295+ if (project .running .compareAndSet (true , false )) {
296+ project .scheduler .stop ();
297+ }
298+ });
299+ }
300+
301+ /**
302+ * Internal wrapper class to hold the state of a project's snapshot lifecycle
303+ */
304+ static class SnapshotLifecycleProjectState {
305+ final ProjectId projectId ;
306+ final SchedulerEngine scheduler ;
307+ final SnapshotLifecycleTask snapshotTask ;
308+ final Map <String , SchedulerEngine .Job > scheduledTasks = ConcurrentCollections .newConcurrentMap ();
309+ final AtomicBoolean running = new AtomicBoolean (true );
310+
311+ SnapshotLifecycleProjectState (ProjectId projectId , SchedulerEngine scheduler , SnapshotLifecycleTask snapshotTask ) {
312+ this .projectId = projectId ;
313+ this .scheduler = scheduler ;
314+ this .snapshotTask = snapshotTask ;
265315 }
266316 }
267317}
0 commit comments