Skip to content

Commit 84750fe

Browse files
committed
Revert "enhancement(scheduler): share waitingPods among profiles"
This reverts commit 227c191.
1 parent a67d1dc commit 84750fe

File tree

7 files changed

+8
-229
lines changed

7 files changed

+8
-229
lines changed

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,6 @@ func TestPostFilter(t *testing.T) {
364364
frameworkruntime.WithExtenders(extenders),
365365
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
366366
frameworkruntime.WithLogger(logger),
367-
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
368367
)
369368
if err != nil {
370369
t.Fatal(err)
@@ -1703,8 +1702,6 @@ func TestPreempt(t *testing.T) {
17031702
ctx, cancel := context.WithCancel(ctx)
17041703
defer cancel()
17051704

1706-
waitingPods := frameworkruntime.NewWaitingPodsMap()
1707-
17081705
cache := internalcache.New(ctx, time.Duration(0))
17091706
for _, pod := range test.pods {
17101707
cache.AddPod(logger, pod)
@@ -1748,7 +1745,6 @@ func TestPreempt(t *testing.T) {
17481745
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
17491746
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
17501747
frameworkruntime.WithInformerFactory(informerFactory),
1751-
frameworkruntime.WithWaitingPods(waitingPods),
17521748
frameworkruntime.WithLogger(logger),
17531749
)
17541750
if err != nil {

pkg/scheduler/framework/runtime/framework.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ type frameworkOptions struct {
132132
extenders []framework.Extender
133133
captureProfile CaptureProfile
134134
parallelizer parallelize.Parallelizer
135-
waitingPods *waitingPodsMap
136135
logger *klog.Logger
137136
}
138137

@@ -222,13 +221,6 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
222221
}
223222
}
224223

225-
// WithWaitingPods sets waitingPods for the scheduling frameworkImpl.
226-
func WithWaitingPods(wp *waitingPodsMap) Option {
227-
return func(o *frameworkOptions) {
228-
o.waitingPods = wp
229-
}
230-
}
231-
232224
// WithLogger overrides the default logger from k8s.io/klog.
233225
func WithLogger(logger klog.Logger) Option {
234226
return func(o *frameworkOptions) {
@@ -262,7 +254,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
262254
registry: r,
263255
snapshotSharedLister: options.snapshotSharedLister,
264256
scorePluginWeight: make(map[string]int),
265-
waitingPods: options.waitingPods,
257+
waitingPods: newWaitingPodsMap(),
266258
clientSet: options.clientSet,
267259
kubeConfig: options.kubeConfig,
268260
eventRecorder: options.eventRecorder,

pkg/scheduler/framework/runtime/framework_test.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,9 +2808,7 @@ func TestPermitPlugins(t *testing.T) {
28082808
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
28092809
ctx, cancel := context.WithCancel(context.Background())
28102810
defer cancel()
2811-
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile,
2812-
WithWaitingPods(NewWaitingPodsMap()),
2813-
)
2811+
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
28142812
if err != nil {
28152813
t.Fatalf("fail to create framework: %s", err)
28162814
}
@@ -2992,10 +2990,7 @@ func TestRecordingMetrics(t *testing.T) {
29922990
SchedulerName: testProfileName,
29932991
Plugins: plugins,
29942992
}
2995-
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
2996-
withMetricsRecorder(recorder),
2997-
WithWaitingPods(NewWaitingPodsMap()),
2998-
)
2993+
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
29992994
if err != nil {
30002995
cancel()
30012996
t.Fatalf("Failed to create framework for testing: %v", err)
@@ -3165,9 +3160,7 @@ func TestPermitWaitDurationMetric(t *testing.T) {
31653160
profile := config.KubeSchedulerProfile{Plugins: plugins}
31663161
ctx, cancel := context.WithCancel(context.Background())
31673162
defer cancel()
3168-
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
3169-
WithWaitingPods(NewWaitingPodsMap()),
3170-
)
3163+
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
31713164
if err != nil {
31723165
t.Fatalf("Failed to create framework for testing: %v", err)
31733166
}
@@ -3223,9 +3216,7 @@ func TestWaitOnPermit(t *testing.T) {
32233216
profile := config.KubeSchedulerProfile{Plugins: plugins}
32243217
ctx, cancel := context.WithCancel(context.Background())
32253218
defer cancel()
3226-
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
3227-
WithWaitingPods(NewWaitingPodsMap()),
3228-
)
3219+
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
32293220
if err != nil {
32303221
t.Fatalf("Failed to create framework for testing: %v", err)
32313222
}

pkg/scheduler/framework/runtime/waiting_pods_map.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type waitingPodsMap struct {
3232
mu sync.RWMutex
3333
}
3434

35-
// NewWaitingPodsMap returns a new waitingPodsMap.
36-
func NewWaitingPodsMap() *waitingPodsMap {
35+
// newWaitingPodsMap returns a new waitingPodsMap.
36+
func newWaitingPodsMap() *waitingPodsMap {
3737
return &waitingPodsMap{
3838
pods: make(map[types.UID]*waitingPod),
3939
}

pkg/scheduler/schedule_one_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -773,9 +773,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
773773
registerPluginFuncs,
774774
testSchedulerName,
775775
frameworkruntime.WithClientSet(client),
776-
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
777-
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
778-
)
776+
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
779777
if err != nil {
780778
t.Fatal(err)
781779
}
@@ -3525,7 +3523,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
35253523
informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
35263524
}
35273525
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
3528-
waitingPods := frameworkruntime.NewWaitingPodsMap()
35293526

35303527
fwk, _ := tf.NewFramework(
35313528
ctx,
@@ -3535,7 +3532,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
35353532
frameworkruntime.WithEventRecorder(recorder),
35363533
frameworkruntime.WithInformerFactory(informerFactory),
35373534
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
3538-
frameworkruntime.WithWaitingPods(waitingPods),
35393535
)
35403536

35413537
errChan := make(chan error, 1)

pkg/scheduler/scheduler.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,6 @@ func New(ctx context.Context,
292292

293293
snapshot := internalcache.NewEmptySnapshot()
294294
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
295-
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
296-
waitingPods := frameworkruntime.NewWaitingPodsMap()
297295

298296
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
299297
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
@@ -305,7 +303,6 @@ func New(ctx context.Context,
305303
frameworkruntime.WithParallelism(int(options.parallelism)),
306304
frameworkruntime.WithExtenders(extenders),
307305
frameworkruntime.WithMetricsRecorder(metricsRecorder),
308-
frameworkruntime.WithWaitingPods(waitingPods),
309306
)
310307
if err != nil {
311308
return nil, fmt.Errorf("initializing profiles: %v", err)

pkg/scheduler/scheduler_test.go

Lines changed: 0 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,13 @@ import (
2121
"fmt"
2222
"sort"
2323
"strings"
24-
"sync"
2524
"testing"
2625
"time"
2726

2827
"github.com/google/go-cmp/cmp"
2928
v1 "k8s.io/api/core/v1"
30-
eventsv1 "k8s.io/api/events/v1"
3129
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3230
"k8s.io/apimachinery/pkg/runtime"
33-
"k8s.io/apimachinery/pkg/util/sets"
3431
utilfeature "k8s.io/apiserver/pkg/util/feature"
3532
"k8s.io/client-go/informers"
3633
"k8s.io/client-go/kubernetes"
@@ -440,14 +437,12 @@ func initScheduler(ctx context.Context, cache internalcache.Cache, queue interna
440437
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
441438
}
442439
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
443-
waitingPods := frameworkruntime.NewWaitingPodsMap()
444440
fwk, err := tf.NewFramework(ctx,
445441
registerPluginFuncs,
446442
testSchedulerName,
447443
frameworkruntime.WithClientSet(client),
448444
frameworkruntime.WithInformerFactory(informerFactory),
449445
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
450-
frameworkruntime.WithWaitingPods(waitingPods),
451446
)
452447
if err != nil {
453448
return nil, nil, err
@@ -596,7 +591,6 @@ const (
596591
queueSort = "no-op-queue-sort-plugin"
597592
fakeBind = "bind-plugin"
598593
emptyEventExtensions = "emptyEventExtensions"
599-
fakePermit = "fakePermit"
600594
)
601595

602596
func Test_buildQueueingHintMap(t *testing.T) {
@@ -911,160 +905,6 @@ func newFramework(ctx context.Context, r frameworkruntime.Registry, profile sche
911905
)
912906
}
913907

914-
func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) {
915-
const (
916-
testSchedulerProfile1 = "test-scheduler-profile-1"
917-
testSchedulerProfile2 = "test-scheduler-profile-2"
918-
testSchedulerProfile3 = "test-scheduler-profile-3"
919-
)
920-
921-
nodes := []runtime.Object{
922-
st.MakeNode().Name("node1").UID("node1").Obj(),
923-
st.MakeNode().Name("node2").UID("node2").Obj(),
924-
st.MakeNode().Name("node3").UID("node3").Obj(),
925-
}
926-
927-
cases := []struct {
928-
name string
929-
profiles []schedulerapi.KubeSchedulerProfile
930-
waitSchedulingPods []*v1.Pod
931-
expectPodNamesInWaitingPods []string
932-
}{
933-
{
934-
name: "pods with same profile are waiting on permit stage",
935-
profiles: []schedulerapi.KubeSchedulerProfile{
936-
{
937-
SchedulerName: testSchedulerProfile1,
938-
Plugins: &schedulerapi.Plugins{
939-
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
940-
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
941-
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
942-
},
943-
},
944-
},
945-
waitSchedulingPods: []*v1.Pod{
946-
st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
947-
st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
948-
st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(),
949-
},
950-
expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"},
951-
},
952-
{
953-
name: "pods with different profiles are waiting on permit stage",
954-
profiles: []schedulerapi.KubeSchedulerProfile{
955-
{
956-
SchedulerName: testSchedulerProfile1,
957-
Plugins: &schedulerapi.Plugins{
958-
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
959-
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
960-
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
961-
},
962-
},
963-
{
964-
SchedulerName: testSchedulerProfile2,
965-
Plugins: &schedulerapi.Plugins{
966-
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
967-
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
968-
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
969-
},
970-
},
971-
{
972-
SchedulerName: testSchedulerProfile3,
973-
Plugins: &schedulerapi.Plugins{
974-
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
975-
Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
976-
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
977-
},
978-
},
979-
},
980-
waitSchedulingPods: []*v1.Pod{
981-
st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
982-
st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
983-
st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(),
984-
st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(),
985-
},
986-
expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"},
987-
},
988-
}
989-
990-
for _, tc := range cases {
991-
t.Run(tc.name, func(t *testing.T) {
992-
// Set up scheduler for the 3 nodes.
993-
objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
994-
fakeClient := fake.NewSimpleClientset(objs...)
995-
informerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
996-
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()})
997-
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit)
998-
999-
outOfTreeRegistry := frameworkruntime.Registry{
1000-
fakePermit: newFakePermitPlugin(eventRecorder),
1001-
}
1002-
1003-
_, ctx := ktesting.NewTestContext(t)
1004-
ctx, cancel := context.WithCancel(ctx)
1005-
defer cancel()
1006-
1007-
scheduler, err := New(
1008-
ctx,
1009-
fakeClient,
1010-
informerFactory,
1011-
nil,
1012-
profile.NewRecorderFactory(eventBroadcaster),
1013-
WithProfiles(tc.profiles...),
1014-
WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
1015-
)
1016-
1017-
if err != nil {
1018-
t.Fatalf("Failed to create scheduler: %v", err)
1019-
}
1020-
1021-
var wg sync.WaitGroup
1022-
waitSchedulingPodNumber := len(tc.waitSchedulingPods)
1023-
wg.Add(waitSchedulingPodNumber)
1024-
stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
1025-
e, ok := obj.(*eventsv1.Event)
1026-
if !ok || (e.Reason != podWaitingReason) {
1027-
return
1028-
}
1029-
wg.Done()
1030-
})
1031-
if err != nil {
1032-
t.Fatal(err)
1033-
}
1034-
defer stopFn()
1035-
1036-
// Run scheduler.
1037-
informerFactory.Start(ctx.Done())
1038-
informerFactory.WaitForCacheSync(ctx.Done())
1039-
go scheduler.Run(ctx)
1040-
1041-
// Send pods to be scheduled.
1042-
for _, p := range tc.waitSchedulingPods {
1043-
_, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
1044-
if err != nil {
1045-
t.Fatal(err)
1046-
}
1047-
}
1048-
1049-
// Wait all pods in waitSchedulingPods to be scheduled.
1050-
wg.Wait()
1051-
1052-
// Ensure that all waitingPods in scheduler can be obtained from any profiles.
1053-
for _, fwk := range scheduler.Profiles {
1054-
actualPodNamesInWaitingPods := sets.NewString()
1055-
fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) {
1056-
actualPodNamesInWaitingPods.Insert(pod.GetPod().Name)
1057-
})
1058-
// Validate the name of pods in waitingPods matches expectations.
1059-
if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) ||
1060-
!actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) {
1061-
t.Fatalf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List())
1062-
}
1063-
}
1064-
})
1065-
}
1066-
}
1067-
1068908
var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
1069909

1070910
// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
@@ -1164,36 +1004,3 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle
11641004
}
11651005

11661006
func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
1167-
1168-
// fakePermitPlugin only implements PermitPlugin interface.
1169-
type fakePermitPlugin struct {
1170-
eventRecorder events.EventRecorder
1171-
}
1172-
1173-
func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory {
1174-
return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
1175-
pl := &fakePermitPlugin{
1176-
eventRecorder: eventRecorder,
1177-
}
1178-
return pl, nil
1179-
}
1180-
}
1181-
1182-
func (f fakePermitPlugin) Name() string {
1183-
return fakePermit
1184-
}
1185-
1186-
const (
1187-
podWaitingReason = "podWaiting"
1188-
)
1189-
1190-
func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
1191-
defer func() {
1192-
// Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage.
1193-
f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "")
1194-
}()
1195-
1196-
return framework.NewStatus(framework.Wait), 100 * time.Second
1197-
}
1198-
1199-
var _ framework.PermitPlugin = &fakePermitPlugin{}

0 commit comments

Comments
 (0)