@@ -18,14 +18,13 @@ package cronjob
18
18
19
19
/*
20
20
I did not use watch or expectations. Those add a lot of corner cases, and we aren't
21
- expecting a large volume of jobs or scheduledJobs . (We are favoring correctness
21
+ expecting a large volume of jobs or cronJobs . (We are favoring correctness
22
22
over scalability. If we find a single controller thread is too slow because
23
23
there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
24
24
If we find the load on the API server is too high, we can use a watch and
25
25
UndeltaStore.)
26
26
27
- Just periodically list jobs and SJs, and then reconcile them.
28
-
27
+ Just periodically list jobs and cronJobs, and then reconcile them.
29
28
*/
30
29
31
30
import (
@@ -63,7 +62,7 @@ var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob")
63
62
type Controller struct {
64
63
kubeClient clientset.Interface
65
64
jobControl jobControlInterface
66
- sjControl sjControlInterface
65
+ cjControl cjControlInterface
67
66
podControl podControlInterface
68
67
recorder record.EventRecorder
69
68
}
@@ -83,7 +82,7 @@ func NewController(kubeClient clientset.Interface) (*Controller, error) {
83
82
jm := & Controller {
84
83
kubeClient : kubeClient ,
85
84
jobControl : realJobControl {KubeClient : kubeClient },
86
- sjControl : & realSJControl {KubeClient : kubeClient },
85
+ cjControl : & realCJControl {KubeClient : kubeClient },
87
86
podControl : & realPodControl {KubeClient : kubeClient },
88
87
recorder : eventBroadcaster .NewRecorder (scheme .Scheme , v1.EventSource {Component : "cronjob-controller" }),
89
88
}
@@ -131,15 +130,15 @@ func (jm *Controller) syncAll() {
131
130
return jm .kubeClient .BatchV1beta1 ().CronJobs (metav1 .NamespaceAll ).List (context .TODO (), opts )
132
131
}
133
132
134
- jobsBySj := groupJobsByParent (js )
135
- klog .V (4 ).Infof ("Found %d groups" , len (jobsBySj ))
133
+ jobsByCj := groupJobsByParent (js )
134
+ klog .V (4 ).Infof ("Found %d groups" , len (jobsByCj ))
136
135
err = pager .New (pager .SimplePageFunc (cronJobListFunc )).EachListItem (context .Background (), metav1.ListOptions {}, func (object runtime.Object ) error {
137
- sj , ok := object .(* batchv1beta1.CronJob )
136
+ cj , ok := object .(* batchv1beta1.CronJob )
138
137
if ! ok {
139
- return fmt .Errorf ("expected type *batchv1beta1.CronJob, got type %T" , sj )
138
+ return fmt .Errorf ("expected type *batchv1beta1.CronJob, got type %T" , cj )
140
139
}
141
- syncOne (sj , jobsBySj [ sj .UID ], time .Now (), jm .jobControl , jm .sjControl , jm .recorder )
142
- cleanupFinishedJobs (sj , jobsBySj [ sj .UID ], jm .jobControl , jm .sjControl , jm .recorder )
140
+ syncOne (cj , jobsByCj [ cj .UID ], time .Now (), jm .jobControl , jm .cjControl , jm .recorder )
141
+ cleanupFinishedJobs (cj , jobsByCj [ cj .UID ], jm .jobControl , jm .cjControl , jm .recorder )
143
142
return nil
144
143
})
145
144
@@ -150,10 +149,10 @@ func (jm *Controller) syncAll() {
150
149
}
151
150
152
151
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
153
- func cleanupFinishedJobs (sj * batchv1beta1.CronJob , js []batchv1.Job , jc jobControlInterface ,
154
- sjc sjControlInterface , recorder record.EventRecorder ) {
152
+ func cleanupFinishedJobs (cj * batchv1beta1.CronJob , js []batchv1.Job , jc jobControlInterface ,
153
+ cjc cjControlInterface , recorder record.EventRecorder ) {
155
154
// If neither limits are active, there is no need to do anything.
156
- if sj .Spec .FailedJobsHistoryLimit == nil && sj .Spec .SuccessfulJobsHistoryLimit == nil {
155
+ if cj .Spec .FailedJobsHistoryLimit == nil && cj .Spec .SuccessfulJobsHistoryLimit == nil {
157
156
return
158
157
}
159
158
@@ -169,107 +168,107 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr
169
168
}
170
169
}
171
170
172
- if sj .Spec .SuccessfulJobsHistoryLimit != nil {
173
- removeOldestJobs (sj ,
171
+ if cj .Spec .SuccessfulJobsHistoryLimit != nil {
172
+ removeOldestJobs (cj ,
174
173
successfulJobs ,
175
174
jc ,
176
- * sj .Spec .SuccessfulJobsHistoryLimit ,
175
+ * cj .Spec .SuccessfulJobsHistoryLimit ,
177
176
recorder )
178
177
}
179
178
180
- if sj .Spec .FailedJobsHistoryLimit != nil {
181
- removeOldestJobs (sj ,
179
+ if cj .Spec .FailedJobsHistoryLimit != nil {
180
+ removeOldestJobs (cj ,
182
181
failedJobs ,
183
182
jc ,
184
- * sj .Spec .FailedJobsHistoryLimit ,
183
+ * cj .Spec .FailedJobsHistoryLimit ,
185
184
recorder )
186
185
}
187
186
188
187
// Update the CronJob, in case jobs were removed from the list.
189
- if _ , err := sjc .UpdateStatus (sj ); err != nil {
190
- nameForLog := fmt .Sprintf ("%s/%s" , sj .Namespace , sj .Name )
191
- klog .Infof ("Unable to update status for %s (rv = %s): %v" , nameForLog , sj .ResourceVersion , err )
188
+ if _ , err := cjc .UpdateStatus (cj ); err != nil {
189
+ nameForLog := fmt .Sprintf ("%s/%s" , cj .Namespace , cj .Name )
190
+ klog .Infof ("Unable to update status for %s (rv = %s): %v" , nameForLog , cj .ResourceVersion , err )
192
191
}
193
192
}
194
193
195
194
// removeOldestJobs removes the oldest jobs from a list of jobs
196
- func removeOldestJobs (sj * batchv1beta1.CronJob , js []batchv1.Job , jc jobControlInterface , maxJobs int32 , recorder record.EventRecorder ) {
195
+ func removeOldestJobs (cj * batchv1beta1.CronJob , js []batchv1.Job , jc jobControlInterface , maxJobs int32 , recorder record.EventRecorder ) {
197
196
numToDelete := len (js ) - int (maxJobs )
198
197
if numToDelete <= 0 {
199
198
return
200
199
}
201
200
202
- nameForLog := fmt .Sprintf ("%s/%s" , sj .Namespace , sj .Name )
201
+ nameForLog := fmt .Sprintf ("%s/%s" , cj .Namespace , cj .Name )
203
202
klog .V (4 ).Infof ("Cleaning up %d/%d jobs from %s" , numToDelete , len (js ), nameForLog )
204
203
205
204
sort .Sort (byJobStartTime (js ))
206
205
for i := 0 ; i < numToDelete ; i ++ {
207
206
klog .V (4 ).Infof ("Removing job %s from %s" , js [i ].Name , nameForLog )
208
- deleteJob (sj , & js [i ], jc , recorder )
207
+ deleteJob (cj , & js [i ], jc , recorder )
209
208
}
210
209
}
211
210
212
211
// syncOne reconciles a CronJob with a list of any Jobs that it created.
213
- // All known jobs created by "sj " should be included in "js".
212
+ // All known jobs created by "cj " should be included in "js".
214
213
// The current time is passed in to facilitate testing.
215
214
// It has no receiver, to facilitate testing.
216
- func syncOne (sj * batchv1beta1.CronJob , js []batchv1.Job , now time.Time , jc jobControlInterface , sjc sjControlInterface , recorder record.EventRecorder ) {
217
- nameForLog := fmt .Sprintf ("%s/%s" , sj .Namespace , sj .Name )
215
+ func syncOne (cj * batchv1beta1.CronJob , js []batchv1.Job , now time.Time , jc jobControlInterface , cjc cjControlInterface , recorder record.EventRecorder ) {
216
+ nameForLog := fmt .Sprintf ("%s/%s" , cj .Namespace , cj .Name )
218
217
219
218
childrenJobs := make (map [types.UID ]bool )
220
219
for _ , j := range js {
221
220
childrenJobs [j .ObjectMeta .UID ] = true
222
- found := inActiveList (* sj , j .ObjectMeta .UID )
221
+ found := inActiveList (* cj , j .ObjectMeta .UID )
223
222
if ! found && ! IsJobFinished (& j ) {
224
- recorder .Eventf (sj , v1 .EventTypeWarning , "UnexpectedJob" , "Saw a job that the controller did not create or forgot: %s" , j .Name )
223
+ recorder .Eventf (cj , v1 .EventTypeWarning , "UnexpectedJob" , "Saw a job that the controller did not create or forgot: %s" , j .Name )
225
224
// We found an unfinished job that has us as the parent, but it is not in our Active list.
226
225
// This could happen if we crashed right after creating the Job and before updating the status,
227
- // or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
226
+ // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
228
227
// a job that they wanted us to adopt.
229
228
230
229
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
231
230
// stop users from creating jobs if they have permission. It is assumed that if a
232
- // user has permission to create a job within a namespace, then they have permission to make any scheduledJob
231
+ // user has permission to create a job within a namespace, then they have permission to make any cronJob
233
232
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
234
- // TBS: how to update sj .Status.LastScheduleTime if the adopted job is newer than any we knew about?
233
+ // TBS: how to update cj .Status.LastScheduleTime if the adopted job is newer than any we knew about?
235
234
} else if found && IsJobFinished (& j ) {
236
235
_ , status := getFinishedStatus (& j )
237
- deleteFromActiveList (sj , j .ObjectMeta .UID )
238
- recorder .Eventf (sj , v1 .EventTypeNormal , "SawCompletedJob" , "Saw completed job: %s, status: %v" , j .Name , status )
236
+ deleteFromActiveList (cj , j .ObjectMeta .UID )
237
+ recorder .Eventf (cj , v1 .EventTypeNormal , "SawCompletedJob" , "Saw completed job: %s, status: %v" , j .Name , status )
239
238
}
240
239
}
241
240
242
241
// Remove any job reference from the active list if the corresponding job does not exist any more.
243
242
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
244
243
// job running.
245
- for _ , j := range sj .Status .Active {
244
+ for _ , j := range cj .Status .Active {
246
245
if found := childrenJobs [j .UID ]; ! found {
247
- recorder .Eventf (sj , v1 .EventTypeNormal , "MissingJob" , "Active job went missing: %v" , j .Name )
248
- deleteFromActiveList (sj , j .UID )
246
+ recorder .Eventf (cj , v1 .EventTypeNormal , "MissingJob" , "Active job went missing: %v" , j .Name )
247
+ deleteFromActiveList (cj , j .UID )
249
248
}
250
249
}
251
250
252
- updatedSJ , err := sjc .UpdateStatus (sj )
251
+ updatedCJ , err := cjc .UpdateStatus (cj )
253
252
if err != nil {
254
- klog .Errorf ("Unable to update status for %s (rv = %s): %v" , nameForLog , sj .ResourceVersion , err )
253
+ klog .Errorf ("Unable to update status for %s (rv = %s): %v" , nameForLog , cj .ResourceVersion , err )
255
254
return
256
255
}
257
- * sj = * updatedSJ
256
+ * cj = * updatedCJ
258
257
259
- if sj .DeletionTimestamp != nil {
258
+ if cj .DeletionTimestamp != nil {
260
259
// The CronJob is being deleted.
261
260
// Don't do anything other than updating status.
262
261
return
263
262
}
264
263
265
- if sj .Spec .Suspend != nil && * sj .Spec .Suspend {
264
+ if cj .Spec .Suspend != nil && * cj .Spec .Suspend {
266
265
klog .V (4 ).Infof ("Not starting job for %s because it is suspended" , nameForLog )
267
266
return
268
267
}
269
268
270
- times , err := getRecentUnmetScheduleTimes (* sj , now )
269
+ times , err := getRecentUnmetScheduleTimes (* cj , now )
271
270
if err != nil {
272
- recorder .Eventf (sj , v1 .EventTypeWarning , "FailedNeedsStart" , "Cannot determine if job needs to be started: %v" , err )
271
+ recorder .Eventf (cj , v1 .EventTypeWarning , "FailedNeedsStart" , "Cannot determine if job needs to be started: %v" , err )
273
272
klog .Errorf ("Cannot determine if %s needs to be started: %v" , nameForLog , err )
274
273
return
275
274
}
@@ -284,22 +283,22 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
284
283
285
284
scheduledTime := times [len (times )- 1 ]
286
285
tooLate := false
287
- if sj .Spec .StartingDeadlineSeconds != nil {
288
- tooLate = scheduledTime .Add (time .Second * time .Duration (* sj .Spec .StartingDeadlineSeconds )).Before (now )
286
+ if cj .Spec .StartingDeadlineSeconds != nil {
287
+ tooLate = scheduledTime .Add (time .Second * time .Duration (* cj .Spec .StartingDeadlineSeconds )).Before (now )
289
288
}
290
289
if tooLate {
291
290
klog .V (4 ).Infof ("Missed starting window for %s" , nameForLog )
292
- recorder .Eventf (sj , v1 .EventTypeWarning , "MissSchedule" , "Missed scheduled time to start a job: %s" , scheduledTime .Format (time .RFC1123Z ))
291
+ recorder .Eventf (cj , v1 .EventTypeWarning , "MissSchedule" , "Missed scheduled time to start a job: %s" , scheduledTime .Format (time .RFC1123Z ))
293
292
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
294
293
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
295
- // the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
294
+ // the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
296
295
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
297
296
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
298
297
// and event the next time we process it, and also so the user looking at the status
299
298
// can see easily that there was a missed execution.
300
299
return
301
300
}
302
- if sj .Spec .ConcurrencyPolicy == batchv1beta1 .ForbidConcurrent && len (sj .Status .Active ) > 0 {
301
+ if cj .Spec .ConcurrencyPolicy == batchv1beta1 .ForbidConcurrent && len (cj .Status .Active ) > 0 {
303
302
// Regardless which source of information we use for the set of active jobs,
304
303
// there is some risk that we won't see an active job when there is one.
305
304
// (because we haven't seen the status update to the SJ or the created pod).
@@ -312,37 +311,37 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
312
311
klog .V (4 ).Infof ("Not starting job for %s because of prior execution still running and concurrency policy is Forbid" , nameForLog )
313
312
return
314
313
}
315
- if sj .Spec .ConcurrencyPolicy == batchv1beta1 .ReplaceConcurrent {
316
- for _ , j := range sj .Status .Active {
314
+ if cj .Spec .ConcurrencyPolicy == batchv1beta1 .ReplaceConcurrent {
315
+ for _ , j := range cj .Status .Active {
317
316
klog .V (4 ).Infof ("Deleting job %s of %s that was still running at next scheduled start time" , j .Name , nameForLog )
318
317
319
318
job , err := jc .GetJob (j .Namespace , j .Name )
320
319
if err != nil {
321
- recorder .Eventf (sj , v1 .EventTypeWarning , "FailedGet" , "Get job: %v" , err )
320
+ recorder .Eventf (cj , v1 .EventTypeWarning , "FailedGet" , "Get job: %v" , err )
322
321
return
323
322
}
324
- if ! deleteJob (sj , job , jc , recorder ) {
323
+ if ! deleteJob (cj , job , jc , recorder ) {
325
324
return
326
325
}
327
326
}
328
327
}
329
328
330
- jobReq , err := getJobFromTemplate (sj , scheduledTime )
329
+ jobReq , err := getJobFromTemplate (cj , scheduledTime )
331
330
if err != nil {
332
331
klog .Errorf ("Unable to make Job from template in %s: %v" , nameForLog , err )
333
332
return
334
333
}
335
- jobResp , err := jc .CreateJob (sj .Namespace , jobReq )
334
+ jobResp , err := jc .CreateJob (cj .Namespace , jobReq )
336
335
if err != nil {
337
336
// If the namespace is being torn down, we can safely ignore
338
337
// this error since all subsequent creations will fail.
339
338
if ! errors .HasStatusCause (err , v1 .NamespaceTerminatingCause ) {
340
- recorder .Eventf (sj , v1 .EventTypeWarning , "FailedCreate" , "Error creating job: %v" , err )
339
+ recorder .Eventf (cj , v1 .EventTypeWarning , "FailedCreate" , "Error creating job: %v" , err )
341
340
}
342
341
return
343
342
}
344
343
klog .V (4 ).Infof ("Created Job %s for %s" , jobResp .Name , nameForLog )
345
- recorder .Eventf (sj , v1 .EventTypeNormal , "SuccessfulCreate" , "Created job %v" , jobResp .Name )
344
+ recorder .Eventf (cj , v1 .EventTypeNormal , "SuccessfulCreate" , "Created job %v" , jobResp .Name )
346
345
347
346
// ------------------------------------------------------------------ //
348
347
@@ -359,29 +358,29 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
359
358
if err != nil {
360
359
klog .V (2 ).Infof ("Unable to make object reference for job for %s" , nameForLog )
361
360
} else {
362
- sj .Status .Active = append (sj .Status .Active , * ref )
361
+ cj .Status .Active = append (cj .Status .Active , * ref )
363
362
}
364
- sj .Status .LastScheduleTime = & metav1.Time {Time : scheduledTime }
365
- if _ , err := sjc .UpdateStatus (sj ); err != nil {
366
- klog .Infof ("Unable to update status for %s (rv = %s): %v" , nameForLog , sj .ResourceVersion , err )
363
+ cj .Status .LastScheduleTime = & metav1.Time {Time : scheduledTime }
364
+ if _ , err := cjc .UpdateStatus (cj ); err != nil {
365
+ klog .Infof ("Unable to update status for %s (rv = %s): %v" , nameForLog , cj .ResourceVersion , err )
367
366
}
368
367
369
368
return
370
369
}
371
370
372
371
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
373
- func deleteJob (sj * batchv1beta1.CronJob , job * batchv1.Job , jc jobControlInterface , recorder record.EventRecorder ) bool {
374
- nameForLog := fmt .Sprintf ("%s/%s" , sj .Namespace , sj .Name )
372
+ func deleteJob (cj * batchv1beta1.CronJob , job * batchv1.Job , jc jobControlInterface , recorder record.EventRecorder ) bool {
373
+ nameForLog := fmt .Sprintf ("%s/%s" , cj .Namespace , cj .Name )
375
374
376
375
// delete the job itself...
377
376
if err := jc .DeleteJob (job .Namespace , job .Name ); err != nil {
378
- recorder .Eventf (sj , v1 .EventTypeWarning , "FailedDelete" , "Deleted job: %v" , err )
377
+ recorder .Eventf (cj , v1 .EventTypeWarning , "FailedDelete" , "Deleted job: %v" , err )
379
378
klog .Errorf ("Error deleting job %s from %s: %v" , job .Name , nameForLog , err )
380
379
return false
381
380
}
382
381
// ... and its reference from active list
383
- deleteFromActiveList (sj , job .ObjectMeta .UID )
384
- recorder .Eventf (sj , v1 .EventTypeNormal , "SuccessfulDelete" , "Deleted job %v" , job .Name )
382
+ deleteFromActiveList (cj , job .ObjectMeta .UID )
383
+ recorder .Eventf (cj , v1 .EventTypeNormal , "SuccessfulDelete" , "Deleted job %v" , job .Name )
385
384
386
385
return true
387
386
}
0 commit comments