Skip to content

Commit 65f87b5

Browse files
authored
Merge pull request kubernetes#72259 from bsalamat/fix_nominated_node
Fix a race in setting nominated node and the scheduling cycle after it.
2 parents 57c8024 + b75672c commit 65f87b5

File tree

7 files changed

+376
-112
lines changed

7 files changed

+376
-112
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
362362
// worth the complexity, especially because we generally expect to have a very
363363
// small number of nominated pods per node.
364364
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
365-
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
365+
pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
366366

367367
if len(pods) == 0 {
368368
return nil
@@ -509,7 +509,7 @@ func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
509509
// This may happen only in tests.
510510
return false, meta, nodeInfo
511511
}
512-
nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name)
512+
nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
513513
if nominatedPods == nil || len(nominatedPods) == 0 {
514514
return false, meta, nodeInfo
515515
}

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 112 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,14 @@ type SchedulingQueue interface {
6464
MoveAllToActiveQueue()
6565
AssignedPodAdded(pod *v1.Pod)
6666
AssignedPodUpdated(pod *v1.Pod)
67-
WaitingPodsForNode(nodeName string) []*v1.Pod
67+
NominatedPodsForNode(nodeName string) []*v1.Pod
6868
PendingPods() []*v1.Pod
6969
// Close closes the SchedulingQueue so that the goroutine which is
7070
// waiting to pop items can exit gracefully.
7171
Close()
72+
// UpdateNominatedPodForNode adds the given pod to the nominated pod map or
73+
// updates it if it already exists.
74+
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
7275
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
7376
DeleteNominatedPodIfExists(pod *v1.Pod)
7477
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
@@ -152,9 +155,9 @@ func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
152155
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
153156
func (f *FIFO) MoveAllToActiveQueue() {}
154157

155-
// WaitingPodsForNode returns pods that are nominated to run on the given node,
158+
// NominatedPodsForNode returns pods that are nominated to run on the given node,
156159
// but FIFO does not support it.
157-
func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
160+
func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod {
158161
return nil
159162
}
160163

@@ -166,6 +169,9 @@ func (f *FIFO) Close() {
166169
// DeleteNominatedPodIfExists does nothing in FIFO.
167170
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
168171

172+
// UpdateNominatedPodForNode does nothing in FIFO.
173+
func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {}
174+
169175
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
170176
func (f *FIFO) NumUnschedulablePods() int {
171177
return 0
@@ -204,10 +210,9 @@ type PriorityQueue struct {
204210
podBackoffQ *util.Heap
205211
// unschedulableQ holds pods that have been tried and determined unschedulable.
206212
unschedulableQ *UnschedulablePodsMap
207-
// nominatedPods is a map keyed by a node name and the value is a list of
208-
// pods which are nominated to run on the node. These are pods which can be in
209-
// the activeQ or unschedulableQ.
210-
nominatedPods map[string][]*v1.Pod
213+
// nominatedPods is a structures that stores pods which are nominated to run
214+
// on nodes.
215+
nominatedPods *nominatedPodMap
211216
// receivedMoveRequest is set to true whenever we receive a request to move a
212217
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
213218
// a pod from the activeQ. It indicates if we received a move request when a
@@ -257,7 +262,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
257262
podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock),
258263
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
259264
unschedulableQ: newUnschedulablePodsMap(),
260-
nominatedPods: map[string][]*v1.Pod{},
265+
nominatedPods: newNominatedPodMap(),
261266
}
262267
pq.cond.L = &pq.lock
263268
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
@@ -272,49 +277,6 @@ func (p *PriorityQueue) run() {
272277
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
273278
}
274279

275-
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
276-
// already exist in the map. Adding an existing pod is not going to update the pod.
277-
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
278-
nnn := NominatedNodeName(pod)
279-
if len(nnn) <= 0 {
280-
return
281-
}
282-
for _, np := range p.nominatedPods[nnn] {
283-
if np.UID == pod.UID {
284-
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
285-
return
286-
}
287-
}
288-
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
289-
}
290-
291-
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
292-
// NOTE: this function assumes lock has been acquired in caller.
293-
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
294-
nnn := NominatedNodeName(pod)
295-
if len(nnn) <= 0 {
296-
return
297-
}
298-
for i, np := range p.nominatedPods[nnn] {
299-
if np.UID != pod.UID {
300-
continue
301-
}
302-
p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
303-
if len(p.nominatedPods[nnn]) == 0 {
304-
delete(p.nominatedPods, nnn)
305-
}
306-
break
307-
}
308-
}
309-
310-
// updateNominatedPod updates a pod in the nominatedPods.
311-
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
312-
// Even if the nominated node name of the Pod is not changed, we must delete and add it again
313-
// to ensure that its pointer is updated.
314-
p.deleteNominatedPodIfExists(oldPod)
315-
p.addNominatedPodIfNeeded(newPod)
316-
}
317-
318280
// Add adds a pod to the active queue. It should be called only when a new pod
319281
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
320282
func (p *PriorityQueue) Add(pod *v1.Pod) error {
@@ -326,14 +288,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
326288
}
327289
if p.unschedulableQ.get(pod) != nil {
328290
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
329-
p.deleteNominatedPodIfExists(pod)
330291
p.unschedulableQ.delete(pod)
331292
}
332293
// Delete pod from backoffQ if it is backing off
333294
if err := p.podBackoffQ.Delete(pod); err == nil {
334295
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
335296
}
336-
p.addNominatedPodIfNeeded(pod)
297+
p.nominatedPods.add(pod, "")
337298
p.cond.Broadcast()
338299

