Skip to content

Commit 4bc6a11

Browse files
authored
Merge pull request kubernetes#127083 from sanposhiho/scheduler-smaller-event
feat: implement Pod smaller update events
2 parents 85384fe + 03e3779 commit 4bc6a11

File tree

8 files changed

+84
-52
lines changed

8 files changed

+84
-52
lines changed

pkg/scheduler/framework/events.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ var (
6161
PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"}
6262
// PodLabelChange is the event when a pod's label is changed.
6363
PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"}
64+
// PodTolerationChange is the event when a pod's toleration is changed.
65+
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
66+
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
67+
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
6468
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
6569
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
6670
// NodeAllocatableChange is the event when node allocatable is changed.
@@ -109,6 +113,8 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
109113
podChangeExtracters := []podChangeExtractor{
110114
extractPodLabelsChange,
111115
extractPodScaleDown,
116+
extractPodSchedulingGateEliminatedChange,
117+
extractPodTolerationChange,
112118
}
113119

114120
for _, fn := range podChangeExtracters {
@@ -159,6 +165,27 @@ func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
159165
return nil
160166
}
161167

168+
func extractPodTolerationChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
169+
if len(newPod.Spec.Tolerations) != len(oldPod.Spec.Tolerations) {
170+
// A Pod got a new toleration.
171+
// Due to API validation, the user can add, but cannot modify or remove tolerations.
172+
// So, it's enough to just check the length of tolerations to notice the update.
173+
// And, any updates in tolerations could make Pod schedulable.
174+
return &PodTolerationChange
175+
}
176+
177+
return nil
178+
}
179+
180+
func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
181+
if len(newPod.Spec.SchedulingGates) == 0 && len(oldPod.Spec.SchedulingGates) != 0 {
182+
// A scheduling gate on the pod is completely removed.
183+
return &PodSchedulingGateEliminatedChange
184+
}
185+
186+
return nil
187+
}
188+
162189
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
163190
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
164191
nodeChangeExtracters := []nodeChangeExtractor{

pkg/scheduler/framework/events_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,24 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
370370
oldPod: st.MakePod().Annotation("foo", "bar2").Obj(),
371371
want: []ClusterEvent{assignedPodOtherUpdate},
372372
},
373+
{
374+
name: "scheduling gate is eliminated",
375+
newPod: st.MakePod().SchedulingGates([]string{}).Obj(),
376+
oldPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
377+
want: []ClusterEvent{PodSchedulingGateEliminatedChange},
378+
},
379+
{
380+
name: "scheduling gate is removed, but not completely eliminated",
381+
newPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
382+
oldPod: st.MakePod().SchedulingGates([]string{"foo", "bar"}).Obj(),
383+
want: []ClusterEvent{assignedPodOtherUpdate},
384+
},
385+
{
386+
name: "pod's tolerations are updated",
387+
newPod: st.MakePod().Toleration("key").Toleration("key2").Obj(),
388+
oldPod: st.MakePod().Toleration("key").Obj(),
389+
want: []ClusterEvent{PodTolerationChange},
390+
},
373391
}
374392
for _, tt := range tests {
375393
t.Run(tt.name, func(t *testing.T) {

pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.Clus
6868
// https://github.com/kubernetes/kubernetes/pull/122234
6969
return []framework.ClusterEventWithHint{
7070
// Pods can be more schedulable once it's gates are removed
71-
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange},
71+
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodSchedulingGatesEliminated}, QueueingHintFn: pl.isSchedulableAfterUpdatePodSchedulingGatesEliminated},
7272
}, nil
7373
}
7474

@@ -79,7 +79,7 @@ func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Fe
7979
}, nil
8080
}
8181

82-
func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
82+
func (pl *SchedulingGates) isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
8383
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
8484
if err != nil {
8585
return framework.Queue, err
@@ -90,8 +90,5 @@ func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *
9090
return framework.QueueSkip, nil
9191
}
9292

93-
if len(modifiedPod.Spec.SchedulingGates) == 0 {
94-
return framework.Queue, nil
95-
}
96-
return framework.QueueSkip, nil
93+
return framework.Queue, nil
9794
}

pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
8888
newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(),
8989
expectedHint: framework.QueueSkip,
9090
},
91-
"skip-queue-on-gates-not-empty": {
92-
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
93-
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
94-
newObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
95-
expectedHint: framework.QueueSkip,
96-
},
97-
"queue-on-gates-become-empty": {
91+
"queue-on-the-unsched-pod-updated": {
9892
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
9993
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
10094
newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(),
@@ -109,7 +103,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
109103
if err != nil {
110104
t.Fatalf("Creating plugin: %v", err)
111105
}
112-
actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
106+
actualHint, err := p.(*SchedulingGates).isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger, tc.pod, tc.oldObj, tc.newObj)
113107
if tc.expectedErr {
114108
require.Error(t, err)
115109
return

pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.Clus
7070
// to determine whether a Pod's update makes the Pod schedulable or not.
7171
// https://github.com/kubernetes/kubernetes/pull/122234
7272
clusterEventWithHint = append(clusterEventWithHint,
73-
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange})
73+
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange})
7474
return clusterEventWithHint, nil
7575
}
7676

@@ -210,25 +210,20 @@ func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Fe
210210
}, nil
211211
}
212212

