Skip to content

Commit 5ce0bd9

Browse files
authored
Merge pull request kubernetes#121677 from kerthcet/cleanup/remove-evnet
Unregister events in schedulingGates for performance
2 parents 8a9b209 + f77a454 commit 5ce0bd9

File tree

6 files changed

+243
-9
lines changed

6 files changed

+243
-9
lines changed

pkg/scheduler/framework/interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,10 @@ type EnqueueExtensions interface {
370370
// and leveraged to build event handlers dynamically.
371371
// Note: the returned list needs to be static (not depend on configuration parameters);
372372
// otherwise it would lead to undefined behavior.
373+
//
374+
// The returned events could be nil to indicate that no events other than the pod's own update
375+
// can make the pod re-schedulable. An example is SchedulingGates plugin.
376+
// Appropriate implementation of this function will make Pod's re-scheduling accurate and performant.
373377
EventsToRegister() []ClusterEventWithHint
374378
}
375379

pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const Name = names.SchedulingGates
3232

3333
// SchedulingGates checks if a Pod carries .spec.schedulingGates.
3434
type SchedulingGates struct {
35-
enablePodSchedulingReadiness bool
35+
EnablePodSchedulingReadiness bool
3636
}
3737

3838
var _ framework.PreEnqueuePlugin = &SchedulingGates{}
@@ -43,7 +43,7 @@ func (pl *SchedulingGates) Name() string {
4343
}
4444

4545
func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
46-
if !pl.enablePodSchedulingReadiness || len(p.Spec.SchedulingGates) == 0 {
46+
if !pl.EnablePodSchedulingReadiness || len(p.Spec.SchedulingGates) == 0 {
4747
return nil
4848
}
4949
gates := make([]string, 0, len(p.Spec.SchedulingGates))
@@ -53,15 +53,13 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework
5353
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates))
5454
}
5555

56-
// EventsToRegister returns the possible events that may make a Pod
57-
// failed by this plugin schedulable.
56+
// EventsToRegister returns nil here to indicate that schedulingGates plugin is not
57+
// interested in any event but its own update.
5858
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint {
59-
return []framework.ClusterEventWithHint{
60-
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
61-
}
59+
return nil
6260
}
6361

6462
// New initializes a new plugin and returns it.
6563
func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) {
66-
return &SchedulingGates{enablePodSchedulingReadiness: fts.EnablePodSchedulingReadiness}, nil
64+
return &SchedulingGates{EnablePodSchedulingReadiness: fts.EnablePodSchedulingReadiness}, nil
6765
}

pkg/scheduler/scheduler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,13 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei
373373
for _, e := range es {
374374
events := e.EventsToRegister()
375375

376+
// This will happen when plugin registers with empty events, it's usually the case a pod
377+
// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
378+
// will enter into the activeQ via priorityQueue.Update().
379+
if len(events) == 0 {
380+
continue
381+
}
382+
376383
// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
377384
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
378385
// cannot be moved by any regular cluster event.

pkg/scheduler/scheduler_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ const (
662662
emptyEventsToRegister = "emptyEventsToRegister"
663663
queueSort = "no-op-queue-sort-plugin"
664664
fakeBind = "bind-plugin"
665+
emptyEventExtensions = "emptyEventExtensions"
665666
)
666667

667668
func Test_buildQueueingHintMap(t *testing.T) {
@@ -729,6 +730,23 @@ func Test_buildQueueingHintMap(t *testing.T) {
729730
},
730731
},
731732
},
733+
{
734+
name: "register plugin with empty event",
735+
plugins: []framework.Plugin{&emptyEventPlugin{}},
736+
want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
737+
},
738+
{
739+
name: "register plugins including emptyEventPlugin",
740+
plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}},
741+
want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
742+
{Resource: framework.Pod, ActionType: framework.Add}: {
743+
{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
744+
},
745+
{Resource: framework.Node, ActionType: framework.Add}: {
746+
{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
747+
},
748+
},
749+
},
732750
}
733751

734752
for _, tt := range tests {
@@ -1009,6 +1027,18 @@ func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
10091027
}
10101028
}
10111029