339300
return nil
@@ -357,7 +318,7 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
357318
if err != nil {
358319
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
359320
} else {
360-
p.addNominatedPodIfNeeded(pod)
321+
p.nominatedPods.add(pod, "")
361322
p.cond.Broadcast()
362323
}
363324
return err
@@ -420,7 +381,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
420381
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
421382
p.backoffPod(pod)
422383
p.unschedulableQ.addOrUpdate(pod)
423-
p.addNominatedPodIfNeeded(pod)
384+
p.nominatedPods.add(pod, "")
424385
return nil
425386
}
426387

@@ -430,14 +391,14 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
430391
if err != nil {
431392
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
432393
} else {
433-
p.addNominatedPodIfNeeded(pod)
394+
p.nominatedPods.add(pod, "")
434395
}
435396
return err
436397
}
437398

438399
err := p.activeQ.Add(pod)
439400
if err == nil {
440-
p.addNominatedPodIfNeeded(pod)
401+
p.nominatedPods.add(pod, "")
441402
p.cond.Broadcast()
442403
}
443404
return err
@@ -523,14 +484,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
523484
if oldPod != nil {
524485
// If the pod is already in the active queue, just update it there.
525486
if _, exists, _ := p.activeQ.Get(oldPod); exists {
526-
p.updateNominatedPod(oldPod, newPod)
487+
p.nominatedPods.update(oldPod, newPod)
527488
err := p.activeQ.Update(newPod)
528489
return err
529490
}
530491

531492
// If the pod is in the backoff queue, update it there.
532493
if _, exists, _ := p.podBackoffQ.Get(oldPod); exists {
533-
p.updateNominatedPod(oldPod, newPod)
494+
p.nominatedPods.update(oldPod, newPod)
534495
p.podBackoffQ.Delete(newPod)
535496
err := p.activeQ.Add(newPod)
536497
if err == nil {
@@ -542,7 +503,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
542503

543504
// If the pod is in the unschedulable queue, updating it may make it schedulable.
544505
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
545-
p.updateNominatedPod(oldPod, newPod)
506+
p.nominatedPods.update(oldPod, newPod)
546507
if isPodUpdated(oldPod, newPod) {
547508
// If the pod is updated reset backoff
548509
p.clearPodBackoff(newPod)
@@ -560,7 +521,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
560521
// If pod is not in any of the two queue, we put it in the active queue.
561522
err := p.activeQ.Add(newPod)
562523
if err == nil {
563-
p.addNominatedPodIfNeeded(newPod)
524+
p.nominatedPods.add(newPod, "")
564525
p.cond.Broadcast()
565526
}
566527
return err
@@ -571,7 +532,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
571532
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
572533
p.lock.Lock()
573534
defer p.lock.Unlock()
574-
p.deleteNominatedPodIfExists(pod)
535+
p.nominatedPods.delete(pod)
575536
err := p.activeQ.Delete(pod)
576537
if err != nil { // The item was probably not found in the activeQ.
577538
p.clearPodBackoff(pod)
@@ -663,16 +624,13 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
663624
return podsToMove
664625
}
665626

666-
// WaitingPodsForNode returns pods that are nominated to run on the given node,
627+
// NominatedPodsForNode returns pods that are nominated to run on the given node,
667628
// but they are waiting for other pods to be removed from the node before they
668629
// can be actually scheduled.
669-
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
630+
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
670631
p.lock.RLock()
671632
defer p.lock.RUnlock()
672-
if list, ok := p.nominatedPods[nodeName]; ok {
673-
return list
674-
}
675-
return nil
633+
return p.nominatedPods.podsForNode(nodeName)
676634
}
677635

678636
// PendingPods returns all the pending pods in the queue. This function is
@@ -702,10 +660,20 @@ func (p *PriorityQueue) Close() {
702660
p.cond.Broadcast()
703661
}
704662

705-
// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
663+
// DeleteNominatedPodIfExists deletes pod nominatedPods.
706664
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
707665
p.lock.Lock()
708-
p.deleteNominatedPodIfExists(pod)
666+
p.nominatedPods.delete(pod)
667+
p.lock.Unlock()
668+
}
669+
670+
// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
671+
// This is called during the preemption process after a node is nominated to run
672+
// the pod. We update the structure before sending a request to update the pod
673+
// object to avoid races with the following scheduling cycles.
674+
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
675+
p.lock.Lock()
676+
p.nominatedPods.add(pod, nodeName)
709677
p.lock.Unlock()
710678
}
711679

@@ -762,3 +730,77 @@ func newUnschedulablePodsMap() *UnschedulablePodsMap {
762730
keyFunc: util.GetPodFullName,
763731
}
764732
}
733+
734+
// nominatedPodMap is a structure that stores pods nominated to run on nodes.
735+
// It exists because nominatedNodeName of pod objects stored in the structure
736+
// may be different than what scheduler has here. We should be able to find pods
737+
// by their UID and update/delete them.
738+
type nominatedPodMap struct {
739+
// nominatedPods is a map keyed by a node name and the value is a list of
740+
// pods which are nominated to run on the node. These are pods which can be in
741+
// the activeQ or unschedulableQ.
742+
nominatedPods map[string][]*v1.Pod
743+
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
744+
// nominated.
745+
nominatedPodToNode map[ktypes.UID]string
746+
}
747+
748+
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
749+
// always delete the pod if it already exist, to ensure we never store more than
750+
// one instance of the pod.
751+
npm.delete(p)
752+
753+
nnn := nodeName
754+
if len(nnn) == 0 {
755+
nnn = NominatedNodeName(p)
756+
if len(nnn) == 0 {
757+
return
758+
}
759+
}
760+
npm.nominatedPodToNode[p.UID] = nnn
761+
for _, np := range npm.nominatedPods[nnn] {
762+
if np.UID == p.UID {
763+
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
764+
return
765+
}
766+
}
767+
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
768+
}
769+
770+
func (npm *nominatedPodMap) delete(p *v1.Pod) {
771+
nnn, ok := npm.nominatedPodToNode[p.UID]
772+
if !ok {
773+
return
774+
}
775+
for i, np := range npm.nominatedPods[nnn] {
776+
if np.UID == p.UID {
777+
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
778+
if len(npm.nominatedPods[nnn]) == 0 {
779+
delete(npm.nominatedPods, nnn)
780+
}
781+
break
782+
}
783+
}
784+
delete(npm.nominatedPodToNode, p.UID)
785+
}
786+
787+
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
788+
// We update irrespective of the nominatedNodeName changed or not, to ensure
789+
// that pod pointer is updated.
790+
npm.delete(oldPod)
791+
npm.add(newPod, "")
792+
}
793+
794+
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
795+
if list, ok := npm.nominatedPods[nodeName]; ok {
796+
return list
797+
}
798+
return nil
799+
}
800+
801+
func newNominatedPodMap() *nominatedPodMap {
802+
return &nominatedPodMap{
803+
nominatedPods: make(map[string][]*v1.Pod),
804+
nominatedPodToNode: make(map[ktypes.UID]string),
805+
}
806+
}

0 commit comments

Comments
 (0)