213-
// isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether
214-
// that change made a previously unschedulable pod schedulable.
215-
// When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration,
216-
// it may make the Pod schedulable.
217-
func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
218-
originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
213+
// isSchedulableAfterPodTolerationChange is invoked whenever a pod's toleration changed.
214+
func (pl *TaintToleration) isSchedulableAfterPodTolerationChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
215+
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
219216
if err != nil {
220217
return framework.Queue, err
221218
}
222219

223-
if pod.UID == modifiedPod.UID &&
224-
len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) {
225-
// An unscheduled Pod got a new toleration.
226-
// Due to API validation, the user can add, but cannot modify or remove tolerations.
227-
// So, it's enough to just check the length of tolerations to notice the update.
228-
// And, any updates in tolerations could make Pod schedulable.
229-
logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
220+
if pod.UID == modifiedPod.UID {
221+
// The updated Pod is the unschedulable Pod.
222+
logger.V(5).Info("a new toleration is added for the unschedulable Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
230223
return framework.Queue, nil
231224
}
232225

226+
logger.V(5).Info("a new toleration is added for a Pod, but it's an unrelated Pod and wouldn't change the TaintToleration plugin's decision", "pod", klog.KObj(modifiedPod))
227+
233228
return framework.QueueSkip, nil
234229
}

pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
421421
}
422422
}
423423

424-
func Test_isSchedulableAfterPodChange(t *testing.T) {
424+
func Test_isSchedulableAfterPodTolerationChange(t *testing.T) {
425425
testcases := map[string]struct {
426426
pod *v1.Pod
427427
oldObj, newObj interface{}
@@ -472,27 +472,6 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
472472
expectedHint: framework.QueueSkip,
473473
expectedErr: false,
474474
},
475-
"skip-updates-not-toleration": {
476-
pod: &v1.Pod{
477-
ObjectMeta: metav1.ObjectMeta{
478-
Name: "pod-1",
479-
Namespace: "ns-1",
480-
}},
481-
oldObj: &v1.Pod{
482-
ObjectMeta: metav1.ObjectMeta{
483-
Name: "pod-1",
484-
Namespace: "ns-1",
485-
}},
486-
newObj: &v1.Pod{
487-
ObjectMeta: metav1.ObjectMeta{
488-
Name: "pod-1",
489-
Namespace: "ns-1",
490-
Labels: map[string]string{"foo": "bar"},
491-
},
492-
},
493-
expectedHint: framework.QueueSkip,
494-
expectedErr: false,
495-
},
496475
"queue-on-toleration-added": {
497476
pod: &v1.Pod{
498477
ObjectMeta: metav1.ObjectMeta{
@@ -530,7 +509,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
530509
if err != nil {
531510
t.Fatalf("creating plugin: %v", err)
532511
}
533-
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
512+
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodTolerationChange(logger, tc.pod, tc.oldObj, tc.newObj)
534513
if tc.expectedErr {
535514
if err == nil {
536515
t.Errorf("unexpected success")

pkg/scheduler/framework/types.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ const (
6868
UpdatePodLabel
6969
// UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced).
7070
UpdatePodScaleDown
71+
// UpdatePodTolerations is an update for pod's tolerations.
72+
UpdatePodTolerations
73+
// UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod.
74+
UpdatePodSchedulingGatesEliminated
7175

7276
// updatePodOther is a update for pod's other fields.
7377
// It's used only for the internal event handling, and thus unexported.
@@ -76,7 +80,7 @@ const (
7680
All ActionType = 1<<iota - 1
7781

7882
// Use the general Update type if you don't either know or care the specific sub-Update type to use.
79-
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | updatePodOther
83+
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | updatePodOther
8084
)
8185

8286
// GVK is short for group/version/kind, which can uniquely represent a particular API resource.

pkg/scheduler/scheduler_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,7 @@ func Test_UnionedGVKs(t *testing.T) {
784784
plugins schedulerapi.PluginSet
785785
want map[framework.GVK]framework.ActionType
786786
enableInPlacePodVerticalScaling bool
787+
enableSchedulerQueueingHints bool
787788
}{
788789
{
789790
name: "filter without EnqueueExtensions plugin",
@@ -894,10 +895,27 @@ func Test_UnionedGVKs(t *testing.T) {
894895
},
895896
enableInPlacePodVerticalScaling: true,
896897
},
898+
{
899+
name: "plugins with default profile (queueingHint: enabled)",
900+
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
901+
want: map[framework.GVK]framework.ActionType{
902+
framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.UpdatePodTolerations | framework.UpdatePodSchedulingGatesEliminated | framework.Delete,
903+
framework.Node: framework.All,
904+
framework.CSINode: framework.All - framework.Delete,
905+
framework.CSIDriver: framework.All - framework.Delete,
906+
framework.CSIStorageCapacity: framework.All - framework.Delete,
907+
framework.PersistentVolume: framework.All - framework.Delete,
908+
framework.PersistentVolumeClaim: framework.All - framework.Delete,
909+
framework.StorageClass: framework.All - framework.Delete,
910+
},
911+
enableInPlacePodVerticalScaling: true,
912+
enableSchedulerQueueingHints: true,
913+
},
897914
}
898915
for _, tt := range tests {
899916
t.Run(tt.name, func(t *testing.T) {
900917
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling)
918+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.enableSchedulerQueueingHints)
901919

902920
_, ctx := ktesting.NewTestContext(t)
903921
ctx, cancel := context.WithCancel(ctx)

0 commit comments

Comments
 (0)