61
61
MaxJobBackOff = 360 * time .Second
62
62
)
63
63
64
- type JobController struct {
64
+ // Controller ensures that all Job objects have corresponding pods to
65
+ // run their configured workload.
66
+ type Controller struct {
65
67
kubeClient clientset.Interface
66
68
podControl controller.PodControlInterface
67
69
@@ -90,7 +92,9 @@ type JobController struct {
90
92
recorder record.EventRecorder
91
93
}
92
94
93
- func NewJobController (podInformer coreinformers.PodInformer , jobInformer batchinformers.JobInformer , kubeClient clientset.Interface ) * JobController {
95
+ // NewController creates a new Job controller that keeps the relevant pods
96
+ // in sync with their corresponding Job objects.
97
+ func NewController (podInformer coreinformers.PodInformer , jobInformer batchinformers.JobInformer , kubeClient clientset.Interface ) * Controller {
94
98
eventBroadcaster := record .NewBroadcaster ()
95
99
eventBroadcaster .StartLogging (klog .Infof )
96
100
eventBroadcaster .StartRecordingToSink (& v1core.EventSinkImpl {Interface : kubeClient .CoreV1 ().Events ("" )})
@@ -99,7 +103,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
99
103
ratelimiter .RegisterMetricAndTrackRateLimiterUsage ("job_controller" , kubeClient .CoreV1 ().RESTClient ().GetRateLimiter ())
100
104
}
101
105
102
- jm := & JobController {
106
+ jm := & Controller {
103
107
kubeClient : kubeClient ,
104
108
podControl : controller.RealPodControl {
105
109
KubeClient : kubeClient ,
@@ -137,7 +141,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
137
141
}
138
142
139
143
// Run the main goroutine responsible for watching and syncing jobs.
140
- func (jm * JobController ) Run (workers int , stopCh <- chan struct {}) {
144
+ func (jm * Controller ) Run (workers int , stopCh <- chan struct {}) {
141
145
defer utilruntime .HandleCrash ()
142
146
defer jm .queue .ShutDown ()
143
147
@@ -156,7 +160,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
156
160
}
157
161
158
162
// getPodJobs returns a list of Jobs that potentially match a Pod.
159
- func (jm * JobController ) getPodJobs (pod * v1.Pod ) []* batch.Job {
163
+ func (jm * Controller ) getPodJobs (pod * v1.Pod ) []* batch.Job {
160
164
jobs , err := jm .jobLister .GetPodJobs (pod )
161
165
if err != nil {
162
166
return nil
@@ -176,7 +180,7 @@ func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
176
180
// resolveControllerRef returns the controller referenced by a ControllerRef,
177
181
// or nil if the ControllerRef could not be resolved to a matching controller
178
182
// of the correct Kind.
179
- func (jm * JobController ) resolveControllerRef (namespace string , controllerRef * metav1.OwnerReference ) * batch.Job {
183
+ func (jm * Controller ) resolveControllerRef (namespace string , controllerRef * metav1.OwnerReference ) * batch.Job {
180
184
// We can't look up by UID, so look up by Name and then verify UID.
181
185
// Don't even try to look up by Name if it's the wrong Kind.
182
186
if controllerRef .Kind != controllerKind .Kind {
@@ -195,7 +199,7 @@ func (jm *JobController) resolveControllerRef(namespace string, controllerRef *m
195
199
}
196
200
197
201
// When a pod is created, enqueue the controller that manages it and update it's expectations.
198
- func (jm * JobController ) addPod (obj interface {}) {
202
+ func (jm * Controller ) addPod (obj interface {}) {
199
203
pod := obj .(* v1.Pod )
200
204
if pod .DeletionTimestamp != nil {
201
205
// on a restart of the controller controller, it's possible a new pod shows up in a state that
@@ -231,7 +235,7 @@ func (jm *JobController) addPod(obj interface{}) {
231
235
// When a pod is updated, figure out what job/s manage it and wake them up.
232
236
// If the labels of the pod have changed we need to awaken both the old
233
237
// and new job. old and cur must be *v1.Pod types.
234
- func (jm * JobController ) updatePod (old , cur interface {}) {
238
+ func (jm * Controller ) updatePod (old , cur interface {}) {
235
239
curPod := cur .(* v1.Pod )
236
240
oldPod := old .(* v1.Pod )
237
241
if curPod .ResourceVersion == oldPod .ResourceVersion {
@@ -283,7 +287,7 @@ func (jm *JobController) updatePod(old, cur interface{}) {
283
287
284
288
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
285
289
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
286
- func (jm * JobController ) deletePod (obj interface {}) {
290
+ func (jm * Controller ) deletePod (obj interface {}) {
287
291
pod , ok := obj .(* v1.Pod )
288
292
289
293
// When a delete is dropped, the relist will notice a pod in the store not
@@ -320,7 +324,7 @@ func (jm *JobController) deletePod(obj interface{}) {
320
324
jm .enqueueController (job , true )
321
325
}
322
326
323
- func (jm * JobController ) updateJob (old , cur interface {}) {
327
+ func (jm * Controller ) updateJob (old , cur interface {}) {
324
328
oldJob := old .(* batch.Job )
325
329
curJob := cur .(* batch.Job )
326
330
@@ -352,7 +356,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
352
356
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
353
357
// immediate tells the controller to update the status right away, and should
354
358
// happen ONLY when there was a successful pod run.
355
- func (jm * JobController ) enqueueController (obj interface {}, immediate bool ) {
359
+ func (jm * Controller ) enqueueController (obj interface {}, immediate bool ) {
356
360
key , err := controller .KeyFunc (obj )
357
361
if err != nil {
358
362
utilruntime .HandleError (fmt .Errorf ("Couldn't get key for object %+v: %v" , obj , err ))
@@ -375,12 +379,12 @@ func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
375
379
376
380
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
377
381
// It enforces that the syncHandler is never invoked concurrently with the same key.
378
- func (jm * JobController ) worker () {
382
+ func (jm * Controller ) worker () {
379
383
for jm .processNextWorkItem () {
380
384
}
381
385
}
382
386
383
- func (jm * JobController ) processNextWorkItem () bool {
387
+ func (jm * Controller ) processNextWorkItem () bool {
384
388
key , quit := jm .queue .Get ()
385
389
if quit {
386
390
return false
@@ -404,7 +408,7 @@ func (jm *JobController) processNextWorkItem() bool {
404
408
// getPodsForJob returns the set of pods that this Job should manage.
405
409
// It also reconciles ControllerRef by adopting/orphaning.
406
410
// Note that the returned Pods are pointers into the cache.
407
- func (jm * JobController ) getPodsForJob (j * batch.Job ) ([]* v1.Pod , error ) {
411
+ func (jm * Controller ) getPodsForJob (j * batch.Job ) ([]* v1.Pod , error ) {
408
412
selector , err := metav1 .LabelSelectorAsSelector (j .Spec .Selector )
409
413
if err != nil {
410
414
return nil , fmt .Errorf ("couldn't convert Job selector: %v" , err )
@@ -434,7 +438,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
434
438
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
435
439
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
436
440
// concurrently with the same key.
437
- func (jm * JobController ) syncJob (key string ) (bool , error ) {
441
+ func (jm * Controller ) syncJob (key string ) (bool , error ) {
438
442
startTime := time .Now ()
439
443
defer func () {
440
444
klog .V (4 ).Infof ("Finished syncing job %q (%v)" , key , time .Since (startTime ))
@@ -601,7 +605,7 @@ func (jm *JobController) syncJob(key string) (bool, error) {
601
605
return forget , manageJobErr
602
606
}
603
607
604
- func (jm * JobController ) deleteJobPods (job * batch.Job , pods []* v1.Pod , errCh chan <- error ) {
608
+ func (jm * Controller ) deleteJobPods (job * batch.Job , pods []* v1.Pod , errCh chan <- error ) {
605
609
// TODO: below code should be replaced with pod termination resulting in
606
610
// pod failures, rather than killing pods. Unfortunately none such solution
607
611
// exists ATM. There's an open discussion in the topic in
@@ -683,7 +687,7 @@ func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
683
687
// manageJob is the core method responsible for managing the number of running
684
688
// pods according to what is specified in the job.Spec.
685
689
// Does NOT modify <activePods>.
686
- func (jm * JobController ) manageJob (activePods []* v1.Pod , succeeded int32 , job * batch.Job ) (int32 , error ) {
690
+ func (jm * Controller ) manageJob (activePods []* v1.Pod , succeeded int32 , job * batch.Job ) (int32 , error ) {
687
691
var activeLock sync.Mutex
688
692
active := int32 (len (activePods ))
689
693
parallelism := * job .Spec .Parallelism
@@ -822,7 +826,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
822
826
return active , nil
823
827
}
824
828
825
- func (jm * JobController ) updateJobStatus (job * batch.Job ) error {
829
+ func (jm * Controller ) updateJobStatus (job * batch.Job ) error {
826
830
jobClient := jm .kubeClient .BatchV1 ().Jobs (job .Namespace )
827
831
var err error
828
832
for i := 0 ; i <= statusUpdateRetries ; i = i + 1 {
0 commit comments