Skip to content

Commit e303808

Browse files
committed
Move scheduling queue's nominator to a separate file
1 parent 33815db commit e303808

File tree

3 files changed

+196
-172
lines changed

3 files changed

+196
-172
lines changed
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"slices"
21+
"sync"
22+
23+
v1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
listersv1 "k8s.io/client-go/listers/core/v1"
27+
"k8s.io/klog/v2"
28+
"k8s.io/kubernetes/pkg/scheduler/framework"
29+
)
30+
31+
// nominator is a structure that stores pods nominated to run on nodes.
32+
// It exists because nominatedNodeName of pod objects stored in the structure
33+
// may be different than what scheduler has here. We should be able to find pods
34+
// by their UID and update/delete them.
35+
type nominator struct {
36+
// nLock synchronizes all operations related to nominator.
37+
// It should not be used anywhere else.
38+
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
39+
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
40+
// otherwise the nominator could end up in deadlock.
41+
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
42+
nLock sync.RWMutex
43+
44+
// podLister is used to verify if the given pod is alive.
45+
podLister listersv1.PodLister
46+
// nominatedPods is a map keyed by a node name and the value is a list of
47+
// pods which are nominated to run on the node. These are pods which can be in
48+
// the activeQ or unschedulablePods.
49+
nominatedPods map[string][]PodRef
50+
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
51+
// nominated.
52+
nominatedPodToNode map[types.UID]string
53+
}
54+
55+
func newPodNominator(podLister listersv1.PodLister) *nominator {
56+
return &nominator{
57+
podLister: podLister,
58+
nominatedPods: make(map[string][]PodRef),
59+
nominatedPodToNode: make(map[types.UID]string),
60+
}
61+
}
62+
63+
// AddNominatedPod adds a pod to the nominated pods of the given node.
64+
// This is called during the preemption process after a node is nominated to run
65+
// the pod. We update the structure before sending a request to update the pod
66+
// object to avoid races with the following scheduling cycles.
67+
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
68+
npm.nLock.Lock()
69+
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
70+
npm.nLock.Unlock()
71+
}
72+
73+
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
74+
// Always delete the pod if it already exists, to ensure we never store more than
75+
// one instance of the pod.
76+
npm.deleteUnlocked(pi.Pod)
77+
78+
var nodeName string
79+
if nominatingInfo.Mode() == framework.ModeOverride {
80+
nodeName = nominatingInfo.NominatedNodeName
81+
} else if nominatingInfo.Mode() == framework.ModeNoop {
82+
if pi.Pod.Status.NominatedNodeName == "" {
83+
return
84+
}
85+
nodeName = pi.Pod.Status.NominatedNodeName
86+
}
87+
88+
if npm.podLister != nil {
89+
// If the pod was removed or if it was already scheduled, don't nominate it.
90+
updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name)
91+
if err != nil {
92+
logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod))
93+
return
94+
}
95+
if updatedPod.Spec.NodeName != "" {
96+
logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName)
97+
return
98+
}
99+
}
100+
101+
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
102+
for _, np := range npm.nominatedPods[nodeName] {
103+
if np.UID == pi.Pod.UID {
104+
logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID)
105+
return
106+
}
107+
}
108+
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod))
109+
}
110+
111+
// UpdateNominatedPod updates the <oldPod> with <newPod>.
112+
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
113+
npm.nLock.Lock()
114+
defer npm.nLock.Unlock()
115+
// In some cases, an Update event with no "NominatedNode" present is received right
116+
// after a node("NominatedNode") is reserved for this pod in memory.
117+
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
118+
var nominatingInfo *framework.NominatingInfo
119+
// We won't fall into below `if` block if the Update event represents:
120+
// (1) NominatedNode info is added
121+
// (2) NominatedNode info is updated
122+
// (3) NominatedNode info is removed
123+
if nominatedNodeName(oldPod) == "" && nominatedNodeName(newPodInfo.Pod) == "" {
124+
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
125+
// This is the only case we should continue reserving the NominatedNode
126+
nominatingInfo = &framework.NominatingInfo{
127+
NominatingMode: framework.ModeOverride,
128+
NominatedNodeName: nnn,
129+
}
130+
}
131+
}
132+
// We update irrespective of the nominatedNodeName changed or not, to ensure
133+
// that pod pointer is updated.
134+
npm.deleteUnlocked(oldPod)
135+
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
136+
}
137+
138+
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
139+
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
140+
npm.nLock.Lock()
141+
npm.deleteUnlocked(pod)
142+
npm.nLock.Unlock()
143+
}
144+
145+
func (npm *nominator) deleteUnlocked(p *v1.Pod) {
146+
nnn, ok := npm.nominatedPodToNode[p.UID]
147+
if !ok {
148+
return
149+
}
150+
for i, np := range npm.nominatedPods[nnn] {
151+
if np.UID == p.UID {
152+
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
153+
if len(npm.nominatedPods[nnn]) == 0 {
154+
delete(npm.nominatedPods, nnn)
155+
}
156+
break
157+
}
158+
}
159+
delete(npm.nominatedPodToNode, p.UID)
160+
}
161+
162+
func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef {
163+
npm.nLock.RLock()
164+
defer npm.nLock.RUnlock()
165+
return slices.Clone(npm.nominatedPods[nodeName])
166+
}
167+
168+
// nominatedNodeName returns nominated node name of a Pod.
169+
func nominatedNodeName(pod *v1.Pod) string {
170+
return pod.Status.NominatedNodeName
171+
}
172+
173+
type PodRef struct {
174+
Name string
175+
Namespace string
176+
UID types.UID
177+
}
178+
179+
func PodToRef(pod *v1.Pod) PodRef {
180+
return PodRef{
181+
Name: pod.Name,
182+
Namespace: pod.Namespace,
183+
UID: pod.UID,
184+
}
185+
}
186+
187+
func (np PodRef) ToPod() *v1.Pod {
188+
return &v1.Pod{
189+
ObjectMeta: metav1.ObjectMeta{
190+
Name: np.Name,
191+
Namespace: np.Namespace,
192+
UID: np.UID,
193+
},
194+
}
195+
}

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 0 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,10 @@ import (
3131
"fmt"
3232
"math/rand"
3333
"reflect"
34-
"slices"
3534
"sync"
3635
"time"
3736

3837
v1 "k8s.io/api/core/v1"
39-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4038
"k8s.io/apimachinery/pkg/types"
4139
"k8s.io/apimachinery/pkg/util/sets"
4240
"k8s.io/apimachinery/pkg/util/wait"
@@ -137,11 +135,6 @@ func NewSchedulingQueue(
137135
return NewPriorityQueue(lessFn, informerFactory, opts...)
138136
}
139137

140-
// NominatedNodeName returns nominated node name of a Pod.
141-
func NominatedNodeName(pod *v1.Pod) string {
142-
return pod.Status.NominatedNodeName
143-
}
144-
145138
// PriorityQueue implements a scheduling queue.
146139
// The head of PriorityQueue is the highest priority pending pod. This structure
147140
// has two sub queues and a additional data structure, namely: activeQ,
@@ -1212,29 +1205,6 @@ func (p *PriorityQueue) Close() {
12121205
p.activeQ.broadcast()
12131206
}
12141207

1215-
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
1216-
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
1217-
npm.nLock.Lock()
1218-
npm.delete(pod)
1219-
npm.nLock.Unlock()
1220-
}
1221-
1222-
// AddNominatedPod adds a pod to the nominated pods of the given node.
1223-
// This is called during the preemption process after a node is nominated to run
1224-
// the pod. We update the structure before sending a request to update the pod
1225-
// object to avoid races with the following scheduling cycles.
1226-
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
1227-
npm.nLock.Lock()
1228-
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
1229-
npm.nLock.Unlock()
1230-
}
1231-
1232-
func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef {
1233-
npm.nLock.RLock()
1234-
defer npm.nLock.RUnlock()
1235-
return slices.Clone(npm.nominatedPods[nodeName])
1236-
}
1237-
12381208
// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
12391209
// but they are waiting for other pods to be removed from the node.
12401210
// CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario.
@@ -1362,147 +1332,6 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec
13621332
}
13631333
}
13641334

