Skip to content

Commit dfb763b

Browse files
authored
Merge pull request kubernetes#127016 from sanposhiho/multiple-inflightpods
fix: discard a pod at Pop() when the pod is being scheduled
2 parents e6f7b35 + 6d357d2 commit dfb763b

File tree

2 files changed

+136
-46
lines changed

2 files changed

+136
-46
lines changed

pkg/scheduler/backend/queue/active_queue.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error {
186186
func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
187187
aq.lock.Lock()
188188
defer aq.lock.Unlock()
189+
190+
return aq.unlockedPop(logger)
191+
}
192+
193+
func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
189194
for aq.queue.Len() == 0 {
190195
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
191196
// When Close() is called, the p.closed is set and the condition is broadcast,
@@ -201,12 +206,22 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
201206
return nil, err
202207
}
203208
pInfo.Attempts++
204-
aq.schedCycle++
205209
// In flight, no concurrent events yet.
206210
if aq.isSchedulingQueueHintEnabled {
211+
// If the pod is already in the map, we shouldn't overwrite the inFlightPods otherwise it'd lead to a memory leak.
212+
// https://github.com/kubernetes/kubernetes/pull/127016
213+
if _, ok := aq.inFlightPods[pInfo.Pod.UID]; ok {
214+
// Just report it as an error, but no need to stop the scheduler
215+
// because it likely doesn't cause any visible issues from the scheduling perspective.
216+
logger.Error(nil, "the same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod))
217+
// Just ignore/discard this duplicated pod and try to pop the next one.
218+
return aq.unlockedPop(logger)
219+
}
220+
207221
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false)
208222
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
209223
}
224+
aq.schedCycle++
210225