1030+
type emptyEventPlugin struct{}
1031+
1032+
func (*emptyEventPlugin) Name() string { return emptyEventExtensions }
1033+
1034+
func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
1035+
return nil
1036+
}
1037+
1038+
func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
1039+
return nil
1040+
}
1041+
10121042
// emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
10131043
// This can simulate a plugin registered at scheduler setup, but does nothing
10141044
// due to some disabled feature gate.

test/integration/scheduler/plugins/plugins_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"k8s.io/kubernetes/pkg/scheduler/framework"
4646
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
4747
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
48+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
4849
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
4950
st "k8s.io/kubernetes/pkg/scheduler/testing"
5051
schedulerutils "k8s.io/kubernetes/test/integration/scheduler"
@@ -2588,3 +2589,197 @@ func TestActivatePods(t *testing.T) {
25882589
t.Errorf("JobPlugin's pods activation logic is not called")
25892590
}
25902591
}
2592+
2593+
var _ framework.PreEnqueuePlugin = &SchedulingGatesPluginWithEvents{}
2594+
var _ framework.EnqueueExtensions = &SchedulingGatesPluginWithEvents{}
2595+
var _ framework.PreEnqueuePlugin = &SchedulingGatesPluginWOEvents{}
2596+
var _ framework.EnqueueExtensions = &SchedulingGatesPluginWOEvents{}
2597+
2598+
const (
2599+
schedulingGatesPluginWithEvents = "scheduling-gates-with-events"
2600+
schedulingGatesPluginWOEvents = "scheduling-gates-without-events"
2601+
)
2602+
2603+
type SchedulingGatesPluginWithEvents struct {
2604+
called int
2605+
schedulinggates.SchedulingGates
2606+
}
2607+
2608+
func (pl *SchedulingGatesPluginWithEvents) Name() string {
2609+
return schedulingGatesPluginWithEvents
2610+
}
2611+
2612+
func (pl *SchedulingGatesPluginWithEvents) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
2613+
pl.called++
2614+
return pl.SchedulingGates.PreEnqueue(ctx, p)
2615+
}
2616+
2617+
func (pl *SchedulingGatesPluginWithEvents) EventsToRegister() []framework.ClusterEventWithHint {
2618+
return []framework.ClusterEventWithHint{
2619+
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
2620+
}
2621+
}
2622+
2623+
type SchedulingGatesPluginWOEvents struct {
2624+
called int
2625+
schedulinggates.SchedulingGates
2626+
}
2627+
2628+
func (pl *SchedulingGatesPluginWOEvents) Name() string {
2629+
return schedulingGatesPluginWOEvents
2630+
}
2631+
2632+
func (pl *SchedulingGatesPluginWOEvents) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
2633+
pl.called++
2634+
return pl.SchedulingGates.PreEnqueue(ctx, p)
2635+
}
2636+
2637+
func (pl *SchedulingGatesPluginWOEvents) EventsToRegister() []framework.ClusterEventWithHint {
2638+
return nil
2639+
}
2640+
2641+
// This test helps to verify registering nil events for schedulingGates plugin works as expected.
2642+
func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
2643+
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)()
2644+
2645+
testContext := testutils.InitTestAPIServer(t, "preenqueue-plugin", nil)
2646+
2647+
num := func(pl framework.Plugin) int {
2648+
switch item := pl.(type) {
2649+
case *SchedulingGatesPluginWithEvents:
2650+
return item.called
2651+
case *SchedulingGatesPluginWOEvents:
2652+
return item.called
2653+
default:
2654+
t.Error("unsupported plugin")
2655+
}
2656+
return 0
2657+
}
2658+
2659+
tests := []struct {
2660+
name string
2661+
enqueuePlugin framework.PreEnqueuePlugin
2662+
count int
2663+
}{
2664+
{
2665+
name: "preEnqueue plugin without event registered",
2666+
enqueuePlugin: &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{EnablePodSchedulingReadiness: true}},
2667+
count: 2,
2668+
},
2669+
{
2670+
name: "preEnqueue plugin with event registered",
2671+
enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{EnablePodSchedulingReadiness: true}},
2672+
count: 3,
2673+
},
2674+
}
2675+
2676+
for _, tt := range tests {
2677+
t.Run(tt.name, func(t *testing.T) {
2678+
registry := frameworkruntime.Registry{
2679+
tt.enqueuePlugin.Name(): newPlugin(tt.enqueuePlugin),
2680+
}
2681+
2682+
// Setup plugins for testing.
2683+
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
2684+
Profiles: []configv1.KubeSchedulerProfile{{
2685+
SchedulerName: pointer.String(v1.DefaultSchedulerName),
2686+
Plugins: &configv1.Plugins{
2687+
PreEnqueue: configv1.PluginSet{
2688+
Enabled: []configv1.Plugin{
2689+
{Name: tt.enqueuePlugin.Name()},
2690+
},
2691+
Disabled: []configv1.Plugin{
2692+
{Name: "*"},
2693+
},
2694+
},
2695+
},
2696+
}},
2697+
})
2698+
2699+
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
2700+
scheduler.WithProfiles(cfg.Profiles...),
2701+
scheduler.WithFrameworkOutOfTreeRegistry(registry),
2702+
)
2703+
defer teardown()
2704+
2705+
// Create a pod with schedulingGates.
2706+
gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name).
2707+
SchedulingGates([]string{"foo"}).
2708+
PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq).
2709+
Container("pause").Obj()
2710+
gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod)
2711+
if err != nil {
2712+
t.Errorf("Error while creating a gated pod: %v", err)
2713+
return
2714+
}
2715+
2716+
if err := testutils.WaitForPodSchedulingGated(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
2717+
t.Errorf("Expected the pod to be gated, but got: %v", err)
2718+
return
2719+
}
2720+
if num(tt.enqueuePlugin) != 1 {
2721+
t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(tt.enqueuePlugin))
2722+
return
2723+
}
2724+
2725+
// Create a best effort pod.
2726+
pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{
2727+
Name: "pause-pod",
2728+
Namespace: testCtx.NS.Name,
2729+
Labels: map[string]string{"foo": "bar"},
2730+
}))
2731+
if err != nil {
2732+
t.Errorf("Error while creating a pod: %v", err)
2733+
return
2734+
}
2735+
2736+
// Wait for the pod schedulabled.
2737+
if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil {
2738+
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
2739+
return
2740+
}
2741+
2742+
// Update the pod which will trigger the requeue logic if plugin registers the events.
2743+
pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{})
2744+
if err != nil {
2745+
t.Errorf("Error while getting a pod: %v", err)
2746+
return
2747+
}
2748+
pausePod.Annotations = map[string]string{"foo": "bar"}
2749+
_, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{})
2750+
if err != nil {
2751+
t.Errorf("Error while updating a pod: %v", err)
2752+
return
2753+
}
2754+
2755+
// Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling.
2756+
if err := testutils.WaitForPodSchedulingGated(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
2757+
t.Errorf("Expected the pod to be gated, but got: %v", err)
2758+
return
2759+
}
2760+
if num(tt.enqueuePlugin) != tt.count {
2761+
t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(tt.enqueuePlugin))
2762+
return
2763+
}
2764+
2765+
// Remove gated pod's scheduling gates.
2766+
gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{})
2767+
if err != nil {
2768+
t.Errorf("Error while getting a pod: %v", err)
2769+
return
2770+
}
2771+
gatedPod.Spec.SchedulingGates = nil
2772+
_, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{})
2773+
if err != nil {
2774+
t.Errorf("Error while updating a pod: %v", err)
2775+
return
2776+
}
2777+
2778+
// Ungated pod should be schedulable now.
2779+
if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
2780+
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
2781+
return
2782+
}
2783+
})
2784+
}
2785+
}

test/integration/util/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper,
764764
nodes := make([]*v1.Node, numNodes)
765765
for i := 0; i < numNodes; i++ {
766766
nodeName := fmt.Sprintf("%v-%d", prefix, i)
767-
node, err := CreateNode(cs, wrapper.Name(nodeName).Obj())
767+
node, err := CreateNode(cs, wrapper.Name(nodeName).Label("kubernetes.io/hostname", nodeName).Obj())
768768
if err != nil {
769769
return nodes[:], err
770770
}

0 commit comments

Comments
 (0)