1365-
type PodRef struct {
1366-
Name string
1367-
Namespace string
1368-
UID types.UID
1369-
}
1370-
1371-
func PodToRef(pod *v1.Pod) PodRef {
1372-
return PodRef{
1373-
Name: pod.Name,
1374-
Namespace: pod.Namespace,
1375-
UID: pod.UID,
1376-
}
1377-
}
1378-
1379-
func (np PodRef) ToPod() *v1.Pod {
1380-
return &v1.Pod{
1381-
ObjectMeta: metav1.ObjectMeta{
1382-
Name: np.Name,
1383-
Namespace: np.Namespace,
1384-
UID: np.UID,
1385-
},
1386-
}
1387-
}
1388-
1389-
// nominator is a structure that stores pods nominated to run on nodes.
1390-
// It exists because nominatedNodeName of pod objects stored in the structure
1391-
// may be different than what scheduler has here. We should be able to find pods
1392-
// by their UID and update/delete them.
1393-
type nominator struct {
1394-
// nLock synchronizes all operations related to nominator.
1395-
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
1396-
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
1397-
// otherwise the nominator could end up in deadlock.
1398-
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
1399-
nLock sync.RWMutex
1400-
1401-
// podLister is used to verify if the given pod is alive.
1402-
podLister listersv1.PodLister
1403-
// nominatedPods is a map keyed by a node name and the value is a list of
1404-
// pods which are nominated to run on the node. These are pods which can be in
1405-
// the activeQ or unschedulablePods.
1406-
nominatedPods map[string][]PodRef
1407-
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
1408-
// nominated.
1409-
nominatedPodToNode map[types.UID]string
1410-
// nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs.
1411-
// Note: it takes SchedulingQueue.lock inside.
1412-
// Make sure you don't call this function while taking any lock in any scenario.
1413-
nominatedPodsToInfo func([]PodRef) []*framework.PodInfo
1414-
}
1415-
1416-
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
1417-
// Always delete the pod if it already exists, to ensure we never store more than
1418-
// one instance of the pod.
1419-
npm.delete(pi.Pod)
1420-
1421-
var nodeName string
1422-
if nominatingInfo.Mode() == framework.ModeOverride {
1423-
nodeName = nominatingInfo.NominatedNodeName
1424-
} else if nominatingInfo.Mode() == framework.ModeNoop {
1425-
if pi.Pod.Status.NominatedNodeName == "" {
1426-
return
1427-
}
1428-
nodeName = pi.Pod.Status.NominatedNodeName
1429-
}
1430-
1431-
if npm.podLister != nil {
1432-
// If the pod was removed or if it was already scheduled, don't nominate it.
1433-
updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name)
1434-
if err != nil {
1435-
logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod))
1436-
return
1437-
}
1438-
if updatedPod.Spec.NodeName != "" {
1439-
logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName)
1440-
return
1441-
}
1442-
}
1443-
1444-
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
1445-
for _, np := range npm.nominatedPods[nodeName] {
1446-
if np.UID == pi.Pod.UID {
1447-
logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID)
1448-
return
1449-
}
1450-
}
1451-
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod))
1452-
}
1453-
1454-
func (npm *nominator) delete(p *v1.Pod) {
1455-
nnn, ok := npm.nominatedPodToNode[p.UID]
1456-
if !ok {
1457-
return
1458-
}
1459-
for i, np := range npm.nominatedPods[nnn] {
1460-
if np.UID == p.UID {
1461-
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
1462-
if len(npm.nominatedPods[nnn]) == 0 {
1463-
delete(npm.nominatedPods, nnn)
1464-
}
1465-
break
1466-
}
1467-
}
1468-
delete(npm.nominatedPodToNode, p.UID)
1469-
}
1470-
1471-
// UpdateNominatedPod updates the <oldPod> with <newPod>.
1472-
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
1473-
npm.nLock.Lock()
1474-
defer npm.nLock.Unlock()
1475-
// In some cases, an Update event with no "NominatedNode" present is received right
1476-
// after a node("NominatedNode") is reserved for this pod in memory.
1477-
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
1478-
var nominatingInfo *framework.NominatingInfo
1479-
// We won't fall into below `if` block if the Update event represents:
1480-
// (1) NominatedNode info is added
1481-
// (2) NominatedNode info is updated
1482-
// (3) NominatedNode info is removed
1483-
if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" {
1484-
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
1485-
// This is the only case we should continue reserving the NominatedNode
1486-
nominatingInfo = &framework.NominatingInfo{
1487-
NominatingMode: framework.ModeOverride,
1488-
NominatedNodeName: nnn,
1489-
}
1490-
}
1491-
}
1492-
// We update irrespective of the nominatedNodeName changed or not, to ensure
1493-
// that pod pointer is updated.
1494-
npm.delete(oldPod)
1495-
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
1496-
}
1497-
1498-
func newPodNominator(podLister listersv1.PodLister) *nominator {
1499-
return &nominator{
1500-
podLister: podLister,
1501-
nominatedPods: make(map[string][]PodRef),
1502-
nominatedPodToNode: make(map[types.UID]string),
1503-
}
1504-
}
1505-
15061335
func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string {
15071336
return cache.NewObjectName(pInfo.Pod.Namespace, pInfo.Pod.Name).String()
15081337
}

0 commit comments

Comments
 (0)