Skip to content

Commit 2185064

Browse files
authored
Merge pull request kubernetes#90356 from Huang-Wei/prefactor-preemption-nompods-interface
Extract logic related with scheduler nominatedPods to an interface
2 parents 1e2ddd1 + bd18403 commit 2185064

File tree

9 files changed

+128
-78
lines changed

9 files changed

+128
-78
lines changed

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
979979
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
980980
t.Fatal(err)
981981
}
982-
scheduler.schedulingQueue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
982+
scheduler.schedulingQueue.AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
983983

984984
_, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod)
985985

pkg/scheduler/factory.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,21 @@ type Configurator struct {
9999
frameworkCapturer FrameworkCapturer
100100
}
101101

102-
func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
102+
func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile, opts ...framework.Option) (framework.Framework, error) {
103103
if c.frameworkCapturer != nil {
104104
c.frameworkCapturer(p)
105105
}
106-
return framework.NewFramework(
107-
c.registry,
108-
p.Plugins,
109-
p.PluginConfig,
106+
opts = append([]framework.Option{
110107
framework.WithClientSet(c.client),
111108
framework.WithInformerFactory(c.informerFactory),
112109
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
113110
framework.WithRunAllFilters(c.alwaysCheckAllPredicates),
111+
}, opts...)
112+
return framework.NewFramework(
113+
c.registry,
114+
p.Plugins,
115+
p.PluginConfig,
116+
opts...,
114117
)
115118
}
116119

@@ -159,7 +162,10 @@ func (c *Configurator) create() (*Scheduler, error) {
159162
}
160163
}
161164

162-
profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory)
165+
// The nominator will be passed all the way to framework instantiation.
166+
nominator := internalqueue.NewPodNominator()
167+
profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory,
168+
framework.WithPodNominator(nominator))
163169
if err != nil {
164170
return nil, fmt.Errorf("initializing profiles: %v", err)
165171
}
@@ -172,6 +178,7 @@ func (c *Configurator) create() (*Scheduler, error) {
172178
lessFn,
173179
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
174180
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
181+
internalqueue.WithPodNominator(nominator),
175182
)
176183

177184
// Setup cache debugger.

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type framework struct {
8383

8484
metricsRecorder *metricsRecorder
8585

86+
preemptHandle PreemptHandle
87+
8688
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
8789
// after the first failure.
8890
runAllFilters bool
@@ -121,6 +123,7 @@ type frameworkOptions struct {
121123
snapshotSharedLister SharedLister
122124
metricsRecorder *metricsRecorder
123125
volumeBinder scheduling.SchedulerVolumeBinder
126+
podNominator PodNominator
124127
runAllFilters bool
125128
}
126129

@@ -170,6 +173,13 @@ func WithVolumeBinder(binder scheduling.SchedulerVolumeBinder) Option {
170173
}
171174
}
172175

