@@ -113,7 +113,7 @@ type Controller struct {
113
113
queue workqueue.TypedRateLimitingInterface [string ]
114
114
115
115
// Orphan deleted pods that still have a Job tracking finalizer to be removed
116
- orphanQueue workqueue.TypedRateLimitingInterface [string ]
116
+ orphanQueue workqueue.TypedRateLimitingInterface [orphanPodKey ]
117
117
118
118
broadcaster record.EventBroadcaster
119
119
recorder record.EventRecorder
@@ -143,6 +143,23 @@ type syncJobCtx struct {
143
143
ready int32
144
144
}
145
145
146
+ type orphanPodKeyKind int
147
+
148
+ const (
149
+ // "key"
150
+ OrphanPodKeyKindName orphanPodKeyKind = iota
151
+ // "selector"
152
+ OrphanPodKeyKindSelector
153
+ )
154
+
155
+ type orphanPodKey struct {
156
+ // Either "name" or "selector"
157
+ kind orphanPodKeyKind
158
+ namespace string
159
+ // Either "pod name" or "pod selector"
160
+ value string
161
+ }
162
+
146
163
// NewController creates a new Job controller that keeps the relevant pods
147
164
// in sync with their corresponding Job objects.
148
165
func NewController (ctx context.Context , podInformer coreinformers.PodInformer , jobInformer batchinformers.JobInformer , kubeClient clientset.Interface ) (* Controller , error ) {
@@ -162,7 +179,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
162
179
expectations : controller .NewControllerExpectations (),
163
180
finalizerExpectations : newUIDTrackingExpectations (),
164
181
queue : workqueue .NewTypedRateLimitingQueueWithConfig (workqueue .NewTypedItemExponentialFailureRateLimiter [string ](DefaultJobApiBackOff , MaxJobApiBackOff ), workqueue.TypedRateLimitingQueueConfig [string ]{Name : "job" , Clock : clock }),
165
- orphanQueue : workqueue .NewTypedRateLimitingQueueWithConfig (workqueue .NewTypedItemExponentialFailureRateLimiter [string ](DefaultJobApiBackOff , MaxJobApiBackOff ), workqueue.TypedRateLimitingQueueConfig [string ]{Name : "job_orphan_pod" , Clock : clock }),
182
+ orphanQueue : workqueue .NewTypedRateLimitingQueueWithConfig (workqueue .NewTypedItemExponentialFailureRateLimiter [orphanPodKey ](DefaultJobApiBackOff , MaxJobApiBackOff ), workqueue.TypedRateLimitingQueueConfig [orphanPodKey ]{Name : "job_orphan_pod" , Clock : clock }),
166
183
broadcaster : eventBroadcaster ,
167
184
recorder : eventBroadcaster .NewRecorder (scheme .Scheme , v1.EventSource {Component : "job-controller" }),
168
185
clock : clock ,
@@ -513,7 +530,17 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
513
530
return
514
531
}
515
532
}
516
- jm .cleanupPodFinalizers (jobObj )
533
+ selector , err := metav1 .LabelSelectorAsSelector (jobObj .Spec .Selector )
534
+ if err != nil {
535
+ utilruntime .HandleError (fmt .Errorf ("job %s/%s has invalid label selector: %w" , jobObj .Namespace , jobObj .Name , err ))
536
+ return
537
+ }
538
+ orphanPodKey := orphanPodKey {
539
+ kind : OrphanPodKeyKindSelector ,
540
+ namespace : jobObj .Namespace ,
541
+ value : selector .String (),
542
+ }
543
+ jm .orphanQueue .Add (orphanPodKey )
517
544
}
518
545
519
546
// enqueueSyncJobImmediately tells the Job controller to invoke syncJob
@@ -563,12 +590,12 @@ func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}
563
590
}
564
591
565
592
func (jm * Controller ) enqueueOrphanPod (obj * v1.Pod ) {
566
- key , err := controller . KeyFunc ( obj )
567
- if err != nil {
568
- utilruntime . HandleError ( fmt . Errorf ( "couldn't get key for object %+v: %v" , obj , err ))
569
- return
593
+ orphanPodKey := orphanPodKey {
594
+ kind : OrphanPodKeyKindName ,
595
+ namespace : obj . Namespace ,
596
+ value : obj . Name ,
570
597
}
571
- jm .orphanQueue .Add (key )
598
+ jm .orphanQueue .Add (orphanPodKey )
572
599
}
573
600
574
601
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@@ -620,37 +647,70 @@ func (jm *Controller) processNextOrphanPod(ctx context.Context) bool {
620
647
}
621
648
622
649
// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
623
- func (jm * Controller ) syncOrphanPod (ctx context.Context , key string ) error {
650
+ func (jm * Controller ) syncOrphanPod (ctx context.Context , key orphanPodKey ) error {
624
651
startTime := jm .clock .Now ()
625
652
logger := klog .FromContext (ctx )
626
653
defer func () {
627
654
logger .V (4 ).Info ("Finished syncing orphan pod" , "pod" , key , "elapsed" , jm .clock .Since (startTime ))
628
655
}()
629
656
630
- ns , name , err := cache .SplitMetaNamespaceKey (key )
657
+ switch key .kind {
658
+ case OrphanPodKeyKindName :
659
+ pod , err := jm .podStore .Pods (key .namespace ).Get (key .value )
660
+ if err != nil {
661
+ if apierrors .IsNotFound (err ) {
662
+ logger .V (4 ).Info ("Orphan pod has been deleted" , "pod" , klog .KRef (key .namespace , key .value ))
663
+ return nil
664
+ }
665
+ return err
666
+ }
667
+ return jm .handleSingleOrphanPod (ctx , pod )
668
+ case OrphanPodKeyKindSelector :
669
+ logger .V (8 ).Info ("syncing all pods matching the label selector" , "namespace" , key .namespace , "labelSelector" , key .value )
670
+ return jm .syncOrphanPodsBySelector (ctx , key .namespace , key .value )
671
+ default :
672
+ return fmt .Errorf ("unknown key type: %d" , key .kind )
673
+ }
674
+ }
675
+
676
+ // syncOrphanPodsBySelector fetches and processes all pods matching the given label selector.
677
+ func (jm * Controller ) syncOrphanPodsBySelector (ctx context.Context , namespace string , labelSelector string ) error {
678
+ logger := klog .FromContext (ctx )
679
+ selector , err := labels .Parse (labelSelector )
631
680
if err != nil {
632
- return err
681
+ return fmt . Errorf ( "invalid label selector: %w" , err )
633
682
}
634
683
635
- sharedPod , err := jm .podStore .Pods (ns ).Get (name )
684
+ // Fetch all pods that match the label selector.
685
+ // relatively expensive operation but it is called only from the orphan reconciler
686
+ pods , err := jm .podStore .Pods (namespace ).List (selector )
636
687
if err != nil {
637
- if apierrors .IsNotFound (err ) {
638
- logger .V (4 ).Info ("Orphan pod has been deleted" , "pod" , key )
639
- return nil
640
- }
641
688
return err
642
689
}
690
+ for _ , pod := range pods {
691
+ if err := jm .handleSingleOrphanPod (ctx , pod ); err != nil {
692
+ logger .Error (err , "syncing orphan pod failed" , "pod" , klog .KObj (pod ))
693
+ }
694
+ }
695
+ return nil
696
+ }
697
+
698
+ // handleSingleOrphanPod processes a single orphan pod.
699
+ func (jm * Controller ) handleSingleOrphanPod (ctx context.Context , sharedPod * v1.Pod ) error {
700
+ logger := klog .FromContext (ctx )
701
+ ns := sharedPod .Namespace
702
+ name := sharedPod .Name
643
703
// Make sure the pod is still orphaned.
644
704
if controllerRef := metav1 .GetControllerOf (sharedPod ); controllerRef != nil {
645
705
if controllerRef .Kind != controllerKind .Kind || controllerRef .APIVersion != batch .SchemeGroupVersion .String () {
646
706
// The pod is controlled by an owner that is not a batch/v1 Job. Do not remove finalizer.
647
707
return nil
648
708
}
649
- job := jm .resolveControllerRef (sharedPod . Namespace , controllerRef )
709
+ job := jm .resolveControllerRef (ns , controllerRef )
650
710
if job != nil {
651
711
// Skip cleanup of finalizers for pods owned by a job managed by an external controller
652
712
if controllerName := managedByExternalController (job ); controllerName != nil {
653
- logger .V (2 ).Info ("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller" , "key " , key , "podUID" , sharedPod .UID , "jobUID" , job .UID , "controllerName" , controllerName )
713
+ logger .V (2 ).Info ("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller" , "namespace " , ns , "name" , name , "podUID" , sharedPod .UID , "jobUID" , job .UID , "controllerName" , controllerName )
654
714
return nil
655
715
}
656
716
}
0 commit comments