Skip to content

Commit 796ecb9

Browse files
authored
Merge pull request kubernetes#77529 from draveness/feature/add-queuesort-extension-point
feat: implement "queue-sort" extension point for scheduling framework
2 parents aba8040 + d60bccc commit 796ecb9

File tree

10 files changed

+260
-162
lines changed

10 files changed

+260
-162
lines changed

pkg/scheduler/core/extender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
536536
for _, name := range test.nodes {
537537
cache.AddNode(createNode(name))
538538
}
539-
queue := internalqueue.NewSchedulingQueue(nil)
539+
queue := internalqueue.NewSchedulingQueue(nil, nil)
540540
scheduler := NewGenericScheduler(
541541
cache,
542542
queue,

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func TestGenericScheduler(t *testing.T) {
452452

453453
scheduler := NewGenericScheduler(
454454
cache,
455-
internalqueue.NewSchedulingQueue(nil),
455+
internalqueue.NewSchedulingQueue(nil, nil),
456456
test.predicates,
457457
algorithmpredicates.EmptyPredicateMetadataProducer,
458458
test.prioritizers,
@@ -488,7 +488,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
488488

489489
s := NewGenericScheduler(
490490
cache,
491-
internalqueue.NewSchedulingQueue(nil),
491+
internalqueue.NewSchedulingQueue(nil, nil),
492492
predicates,
493493
algorithmpredicates.EmptyPredicateMetadataProducer,
494494
prioritizers,
@@ -1491,7 +1491,7 @@ func TestPreempt(t *testing.T) {
14911491
}
14921492
scheduler := NewGenericScheduler(
14931493
cache,
1494-
internalqueue.NewSchedulingQueue(nil),
1494+
internalqueue.NewSchedulingQueue(nil, nil),
14951495
map[string]algorithmpredicates.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
14961496
algorithmpredicates.EmptyPredicateMetadataProducer,
14971497
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},

pkg/scheduler/factory/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
262262
c := &configFactory{
263263
client: args.Client,
264264
podLister: schedulerCache,
265-
podQueue: internalqueue.NewSchedulingQueue(stopEverything),
265+
podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
266266
nodeLister: args.NodeInformer.Lister(),
267267
pVLister: args.PvInformer.Lister(),
268268
pVCLister: args.PvcInformer.Lister(),

pkg/scheduler/factory/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func TestDefaultErrorFunc(t *testing.T) {
256256
defer close(stopCh)
257257

258258
timestamp := time.Now()
259-
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
259+
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
260260
schedulerCache := internalcache.New(30*time.Second, stopCh)
261261
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh)
262262

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type framework struct {
3434
nodeInfoSnapshot *cache.NodeInfoSnapshot
3535
waitingPods *waitingPodsMap
3636
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
37+
queueSortPlugins []QueueSortPlugin
3738
reservePlugins []ReservePlugin
3839
prebindPlugins []PrebindPlugin
3940
unreservePlugins []UnreservePlugin
@@ -69,6 +70,10 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
6970
// TODO: For now, we assume any plugins that implements an extension
7071
// point wants to be called at that extension point. We should change this
7172
// later and add these plugins based on the configuration.
73+
if qsp, ok := p.(QueueSortPlugin); ok {
74+
f.queueSortPlugins = append(f.queueSortPlugins, qsp)
75+
}
76+
7277
if rp, ok := p.(ReservePlugin); ok {
7378
f.reservePlugins = append(f.reservePlugins, rp)
7479
}
@@ -85,6 +90,16 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
8590
return f, nil
8691
}
8792

93+
// QueueSortFunc returns the function to sort pods in scheduling queue
94+
func (f *framework) QueueSortFunc() LessFunc {
95+
if len(f.queueSortPlugins) == 0 {
96+
return nil
97+
}
98+
99+
// Only one QueueSort plugin can be enabled.
100+
return f.queueSortPlugins[0].Less
101+
}
102+
88103
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
89104
// failure (bool) if any of the plugins returns an error. It also returns an
90105
// error containing the rejection message or the error occurred in the plugin.

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,25 @@ type Plugin interface {
107107
Name() string
108108
}
109109

110+
// PodInfo is minimum cell in the scheduling queue.
111+
type PodInfo struct {
112+
Pod *v1.Pod
113+
// The time pod added to the scheduling queue.
114+
Timestamp time.Time
115+
}
116+
117+
// LessFunc is the function to sort pod info
118+
type LessFunc func(podInfo1, podInfo2 *PodInfo) bool
119+
120+
// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
121+
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
122+
// plugin may be enabled at a time.
123+
type QueueSortPlugin interface {
124+
Plugin
125+
// Less are used to sort pods in the scheduling queue.
126+
Less(*PodInfo, *PodInfo) bool
127+
}
128+
110129
// ReservePlugin is an interface for Reserve plugins. These plugins are called
111130
// at the reservation point. These are meant to update the state of the plugin.
112131
// This concept used to be called 'assume' in the original scheduler.
@@ -157,6 +176,9 @@ type PermitPlugin interface {
157176
// Configured plugins are called at specified points in a scheduling context.
158177
type Framework interface {
159178
FrameworkHandle
179+
// QueueSortFunc returns the function to sort pods in scheduling queue
180+
QueueSortFunc() LessFunc
181+
160182
// RunPrebindPlugins runs the set of configured prebind plugins. It returns
161183
// *Status and its code is set to non-success if any of the plugins returns
162184
// anything but Success. If the Status code is "Unschedulable", it is

pkg/scheduler/internal/queue/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
deps = [
1212
"//pkg/scheduler/algorithm/predicates:go_default_library",
1313
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
14+
"//pkg/scheduler/framework/v1alpha1:go_default_library",
1415
"//pkg/scheduler/metrics:go_default_library",
1516
"//pkg/scheduler/util:go_default_library",
1617
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -31,6 +32,8 @@ go_test(
3132
embed = [":go_default_library"],
3233
deps = [
3334
"//pkg/api/v1/pod:go_default_library",
35+
"//pkg/scheduler/framework/v1alpha1:go_default_library",
36+
"//pkg/scheduler/internal/cache:go_default_library",
3437
"//pkg/scheduler/metrics:go_default_library",
3538
"//pkg/scheduler/util:go_default_library",
3639
"//staging/src/k8s.io/api/core/v1:go_default_library",

0 commit comments

Comments
 (0)