176+
// WithPodNominator sets podNominator for the scheduling framework.
177+
func WithPodNominator(nominator PodNominator) Option {
178+
return func(o *frameworkOptions) {
179+
o.podNominator = nominator
180+
}
181+
}
182+
173183
var defaultFrameworkOptions = frameworkOptions{
174184
metricsRecorder: newMetricsRecorder(1000, time.Second),
175185
}
@@ -192,6 +202,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
192202
informerFactory: options.informerFactory,
193203
volumeBinder: options.volumeBinder,
194204
metricsRecorder: options.metricsRecorder,
205+
preemptHandle: options.podNominator,
195206
runAllFilters: options.runAllFilters,
196207
}
197208
if plugins == nil {

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,3 +495,19 @@ type FrameworkHandle interface {
495495
// VolumeBinder returns the volume binder used by scheduler.
496496
VolumeBinder() scheduling.SchedulerVolumeBinder
497497
}
498+
499+
// PreemptHandle incorporates all needed logic to run preemption logic.
500+
type PreemptHandle interface {
501+
PodNominator
502+
}
503+
504+
// PodNominator abstracts operations to maintain nominated Pods.
505+
type PodNominator interface {
506+
// AddNominatedPod adds the given pod to the nominated pod map or
507+
// updates it if it already exists.
508+
AddNominatedPod(pod *v1.Pod, nodeName string)
509+
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist.
510+
DeleteNominatedPodIfExists(pod *v1.Pod)
511+
// UpdateNominatedPod updates the <oldPod> with <newPod>.
512+
UpdateNominatedPod(oldPod, newPod *v1.Pod)
513+
}

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ const (
6565
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
6666
// makes it easy to use those data structures as a SchedulingQueue.
6767
type SchedulingQueue interface {
68+
framework.PodNominator
6869
Add(pod *v1.Pod) error
6970
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
7071
// The podSchedulingCycle represents the current scheduling cycle number which can be
@@ -87,11 +88,6 @@ type SchedulingQueue interface {
8788
// Close closes the SchedulingQueue so that the goroutine which is
8889
// waiting to pop items can exit gracefully.
8990
Close()
90-
// UpdateNominatedPodForNode adds the given pod to the nominated pod map or
91-
// updates it if it already exists.
92-
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
93-
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
94-
DeleteNominatedPodIfExists(pod *v1.Pod)
9591
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
9692
NumUnschedulablePods() int
9793
// Run starts the goroutines managing the queue.
@@ -116,6 +112,9 @@ func NominatedNodeName(pod *v1.Pod) string {
116112
// is called unschedulableQ. The third queue holds pods that are moved from
117113
// unschedulable queues and will be moved to active queue when backoff are completed.
118114
type PriorityQueue struct {
115+
// PodNominator abstracts the operations to maintain nominated Pods.
116+
framework.PodNominator
117+
119118
stop chan struct{}
120119
clock util.Clock
121120

@@ -135,9 +134,6 @@ type PriorityQueue struct {
135134
podBackoffQ *heap.Heap
136135
// unschedulableQ holds pods that have been tried and determined unschedulable.
137136
unschedulableQ *UnschedulablePodsMap
138-
// nominatedPods is a structures that stores pods which are nominated to run
139-
// on nodes.
140-
nominatedPods *nominatedPodMap
141137
// schedulingCycle represents sequence number of scheduling cycle and is incremented
142138
// when a pod is popped.
143139
schedulingCycle int64
@@ -156,6 +152,7 @@ type priorityQueueOptions struct {
156152
clock util.Clock
157153
podInitialBackoffDuration time.Duration
158154
podMaxBackoffDuration time.Duration
155+
podNominator framework.PodNominator
159156
}
160157

161158
// Option configures a PriorityQueue
@@ -168,20 +165,27 @@ func WithClock(clock util.Clock) Option {
168165
}
169166
}
170167

171-
// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue,
168+
// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue.
172169
func WithPodInitialBackoffDuration(duration time.Duration) Option {
173170
return func(o *priorityQueueOptions) {
174171
o.podInitialBackoffDuration = duration
175172
}
176173
}
177174

178-
// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue,
175+
// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue.
179176
func WithPodMaxBackoffDuration(duration time.Duration) Option {
180177
return func(o *priorityQueueOptions) {
181178
o.podMaxBackoffDuration = duration
182179
}
183180
}
184181

182+
// WithPodNominator sets pod nominator for PriorityQueue.
183+
func WithPodNominator(pn framework.PodNominator) Option {
184+
return func(o *priorityQueueOptions) {
185+
o.podNominator = pn
186+
}
187+
}
188+
185189
var defaultPriorityQueueOptions = priorityQueueOptions{
186190
clock: util.RealClock{},
187191
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
@@ -214,14 +218,18 @@ func NewPriorityQueue(
214218
return lessFn(pInfo1, pInfo2)
215219
}
216220

221+
if options.podNominator == nil {
222+
options.podNominator = NewPodNominator()
223+
}
224+
217225
pq := &PriorityQueue{
226+
PodNominator: options.podNominator,
218227
clock: options.clock,
219228
stop: make(chan struct{}),
220229
podInitialBackoffDuration: options.podInitialBackoffDuration,
221230
podMaxBackoffDuration: options.podMaxBackoffDuration,
222231
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
223232
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
224-
nominatedPods: newNominatedPodMap(),
225233
moveRequestCycle: -1,
226234
}
227235
pq.cond.L = &pq.lock
@@ -255,7 +263,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
255263
klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
256264
}
257265
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
258-
p.nominatedPods.add(pod, "")
266+
p.PodNominator.AddNominatedPod(pod, "")
259267
p.cond.Broadcast()
260268

261269
return nil
@@ -316,9 +324,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
316324
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
317325
}
318326

319-
p.nominatedPods.add(pod, "")
327+
p.PodNominator.AddNominatedPod(pod, "")
320328
return nil
321-
322329
}
323330

324331
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
@@ -416,14 +423,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
416423
oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod)
417424
// If the pod is already in the active queue, just update it there.
418425
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
419-
p.nominatedPods.update(oldPod, newPod)
426+
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
420427
err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
421428
return err
422429
}
423430

424431
// If the pod is in the backoff queue, update it there.
425432
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
426-
p.nominatedPods.update(oldPod, newPod)
433+
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
427434
p.podBackoffQ.Delete(oldPodInfo)
428435
err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
429436
if err == nil {
@@ -435,7 +442,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
435442

436443
// If the pod is in the unschedulable queue, updating it may make it schedulable.
437444
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
438-
p.nominatedPods.update(oldPod, newPod)
445+
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
439446
if isPodUpdated(oldPod, newPod) {
440447
p.unschedulableQ.delete(usPodInfo.Pod)
441448
err := p.activeQ.Add(updatePod(usPodInfo, newPod))
@@ -451,7 +458,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
451458
// If pod is not in any of the queues, we put it in the active queue.
452459
err := p.activeQ.Add(p.newQueuedPodInfo(newPod))
453460
if err == nil {
454-
p.nominatedPods.add(newPod, "")
461+
p.PodNominator.AddNominatedPod(newPod, "")
455462
p.cond.Broadcast()
456463
}
457464
return err
@@ -462,7 +469,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
462469
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
463470
p.lock.Lock()
464471
defer p.lock.Unlock()
465-
p.nominatedPods.delete(pod)
472+
p.PodNominator.DeleteNominatedPodIfExists(pod)
466473
err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod))
467474
if err != nil { // The item was probably not found in the activeQ.
468475
p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod))
@@ -553,9 +560,8 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
553560
// but they are waiting for other pods to be removed from the node before they
554561
// can be actually scheduled.
555562
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
556-
p.lock.RLock()
557-
defer p.lock.RUnlock()
558-
return p.nominatedPods.podsForNode(nodeName)
563+
// TODO: make podsForNode() public?
564+
return p.PodNominator.(*nominatedPodMap).podsForNode(nodeName)
559565
}
560566

