Skip to content

Commit 4deb4f2

Browse files
committed
Trigger rescheduling on delete event also when unscheduled pod is removed
1 parent 4806519 commit 4deb4f2

File tree

10 files changed

+372
-43
lines changed

10 files changed

+372
-43
lines changed

pkg/scheduler/eventhandlers.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,16 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
160160

161161
logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod))
162162
sched.SchedulingQueue.Update(logger, oldPod, newPod)
163+
if hasNominatedNodeNameChanged(oldPod, newPod) {
164+
// Nominated node changed in pod, so we need to treat it as if the pod was deleted from the old nominated node,
165+
// because the scheduler treats such a pod as if it was already assigned when scheduling lower or equal priority pods.
166+
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, oldPod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(oldPod)))
167+
}
168+
}
169+
170+
// hasNominatedNodeNameChanged returns true when nominated node name has existed but changed.
171+
func hasNominatedNodeNameChanged(oldPod, newPod *v1.Pod) bool {
172+
return len(oldPod.Status.NominatedNodeName) > 0 && oldPod.Status.NominatedNodeName != newPod.Status.NominatedNodeName
163173
}
164174

165175
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
@@ -195,8 +205,21 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
195205
// If a waiting pod is rejected, it indicates it's previously assumed and we're
196206
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
197207
// event to immediately retry some unscheduled Pods.
208+
// Similarly when a pod that had nominated node is deleted, it can unblock scheduling of other pods,
209+
// because the lower or equal priority pods treat such a pod as if it was assigned.
198210
if fwk.RejectWaitingPod(pod.UID) {
199211
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil)
212+
} else if pod.Status.NominatedNodeName != "" {
213+
// Note that a nominated pod can fall into `RejectWaitingPod` case as well,
214+
// but in that case the `MoveAllToActiveOrBackoffQueue` already covered lower priority pods.
215+
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(pod)))
216+
}
217+
}
218+
219+
// getLEPriorityPreCheck is a PreEnqueueCheck function that selects only lower or equal priority pods.
220+
func getLEPriorityPreCheck(priority int32) queue.PreEnqueueCheck {
221+
return func(pod *v1.Pod) bool {
222+
return corev1helpers.PodPriority(pod) <= priority
200223
}
201224
}
202225

pkg/scheduler/eventhandlers_test.go

