Skip to content

Commit c54b664

Browse files
authored
Merge pull request kubernetes#77559 from ahg-g/permit-extension-point
Implement the permit extension point in scheduler.
2 parents d881c0d + 98de316 commit c54b664

File tree

6 files changed

+496
-8
lines changed

6 files changed

+496
-8
lines changed

pkg/scheduler/framework/v1alpha1/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ go_library(
77
"framework.go",
88
"interface.go",
99
"registry.go",
10+
"waiting_pods_map.go",
1011
],
1112
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
1213
visibility = ["//visibility:public"],
1314
deps = [
1415
"//pkg/scheduler/internal/cache:go_default_library",
1516
"//staging/src/k8s.io/api/core/v1:go_default_library",
1617
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
18+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
1719
"//vendor/k8s.io/klog:go_default_library",
1820
],
1921
)

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package v1alpha1
1818

1919
import (
2020
"fmt"
21+
"time"
2122

2223
"k8s.io/api/core/v1"
2324
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/types"
2426
"k8s.io/klog"
2527
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
2628
)
@@ -30,12 +32,19 @@ import (
3032
type framework struct {
3133
registry Registry
3234
nodeInfoSnapshot *cache.NodeInfoSnapshot
35+
waitingPods *waitingPodsMap
3336
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
3437
reservePlugins []ReservePlugin
3538
prebindPlugins []PrebindPlugin
3639
unreservePlugins []UnreservePlugin
40+
permitPlugins []PermitPlugin
3741
}
3842

43+
const (
44+
// Specifies the maximum timeout a permit plugin can return.
45+
maxTimeout time.Duration = 15 * time.Minute
46+
)
47+
3948
var _ = Framework(&framework{})
4049

4150
// NewFramework initializes plugins given the configuration and the registry.
@@ -44,6 +53,7 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
4453
registry: r,
4554
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
4655
plugins: make(map[string]Plugin),
56+
waitingPods: newWaitingPodsMap(),
4757
}
4858

4959
// TODO: The framework needs to read the scheduler config and initialize only
@@ -68,6 +78,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
6878
if up, ok := p.(UnreservePlugin); ok {
6979
f.unreservePlugins = append(f.unreservePlugins, up)
7080
}
81+
if pr, ok := p.(PermitPlugin); ok {
82+
f.permitPlugins = append(f.permitPlugins, pr)
83+
}
7184
}
7285
return f, nil
7386
}
@@ -117,10 +130,83 @@ func (f *framework) RunUnreservePlugins(
117130
}
118131
}
119132

133+
// RunPermitPlugins runs the set of configured permit plugins. If any of these
134+
// plugins returns a status other than "Success" or "Wait", it does not continue
135+
// running the remaining plugins and returns an error. Otherwise, if any of the
136+
// plugins returns "Wait", then this function will block for the timeout period
137+
// returned by the plugin, if the time expires, then it will return an error.
138+
// Note that if multiple plugins asked to wait, then we wait for the minimum
139+
// timeout duration.
140+
func (f *framework) RunPermitPlugins(
141+
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
142+
timeout := maxTimeout
143+
statusCode := Success
144+
for _, pl := range f.permitPlugins {
145+
status, d := pl.Permit(pc, pod, nodeName)
146+
if !status.IsSuccess() {
147+
if status.Code() == Unschedulable {
148+
msg := fmt.Sprintf("rejected by %v at permit: %v", pl.Name(), status.Message())
149+
klog.V(4).Infof(msg)
150+
return NewStatus(status.Code(), msg)
151+
}
152+
if status.Code() == Wait {
153+
// Use the minimum timeout duration.
154+
if timeout > d {
155+
timeout = d
156+
}
157+
statusCode = Wait
158+
} else {
159+
msg := fmt.Sprintf("error while running %v permit plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
160+
klog.Error(msg)
161+
return NewStatus(Error, msg)
162+
}
163+
}
164+
}
165+
166+
// We now wait for the minimum duration if at least one plugin asked to
167+
// wait (and no plugin rejected the pod)
168+
if statusCode == Wait {
169+
w := newWaitingPod(pod)
170+
f.waitingPods.add(w)
171+
defer f.waitingPods.remove(pod.UID)
172+
timer := time.NewTimer(timeout)
173+
klog.V(4).Infof("waiting for %v for pod %v at permit", timeout, pod.Name)
174+
select {
175+
case <-timer.C:
176+
msg := fmt.Sprintf("pod %v rejected due to timeout after waiting %v at permit", pod.Name, timeout)
177+
klog.V(4).Infof(msg)
178+
return NewStatus(Unschedulable, msg)
179+
case s := <-w.s:
180+
if !s.IsSuccess() {
181+
if s.Code() == Unschedulable {
182+
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
183+
klog.V(4).Infof(msg)
184+
return NewStatus(s.Code(), msg)
185+
}
186+
msg := fmt.Sprintf("error received while waiting at permit for pod %v: %v", pod.Name, s.Message())
187+
klog.Error(msg)
188+
return NewStatus(Error, msg)
189+
}
190+
}
191+
}
192+
193+
return nil
194+
}
195+
120196
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
121197
// is taken at the beginning of a scheduling cycle and remains unchanged until a
122198
// pod finishes "Reserve". There is no guarantee that the information remains
123199
// unchanged after "Reserve".
124200
func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot {
125201
return f.nodeInfoSnapshot
126202
}
203+
204+
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
205+
func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
206+
f.waitingPods.iterate(callback)
207+
}
208+
209+
// GetWaitingPod returns a reference to a WaitingPod given its UID.
210+
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
211+
return f.waitingPods.get(uid)
212+
}

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package v1alpha1
2020

