Skip to content

Commit 33815db

Browse files
committed
Move NominatedPodsForNode to scheduling queue directly
1 parent b8dcc2c commit 33815db

File tree

7 files changed

+70
-58
lines changed

7 files changed

+70
-58
lines changed

pkg/scheduler/extender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
337337
test.registerPlugins, "",
338338
runtime.WithClientSet(client),
339339
runtime.WithInformerFactory(informerFactory),
340-
runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
340+
runtime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
341341
runtime.WithLogger(logger),
342342
)
343343
if err != nil {

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func TestPostFilter(t *testing.T) {
380380
frameworkruntime.WithClientSet(cs),
381381
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
382382
frameworkruntime.WithInformerFactory(informerFactory),
383-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
383+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
384384
frameworkruntime.WithExtenders(extenders),
385385
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
386386
frameworkruntime.WithLogger(logger),
@@ -1117,7 +1117,7 @@ func TestDryRunPreemption(t *testing.T) {
11171117
fwk, err := tf.NewFramework(
11181118
ctx,
11191119
registeredPlugins, "",
1120-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
1120+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
11211121
frameworkruntime.WithSnapshotSharedLister(snapshot),
11221122
frameworkruntime.WithInformerFactory(informerFactory),
11231123
frameworkruntime.WithParallelism(parallelism),
@@ -1376,7 +1376,7 @@ func TestSelectBestCandidate(t *testing.T) {
13761376
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
13771377
},
13781378
"",
1379-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
1379+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
13801380
frameworkruntime.WithSnapshotSharedLister(snapshot),
13811381
frameworkruntime.WithLogger(logger),
13821382
)
@@ -1760,7 +1760,7 @@ func TestPreempt(t *testing.T) {
17601760
frameworkruntime.WithClientSet(client),
17611761
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
17621762
frameworkruntime.WithExtenders(extenders),
1763-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
1763+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
17641764
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
17651765
frameworkruntime.WithInformerFactory(informerFactory),
17661766
frameworkruntime.WithWaitingPods(waitingPods),

pkg/scheduler/framework/preemption/preemption_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func TestDryRunPreemption(t *testing.T) {
208208
fwk, err := tf.NewFramework(
209209
ctx,
210210
registeredPlugins, "",
211-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
211+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
212212
frameworkruntime.WithInformerFactory(informerFactory),
213213
frameworkruntime.WithParallelism(parallelism),
214214
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)),
@@ -313,7 +313,7 @@ func TestSelectCandidate(t *testing.T) {
313313
ctx,
314314
registeredPlugins,
315315
"",
316-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
316+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
317317
frameworkruntime.WithSnapshotSharedLister(snapshot),
318318
frameworkruntime.WithLogger(logger),
319319
)

pkg/scheduler/framework/runtime/framework_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/types"
3333
"k8s.io/apimachinery/pkg/util/sets"
34+
"k8s.io/client-go/informers"
35+
clientsetfake "k8s.io/client-go/kubernetes/fake"
3436
"k8s.io/component-base/metrics/testutil"
3537
"k8s.io/klog/v2/ktesting"
3638
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -2385,7 +2387,20 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
23852387
)
23862388
}
23872389

2388-
podNominator := internalqueue.NewTestPodNominator(nil)
2390+
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0)
2391+
podInformer := informerFactory.Core().V1().Pods().Informer()
2392+
err := podInformer.GetStore().Add(tt.pod)
2393+
if err != nil {
2394+
t.Fatalf("Error adding pod to podInformer: %s", err)
2395+
}
2396+
if tt.nominatedPod != nil {
2397+
err = podInformer.GetStore().Add(tt.nominatedPod)
2398+
if err != nil {
2399+
t.Fatalf("Error adding nominated pod to podInformer: %s", err)
2400+
}
2401+
}
2402+
2403+
podNominator := internalqueue.NewSchedulingQueue(nil, informerFactory)
23892404
if tt.nominatedPod != nil {
23902405
podNominator.AddNominatedPod(
23912406
logger,

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func NewPriorityQueue(
349349
}
350350
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
351351
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
352-
pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo)
352+
pq.nominator = newPodNominator(options.podLister)
353353

354354
return pq
355355
}
@@ -1203,18 +1203,6 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo {
12031203
return &framework.PodInfo{Pod: pod}
12041204
}
12051205

1206-
func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo {
1207-
p.lock.RLock()
1208-
defer p.lock.RUnlock()
1209-
p.activeQ.getLock().RLock()
1210-
defer p.activeQ.getLock().RUnlock()
1211-
pods := make([]*framework.PodInfo, len(nominatedPods))
1212-
for i, np := range nominatedPods {
1213-
pods[i] = p.nominatedPodToInfo(np).DeepCopy()
1214-
}
1215-
return pods
1216-
}
1217-
12181206
// Close closes the priority queue.
12191207
func (p *PriorityQueue) Close() {
12201208
p.lock.Lock()
@@ -1241,15 +1229,27 @@ func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo,
12411229
npm.nLock.Unlock()
12421230
}
12431231

1232+
func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef {
1233+
npm.nLock.RLock()
1234+
defer npm.nLock.RUnlock()
1235+
return slices.Clone(npm.nominatedPods[nodeName])
1236+
}
1237+
12441238
// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
12451239
// but they are waiting for other pods to be removed from the node.
1246-
// CAUTION: Make sure you don't call this function while taking any lock in any scenario.
1247-
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
1248-
npm.nLock.RLock()
1249-
nominatedPods := slices.Clone(npm.nominatedPods[nodeName])
1250-
npm.nLock.RUnlock()
1251-
// Note that nominatedPodsToInfo takes SchedulingQueue.lock inside.
1252-
return npm.nominatedPodsToInfo(nominatedPods)
1240+
// CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario.
1241+
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
1242+
p.lock.RLock()
1243+
defer p.lock.RUnlock()
1244+
nominatedPods := p.nominator.nominatedPodsForNode(nodeName)
1245+
1246+
p.activeQ.getLock().RLock()
1247+
defer p.activeQ.getLock().RUnlock()
1248+
pods := make([]*framework.PodInfo, len(nominatedPods))
1249+
for i, np := range nominatedPods {
1250+
pods[i] = p.nominatedPodToInfo(np).DeepCopy()
1251+
}
1252+
return pods
12531253
}
12541254

