Skip to content

Commit 027f346

Browse files
authored
Merge pull request kubernetes#124926 from kerthcet/feat/sharing-waitingPods
enhancement(scheduler): share waitingPods among profiles
2 parents 8c1983f + 31a4b13 commit 027f346

File tree

7 files changed

+229
-8
lines changed

7 files changed

+229
-8
lines changed

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ func TestPostFilter(t *testing.T) {
364364
frameworkruntime.WithExtenders(extenders),
365365
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
366366
frameworkruntime.WithLogger(logger),
367+
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
367368
)
368369
if err != nil {
369370
t.Fatal(err)
@@ -1702,6 +1703,8 @@ func TestPreempt(t *testing.T) {
17021703
ctx, cancel := context.WithCancel(ctx)
17031704
defer cancel()
17041705

1706+
waitingPods := frameworkruntime.NewWaitingPodsMap()
1707+
17051708
cache := internalcache.New(ctx, time.Duration(0))
17061709
for _, pod := range test.pods {
17071710
cache.AddPod(logger, pod)
@@ -1745,6 +1748,7 @@ func TestPreempt(t *testing.T) {
17451748
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
17461749
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
17471750
frameworkruntime.WithInformerFactory(informerFactory),
1751+
frameworkruntime.WithWaitingPods(waitingPods),
17481752
frameworkruntime.WithLogger(logger),
17491753
)
17501754
if err != nil {

pkg/scheduler/framework/runtime/framework.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ type frameworkOptions struct {
132132
extenders []framework.Extender
133133
captureProfile CaptureProfile
134134
parallelizer parallelize.Parallelizer
135+
waitingPods *waitingPodsMap
135136
logger *klog.Logger
136137
}
137138

@@ -221,6 +222,13 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
221222
}
222223
}
223224

225+
// WithWaitingPods sets waitingPods for the scheduling frameworkImpl.
226+
func WithWaitingPods(wp *waitingPodsMap) Option {
227+
return func(o *frameworkOptions) {
228+
o.waitingPods = wp
229+
}
230+
}
231+
224232
// WithLogger overrides the default logger from k8s.io/klog.
225233
func WithLogger(logger klog.Logger) Option {
226234
return func(o *frameworkOptions) {
@@ -254,7 +262,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
254262
registry: r,
255263
snapshotSharedLister: options.snapshotSharedLister,
256264
scorePluginWeight: make(map[string]int),
257-
waitingPods: newWaitingPodsMap(),
265+
waitingPods: options.waitingPods,
258266
clientSet: options.clientSet,
259267
kubeConfig: options.kubeConfig,
260268
eventRecorder: options.eventRecorder,

pkg/scheduler/framework/runtime/framework_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,7 +2808,9 @@ func TestPermitPlugins(t *testing.T) {
28082808
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
28092809
ctx, cancel := context.WithCancel(context.Background())
28102810
defer cancel()
2811-
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
2811+
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile,
2812+
WithWaitingPods(NewWaitingPodsMap()),
2813+
)
28122814
if err != nil {
28132815
t.Fatalf("fail to create framework: %s", err)
28142816
}
@@ -2990,7 +2992,10 @@ func TestRecordingMetrics(t *testing.T) {
29902992
SchedulerName: testProfileName,
29912993
Plugins: plugins,
29922994
}
2993-
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
2995+
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
2996+
withMetricsRecorder(recorder),
2997+
WithWaitingPods(NewWaitingPodsMap()),
2998+
)
29942999
if err != nil {
29953000
cancel()
29963001
t.Fatalf("Failed to create framework for testing: %v", err)
@@ -3160,7 +3165,9 @@ func TestPermitWaitDurationMetric(t *testing.T) {
31603165
profile := config.KubeSchedulerProfile{Plugins: plugins}
31613166
ctx, cancel := context.WithCancel(context.Background())
31623167
defer cancel()
3163-
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
3168+
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
3169+
WithWaitingPods(NewWaitingPodsMap()),
3170+
)
31643171
if err != nil {
31653172
t.Fatalf("Failed to create framework for testing: %v", err)
31663173
}
@@ -3216,7 +3223,9 @@ func TestWaitOnPermit(t *testing.T) {
32163223
profile := config.KubeSchedulerProfile{Plugins: plugins}
32173224
ctx, cancel := context.WithCancel(context.Background())
32183225
defer cancel()
3219-
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
3226+
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
3227+
WithWaitingPods(NewWaitingPodsMap()),
3228+
)
32203229
if err != nil {
32213230
t.Fatalf("Failed to create framework for testing: %v", err)
32223231
}

pkg/scheduler/framework/runtime/waiting_pods_map.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type waitingPodsMap struct {
3232
mu sync.RWMutex
3333
}
3434

35-
// newWaitingPodsMap returns a new waitingPodsMap.
36-
func newWaitingPodsMap() *waitingPodsMap {
35+
// NewWaitingPodsMap returns a new waitingPodsMap.
36+
func NewWaitingPodsMap() *waitingPodsMap {
3737
return &waitingPodsMap{
3838
pods: make(map[types.UID]*waitingPod),
3939
}

pkg/scheduler/schedule_one_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
778778
registerPluginFuncs,
779779
testSchedulerName,
780780
frameworkruntime.WithClientSet(client),
781-
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
781+
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
782+
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
783+
)
782784
if err != nil {
783785
t.Fatal(err)
784786
}
@@ -3539,6 +3541,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
35393541
informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
35403542
}
35413543
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
3544+
waitingPods := frameworkruntime.NewWaitingPodsMap()
35423545

35433546
fwk, _ := tf.NewFramework(
35443547
ctx,
@@ -3548,6 +3551,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
35483551
frameworkruntime.WithEventRecorder(recorder),
35493552
frameworkruntime.WithInformerFactory(informerFactory),
35503553
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
3554+
frameworkruntime.WithWaitingPods(waitingPods),
35513555
)
35523556

35533557
errChan := make(chan error, 1)

pkg/scheduler/scheduler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,8 @@ func New(ctx context.Context,
292292

293293
snapshot := internalcache.NewEmptySnapshot()
294294
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
295+
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
296+
waitingPods := frameworkruntime.NewWaitingPodsMap()
295297

296298
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
297299
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
@@ -303,6 +305,7 @@ func New(ctx context.Context,
303305
frameworkruntime.WithParallelism(int(options.parallelism)),
304306
frameworkruntime.WithExtenders(extenders),
305307
frameworkruntime.WithMetricsRecorder(metricsRecorder),
308+
frameworkruntime.WithWaitingPods(waitingPods),
306309
)
307310
if err != nil {
308311
return nil, fmt.Errorf("initializing profiles: %v", err)

pkg/scheduler/scheduler_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ import (
2121
"fmt"
2222
"sort"
2323
"strings"
24+
"sync"
2425
"testing"
2526
"time"
2627

2728
"github.com/google/go-cmp/cmp"
2829
v1 "k8s.io/api/core/v1"
30+
eventsv1 "k8s.io/api/events/v1"
2931
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3032
"k8s.io/apimachinery/pkg/runtime"
33+
"k8s.io/apimachinery/pkg/util/sets"
3134
utilfeature "k8s.io/apiserver/pkg/util/feature"
3235
"k8s.io/client-go/informers"
3336
"k8s.io/client-go/kubernetes"
@@ -437,12 +440,14 @@ func initScheduler(ctx context.Context, cache internalcache.Cache, queue interna
437440
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
438441
}
439442
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
443+
waitingPods := frameworkruntime.NewWaitingPodsMap()
440444
fwk, err := tf.NewFramework(ctx,
441445
registerPluginFuncs,
442446
testSchedulerName,
443447
frameworkruntime.WithClientSet(client),
444448
frameworkruntime.WithInformerFactory(informerFactory),
445449
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
450+
frameworkruntime.WithWaitingPods(waitingPods),
446451
)
447452
if err != nil {
448453
return nil, nil, err
@@ -591,6 +596,7 @@ const (
591596
queueSort = "no-op-queue-sort-plugin"
592597
fakeBind = "bind-plugin"
593598
emptyEventExtensions = "emptyEventExtensions"
599+
fakePermit = "fakePermit"
594600
)
595601

596602
func Test_buildQueueingHintMap(t *testing.T) {
@@ -905,6 +911,160 @@ func newFramework(ctx context.Context, r frameworkruntime.Registry, profile sche
905911
)
906912
}
907913

914+
func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) {
915+
const (
916+
testSchedulerProfile1 = "test-scheduler-profile-1"
917+
testSchedulerProfile2 = "test-scheduler-profile-2"
918+
testSchedulerProfile3 = "test-scheduler-profile-3"
919+
)
920+
921+
nodes := []runtime.Object{
922+
st.MakeNode().Name("node1").UID("node1").Obj(),
923+
st.MakeNode().Name("node2").UID("node2").Obj(),
924+
st.MakeNode().Name("node3").UID("node3").Obj(),
925+
}
926+
927+
cases := []struct {
928+
name string
929+
profiles []schedulerapi.KubeSchedulerProfile
930+
waitSchedulingPods []*v1.Pod
931+
expectPodNamesInWaitingPods []string
932+
}{
933+
{
934+
name: "pods with same profile are waiting on permit stage",
935+
profiles: []schedulerapi.KubeSchedulerProfile{
936+
{
937+
SchedulerName: testSchedulerProfile1,
938+
Plugins: &schedulerapi.Plugins{
939+
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
940+
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
941+
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
942+
},
943+
},
944+
},
945+
waitSchedulingPods: []*v1.Pod{
946+
st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
947+
st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
948+
st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(),
949+
},
950+
expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"},
951+
},
952+
{
953+
name: "pods with different profiles are waiting on permit stage",
954+
profiles: []schedulerapi.KubeSchedulerProfile{
955+
{
956+
SchedulerName: testSchedulerProfile1,
957+
Plugins: &schedulerapi.Plugins{
958+
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
959+
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
960+
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
961+
},
962+
},
963+
{
964+
SchedulerName: testSchedulerProfile2,
965+
Plugins: &schedulerapi.Plugins{
966+
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
967+
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
968+
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
969+
},
970+
},
971+
{
972+
SchedulerName: testSchedulerProfile3,
973+
Plugins: &schedulerapi.Plugins{
974+
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
975+
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
976+
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
977+
},
978+
},
979+
},
980+
waitSchedulingPods: []*v1.Pod{
981+
st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
982+
st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
983+
st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(),
984+
st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(),
985+
},
986+
expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"},
987+
},
988+
}
989+
990+
for _, tc := range cases {
991+
t.Run(tc.name, func(t *testing.T) {
992+
// Set up scheduler for the 3 nodes.
993+
objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
994+
fakeClient := fake.NewSimpleClientset(objs...)
995+
informerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
996+
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()})
997+
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit)
998+
999+
outOfTreeRegistry := frameworkruntime.Registry{
1000+
fakePermit: newFakePermitPlugin(eventRecorder),
1001+
}
1002+
1003+
_, ctx := ktesting.NewTestContext(t)
1004+
ctx, cancel := context.WithCancel(ctx)
1005+
defer cancel()
1006+
1007+
scheduler, err := New(
1008+
ctx,
1009+
fakeClient,
1010+
informerFactory,
1011+
nil,
1012+
profile.NewRecorderFactory(eventBroadcaster),
1013+
WithProfiles(tc.profiles...),
1014+
WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
1015+
)
1016+
1017+
if err != nil {
1018+
t.Fatalf("Failed to create scheduler: %v", err)
1019+
}
1020+
1021+
var wg sync.WaitGroup
1022+
waitSchedulingPodNumber := len(tc.waitSchedulingPods)
1023+
wg.Add(waitSchedulingPodNumber)
1024+
stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
1025+
e, ok := obj.(*eventsv1.Event)
1026+
if !ok || (e.Reason != podWaitingReason) {
1027+
return
1028+
}
1029+
wg.Done()
1030+
})
1031+
if err != nil {
1032+
t.Fatal(err)
1033+
}
1034+
defer stopFn()
1035+
1036+
// Run scheduler.
1037+
informerFactory.Start(ctx.Done())
1038+
informerFactory.WaitForCacheSync(ctx.Done())
1039+
go scheduler.Run(ctx)
1040+
1041+
// Send pods to be scheduled.
1042+
for _, p := range tc.waitSchedulingPods {
1043+
_, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
1044+
if err != nil {
1045+
t.Fatal(err)
1046+
}
1047+
}
1048+
1049+
// Wait all pods in waitSchedulingPods to be scheduled.
1050+
wg.Wait()
1051+
1052+
// Ensure that all waitingPods in scheduler can be obtained from any profiles.
1053+
for _, fwk := range scheduler.Profiles {
1054+
actualPodNamesInWaitingPods := sets.NewString()
1055+
fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) {
1056+
actualPodNamesInWaitingPods.Insert(pod.GetPod().Name)
1057+
})
1058+
// Validate the name of pods in waitingPods matches expectations.
1059+
if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) ||
1060+
!actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) {
1061+
t.Fatalf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List())
1062+
}
1063+
}
1064+
})
1065+
}
1066+
}
1067+
9081068
var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
9091069

9101070
// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
@@ -1004,3 +1164,36 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle
10041164
}
10051165

10061166
func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
1167+
1168+
// fakePermitPlugin only implements PermitPlugin interface.
1169+
type fakePermitPlugin struct {
1170+
eventRecorder events.EventRecorder
1171+
}
1172+
1173+
func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory {
1174+
return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
1175+
pl := &fakePermitPlugin{
1176+
eventRecorder: eventRecorder,
1177+
}
1178+
return pl, nil
1179+
}
1180+
}
1181+
1182+
func (f fakePermitPlugin) Name() string {
1183+
return fakePermit
1184+
}
1185+
1186+
const (
1187+
podWaitingReason = "podWaiting"
1188+
)
1189+
1190+
func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
1191+
defer func() {
1192+
// Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage.
1193+
f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "")
1194+
}()
1195+
1196+
return framework.NewStatus(framework.Wait), 100 * time.Second
1197+
}
1198+
1199+
var _ framework.PermitPlugin = &fakePermitPlugin{}

0 commit comments

Comments
 (0)