60
60
MaxJobBackOff = 360 * time .Second
61
61
)
62
62
63
- type JobController struct {
63
+ // Controller is a controller for Jobs.
64
+ type Controller struct {
64
65
kubeClient clientset.Interface
65
66
podControl controller.PodControlInterface
66
67
@@ -89,7 +90,8 @@ type JobController struct {
89
90
recorder record.EventRecorder
90
91
}
91
92
92
- func NewJobController (podInformer coreinformers.PodInformer , jobInformer batchinformers.JobInformer , kubeClient clientset.Interface ) * JobController {
93
+ // NewController creates a new Controller.
94
+ func NewController (podInformer coreinformers.PodInformer , jobInformer batchinformers.JobInformer , kubeClient clientset.Interface ) * Controller {
93
95
eventBroadcaster := record .NewBroadcaster ()
94
96
eventBroadcaster .StartLogging (klog .Infof )
95
97
eventBroadcaster .StartRecordingToSink (& v1core.EventSinkImpl {Interface : kubeClient .CoreV1 ().Events ("" )})
@@ -98,7 +100,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
98
100
metrics .RegisterMetricAndTrackRateLimiterUsage ("job_controller" , kubeClient .CoreV1 ().RESTClient ().GetRateLimiter ())
99
101
}
100
102
101
- jm := & JobController {
103
+ jm := & Controller {
102
104
kubeClient : kubeClient ,
103
105
podControl : controller.RealPodControl {
104
106
KubeClient : kubeClient ,
@@ -136,7 +138,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
136
138
}
137
139
138
140
// Run the main goroutine responsible for watching and syncing jobs.
139
- func (jm * JobController ) Run (workers int , stopCh <- chan struct {}) {
141
+ func (jm * Controller ) Run (workers int , stopCh <- chan struct {}) {
140
142
defer utilruntime .HandleCrash ()
141
143
defer jm .queue .ShutDown ()
142
144
@@ -155,7 +157,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
155
157
}
156
158
157
159
// getPodJobs returns a list of Jobs that potentially match a Pod.
158
- func (jm * JobController ) getPodJobs (pod * v1.Pod ) []* batch.Job {
160
+ func (jm * Controller ) getPodJobs (pod * v1.Pod ) []* batch.Job {
159
161
jobs , err := jm .jobLister .GetPodJobs (pod )
160
162
if err != nil {
161
163
return nil
@@ -175,7 +177,7 @@ func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
175
177
// resolveControllerRef returns the controller referenced by a ControllerRef,
176
178
// or nil if the ControllerRef could not be resolved to a matching controller
177
179
// of the correct Kind.
178
- func (jm * JobController ) resolveControllerRef (namespace string , controllerRef * metav1.OwnerReference ) * batch.Job {
180
+ func (jm * Controller ) resolveControllerRef (namespace string , controllerRef * metav1.OwnerReference ) * batch.Job {
179
181
// We can't look up by UID, so look up by Name and then verify UID.
180
182
// Don't even try to look up by Name if it's the wrong Kind.
181
183
if controllerRef .Kind != controllerKind .Kind {
@@ -194,7 +196,7 @@ func (jm *JobController) resolveControllerRef(namespace string, controllerRef *m
194
196
}
195
197
196
198
// When a pod is created, enqueue the controller that manages it and update it's expectations.
197
- func (jm * JobController ) addPod (obj interface {}) {
199
+ func (jm * Controller ) addPod (obj interface {}) {
198
200
pod := obj .(* v1.Pod )
199
201
if pod .DeletionTimestamp != nil {
200
202
// on a restart of the controller controller, it's possible a new pod shows up in a state that
@@ -230,7 +232,7 @@ func (jm *JobController) addPod(obj interface{}) {
230
232
// When a pod is updated, figure out what job/s manage it and wake them up.
231
233
// If the labels of the pod have changed we need to awaken both the old
232
234
// and new job. old and cur must be *v1.Pod types.
233
- func (jm * JobController ) updatePod (old , cur interface {}) {
235
+ func (jm * Controller ) updatePod (old , cur interface {}) {
234
236
curPod := cur .(* v1.Pod )
235
237
oldPod := old .(* v1.Pod )
236
238
if curPod .ResourceVersion == oldPod .ResourceVersion {
@@ -282,7 +284,7 @@ func (jm *JobController) updatePod(old, cur interface{}) {
282
284
283
285
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
284
286
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
285
- func (jm * JobController ) deletePod (obj interface {}) {
287
+ func (jm * Controller ) deletePod (obj interface {}) {
286
288
pod , ok := obj .(* v1.Pod )
287
289
288
290
// When a delete is dropped, the relist will notice a pod in the store not
@@ -319,7 +321,7 @@ func (jm *JobController) deletePod(obj interface{}) {
319
321
jm .enqueueController (job , true )
320
322
}
321
323
322
- func (jm * JobController ) updateJob (old , cur interface {}) {
324
+ func (jm * Controller ) updateJob (old , cur interface {}) {
323
325
oldJob := old .(* batch.Job )
324
326
curJob := cur .(* batch.Job )
325
327
@@ -351,7 +353,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
351
353
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
352
354
// immediate tells the controller to update the status right away, and should
353
355
// happen ONLY when there was a successful pod run.
354
- func (jm * JobController ) enqueueController (obj interface {}, immediate bool ) {
356
+ func (jm * Controller ) enqueueController (obj interface {}, immediate bool ) {
355
357
key , err := controller .KeyFunc (obj )
356
358
if err != nil {
357
359
utilruntime .HandleError (fmt .Errorf ("Couldn't get key for object %+v: %v" , obj , err ))
@@ -374,12 +376,12 @@ func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
374
376
375
377
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
376
378
// It enforces that the syncHandler is never invoked concurrently with the same key.
377
- func (jm * JobController ) worker () {
379
+ func (jm * Controller ) worker () {
378
380
for jm .processNextWorkItem () {
379
381
}
380
382
}
381
383
382
- func (jm * JobController ) processNextWorkItem () bool {
384
+ func (jm * Controller ) processNextWorkItem () bool {
383
385
key , quit := jm .queue .Get ()
384
386
if quit {
385
387
return false
@@ -403,7 +405,7 @@ func (jm *JobController) processNextWorkItem() bool {
403
405
// getPodsForJob returns the set of pods that this Job should manage.
404
406
// It also reconciles ControllerRef by adopting/orphaning.
405
407
// Note that the returned Pods are pointers into the cache.
406
- func (jm * JobController ) getPodsForJob (j * batch.Job ) ([]* v1.Pod , error ) {
408
+ func (jm * Controller ) getPodsForJob (j * batch.Job ) ([]* v1.Pod , error ) {
407
409
selector , err := metav1 .LabelSelectorAsSelector (j .Spec .Selector )
408
410
if err != nil {
409
411
return nil , fmt .Errorf ("couldn't convert Job selector: %v" , err )
@@ -433,7 +435,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
433
435
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
434
436
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
435
437
// concurrently with the same key.
436
- func (jm * JobController ) syncJob (key string ) (bool , error ) {
438
+ func (jm * Controller ) syncJob (key string ) (bool , error ) {
437
439
startTime := time .Now ()
438
440
defer func () {
439
441
klog .V (4 ).Infof ("Finished syncing job %q (%v)" , key , time .Since (startTime ))
@@ -599,7 +601,7 @@ func (jm *JobController) syncJob(key string) (bool, error) {
599
601
return forget , manageJobErr
600
602
}
601
603
602
- func (jm * JobController ) deleteJobPods (job * batch.Job , pods []* v1.Pod , errCh chan <- error ) {
604
+ func (jm * Controller ) deleteJobPods (job * batch.Job , pods []* v1.Pod , errCh chan <- error ) {
603
605
// TODO: below code should be replaced with pod termination resulting in
604
606
// pod failures, rather than killing pods. Unfortunately none such solution
605
607
// exists ATM. There's an open discussion in the topic in
@@ -681,7 +683,7 @@ func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
681
683
// manageJob is the core method responsible for managing the number of running
682
684
// pods according to what is specified in the job.Spec.
683
685
// Does NOT modify <activePods>.
684
- func (jm * JobController ) manageJob (activePods []* v1.Pod , succeeded int32 , job * batch.Job ) (int32 , error ) {
686
+ func (jm * Controller ) manageJob (activePods []* v1.Pod , succeeded int32 , job * batch.Job ) (int32 , error ) {
685
687
var activeLock sync.Mutex
686
688
active := int32 (len (activePods ))
687
689
parallelism := * job .Spec .Parallelism
@@ -820,7 +822,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
820
822
return active , nil
821
823
}
822
824
823
- func (jm * JobController ) updateJobStatus (job * batch.Job ) error {
825
+ func (jm * Controller ) updateJobStatus (job * batch.Job ) error {
824
826
jobClient := jm .kubeClient .BatchV1 ().Jobs (job .Namespace )
825
827
var err error
826
828
for i := 0 ; i <= statusUpdateRetries ; i = i + 1 {
0 commit comments