12551255
func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
@@ -1495,12 +1495,11 @@ func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, new
14951495
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
14961496
}
14971497

1498-
func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]PodRef) []*framework.PodInfo) *nominator {
1498+
func newPodNominator(podLister listersv1.PodLister) *nominator {
14991499
return &nominator{
1500-
podLister: podLister,
1501-
nominatedPods: make(map[string][]PodRef),
1502-
nominatedPodToNode: make(map[types.UID]string),
1503-
nominatedPodsToInfo: nominatedPodsToInfo,
1500+
podLister: podLister,
1501+
nominatedPods: make(map[string][]PodRef),
1502+
nominatedPodToNode: make(map[types.UID]string),
15041503
}
15051504
}
15061505

pkg/scheduler/internal/queue/testing.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"k8s.io/apimachinery/pkg/runtime"
2323
"k8s.io/client-go/informers"
2424
"k8s.io/client-go/kubernetes/fake"
25-
listersv1 "k8s.io/client-go/listers/core/v1"
2625
"k8s.io/kubernetes/pkg/scheduler/framework"
2726
)
2827

@@ -54,17 +53,3 @@ func NewTestQueueWithInformerFactory(
5453
informerFactory.WaitForCacheSync(ctx.Done())
5554
return pq
5655
}
57-
58-
// NewPodNominator creates a nominator as a backing of framework.PodNominator.
59-
// A podLister is passed in so as to check if the pod exists
60-
// before adding its nominatedNode info.
61-
func NewTestPodNominator(podLister listersv1.PodLister) framework.PodNominator {
62-
nominatedPodsToInfo := func(nominatedPods []PodRef) []*framework.PodInfo {
63-
pods := make([]*framework.PodInfo, len(nominatedPods))
64-
for i, np := range nominatedPods {
65-
pods[i] = &framework.PodInfo{Pod: np.ToPod()}
66-
}
67-
return pods
68-
}
69-
return newPodNominator(podLister, nominatedPodsToInfo)
70-
}