211226
// Update metrics and reset the set of unschedulable plugins for the next attempt.
212227
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {

pkg/scheduler/backend/queue/scheduling_queue_test.go

Lines changed: 120 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
173173

174174
func Test_InFlightPods(t *testing.T) {
175175
logger, _ := ktesting.NewTestContext(t)
176-
pod := st.MakePod().Name("targetpod").UID("pod1").Obj()
176+
pod1 := st.MakePod().Name("targetpod").UID("pod1").Obj()
177177
pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj()
178178
pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj()
179179
var poppedPod, poppedPod2 *framework.QueuedPodInfo
@@ -182,8 +182,11 @@ func Test_InFlightPods(t *testing.T) {
182182
// ONLY ONE of the following should be set.
183183
eventHappens *framework.ClusterEvent
184184
podPopped *v1.Pod
185-
podEnqueued *framework.QueuedPodInfo
186-
callback func(t *testing.T, q *PriorityQueue)
185+
// podCreated is the Pod that is created and inserted into the activeQ.
186+
podCreated *v1.Pod
187+
// podEnqueued is the Pod that is enqueued back to activeQ.
188+
podEnqueued *framework.QueuedPodInfo
189+
callback func(t *testing.T, q *PriorityQueue)
187190
}
188191

189192
tests := []struct {
@@ -201,10 +204,10 @@ func Test_InFlightPods(t *testing.T) {
201204
}{
202205
{
203206
name: "when SchedulingQueueHint is disabled, inFlightPods and inFlightEvents should be empty",
204-
initialPods: []*v1.Pod{pod},
207+
initialPods: []*v1.Pod{pod1},
205208
actions: []action{
206209
// This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled.
207-
{podPopped: pod},
210+
{podPopped: pod1},
208211
// This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled.
209212
{eventHappens: &framework.PvAdd},
210213
},
@@ -224,18 +227,18 @@ func Test_InFlightPods(t *testing.T) {
224227
{
225228
name: "Pod and interested events are registered in inFlightPods/inFlightEvents",
226229
isSchedulingQueueHintEnabled: true,
227-
initialPods: []*v1.Pod{pod},
230+
initialPods: []*v1.Pod{pod1},
228231
actions: []action{
229232
// This won't be added to inFlightEvents because no inFlightPods at this point.
230233
{eventHappens: &framework.PvcAdd},
231-
{podPopped: pod},
234+
{podPopped: pod1},
232235
// This gets added for the pod.
233236
{eventHappens: &framework.PvAdd},
234237
// This doesn't get added because no plugin is interested in framework.PvUpdate.
235238
{eventHappens: &framework.PvUpdate},
236239
},
237-
wantInFlightPods: []*v1.Pod{pod},
238-
wantInFlightEvents: []interface{}{pod, framework.PvAdd},
240+
wantInFlightPods: []*v1.Pod{pod1},
241+
wantInFlightEvents: []interface{}{pod1, framework.PvAdd},
239242
queueingHintMap: QueueingHintMapPerProfile{
240243
"": {
241244
framework.PvAdd: {
@@ -250,16 +253,16 @@ func Test_InFlightPods(t *testing.T) {
250253
{
251254
name: "Pod, registered in inFlightPods, is enqueued back to activeQ",
252255
isSchedulingQueueHintEnabled: true,
253-
initialPods: []*v1.Pod{pod, pod2},
256+
initialPods: []*v1.Pod{pod1, pod2},
254257
actions: []action{
255258
// This won't be added to inFlightEvents because no inFlightPods at this point.
256259
{eventHappens: &framework.PvcAdd},
257-
{podPopped: pod},
260+
{podPopped: pod1},
258261
{eventHappens: &framework.PvAdd},
259262
{podPopped: pod2},
260263
{eventHappens: &framework.NodeAdd},
261264
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
262-
{podEnqueued: newQueuedPodInfoForLookup(pod)},
265+
{podEnqueued: newQueuedPodInfoForLookup(pod1)},
263266
},
264267
wantBackoffQPodNames: []string{"targetpod"},
265268
wantInFlightPods: []*v1.Pod{pod2}, // only pod2 is registered because pod is already enqueued back.
@@ -290,16 +293,16 @@ func Test_InFlightPods(t *testing.T) {
290293
{
291294
name: "All Pods registered in inFlightPods are enqueued back to activeQ",
292295
isSchedulingQueueHintEnabled: true,
293-
initialPods: []*v1.Pod{pod, pod2},
296+
initialPods: []*v1.Pod{pod1, pod2},
294297
actions: []action{
295298
// This won't be added to inFlightEvents because no inFlightPods at this point.
296299
{eventHappens: &framework.PvcAdd},
297-
{podPopped: pod},
300+
{podPopped: pod1},
298301
{eventHappens: &framework.PvAdd},
299302
{podPopped: pod2},
300303
{eventHappens: &framework.NodeAdd},
301304
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
302-
{podEnqueued: newQueuedPodInfoForLookup(pod)},
305+
{podEnqueued: newQueuedPodInfoForLookup(pod1)},
303306
{eventHappens: &framework.CSINodeUpdate},
304307
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
305308
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
@@ -338,11 +341,11 @@ func Test_InFlightPods(t *testing.T) {
338341
{
339342
name: "One intermediate Pod registered in inFlightPods is enqueued back to activeQ",
340343
isSchedulingQueueHintEnabled: true,
341-
initialPods: []*v1.Pod{pod, pod2, pod3},
344+
initialPods: []*v1.Pod{pod1, pod2, pod3},
342345
actions: []action{
343346
// This won't be added to inFlightEvents because no inFlightPods at this point.
344347
{eventHappens: &framework.PvcAdd},
345-
{podPopped: pod},
348+
{podPopped: pod1},
346349
{eventHappens: &framework.PvAdd},
347350
{podPopped: pod2},
348351
{eventHappens: &framework.NodeAdd},
@@ -352,8 +355,8 @@ func Test_InFlightPods(t *testing.T) {
352355
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
353356
},
354357
wantBackoffQPodNames: []string{"targetpod2"},
355-
wantInFlightPods: []*v1.Pod{pod, pod3},
356-
wantInFlightEvents: []interface{}{pod, framework.PvAdd, framework.NodeAdd, pod3, framework.AssignedPodAdd},
358+
wantInFlightPods: []*v1.Pod{pod1, pod3},
359+
wantInFlightEvents: []interface{}{pod1, framework.PvAdd, framework.NodeAdd, pod3, framework.AssignedPodAdd},
357360
queueingHintMap: QueueingHintMapPerProfile{
358361
"": {
359362
framework.PvAdd: {
@@ -379,11 +382,11 @@ func Test_InFlightPods(t *testing.T) {
379382
},
380383
{
381384
name: "pod is enqueued to queue without QueueingHint when SchedulingQueueHint is disabled",
382-
initialPods: []*v1.Pod{pod},
385+
initialPods: []*v1.Pod{pod1},
383386
actions: []action{
384-
{podPopped: pod},
387+
{podPopped: pod1},
385388
{eventHappens: &framework.AssignedPodAdd},
386-
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
389+
{podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")},
387390
},
388391
wantBackoffQPodNames: []string{"targetpod"},
389392
wantInFlightPods: nil,
@@ -404,13 +407,13 @@ func Test_InFlightPods(t *testing.T) {
404407
{
405408
name: "events before popping Pod are ignored when Pod is enqueued back to queue",
406409
isSchedulingQueueHintEnabled: true,
407-
initialPods: []*v1.Pod{pod},
410+
initialPods: []*v1.Pod{pod1},
408411
actions: []action{
409412
{eventHappens: &framework.WildCardEvent},
410-
{podPopped: pod},
413+
{podPopped: pod1},
411414
{eventHappens: &framework.AssignedPodAdd},
412415
// This Pod won't be requeued to activeQ/backoffQ because fooPlugin1 returns QueueSkip.
413-
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
416+
{podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")},
414417
},
415418
wantUnschedPodPoolPodNames: []string{"targetpod"},
416419
wantInFlightPods: nil,
@@ -431,11 +434,11 @@ func Test_InFlightPods(t *testing.T) {
431434
{
432435
name: "pod is enqueued to backoff if no failed plugin",
433436
isSchedulingQueueHintEnabled: true,
434-
initialPods: []*v1.Pod{pod},
437+
initialPods: []*v1.Pod{pod1},
435438
actions: []action{
436-
{podPopped: pod},
439+
{podPopped: pod1},
437440
{eventHappens: &framework.AssignedPodAdd},
438-
{podEnqueued: newQueuedPodInfoForLookup(pod)},
441+
{podEnqueued: newQueuedPodInfoForLookup(pod1)},
439442
},
440443
wantBackoffQPodNames: []string{"targetpod"},
441444
wantInFlightPods: nil,
@@ -455,11 +458,11 @@ func Test_InFlightPods(t *testing.T) {
455458
{
456459
name: "pod is enqueued to unschedulable pod pool if no events that can make the pod schedulable",
457460
isSchedulingQueueHintEnabled: true,
458-
initialPods: []*v1.Pod{pod},
461+
initialPods: []*v1.Pod{pod1},
459462
actions: []action{
460-
{podPopped: pod},
463+
{podPopped: pod1},
461464
{eventHappens: &framework.NodeAdd},
462-
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
465+
{podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")},
463466
},
464467
wantUnschedPodPoolPodNames: []string{"targetpod"},
465468
wantInFlightPods: nil,
@@ -480,11 +483,11 @@ func Test_InFlightPods(t *testing.T) {
480483
{
481484
name: "pod is enqueued to unschedulable pod pool because the failed plugin has a hint fn but it returns Skip",
482485
isSchedulingQueueHintEnabled: true,
483-
initialPods: []*v1.Pod{pod},
486+
initialPods: []*v1.Pod{pod1},
484487
actions: []action{
485-
{podPopped: pod},
488+
{podPopped: pod1},
486489
{eventHappens: &framework.AssignedPodAdd},
487-
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
490+
{podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")},
488491
},
489492
wantUnschedPodPoolPodNames: []string{"targetpod"},
490493
wantInFlightPods: nil,
@@ -505,12 +508,12 @@ func Test_InFlightPods(t *testing.T) {
505508
{
506509
name: "pod is enqueued to activeQ because the Pending plugins has a hint fn and it returns Queue",
507510
isSchedulingQueueHintEnabled: true,
508-
initialPods: []*v1.Pod{pod},
511+
initialPods: []*v1.Pod{pod1},
509512
actions: []action{
510-
{podPopped: pod},
513+
{podPopped: pod1},
511514
{eventHappens: &framework.AssignedPodAdd},
512515
{podEnqueued: &framework.QueuedPodInfo{
513-
PodInfo: mustNewPodInfo(pod),
516+
PodInfo: mustNewPodInfo(pod1),
514517
UnschedulablePlugins: sets.New("fooPlugin2", "fooPlugin3"),
515518
PendingPlugins: sets.New("fooPlugin1"),
516519
}},
@@ -541,11 +544,11 @@ func Test_InFlightPods(t *testing.T) {
541544
{
542545
name: "pod is enqueued to backoffQ because the failed plugin has a hint fn and it returns Queue",
543546
isSchedulingQueueHintEnabled: true,
544-
initialPods: []*v1.Pod{pod},
547+
initialPods: []*v1.Pod{pod1},
545548
actions: []action{
546-
{podPopped: pod},
549+
{podPopped: pod1},
547550
{eventHappens: &framework.AssignedPodAdd},
548-
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")},
551+
{podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1", "fooPlugin2")},
549552
},
550553
wantBackoffQPodNames: []string{"targetpod"},
551554
wantInFlightPods: nil,
@@ -570,9 +573,9 @@ func Test_InFlightPods(t *testing.T) {
570573
{
571574
name: "pod is enqueued to activeQ because the failed plugin has a hint fn and it returns Queue for a concurrent event that was received while some other pod was in flight",
572575
isSchedulingQueueHintEnabled: true,
573-
initialPods: []*v1.Pod{pod, pod2},
576+
initialPods: []*v1.Pod{pod1, pod2},
574577
actions: []action{
575-
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }},
578+
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod1) }},
576579
{eventHappens: &framework.NodeAdd},
577580
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, logger, q, pod2) }},
578581
{eventHappens: &framework.AssignedPodAdd},
@@ -622,9 +625,9 @@ func Test_InFlightPods(t *testing.T) {
622625
{
623626
name: "popped pod must have empty UnschedulablePlugins and PendingPlugins",
624627
isSchedulingQueueHintEnabled: true,
625-
initialPods: []*v1.Pod{pod},
628+
initialPods: []*v1.Pod{pod1},
626629
actions: []action{
627-
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }},
630+
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod1) }},
628631
{callback: func(t *testing.T, q *PriorityQueue) {
629632
logger, _ := ktesting.NewTestContext(t)
630633
// Unschedulable due to PendingPlugins.
@@ -636,7 +639,7 @@ func Test_InFlightPods(t *testing.T) {
636639
}},
637640
{eventHappens: &framework.PvAdd}, // Active again.
638641
{callback: func(t *testing.T, q *PriorityQueue) {
639-
poppedPod = popPod(t, logger, q, pod)
642+
poppedPod = popPod(t, logger, q, pod1)
640643
if len(poppedPod.UnschedulablePlugins) > 0 {
641644
t.Errorf("QueuedPodInfo from Pop should have empty UnschedulablePlugins, got instead: %+v", poppedPod)
642645
}
@@ -661,6 +664,76 @@ func Test_InFlightPods(t *testing.T) {
661664
},
662665
},
663666
},
667+
{
668+
// This scenario shouldn't happen unless we make the similar bug like https://github.com/kubernetes/kubernetes/issues/118226.
669+
// But, given the bug could make a serious memory leak and likely would be hard to detect,
670+
// we should have a safe guard from the same bug so that, at least, we can prevent the memory leak.
671+
name: "Pop is made twice for the same Pod, but the cleanup still happen correctly",
672+
isSchedulingQueueHintEnabled: true,
673+
initialPods: []*v1.Pod{pod1, pod2},
674+
actions: []action{
675+
// This won't be added to inFlightEvents because no inFlightPods at this point.
676+
{eventHappens: &framework.PvcAdd},
677+
{podPopped: pod1},
678+
{eventHappens: &framework.PvAdd},
679+
{podPopped: pod2},
680+
// Simulate a bug, putting pod into activeQ, while pod is being scheduled.
681+
{callback: func(t *testing.T, q *PriorityQueue) {
682+
q.activeQ.underLock(func(unlocked unlockedActiveQueuer) {
683+
unlocked.AddOrUpdate(newQueuedPodInfoForLookup(pod1))
684+
})
685+
}},
686+
// At this point, in the activeQ, we have pod1 and pod3 in this order.
687+
{podCreated: pod3},
688+
// pod3 is poped, not pod1.
689+
// In detail, this Pop() first tries to pop pod1, but it's already being scheduled and hence discarded.
690+
// Then, it pops the next pod, pod3.
691+
{podPopped: pod3},
692+
{callback: func(t *testing.T, q *PriorityQueue) {
693+
// Make sure that pod1 is discarded and hence no pod in activeQ.
694+
if len(q.activeQ.list()) != 0 {
695+
t.Fatalf("activeQ should be empty, but got: %v", q.activeQ.list())
696+
}
697+
}},
698+
{eventHappens: &framework.NodeAdd},
699+
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
700+
{podEnqueued: newQueuedPodInfoForLookup(pod1)},
701+
{eventHappens: &framework.CSINodeUpdate},
702+
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
703+
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
704+
{podEnqueued: newQueuedPodInfoForLookup(pod3)},
705+
},
706+
wantBackoffQPodNames: []string{"targetpod", "targetpod2", "targetpod3"},
707+
wantInFlightPods: nil, // should be empty
708+
queueingHintMap: QueueingHintMapPerProfile{
709+
"": {
710+
framework.PvAdd: {
711+
{
712+
PluginName: "fooPlugin1",
713+
QueueingHintFn: queueHintReturnQueue,
714+
},
715+
},
716+
framework.NodeAdd: {
717+
{
718+
PluginName: "fooPlugin1",
719+
QueueingHintFn: queueHintReturnQueue,
720+
},
721+
},
722+
framework.PvcAdd: {
723+
{
724+
PluginName: "fooPlugin1",
725+
QueueingHintFn: queueHintReturnQueue,
726+
},
727+
},
728+
framework.CSINodeUpdate: {
729+
{
730+
PluginName: "fooPlugin1",
731+
QueueingHintFn: queueHintReturnQueue,
732+
},
733+
},
734+
},
735+
},
736+
},
664737
}
665738

666739
for _, test := range tests {
@@ -692,6 +765,8 @@ func Test_InFlightPods(t *testing.T) {
692765

693766
for _, action := range test.actions {
694767
switch {
768+
case action.podCreated != nil:
769+
q.Add(logger, action.podCreated)
695770
case action.podPopped != nil:
696771
popPod(t, logger, q, action.podPopped)
697772
case action.eventHappens != nil:

0 commit comments

Comments
 (0)