Skip to content

Commit b141a99

Browse files
authored
Merge pull request kubernetes#85820 from ahg-g/ahg-queue
Start and stop the scheduling queue consistently
2 parents 205570e + f388534 commit b141a99

File tree

8 files changed

+54
-52
lines changed

8 files changed

+54
-52
lines changed

pkg/scheduler/core/extender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
539539
for _, name := range test.nodes {
540540
cache.AddNode(createNode(name))
541541
}
542-
queue := internalqueue.NewSchedulingQueue(nil, nil)
542+
queue := internalqueue.NewSchedulingQueue(nil)
543543
scheduler := NewGenericScheduler(
544544
cache,
545545
queue,

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ func TestGenericScheduler(t *testing.T) {
662662
}
663663
scheduler := NewGenericScheduler(
664664
cache,
665-
internalqueue.NewSchedulingQueue(nil, nil),
665+
internalqueue.NewSchedulingQueue(nil),
666666
test.predicates,
667667
predMetaProducer,
668668
test.prioritizers,
@@ -702,7 +702,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
702702

703703
s := NewGenericScheduler(
704704
cache,
705-
internalqueue.NewSchedulingQueue(nil, nil),
705+
internalqueue.NewSchedulingQueue(nil),
706706
predicates,
707707
algorithmpredicates.EmptyMetadataProducer,
708708
nil,
@@ -819,7 +819,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
819819
cache.AddNode(n)
820820
}
821821

822-
queue := internalqueue.NewSchedulingQueue(nil, nil)
822+
queue := internalqueue.NewSchedulingQueue(nil)
823823
scheduler := NewGenericScheduler(
824824
cache,
825825
queue,
@@ -1412,7 +1412,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
14121412
filterPlugin.failedNodeReturnCodeMap = filterFailedNodeReturnCodeMap
14131413
scheduler := NewGenericScheduler(
14141414
nil,
1415-
internalqueue.NewSchedulingQueue(nil, nil),
1415+
internalqueue.NewSchedulingQueue(nil),
14161416
test.predicates,
14171417
factory.GetPredicateMetadata,
14181418
nil,
@@ -2160,7 +2160,7 @@ func TestPreempt(t *testing.T) {
21602160
}
21612161
scheduler := NewGenericScheduler(
21622162
cache,
2163-
internalqueue.NewSchedulingQueue(nil, nil),
2163+
internalqueue.NewSchedulingQueue(nil),
21642164
map[string]algorithmpredicates.FitPredicate{"matches": predicate},
21652165
predMetaProducer,
21662166
[]priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},

pkg/scheduler/factory.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
266266
}
267267

268268
podQueue := internalqueue.NewSchedulingQueue(
269-
c.StopEverything,
270269
framework,
271270
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
272271
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
@@ -281,11 +280,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
281280
)
282281
debugger.ListenForSignal(c.StopEverything)
283282

284-
go func() {
285-
<-c.StopEverything
286-
podQueue.Close()
287-
}()
288-
289283
algo := core.NewGenericScheduler(
290284
c.schedulerCache,
291285
podQueue,

pkg/scheduler/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func TestDefaultErrorFunc(t *testing.T) {
319319
defer close(stopCh)
320320

321321
timestamp := time.Now()
322-
queue := internalqueue.NewPriorityQueue(nil, nil, internalqueue.WithClock(clock.NewFakeClock(timestamp)))
322+
queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(timestamp)))
323323
schedulerCache := internalcache.New(30*time.Second, stopCh)
324324
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache)
325325

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,13 @@ type SchedulingQueue interface {
9797
DeleteNominatedPodIfExists(pod *v1.Pod)
9898
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
9999
NumUnschedulablePods() int
100+
// Run starts the goroutines managing the queue.
101+
Run()
100102
}
101103

102104
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
103-
func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework, opts ...Option) SchedulingQueue {
104-
return NewPriorityQueue(stop, fwk, opts...)
105+
func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue {
106+
return NewPriorityQueue(fwk, opts...)
105107
}
106108

107109
// NominatedNodeName returns nominated node name of a Pod.
@@ -117,7 +119,7 @@ func NominatedNodeName(pod *v1.Pod) string {
117119
// is called unschedulableQ. The third queue holds pods that are moved from
118120
// unschedulable queues and will be moved to active queue when backoff are completed.
119121
type PriorityQueue struct {
120-
stop <-chan struct{}
122+
stop chan struct{}
121123
clock util.Clock
122124
// podBackoff tracks backoff for pods attempting to be rescheduled
123125
podBackoff *PodBackoffMap
@@ -209,7 +211,6 @@ func activeQComp(podInfo1, podInfo2 interface{}) bool {
209211

210212
// NewPriorityQueue creates a PriorityQueue object.
211213
func NewPriorityQueue(
212-
stop <-chan struct{},
213214
fwk framework.Framework,
214215
opts ...Option,
215216
) *PriorityQueue {
@@ -232,7 +233,7 @@ func NewPriorityQueue(
232233

233234
pq := &PriorityQueue{
234235
clock: options.clock,
235-
stop: stop,
236+
stop: make(chan struct{}),
236237
podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration),
237238
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
238239
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
@@ -242,13 +243,11 @@ func NewPriorityQueue(
242243
pq.cond.L = &pq.lock
243244
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
244245

245-
pq.run()
246-
247246
return pq
248247
}
249248

250-
// run starts the goroutine to pump from podBackoffQ to activeQ
251-
func (p *PriorityQueue) run() {
249+
// Run starts the goroutine to pump from podBackoffQ to activeQ
250+
func (p *PriorityQueue) Run() {
252251
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
253252
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
254253
}
@@ -636,6 +635,7 @@ func (p *PriorityQueue) PendingPods() []*v1.Pod {
636635
func (p *PriorityQueue) Close() {
637636
p.lock.Lock()
638637
defer p.lock.Unlock()
638+
close(p.stop)
639639
p.closed = true
640640
p.cond.Broadcast()
641641
}

pkg/scheduler/internal/queue/scheduling_queue_test.go

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
129129
}
130130

131131
func TestPriorityQueue_Add(t *testing.T) {
132-
q := NewPriorityQueue(nil, nil)
132+
q := createAndRunPriorityQueue(nil)
133133
if err := q.Add(&medPriorityPod); err != nil {
134134
t.Errorf("add failed: %v", err)
135135
}
@@ -259,7 +259,7 @@ func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister {
259259
}
260260

261261
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
262-
q := NewPriorityQueue(nil, &fakeFramework{})
262+
q := createAndRunPriorityQueue(&fakeFramework{})
263263
if err := q.Add(&medPriorityPod); err != nil {
264264
t.Errorf("add failed: %v", err)
265265
}
@@ -275,7 +275,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
275275
}
276276

277277
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
278-
q := NewPriorityQueue(nil, nil)
278+
q := createAndRunPriorityQueue(nil)
279279
q.Add(&highPriNominatedPod)
280280
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
281281
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
@@ -307,7 +307,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
307307
// Pods in and before current scheduling cycle will be put back to activeQueue
308308
// if we were trying to schedule them when we received move request.
309309
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
310-
q := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(time.Now())))
310+
q := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(time.Now())))
311311
totalNum := 10
312312
expectedPods := make([]v1.Pod, 0, totalNum)
313313
for i := 0; i < totalNum; i++ {
@@ -374,7 +374,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
374374
}
375375

376376
func TestPriorityQueue_Pop(t *testing.T) {
377-
q := NewPriorityQueue(nil, nil)
377+
q := createAndRunPriorityQueue(nil)
378378
wg := sync.WaitGroup{}
379379
wg.Add(1)
380380
go func() {
@@ -391,7 +391,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
391391
}
392392

393393
func TestPriorityQueue_Update(t *testing.T) {
394-
q := NewPriorityQueue(nil, nil)
394+
q := createAndRunPriorityQueue(nil)
395395
q.Update(nil, &highPriorityPod)
396396
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
397397
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
@@ -427,7 +427,7 @@ func TestPriorityQueue_Update(t *testing.T) {
427427
}
428428

429429
func TestPriorityQueue_Delete(t *testing.T) {
430-
q := NewPriorityQueue(nil, nil)
430+
q := createAndRunPriorityQueue(nil)
431431
q.Update(&highPriorityPod, &highPriNominatedPod)
432432
q.Add(&unschedulablePod)
433433
if err := q.Delete(&highPriNominatedPod); err != nil {
@@ -451,7 +451,7 @@ func TestPriorityQueue_Delete(t *testing.T) {
451451
}
452452

453453
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
454-
q := NewPriorityQueue(nil, nil)
454+
q := createAndRunPriorityQueue(nil)
455455
q.Add(&medPriorityPod)
456456
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
457457
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
@@ -497,7 +497,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
497497
Spec: v1.PodSpec{NodeName: "machine1"},
498498
}
499499

500-
q := NewPriorityQueue(nil, nil)
500+
q := createAndRunPriorityQueue(nil)
501501
q.Add(&medPriorityPod)
502502
// Add a couple of pods to the unschedulableQ.
503503
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
@@ -518,7 +518,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
518518
}
519519

520520
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
521-
q := NewPriorityQueue(nil, nil)
521+
q := createAndRunPriorityQueue(nil)
522522
q.Add(&medPriorityPod)
523523
q.Add(&unschedulablePod)
524524
q.Add(&highPriorityPod)
@@ -543,7 +543,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
543543
return pendingSet
544544
}
545545

546-
q := NewPriorityQueue(nil, nil)
546+
q := createAndRunPriorityQueue(nil)
547547
q.Add(&medPriorityPod)
548548
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
549549
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
@@ -559,7 +559,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
559559
}
560560

561561
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
562-
q := NewPriorityQueue(nil, nil)
562+
q := createAndRunPriorityQueue(nil)
563563
if err := q.Add(&medPriorityPod); err != nil {
564564
t.Errorf("add failed: %v", err)
565565
}
@@ -628,8 +628,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
628628
}
629629

630630
func TestPriorityQueue_NewWithOptions(t *testing.T) {
631-
q := NewPriorityQueue(
632-
nil,
631+
q := createAndRunPriorityQueue(
633632
nil,
634633
WithPodInitialBackoffDuration(2*time.Second),
635634
WithPodMaxBackoffDuration(20*time.Second),
@@ -802,7 +801,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
802801
}{
803802
{
804803
name: "PriorityQueue close",
805-
q: NewPriorityQueue(nil, nil),
804+
q: createAndRunPriorityQueue(nil),
806805
expectedErr: fmt.Errorf(queueClosed),
807806
},
808807
}
@@ -831,7 +830,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
831830
// ensures that an unschedulable pod does not block head of the queue when there
832831
// are frequent events that move pods to the active queue.
833832
func TestRecentlyTriedPodsGoBack(t *testing.T) {
834-
q := NewPriorityQueue(nil, nil)
833+
q := createAndRunPriorityQueue(nil)
835834
// Add a few pods to priority queue.
836835
for i := 0; i < 5; i++ {
837836
p := v1.Pod{
@@ -885,7 +884,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
885884
// This behavior ensures that an unschedulable pod does not block head of the queue when there
886885
// are frequent events that move pods to the active queue.
887886
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
888-
q := NewPriorityQueue(nil, nil)
887+
q := createAndRunPriorityQueue(nil)
889888

890889
// Add an unschedulable pod to a priority queue.
891890
// This makes a situation that the pod was tried to schedule
@@ -976,7 +975,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
976975
// TestHighPriorityBackoff tests that a high priority pod does not block
977976
// other pods if it is unschedulable
978977
func TestHighPriorityBackoff(t *testing.T) {
979-
q := NewPriorityQueue(nil, nil)
978+
q := createAndRunPriorityQueue(nil)
980979

981980
midPod := v1.Pod{
982981
ObjectMeta: metav1.ObjectMeta{
@@ -1039,7 +1038,7 @@ func TestHighPriorityBackoff(t *testing.T) {
10391038
// TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to
10401039
// activeQ after one minutes if it is in unschedulableQ
10411040
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
1042-
q := NewPriorityQueue(nil, nil)
1041+
q := createAndRunPriorityQueue(nil)
10431042
midPod := v1.Pod{
10441043
ObjectMeta: metav1.ObjectMeta{
10451044
Name: "test-midpod",
@@ -1236,7 +1235,7 @@ func TestPodTimestamp(t *testing.T) {
12361235

12371236
for _, test := range tests {
12381237
t.Run(test.name, func(t *testing.T) {
1239-
queue := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(timestamp)))
1238+
queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
12401239
var podInfoList []*framework.PodInfo
12411240

12421241
for i, op := range test.operations {
@@ -1403,7 +1402,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
14031402
for _, test := range tests {
14041403
t.Run(test.name, func(t *testing.T) {
14051404
resetMetrics()
1406-
queue := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(timestamp)))
1405+
queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
14071406
for i, op := range test.operations {
14081407
for _, pInfo := range test.operands[i] {
14091408
op(queue, pInfo)
@@ -1432,7 +1431,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
14321431
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
14331432
// Add -> Pop.
14341433
c := clock.NewFakeClock(timestamp)
1435-
queue := NewPriorityQueue(nil, nil, WithClock(c))
1434+
queue := createAndRunPriorityQueue(nil, WithClock(c))
14361435
queue.Add(pod)
14371436
pInfo, err := queue.Pop()
14381437
if err != nil {
@@ -1443,7 +1442,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
14431442
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
14441443
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
14451444
c = clock.NewFakeClock(timestamp)
1446-
queue = NewPriorityQueue(nil, nil, WithClock(c))
1445+
queue = createAndRunPriorityQueue(nil, WithClock(c))
14471446
queue.Add(pod)
14481447
pInfo, err = queue.Pop()
14491448
if err != nil {
@@ -1463,7 +1462,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
14631462
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
14641463
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
14651464
c = clock.NewFakeClock(timestamp)
1466-
queue = NewPriorityQueue(nil, nil, WithClock(c))
1465+
queue = createAndRunPriorityQueue(nil, WithClock(c))
14671466
queue.Add(pod)
14681467
pInfo, err = queue.Pop()
14691468
if err != nil {
@@ -1561,9 +1560,9 @@ func TestIncomingPodsMetrics(t *testing.T) {
15611560
for _, test := range tests {
15621561
t.Run(test.name, func(t *testing.T) {
15631562
metrics.SchedulerQueueIncomingPods.Reset()
1564-
stop := make(chan struct{})
1565-
close(stop) // Stop the periodic flush
1566-
queue := NewPriorityQueue(stop, nil, WithClock(clock.NewFakeClock(timestamp)))
1563+
queue := NewPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
1564+
queue.Close()
1565+
queue.Run()
15671566
for _, op := range test.operations {
15681567
for _, pInfo := range pInfos {
15691568
op(queue, pInfo)
@@ -1586,3 +1585,9 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Po
15861585
t.Errorf("[%s] Pod initial schedule attempt timestamp unexpected, got %v, want %v", name, pInfo.InitialAttemptTimestamp, wantInitialAttemptTs)
15871586
}
15881587
}
1588+
1589+
func createAndRunPriorityQueue(fwk framework.Framework, opts ...Option) *PriorityQueue {
1590+
q := NewPriorityQueue(fwk, opts...)
1591+
q.Run()
1592+
return q
1593+
}

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,9 @@ func (sched *Scheduler) Run(ctx context.Context) {
420420
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
421421
return
422422
}
423-
423+
sched.SchedulingQueue.Run()
424424
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
425+
sched.SchedulingQueue.Close()
425426
}
426427

427428
// recordFailedSchedulingEvent records an event for the pod that indicates the

0 commit comments

Comments
 (0)