pkg/scheduler/schedule_one_test.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -793,13 +793,14 @@ func TestSchedulerScheduleOne(t *testing.T) {
793793
t.Fatal(err)
794794
}
795795

796+
informerFactory := informers.NewSharedInformerFactory(client, 0)
796797
sched := &Scheduler{
797798
Cache: cache,
798799
client: client,
799800
NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
800801
return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil
801802
},
802-
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
803+
SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory),
803804
Profiles: profile.Map{testSchedulerName: fwk},
804805
}
805806

@@ -2472,7 +2473,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
24722473
test.registerPlugins, "",
24732474
frameworkruntime.WithSnapshotSharedLister(snapshot),
24742475
frameworkruntime.WithInformerFactory(informerFactory),
2475-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
2476+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
24762477
)
24772478
if err != nil {
24782479
t.Fatal(err)
@@ -2538,7 +2539,7 @@ func TestFindFitAllError(t *testing.T) {
25382539
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
25392540
},
25402541
"",
2541-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
2542+
frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)),
25422543
)
25432544
if err != nil {
25442545
t.Fatal(err)
@@ -2581,7 +2582,7 @@ func TestFindFitSomeError(t *testing.T) {
25812582
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
25822583
},
25832584
"",
2584-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
2585+
frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)),
25852586
)
25862587
if err != nil {
25872588
t.Fatal(err)
@@ -2652,10 +2653,18 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
26522653
logger, ctx := ktesting.NewTestContext(t)
26532654
ctx, cancel := context.WithCancel(ctx)
26542655
defer cancel()
2656+
2657+
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0)
2658+
podInformer := informerFactory.Core().V1().Pods().Informer()
2659+
err := podInformer.GetStore().Add(test.pod)
2660+
if err != nil {
2661+
t.Fatalf("Error adding pod to podInformer: %s", err)
2662+
}
2663+
26552664
fwk, err := tf.NewFramework(
26562665
ctx,
26572666
registerPlugins, "",
2658-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
2667+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
26592668
)
26602669
if err != nil {
26612670
t.Fatal(err)
@@ -2669,6 +2678,10 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
26692678
if err != nil {
26702679
t.Fatal(err)
26712680
}
2681+
err = podInformer.GetStore().Add(podinfo.Pod)
2682+
if err != nil {
2683+
t.Fatalf("Error adding nominated pod to podInformer: %s", err)
2684+
}
26722685
fwk.AddNominatedPod(logger, podinfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})
26732686

26742687
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
@@ -2796,7 +2809,7 @@ func TestZeroRequest(t *testing.T) {
27962809
frameworkruntime.WithInformerFactory(informerFactory),
27972810
frameworkruntime.WithSnapshotSharedLister(snapshot),
27982811
frameworkruntime.WithClientSet(client),
2799-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
2812+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
28002813
)
28012814
if err != nil {
28022815
t.Fatalf("error creating framework: %+v", err)
@@ -3199,7 +3212,7 @@ func Test_prioritizeNodes(t *testing.T) {
31993212
frameworkruntime.WithInformerFactory(informerFactory),
32003213
frameworkruntime.WithSnapshotSharedLister(snapshot),
32013214
frameworkruntime.WithClientSet(client),
3202-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
3215+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
32033216
)
32043217
if err != nil {
32053218
t.Fatalf("error creating framework: %+v", err)
@@ -3317,7 +3330,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
33173330
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
33183331
},
33193332
"",
3320-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
3333+
frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)),
33213334
)
33223335
if err != nil {
33233336
t.Fatal(err)
@@ -3399,7 +3412,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
33993412
ctx,
34003413
registerPlugins, "",
34013414
frameworkruntime.WithClientSet(client),
3402-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
3415+
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
34033416
)
34043417
if err != nil {
34053418
t.Fatal(err)
@@ -3557,7 +3570,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
35573570
frameworkruntime.WithClientSet(client),
35583571
frameworkruntime.WithEventRecorder(recorder),
35593572
frameworkruntime.WithInformerFactory(informerFactory),
3560-
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
3573+
frameworkruntime.WithPodNominator(schedulingQueue),
35613574
frameworkruntime.WithWaitingPods(waitingPods),
35623575
)
35633576

0 commit comments

Comments
 (0)