561567
// PendingPods returns all the pending pods in the queue. This function is
@@ -585,21 +591,21 @@ func (p *PriorityQueue) Close() {
585591
p.cond.Broadcast()
586592
}
587593

588-
// DeleteNominatedPodIfExists deletes pod nominatedPods.
589-
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
590-
p.lock.Lock()
591-
p.nominatedPods.delete(pod)
592-
p.lock.Unlock()
594+
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
595+
func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) {
596+
npm.Lock()
597+
npm.delete(pod)
598+
npm.Unlock()
593599
}
594600

595-
// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
601+
// AddNominatedPod adds a pod to the nominated pods of the given node.
596602
// This is called during the preemption process after a node is nominated to run
597603
// the pod. We update the structure before sending a request to update the pod
598604
// object to avoid races with the following scheduling cycles.
599-
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
600-
p.lock.Lock()
601-
p.nominatedPods.add(pod, nodeName)
602-
p.lock.Unlock()
605+
func (npm *nominatedPodMap) AddNominatedPod(pod *v1.Pod, nodeName string) {
606+
npm.Lock()
607+
npm.add(pod, nodeName)
608+
npm.Unlock()
603609
}
604610

605611
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
@@ -721,6 +727,8 @@ type nominatedPodMap struct {
721727
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
722728
// nominated.
723729
nominatedPodToNode map[ktypes.UID]string
730+
731+
sync.RWMutex
724732
}
725733

726734
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
@@ -762,7 +770,10 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) {
762770
delete(npm.nominatedPodToNode, p.UID)
763771
}
764772

765-
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
773+
// UpdateNominatedPod updates the <oldPod> with <newPod>.
774+
func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) {
775+
npm.Lock()
776+
defer npm.Unlock()
766777
// In some cases, an Update event with no "NominatedNode" present is received right
767778
// after a node("NominatedNode") is reserved for this pod in memory.
768779
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
@@ -784,13 +795,16 @@ func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
784795
}
785796

786797
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
798+
npm.RLock()
799+
defer npm.RUnlock()
787800
if list, ok := npm.nominatedPods[nodeName]; ok {
788801
return list
789802
}
790803
return nil
791804
}
792805

793-
func newNominatedPodMap() *nominatedPodMap {
806+
// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
807+
func NewPodNominator() framework.PodNominator {
794808
return &nominatedPodMap{
795809
nominatedPods: make(map[string][]*v1.Pod),
796810
nominatedPodToNode: make(map[ktypes.UID]string),

0 commit comments

Comments
 (0)