Skip to content

Commit 59673f0

Browse files
authored
Merge pull request kubernetes#125578 from nayihz/fix_sche_queue_update
skip update pod that exist in scheduling cycle
2 parents 2200f5e + 26dcab1 commit 59673f0

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,25 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
987987
p.lock.Lock()
988988
defer p.lock.Unlock()
989989

990+
if p.isSchedulingQueueHintEnabled {
991+
// the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
992+
if _, ok := p.inFlightPods[newPod.UID]; ok {
993+
logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod))
994+
995+
// Record this update as Pod/Update because
996+
// this update may make the Pod schedulable in case it gets rejected and comes back to the queue.
997+
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
998+
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
999+
p.inFlightEvents.PushBack(&clusterEvent{
1000+
event: UnscheduledPodUpdate,
1001+
oldObj: oldPod,
1002+
newObj: newPod,
1003+
})
1004+
1005+
return nil
1006+
}
1007+
}
1008+
9901009
if oldPod != nil {
9911010
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
9921011
// If the pod is already in the active queue, just update it there.

pkg/scheduler/internal/queue/scheduling_queue_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,7 @@ func TestPriorityQueue_Update(t *testing.T) {
994994
},
995995
}
996996

997+
notInAnyQueue := "NotInAnyQueue"
997998
tests := []struct {
998999
name string
9991000
wantQ string
@@ -1093,6 +1094,25 @@ func TestPriorityQueue_Update(t *testing.T) {
10931094
},
10941095
schedulingHintsEnablement: []bool{true},
10951096
},
1097+
{
1098+
name: "when updating a pod which is in flightPods, the pod will not be added to any queue",
1099+
wantQ: notInAnyQueue,
1100+
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
1101+
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
1102+
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
1103+
err := q.activeQ.Add(podInfo)
1104+
if err != nil {
1105+
t.Errorf("unexpected error from activeQ.Add: %v", err)
1106+
}
1107+
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
1108+
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
1109+
}
1110+
updatedPod := medPriorityPodInfo.Pod.DeepCopy()
1111+
updatedPod.Annotations["foo"] = "bar"
1112+
return medPriorityPodInfo.Pod, updatedPod
1113+
},
1114+
schedulingHintsEnablement: []bool{true},
1115+
},
10961116
}
10971117

10981118
for _, tt := range tests {
@@ -1135,6 +1155,11 @@ func TestPriorityQueue_Update(t *testing.T) {
11351155
pInfo = pInfoFromUnsched
11361156
}
11371157

1158+
if tt.wantQ == notInAnyQueue {
1159+
// skip the rest of the test if pod is not expected to be in any of the queues.
1160+
return
1161+
}
1162+
11381163
if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" {
11391164
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
11401165
}
@@ -1148,6 +1173,66 @@ func TestPriorityQueue_Update(t *testing.T) {
11481173
}
11491174
}
11501175

1176+
// TestPriorityQueue_UpdateWhenInflight ensures to requeue a Pod back to activeQ/backoffQ
1177+
// if it actually got an update that may make it schedulable while being scheduled.
1178+
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
1179+
func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
1180+
logger, ctx := ktesting.NewTestContext(t)
1181+
ctx, cancel := context.WithCancel(ctx)
1182+
defer cancel()
1183+
1184+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)
1185+
m := makeEmptyQueueingHintMapPerProfile()
1186+
// fakePlugin could change its scheduling result by any updates in Pods.
1187+
m[""][UnscheduledPodUpdate] = []*QueueingHintFunction{
1188+
{
1189+
PluginName: "fakePlugin",
1190+
QueueingHintFn: queueHintReturnQueue,
1191+
},
1192+
}
1193+
c := testingclock.NewFakeClock(time.Now())
1194+
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(c))
1195+
1196+
// test-pod is created and popped out from the queue
1197+
testPod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj()
1198+
if err := q.Add(logger, testPod); err != nil {
1199+
t.Errorf("add failed: %v", err)
1200+
}
1201+
if p, err := q.Pop(logger); err != nil || p.Pod != testPod {
1202+
t.Errorf("Expected: %v after Pop, but got: %v", testPod.Name, p.Pod.Name)
1203+
}
1204+
1205+
// testPod is updated while being scheduled.
1206+
updatedPod := testPod.DeepCopy()
1207+
updatedPod.Spec.Tolerations = []v1.Toleration{
1208+
{
1209+
Key: "foo",
1210+
Effect: v1.TaintEffectNoSchedule,
1211+
},
1212+
}
1213+
1214+
if err := q.Update(logger, testPod, updatedPod); err != nil {
1215+
t.Error("Error calling Update")
1216+
}
1217+
// test-pod got rejected by fakePlugin,
1218+
// but the update event that it just got may change this scheduling result,
1219+
// and hence we should put this pod to activeQ/backoffQ.
1220+
err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(updatedPod, "fakePlugin"), q.SchedulingCycle())
1221+
if err != nil {
1222+
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
1223+
}
1224+
1225+
var pInfo *framework.QueuedPodInfo
1226+
if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)); !exists {
1227+
t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name)
1228+
} else {
1229+
pInfo = obj.(*framework.QueuedPodInfo)
1230+
}
1231+
if diff := cmp.Diff(updatedPod, pInfo.PodInfo.Pod); diff != "" {
1232+
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
1233+
}
1234+
}
1235+
11511236
func TestPriorityQueue_Delete(t *testing.T) {
11521237
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
11531238
logger, ctx := ktesting.NewTestContext(t)

0 commit comments

Comments
 (0)