Skip to content

Commit 4ce63b4

Browse files
authored
Merge pull request kubernetes#126197 from macsko/move_nominatedpodsfornode_to_scheduling_queue
Move NominatedPodsForNode to scheduling queue directly
2 parents 243fdaf + a7ad94f commit 4ce63b4

File tree

9 files changed

+272
-236
lines changed

9 files changed

+272
-236
lines changed

pkg/scheduler/extender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
337337
test.registerPlugins, "",
338338
runtime.WithClientSet(client),
339339
runtime.WithInformerFactory(informerFactory),
340-
runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
340+
runtime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
341341
runtime.WithLogger(logger),
342342
)
343343
if err != nil {

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func TestPostFilter(t *testing.T) {
380380
frameworkruntime.WithClientSet(cs),
381381
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
382382
frameworkruntime.WithInformerFactory(informerFactory),
383-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
383+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
384384
frameworkruntime.WithExtenders(extenders),
385385
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
386386
frameworkruntime.WithLogger(logger),
@@ -1117,7 +1117,7 @@ func TestDryRunPreemption(t *testing.T) {
11171117
fwk, err := tf.NewFramework(
11181118
ctx,
11191119
registeredPlugins, "",
1120-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
1120+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
11211121
frameworkruntime.WithSnapshotSharedLister(snapshot),
11221122
frameworkruntime.WithInformerFactory(informerFactory),
11231123
frameworkruntime.WithParallelism(parallelism),
@@ -1376,7 +1376,7 @@ func TestSelectBestCandidate(t *testing.T) {
13761376
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
13771377
},
13781378
"",
1379-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
1379+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
13801380
frameworkruntime.WithSnapshotSharedLister(snapshot),
13811381
frameworkruntime.WithLogger(logger),
13821382
)
@@ -1760,7 +1760,7 @@ func TestPreempt(t *testing.T) {
17601760
frameworkruntime.WithClientSet(client),
17611761
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
17621762
frameworkruntime.WithExtenders(extenders),
1763-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
1763+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
17641764
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
17651765
frameworkruntime.WithInformerFactory(informerFactory),
17661766
frameworkruntime.WithWaitingPods(waitingPods),

pkg/scheduler/framework/preemption/preemption_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func TestDryRunPreemption(t *testing.T) {
208208
fwk, err := tf.NewFramework(
209209
ctx,
210210
registeredPlugins, "",
211-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
211+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
212212
frameworkruntime.WithInformerFactory(informerFactory),
213213
frameworkruntime.WithParallelism(parallelism),
214214
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)),
@@ -313,7 +313,7 @@ func TestSelectCandidate(t *testing.T) {
313313
ctx,
314314
registeredPlugins,
315315
"",
316-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
316+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
317317
frameworkruntime.WithSnapshotSharedLister(snapshot),
318318
frameworkruntime.WithLogger(logger),
319319
)

pkg/scheduler/framework/runtime/framework_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/types"
3333
"k8s.io/apimachinery/pkg/util/sets"
34+
"k8s.io/client-go/informers"
35+
clientsetfake "k8s.io/client-go/kubernetes/fake"
3436
"k8s.io/component-base/metrics/testutil"
3537
"k8s.io/klog/v2/ktesting"
3638
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -2385,7 +2387,20 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
23852387
)
23862388
}
23872389

2388-
podNominator := internalqueue.NewTestPodNominator(nil)
2390+
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0)
2391+
podInformer := informerFactory.Core().V1().Pods().Informer()
2392+
err := podInformer.GetStore().Add(tt.pod)
2393+
if err != nil {
2394+
t.Fatalf("Error adding pod to podInformer: %s", err)
2395+
}
2396+
if tt.nominatedPod != nil {
2397+
err = podInformer.GetStore().Add(tt.nominatedPod)
2398+
if err != nil {
2399+
t.Fatalf("Error adding nominated pod to podInformer: %s", err)
2400+
}
2401+
}
2402+
2403+
podNominator := internalqueue.NewSchedulingQueue(nil, informerFactory)
23892404
if tt.nominatedPod != nil {
23902405
podNominator.AddNominatedPod(
23912406
logger,
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"slices"
21+
"sync"
22+
23+
v1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
listersv1 "k8s.io/client-go/listers/core/v1"
27+
"k8s.io/klog/v2"
28+
"k8s.io/kubernetes/pkg/scheduler/framework"
29+
)
30+
31+
// nominator is a structure that stores pods nominated to run on nodes.
32+
// It exists because nominatedNodeName of pod objects stored in the structure
33+
// may be different than what scheduler has here. We should be able to find pods
34+
// by their UID and update/delete them.
35+
type nominator struct {
36+
// nLock synchronizes all operations related to nominator.
37+
// It should not be used anywhere else.
38+
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
39+
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
40+
// otherwise the nominator could end up in deadlock.
41+
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
42+
nLock sync.RWMutex
43+
44+
// podLister is used to verify if the given pod is alive.
45+
podLister listersv1.PodLister
46+
// nominatedPods is a map keyed by a node name and the value is a list of
47+
// pods which are nominated to run on the node. These are pods which can be in
48+
// the activeQ or unschedulablePods.
49+
nominatedPods map[string][]podRef
50+
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
51+
// nominated.
52+
nominatedPodToNode map[types.UID]string
53+
}
54+
55+
func newPodNominator(podLister listersv1.PodLister) *nominator {
56+
return &nominator{
57+
podLister: podLister,
58+
nominatedPods: make(map[string][]podRef),
59+
nominatedPodToNode: make(map[types.UID]string),
60+
}
61+
}
62+
63+
// AddNominatedPod adds a pod to the nominated pods of the given node.
64+
// This is called during the preemption process after a node is nominated to run
65+
// the pod. We update the structure before sending a request to update the pod
66+
// object to avoid races with the following scheduling cycles.
67+
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
68+
npm.nLock.Lock()
69+
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
70+
npm.nLock.Unlock()
71+
}
72+
73+
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
74+
// Always delete the pod if it already exists, to ensure we never store more than
75+
// one instance of the pod.
76+
npm.deleteUnlocked(pi.Pod)
77+
78+
var nodeName string
79+
if nominatingInfo.Mode() == framework.ModeOverride {
80+
nodeName = nominatingInfo.NominatedNodeName
81+
} else if nominatingInfo.Mode() == framework.ModeNoop {
82+
if pi.Pod.Status.NominatedNodeName == "" {
83+
return
84+
}
85+
nodeName = pi.Pod.Status.NominatedNodeName
86+
}
87+
88+
if npm.podLister != nil {
89+
// If the pod was removed or if it was already scheduled, don't nominate it.
90+
updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name)
91+
if err != nil {
92+
logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod))
93+
return
94+
}
95+
if updatedPod.Spec.NodeName != "" {
96+
logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName)
97+
return
98+
}
99+
}
100+
101+
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
102+
for _, np := range npm.nominatedPods[nodeName] {
103+
if np.uid == pi.Pod.UID {
104+
logger.V(4).Info("Pod already exists in the nominator", "pod", np.uid)
105+
return
106+
}
107+
}
108+
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], podToRef(pi.Pod))
109+
}
110+
111+
// UpdateNominatedPod updates the <oldPod> with <newPod>.
112+
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
113+
npm.nLock.Lock()
114+
defer npm.nLock.Unlock()
115+
// In some cases, an Update event with no "NominatedNode" present is received right
116+
// after a node("NominatedNode") is reserved for this pod in memory.
117+
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
118+
var nominatingInfo *framework.NominatingInfo
119+
// We won't fall into below `if` block if the Update event represents:
120+
// (1) NominatedNode info is added
121+
// (2) NominatedNode info is updated
122+
// (3) NominatedNode info is removed
123+
if nominatedNodeName(oldPod) == "" && nominatedNodeName(newPodInfo.Pod) == "" {
124+
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
125+
// This is the only case we should continue reserving the NominatedNode
126+
nominatingInfo = &framework.NominatingInfo{
127+
NominatingMode: framework.ModeOverride,
128+
NominatedNodeName: nnn,
129+
}
130+
}
131+
}
132+
// We update irrespective of the nominatedNodeName changed or not, to ensure
133+
// that pod pointer is updated.
134+
npm.deleteUnlocked(oldPod)
135+
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
136+
}
137+
138+
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
139+
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
140+
npm.nLock.Lock()
141+
npm.deleteUnlocked(pod)
142+
npm.nLock.Unlock()
143+
}
144+
145+
func (npm *nominator) deleteUnlocked(p *v1.Pod) {
146+
nnn, ok := npm.nominatedPodToNode[p.UID]
147+
if !ok {
148+
return
149+
}
150+
for i, np := range npm.nominatedPods[nnn] {
151+
if np.uid == p.UID {
152+
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
153+
if len(npm.nominatedPods[nnn]) == 0 {
154+
delete(npm.nominatedPods, nnn)
155+
}
156+
break
157+
}
158+
}
159+
delete(npm.nominatedPodToNode, p.UID)
160+
}
161+
162+
func (npm *nominator) nominatedPodsForNode(nodeName string) []podRef {
163+
npm.nLock.RLock()
164+
defer npm.nLock.RUnlock()
165+
return slices.Clone(npm.nominatedPods[nodeName])
166+
}
167+
168+
// nominatedNodeName returns nominated node name of a Pod.
169+
func nominatedNodeName(pod *v1.Pod) string {
170+
return pod.Status.NominatedNodeName
171+
}
172+
173+
type podRef struct {
174+
name string
175+
namespace string
176+
uid types.UID
177+
}
178+
179+
func podToRef(pod *v1.Pod) podRef {
180+
return podRef{
181+
name: pod.Name,
182+
namespace: pod.Namespace,
183+
uid: pod.UID,
184+
}
185+
}
186+
187+
func (np podRef) toPod() *v1.Pod {
188+
return &v1.Pod{
189+
ObjectMeta: metav1.ObjectMeta{
190+
Name: np.name,
191+
Namespace: np.namespace,
192+
UID: np.uid,
193+
},
194+
}
195+
}

0 commit comments

Comments
 (0)