Skip to content

Commit 38b7668

Browse files
committed
Refactor scheduler's framework permit API
1 parent 534051a commit 38b7668

File tree

4 files changed

+164
-115
lines changed

4 files changed

+164
-115
lines changed

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -542,21 +542,22 @@ func (f *framework) RunPermitPlugins(
542542
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
543543
startTime := time.Now()
544544
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
545-
timeout := maxTimeout
545+
pluginsWaitTime := make(map[string]time.Duration)
546546
statusCode := Success
547547
for _, pl := range f.permitPlugins {
548-
status, d := pl.Permit(ctx, state, pod, nodeName)
548+
status, timeout := pl.Permit(ctx, state, pod, nodeName)
549549
if !status.IsSuccess() {
550550
if status.IsUnschedulable() {
551551
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
552552
klog.V(4).Infof(msg)
553553
return NewStatus(status.Code(), msg)
554554
}
555555
if status.Code() == Wait {
556-
// Use the minimum timeout duration.
557-
if timeout > d {
558-
timeout = d
556+
// Not allowed to be greater than maxTimeout.
557+
if timeout > maxTimeout {
558+
timeout = maxTimeout
559559
}
560+
pluginsWaitTime[pl.Name()] = timeout
560561
statusCode = Wait
561562
} else {
562563
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
@@ -569,27 +570,20 @@ func (f *framework) RunPermitPlugins(
569570
// We now wait for the minimum duration if at least one plugin asked to
570571
// wait (and no plugin rejected the pod)
571572
if statusCode == Wait {
572-
w := newWaitingPod(pod)
573+
w := newWaitingPod(pod, pluginsWaitTime)
573574
f.waitingPods.add(w)
574575
defer f.waitingPods.remove(pod.UID)
575-
timer := time.NewTimer(timeout)
576-
klog.V(4).Infof("waiting for %v for pod %q at permit", timeout, pod.Name)
577-
select {
578-
case <-timer.C:
579-
msg := fmt.Sprintf("pod %q rejected due to timeout after waiting %v at permit", pod.Name, timeout)
580-
klog.V(4).Infof(msg)
581-
return NewStatus(Unschedulable, msg)
582-
case s := <-w.s:
583-
if !s.IsSuccess() {
584-
if s.IsUnschedulable() {
585-
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
586-
klog.V(4).Infof(msg)
587-
return NewStatus(s.Code(), msg)
588-
}
589-
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
590-
klog.Error(msg)
591-
return NewStatus(Error, msg)
576+
klog.V(4).Infof("waiting for pod %q at permit", pod.Name)
577+
s := <-w.s
578+
if !s.IsSuccess() {
579+
if s.IsUnschedulable() {
580+
msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
581+
klog.V(4).Infof(msg)
582+
return NewStatus(s.Code(), msg)
592583
}
584+
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
585+
klog.Error(msg)
586+
return NewStatus(Error, msg)
593587
}
594588
}
595589

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,14 @@ func NewStatus(code Code, msg string) *Status {
150150
type WaitingPod interface {
151151
// GetPod returns a reference to the waiting pod.
152152
GetPod() *v1.Pod
153-
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
154-
// successfully delivered, false otherwise.
155-
Allow() bool
156-
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
153+
// GetPendingPlugins returns a list of pending permit plugin's name.
154+
GetPendingPlugins() []string
155+
// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
156+
// If this is the last remaining plugin to allow, then a success signal is delivered
157+
// to unblock the pod.
158+
// Returns true if the allow signal was successfully dealt with, false otherwise.
159+
Allow(pluginName string) bool
160+
// Reject declares the waiting pod unschedulable. Returns true if the reject signal
157161
// was successfully delivered, false otherwise.
158162
Reject(msg string) bool
159163
}

pkg/scheduler/framework/v1alpha1/waiting_pods_map.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ limitations under the License.
1717
package v1alpha1
1818

1919
import (
20+
"fmt"
2021
"sync"
22+
"time"
2123

2224
"k8s.io/api/core/v1"
2325
"k8s.io/apimachinery/pkg/types"
@@ -69,26 +71,66 @@ func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
6971

7072
// waitingPod represents a pod waiting in the permit phase.
7173
type waitingPod struct {
72-
pod *v1.Pod
73-
s chan *Status
74+
pod *v1.Pod
75+
pendingPlugins map[string]*time.Timer
76+
s chan *Status
77+
mu sync.RWMutex
7478
}
7579

7680
// newWaitingPod returns a new waitingPod instance.
77-
func newWaitingPod(pod *v1.Pod) *waitingPod {
78-
return &waitingPod{
81+
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
82+
wp := &waitingPod{
7983
pod: pod,
8084
s: make(chan *Status),
8185
}
86+
87+
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
88+
for k, v := range pluginsMaxWaitTime {
89+
plugin, waitTime := k, v
90+
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
91+
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
92+
waitTime, plugin)
93+
wp.Reject(msg)
94+
})
95+
}
96+
97+
return wp
8298
}
8399

84100
// GetPod returns a reference to the waiting pod.
85101
func (w *waitingPod) GetPod() *v1.Pod {
86102
return w.pod
87103
}
88104

89-
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
90-
// successfully delivered, false otherwise.
91-
func (w *waitingPod) Allow() bool {
105+
// GetPendingPlugins returns a list of pending permit plugin's name.
106+
func (w *waitingPod) GetPendingPlugins() []string {
107+
w.mu.RLock()
108+
defer w.mu.RUnlock()
109+
plugins := make([]string, 0, len(w.pendingPlugins))
110+
for p := range w.pendingPlugins {
111+
plugins = append(plugins, p)
112+
}
113+
114+
return plugins
115+
}
116+
117+
// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
118+
// If this is the last remaining plugin to allow, then a success signal is delivered
119+
// to unblock the pod.
120+
// Returns true if the allow signal was successfully dealt with, false otherwise.
121+
func (w *waitingPod) Allow(pluginName string) bool {
122+
w.mu.Lock()
123+
defer w.mu.Unlock()
124+
if timer, exist := w.pendingPlugins[pluginName]; exist {
125+
timer.Stop()
126+
delete(w.pendingPlugins, pluginName)
127+
}
128+
129+
// Only signal success status after all plugins have allowed
130+
if len(w.pendingPlugins) != 0 {
131+
return true
132+
}
133+
92134
select {
93135
case w.s <- NewStatus(Success, ""):
94136
return true
@@ -97,9 +139,15 @@ func (w *waitingPod) Allow() bool {
97139
}
98140
}
99141

100-
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
142+
// Reject declares the waiting pod unschedulable. Returns true if the reject signal
101143
// was successfully delivered, false otherwise.
102144
func (w *waitingPod) Reject(msg string) bool {
145+
w.mu.RLock()
146+
defer w.mu.RUnlock()
147+
for _, timer := range w.pendingPlugins {
148+
timer.Stop()
149+
}
150+
103151
select {
104152
case w.s <- NewStatus(Unschedulable, msg):
105153
return true

0 commit comments

Comments
 (0)