2121
import (
2222
"errors"
23+
"time"
2324

2425
"k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/types"
2527
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
2628
)
2729

@@ -38,6 +40,8 @@ const (
3840
// Unschedulable is used when a plugin finds a pod unschedulable.
3941
// The accompanying status message should explain why the pod is unschedulable.
4042
Unschedulable Code = 2
43+
// Wait is used when a permit plugin finds a pod scheduling should wait.
44+
Wait Code = 3
4145
)
4246

4347
// Status indicates the result of running a plugin. It consists of a code and a
@@ -86,6 +90,18 @@ func NewStatus(code Code, msg string) *Status {
8690
}
8791
}
8892

93+
// WaitingPod represents a pod currently waiting in the permit phase.
94+
type WaitingPod interface {
95+
// GetPod returns a reference to the waiting pod.
96+
GetPod() *v1.Pod
97+
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
98+
// successfully delivered, false otherwise.
99+
Allow() bool
100+
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
101+
// was successfully delivered, false otherwise.
102+
Reject(msg string) bool
103+
}
104+
89105
// Plugin is the parent type for all the scheduling framework plugins.
90106
type Plugin interface {
91107
Name() string
@@ -105,7 +121,7 @@ type ReservePlugin interface {
105121
}
106122

107123
// PrebindPlugin is an interface that must be implemented by "prebind" plugins.
108-
// These plugins are called before a pod being scheduled
124+
// These plugins are called before a pod being scheduled.
109125
type PrebindPlugin interface {
110126
Plugin
111127
// Prebind is called before binding a pod. All prebind plugins must return
@@ -124,6 +140,19 @@ type UnreservePlugin interface {
124140
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
125141
}
126142

143+
// PermitPlugin is an interface that must be implemented by "permit" plugins.
144+
// These plugins are called before a pod is bound to a node.
145+
type PermitPlugin interface {
146+
Plugin
147+
// Permit is called before binding a pod (and before prebind plugins). Permit
148+
// plugins are used to prevent or delay the binding of a Pod. A permit plugin
149+
// must return success or wait with timeout duration, or the pod will be rejected.
150+
// The pod will also be rejected if the wait timeout or the pod is rejected while
151+
// waiting. Note that if the plugin returns "wait", the framework will wait only
152+
// after running the remaining plugins given that no other plugin rejects the pod.
153+
Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)
154+
}
155+
127156
// Framework manages the set of plugins in use by the scheduling framework.
128157
// Configured plugins are called at specified points in a scheduling context.
129158
type Framework interface {
@@ -142,6 +171,15 @@ type Framework interface {
142171

143172
// RunUnreservePlugins runs the set of configured unreserve plugins.
144173
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
174+
175+
// RunPermitPlugins runs the set of configured permit plugins. If any of these
176+
// plugins returns a status other than "Success" or "Wait", it does not continue
177+
// running the remaining plugins and returns an error. Otherwise, if any of the
178+
// plugins returns "Wait", then this function will block for the timeout period
179+
// returned by the plugin, if the time expires, then it will return an error.
180+
// Note that if multiple plugins asked to wait, then we wait for the minimum
181+
// timeout duration.
182+
RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
145183
}
146184

147185
// FrameworkHandle provides data and some tools that plugins can use. It is
@@ -153,4 +191,10 @@ type FrameworkHandle interface {
153191
// a pod finishes "Reserve" point. There is no guarantee that the information
154192
// remains unchanged in the binding phase of scheduling.
155193
NodeInfoSnapshot() *internalcache.NodeInfoSnapshot
194+
195+
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
196+
IterateOverWaitingPods(callback func(WaitingPod))
197+
198+
// GetWaitingPod returns a waiting pod given its UID.
199+
GetWaitingPod(uid types.UID) WaitingPod
156200
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1alpha1
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/types"
24+
)
25+
26+
// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
27+
type waitingPodsMap struct {
28+
pods map[types.UID]WaitingPod
29+
mu sync.RWMutex
30+
}
31+
32+
// newWaitingPodsMap returns a new waitingPodsMap.
33+
func newWaitingPodsMap() *waitingPodsMap {
34+
return &waitingPodsMap{
35+
pods: make(map[types.UID]WaitingPod),
36+
}
37+
}
38+
39+
// add a new WaitingPod to the map.
40+
func (m *waitingPodsMap) add(wp WaitingPod) {
41+
m.mu.Lock()
42+
defer m.mu.Unlock()
43+
m.pods[wp.GetPod().UID] = wp
44+
}
45+
46+
// remove a WaitingPod from the map.
47+
func (m *waitingPodsMap) remove(uid types.UID) {
48+
m.mu.Lock()
49+
defer m.mu.Unlock()
50+
delete(m.pods, uid)
51+
}
52+
53+
// get a WaitingPod from the map.
54+
func (m *waitingPodsMap) get(uid types.UID) WaitingPod {
55+
m.mu.RLock()
56+
defer m.mu.RUnlock()
57+
return m.pods[uid]
58+
59+
}
60+
61+
// iterate acquires a read lock and iterates over the WaitingPods map.
62+
func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
63+
m.mu.RLock()
64+
defer m.mu.RUnlock()
65+
for _, v := range m.pods {
66+
callback(v)
67+
}
68+
}
69+
70+
// waitingPod represents a pod waiting in the permit phase.
71+
type waitingPod struct {
72+
pod *v1.Pod
73+
s chan *Status
74+
}
75+
76+
// newWaitingPod returns a new waitingPod instance.
77+
func newWaitingPod(pod *v1.Pod) *waitingPod {
78+
return &waitingPod{
79+
pod: pod,
80+
s: make(chan *Status),
81+
}
82+
}
83+
84+
// GetPod returns a reference to the waiting pod.
85+
func (w *waitingPod) GetPod() *v1.Pod {
86+
return w.pod
87+
}
88+
89+
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
90+
// successfully delivered, false otherwise.
91+
func (w *waitingPod) Allow() bool {
92+
select {
93+
case w.s <- NewStatus(Success, ""):
94+
return true
95+
default:
96+
return false
97+
}
98+
}
99+
100+
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
101+
// was successfully delivered, false otherwise.
102+
func (w *waitingPod) Reject(msg string) bool {
103+
select {
104+
case w.s <- NewStatus(Unschedulable, msg):
105+
return true
106+
default:
107+
return false
108+
}
109+
}

pkg/scheduler/scheduler.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,25 @@ func (sched *Scheduler) scheduleOne() {
533533
}
534534
}
535535

536+
// Run "permit" plugins.
537+
permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
538+
if !permitStatus.IsSuccess() {
539+
var reason string
540+
if permitStatus.Code() == framework.Unschedulable {
541+
reason = v1.PodReasonUnschedulable
542+
} else {
543+
metrics.PodScheduleErrors.Inc()
544+
reason = SchedulerError
545+
}
546+
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
547+
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
548+
}
549+
sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message())
550+
// trigger un-reserve plugins to clean up state associated with the reserved Pod
551+
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
552+
return
553+
}
554+
536555
// Run "prebind" plugins.
537556
prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
538557
if !prebindStatus.IsSuccess() {

0 commit comments

Comments
 (0)