@@ -55,10 +55,21 @@ const (
55
55
56
56
// podConditionUpdater updates the condition of a pod based on the passed
57
57
// PodCondition
58
+ // TODO (ahmad-diaa): Remove type and replace it with scheduler methods
58
59
type podConditionUpdater interface {
59
60
update (pod * v1.Pod , podCondition * v1.PodCondition ) error
60
61
}
61
62
63
+ // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
64
+ // field of the preemptor pod.
65
+ // TODO (ahmad-diaa): Remove type and replace it with scheduler methods
66
+ type podPreemptor interface {
67
+ getUpdatedPod (pod * v1.Pod ) (* v1.Pod , error )
68
+ deletePod (pod * v1.Pod ) error
69
+ setNominatedNodeName (pod * v1.Pod , nominatedNode string ) error
70
+ removeNominatedNodeName (pod * v1.Pod ) error
71
+ }
72
+
62
73
// Scheduler watches for new unscheduled pods. It attempts to find
63
74
// nodes that they fit on and writes bindings back to the api server.
64
75
type Scheduler struct {
@@ -74,7 +85,7 @@ type Scheduler struct {
74
85
podConditionUpdater podConditionUpdater
75
86
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
76
87
// the preemptor pod.
77
- PodPreemptor factory. PodPreemptor
88
+ podPreemptor podPreemptor
78
89
// Framework runs scheduler plugins at configured extension points.
79
90
Framework framework.Framework
80
91
@@ -344,6 +355,8 @@ func New(client clientset.Interface,
344
355
// Create the scheduler.
345
356
sched := NewFromConfig (config )
346
357
sched .podConditionUpdater = & podConditionUpdaterImpl {client }
358
+ sched .podPreemptor = & podPreemptorImpl {client }
359
+
347
360
AddAllEventHandlers (sched , options .schedulerName , informerFactory , podInformer )
348
361
return sched , nil
349
362
}
@@ -391,7 +404,6 @@ func NewFromConfig(config *factory.Config) *Scheduler {
391
404
SchedulerCache : config .SchedulerCache ,
392
405
Algorithm : config .Algorithm ,
393
406
GetBinder : config .GetBinder ,
394
- PodPreemptor : config .PodPreemptor ,
395
407
Framework : config .Framework ,
396
408
NextPod : config .NextPod ,
397
409
WaitForCacheSync : config .WaitForCacheSync ,
@@ -434,7 +446,7 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err
434
446
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
435
447
// It returns the node name and an error if any.
436
448
func (sched * Scheduler ) preempt (state * framework.CycleState , fwk framework.Framework , preemptor * v1.Pod , scheduleErr error ) (string , error ) {
437
- preemptor , err := sched .PodPreemptor . GetUpdatedPod (preemptor )
449
+ preemptor , err := sched .podPreemptor . getUpdatedPod (preemptor )
438
450
if err != nil {
439
451
klog .Errorf ("Error getting the updated preemptor pod object: %v" , err )
440
452
return "" , err
@@ -454,15 +466,15 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
454
466
sched .SchedulingQueue .UpdateNominatedPodForNode (preemptor , nodeName )
455
467
456
468
// Make a call to update nominated node name of the pod on the API server.
457
- err = sched .PodPreemptor . SetNominatedNodeName (preemptor , nodeName )
469
+ err = sched .podPreemptor . setNominatedNodeName (preemptor , nodeName )
458
470
if err != nil {
459
471
klog .Errorf ("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v" , preemptor .Namespace , preemptor .Name , err )
460
472
sched .SchedulingQueue .DeleteNominatedPodIfExists (preemptor )
461
473
return "" , err
462
474
}
463
475
464
476
for _ , victim := range victims {
465
- if err := sched .PodPreemptor . DeletePod (victim ); err != nil {
477
+ if err := sched .podPreemptor . deletePod (victim ); err != nil {
466
478
klog .Errorf ("Error preempting pod %v/%v: %v" , victim .Namespace , victim .Name , err )
467
479
return "" , err
468
480
}
@@ -481,7 +493,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
481
493
// function of generic_scheduler.go returns the pod itself for removal of
482
494
// the 'NominatedPod' field.
483
495
for _ , p := range nominatedPodsToClear {
484
- rErr := sched .PodPreemptor . RemoveNominatedNodeName (p )
496
+ rErr := sched .podPreemptor . removeNominatedNodeName (p )
485
497
if rErr != nil {
486
498
klog .Errorf ("Cannot remove 'NominatedPod' field of pod: %v" , rErr )
487
499
// We do not return as this error is not critical.
@@ -756,6 +768,32 @@ func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition
756
768
return nil
757
769
}
758
770
771
+ type podPreemptorImpl struct {
772
+ Client clientset.Interface
773
+ }
774
+
775
+ func (p * podPreemptorImpl ) getUpdatedPod (pod * v1.Pod ) (* v1.Pod , error ) {
776
+ return p .Client .CoreV1 ().Pods (pod .Namespace ).Get (pod .Name , metav1.GetOptions {})
777
+ }
778
+
779
+ func (p * podPreemptorImpl ) deletePod (pod * v1.Pod ) error {
780
+ return p .Client .CoreV1 ().Pods (pod .Namespace ).Delete (pod .Name , & metav1.DeleteOptions {})
781
+ }
782
+
783
+ func (p * podPreemptorImpl ) setNominatedNodeName (pod * v1.Pod , nominatedNodeName string ) error {
784
+ podCopy := pod .DeepCopy ()
785
+ podCopy .Status .NominatedNodeName = nominatedNodeName
786
+ _ , err := p .Client .CoreV1 ().Pods (pod .Namespace ).UpdateStatus (podCopy )
787
+ return err
788
+ }
789
+
790
+ func (p * podPreemptorImpl ) removeNominatedNodeName (pod * v1.Pod ) error {
791
+ if len (pod .Status .NominatedNodeName ) == 0 {
792
+ return nil
793
+ }
794
+ return p .setNominatedNodeName (pod , "" )
795
+ }
796
+
759
797
// nodeResourceString returns a string representation of node resources.
760
798
func nodeResourceString (n * v1.Node ) string {
761
799
if n == nil {
0 commit comments