Lines changed: 158 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package scheduler
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"reflect"
2223
"testing"
2324
"time"
@@ -30,8 +31,10 @@ import (
3031
resourceapi "k8s.io/api/resource/v1beta1"
3132
storagev1 "k8s.io/api/storage/v1"
3233
"k8s.io/apimachinery/pkg/api/resource"
34+
"k8s.io/apimachinery/pkg/util/sets"
3335
utilfeature "k8s.io/apiserver/pkg/util/feature"
3436
featuregatetesting "k8s.io/component-base/featuregate/testing"
37+
"k8s.io/klog/v2"
3538
"k8s.io/klog/v2/ktesting"
3639

3740
"k8s.io/apimachinery/pkg/runtime"
@@ -42,18 +45,168 @@ import (
4245
"k8s.io/client-go/kubernetes/fake"
4346

4447
"k8s.io/kubernetes/pkg/features"
45-
"k8s.io/kubernetes/pkg/scheduler/backend/cache"
46-
"k8s.io/kubernetes/pkg/scheduler/backend/queue"
48+
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
49+
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
4750
"k8s.io/kubernetes/pkg/scheduler/framework"
4851
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
4952
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
5053
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
5154
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
55+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
5256
"k8s.io/kubernetes/pkg/scheduler/metrics"
5357
st "k8s.io/kubernetes/pkg/scheduler/testing"
58+
"k8s.io/kubernetes/pkg/scheduler/util"
5459
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
5560
)
5661

62+
func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
63+
metrics.Register()
64+
highPriorityPod :=
65+
st.MakePod().Name("hpp").Namespace("ns1").UID("hppns1").Priority(highPriority).SchedulerName(testSchedulerName).Obj()
66+
67+
medNominatedPriorityPod :=
68+
st.MakePod().Name("mpp").Namespace("ns2").UID("mppns1").Priority(midPriority).SchedulerName(testSchedulerName).NominatedNodeName("node1").Obj()
69+
medPriorityPod :=
70+
st.MakePod().Name("smpp").Namespace("ns3").UID("mppns2").Priority(midPriority).SchedulerName(testSchedulerName).Obj()
71+
72+
lowPriorityPod :=
73+
st.MakePod().Name("lpp").Namespace("ns4").UID("lppns1").Priority(lowPriority).SchedulerName(testSchedulerName).Obj()
74+
75+
unschedulablePods := []*v1.Pod{highPriorityPod, medNominatedPriorityPod, medPriorityPod, lowPriorityPod}
76+
77+
// Make pods schedulable on Delete event when QHints are enabled, but not when nominated node appears.
78+
queueHintForPodDelete := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
79+
oldPod, _, err := util.As[*v1.Pod](oldObj, newObj)
80+
if err != nil {
81+
t.Errorf("Failed to convert objects to pods: %v", err)
82+
}
83+
if oldPod.Status.NominatedNodeName == "" {
84+
return framework.QueueSkip, nil
85+
}
86+
return framework.Queue, nil
87+
}
88+
queueingHintMap := internalqueue.QueueingHintMapPerProfile{
89+
testSchedulerName: {
90+
framework.EventAssignedPodDelete: {
91+
{
92+
PluginName: "fooPlugin1",
93+
QueueingHintFn: queueHintForPodDelete,
94+
},
95+
},
96+
},
97+
}
98+
99+
tests := []struct {
100+
name string
101+
updateFunc func(s *Scheduler)
102+
wantInActive sets.Set[string]
103+
}{
104+
{
105+
name: "Update of a nominated node name to a different value should trigger rescheduling of lower priority pods",
106+
updateFunc: func(s *Scheduler) {
107+
updatedPod := medNominatedPriorityPod.DeepCopy()
108+
updatedPod.Status.NominatedNodeName = "node2"
109+
updatedPod.ResourceVersion = "1"
110+
s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod)
111+
},
112+
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
113+
},
114+
{
115+
name: "Removal of a nominated node name should trigger rescheduling of lower priority pods",
116+
updateFunc: func(s *Scheduler) {
117+
updatedPod := medNominatedPriorityPod.DeepCopy()
118+
updatedPod.Status.NominatedNodeName = ""
119+
updatedPod.ResourceVersion = "1"
120+
s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod)
121+
},
122+
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
123+
},
124+
{
125+
name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods",
126+
updateFunc: func(s *Scheduler) {
127+
s.deletePodFromSchedulingQueue(medNominatedPriorityPod)
128+
},
129+
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name),
130+
},
131+
{
132+
name: "Addition of a nominated node name to the high priority pod that did not have it before shouldn't trigger rescheduling",
133+
updateFunc: func(s *Scheduler) {
134+
updatedPod := highPriorityPod.DeepCopy()
135+
updatedPod.Status.NominatedNodeName = "node2"
136+
updatedPod.ResourceVersion = "1"
137+
s.updatePodInSchedulingQueue(highPriorityPod, updatedPod)
138+
},
139+
wantInActive: sets.New[string](),
140+
},
141+
}
142+
143+
for _, tt := range tests {
144+
for _, qHintEnabled := range []bool{false, true} {
145+
t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) {
146+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
147+
148+
logger, ctx := ktesting.NewTestContext(t)
149+
ctx, cancel := context.WithCancel(ctx)
150+
defer cancel()
151+
152+
var objs []runtime.Object
153+
for _, pod := range unschedulablePods {
154+
objs = append(objs, pod)
155+
}
156+
client := fake.NewClientset(objs...)
157+
informerFactory := informers.NewSharedInformerFactory(client, 0)
158+
159+
recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done())
160+
queue := internalqueue.NewPriorityQueue(
161+
newDefaultQueueSort(),
162+
informerFactory,
163+
internalqueue.WithMetricsRecorder(*recorder),
164+
internalqueue.WithQueueingHintMapPerProfile(queueingHintMap),
165+
// disable backoff queue
166+
internalqueue.WithPodInitialBackoffDuration(0),
167+
internalqueue.WithPodMaxBackoffDuration(0))
168+
schedulerCache := internalcache.New(ctx, 30*time.Second)
169+
170+
// Put test pods into unschedulable queue
171+
for _, pod := range unschedulablePods {
172+
queue.Add(logger, pod)
173+
poppedPod, err := queue.Pop(logger)
174+
if err != nil {
175+
t.Fatalf("Pop failed: %v", err)
176+
}
177+
poppedPod.UnschedulablePlugins = sets.New("fooPlugin1")
178+
if err := queue.AddUnschedulableIfNotPresent(logger, poppedPod, queue.SchedulingCycle()); err != nil {
179+
t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
180+
}
181+
}
182+
183+
s, _, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
184+
if err != nil {
185+
t.Fatalf("Failed to initialize test scheduler: %v", err)
186+
}
187+
188+
if len(s.SchedulingQueue.PodsInActiveQ()) > 0 {
189+
t.Errorf("No pods were expected to be in the activeQ before the update, but there were %v", s.SchedulingQueue.PodsInActiveQ())
190+
}
191+
tt.updateFunc(s)
192+
if len(s.SchedulingQueue.PodsInActiveQ()) != len(tt.wantInActive) {
193+
t.Errorf("Different number of pods were expected to be in the activeQ, but found actual %v vs. expected %v", s.SchedulingQueue.PodsInActiveQ(), tt.wantInActive)
194+
}
195+
for _, pod := range s.SchedulingQueue.PodsInActiveQ() {
196+
if !tt.wantInActive.Has(pod.Name) {
197+
t.Errorf("Found unexpected pod in activeQ: %s", pod.Name)
198+
}
199+
}
200+
})
201+
}
202+
}
203+
}
204+
205+
func newDefaultQueueSort() framework.LessFunc {
206+
sort := &queuesort.PrioritySort{}
207+
return sort.Less
208+
}
209+
57210
func TestUpdatePodInCache(t *testing.T) {
58211
ttl := 10 * time.Second
59212
nodeName := "node"
@@ -81,8 +234,8 @@ func TestUpdatePodInCache(t *testing.T) {
81234
ctx, cancel := context.WithCancel(ctx)
82235
defer cancel()
83236
sched := &Scheduler{
84-
Cache: cache.New(ctx, ttl),
85-
SchedulingQueue: queue.NewTestQueue(ctx, nil),
237+
Cache: internalcache.New(ctx, ttl),
238+
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
86239
logger: logger,
87240
}
88241
sched.addPodToCache(tt.oldObj)
@@ -354,7 +507,7 @@ func TestAddAllEventHandlers(t *testing.T) {
354507
defer cancel()
355508

356509
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0)
357-
schedulingQueue := queue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
510+
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
358511
testSched := Scheduler{
359512
StopEverything: ctx.Done(),
360513
SchedulingQueue: schedulingQueue,

pkg/scheduler/framework/plugins/nodeports/node_ports.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Po
143143
}
144144

145145
// If the deleted pod is unscheduled, it doesn't make the target pod schedulable.
146-
if deletedPod.Spec.NodeName == "" {
146+
if deletedPod.Spec.NodeName == "" && deletedPod.Status.NominatedNodeName == "" {
147147
logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod))
148148
return framework.QueueSkip, nil
149149
}

pkg/scheduler/framework/plugins/noderesources/fit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (f *Fit) isSchedulableAfterPodEvent(logger klog.Logger, pod *v1.Pod, oldObj
294294
}
295295

296296
if modifiedPod == nil {
297-
if originalPod.Spec.NodeName == "" {
297+
if originalPod.Spec.NodeName == "" && originalPod.Status.NominatedNodeName == "" {
298298
logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
299299
return framework.QueueSkip, nil
300300
}

pkg/scheduler/framework/plugins/nodevolumelimits/csi.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Po
104104
return framework.QueueSkip, nil
105105
}
106106

107-
if deletedPod.Spec.NodeName == "" {
107+
if deletedPod.Spec.NodeName == "" && deletedPod.Status.NominatedNodeName == "" {
108108
return framework.QueueSkip, nil
109109
}
110110

pkg/scheduler/framework/runtime/framework.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
10111011
nodeInfoToUse := info
10121012
if i == 0 {
10131013
var err error
1014-
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
1014+
podsAdded, stateToUse, nodeInfoToUse, err = addGENominatedPods(ctx, f, pod, state, info)
10151015
if err != nil {
10161016
return framework.AsStatus(err)
10171017
}
@@ -1028,10 +1028,10 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
10281028
return status
10291029
}
10301030

1031-
// addNominatedPods adds pods with equal or greater priority which are nominated
1031+
// addGENominatedPods adds pods with equal or greater priority which are nominated
10321032
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
10331033
// 3) augmented nodeInfo.
1034-
func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
1034+
func addGENominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
10351035
if fh == nil {
10361036
// This may happen only in tests.
10371037
return false, state, nodeInfo, nil

pkg/scheduler/scheduler.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"k8s.io/kubernetes/pkg/scheduler/metrics"
5151
"k8s.io/kubernetes/pkg/scheduler/profile"
5252
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
53+
"k8s.io/utils/clock"
5354
)
5455

5556
const (
@@ -116,6 +117,7 @@ func (sched *Scheduler) applyDefaultHandlers() {
116117
}
117118

118119
type schedulerOptions struct {
120+
clock clock.Clock
119121
componentConfigVersion string
120122
kubeConfig *restclient.Config
121123
// Overridden by profile level percentageOfNodesToScore if set in v1.
@@ -227,6 +229,13 @@ func WithExtenders(e ...schedulerapi.Extender) Option {
227229
}
228230
}
229231

232+
// WithClock sets clock for PriorityQueue, the default clock is clock.RealClock.
233+
func WithClock(clock clock.Clock) Option {
234+
return func(o *schedulerOptions) {
235+
o.clock = clock
236+
}
237+
}
238+
230239
// FrameworkCapturer is used for registering a notify function in building framework.
231240
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
232241

@@ -238,6 +247,7 @@ func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
238247
}
239248

240249
var defaultSchedulerOptions = schedulerOptions{
250+
clock: clock.RealClock{},
241251
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
242252
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
243253
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
@@ -343,6 +353,7 @@ func New(ctx context.Context,
343353
podQueue := internalqueue.NewSchedulingQueue(
344354
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
345355
informerFactory,
356+
internalqueue.WithClock(options.clock),
346357
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
347358
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
348359
internalqueue.WithPodLister(podLister),

0 commit comments

Comments
 (0)