Skip to content

Commit d5e0a94

Browse files
authored
Merge pull request kubernetes#88199 from mateuszlitwin/run-permit-plugins-in-scheduling-cycle
run permit plugins in the scheduling cycle
2 parents 1e12d92 + d221d82 commit d5e0a94

File tree

6 files changed

+154
-75
lines changed

6 files changed

+154
-75
lines changed

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -735,10 +735,9 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin,
735735
// RunPermitPlugins runs the set of configured permit plugins. If any of these
736736
// plugins returns a status other than "Success" or "Wait", it does not continue
737737
// running the remaining plugins and returns an error. Otherwise, if any of the
738-
// plugins returns "Wait", then this function will block for the timeout period
739-
// returned by the plugin, if the time expires, then it will return an error.
740-
// Note that if multiple plugins asked to wait, then we wait for the minimum
741-
// timeout duration.
738+
// plugins returns "Wait", then this function will create and add waiting pod
739+
// to a map of currently waiting pods and return status with "Wait" code.
740+
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
742741
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
743742
startTime := time.Now()
744743
defer func() {
@@ -750,7 +749,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
750749
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
751750
if !status.IsSuccess() {
752751
if status.IsUnschedulable() {
753-
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
752+
msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
754753
klog.V(4).Infof(msg)
755754
return NewStatus(status.Code(), msg)
756755
}
@@ -768,29 +767,13 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
768767
}
769768
}
770769
}
771-
772-
// We now wait for the minimum duration if at least one plugin asked to
773-
// wait (and no plugin rejected the pod)
774770
if statusCode == Wait {
775-
startTime := time.Now()
776-
w := newWaitingPod(pod, pluginsWaitTime)
777-
f.waitingPods.add(w)
778-
defer f.waitingPods.remove(pod.UID)
779-
klog.V(4).Infof("waiting for pod %q at permit", pod.Name)
780-
s := <-w.s
781-
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
782-
if !s.IsSuccess() {
783-
if s.IsUnschedulable() {
784-
msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
785-
klog.V(4).Infof(msg)
786-
return NewStatus(s.Code(), msg)
787-
}
788-
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
789-
klog.Error(msg)
790-
return NewStatus(Error, msg)
791-
}
771+
waitingPod := newWaitingPod(pod, pluginsWaitTime)
772+
f.waitingPods.add(waitingPod)
773+
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
774+
klog.V(4).Infof(msg)
775+
return NewStatus(Wait, msg)
792776
}
793-
794777
return nil
795778
}
796779

@@ -804,6 +787,32 @@ func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state
804787
return status, timeout
805788
}
806789

790+
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
791+
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
792+
waitingPod := f.waitingPods.get(pod.UID)
793+
if waitingPod == nil {
794+
return nil
795+
}
796+
defer f.waitingPods.remove(pod.UID)
797+
klog.V(4).Infof("pod %q waiting on permit", pod.Name)
798+
799+
startTime := time.Now()
800+
s := <-waitingPod.s
801+
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
802+
803+
if !s.IsSuccess() {
804+
if s.IsUnschedulable() {
805+
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
806+
klog.V(4).Infof(msg)
807+
return NewStatus(s.Code(), msg)
808+
}
809+
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
810+
klog.Error(msg)
811+
return NewStatus(Error, msg)
812+
}
813+
return nil
814+
}
815+
807816
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
808817
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
809818
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
@@ -819,7 +828,10 @@ func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
819828

820829
// GetWaitingPod returns a reference to a WaitingPod given its UID.
821830
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
822-
return f.waitingPods.get(uid)
831+
if wp := f.waitingPods.get(uid); wp != nil {
832+
return wp
833+
}
834+
return nil // Returning nil instead of *waitingPod(nil).
823835
}
824836

825837
// RejectWaitingPod rejects a WaitingPod given its UID.

pkg/scheduler/framework/v1alpha1/framework_test.go

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,7 +1217,7 @@ func TestPermitPlugins(t *testing.T) {
12171217
inj: injectedResult{PermitStatus: int(Unschedulable)},
12181218
},
12191219
},
1220-
want: NewStatus(Unschedulable, `rejected by "TestPlugin" at permit: injected status`),
1220+
want: NewStatus(Unschedulable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
12211221
},
12221222
{
12231223
name: "ErrorPermitPlugin",
@@ -1237,7 +1237,7 @@ func TestPermitPlugins(t *testing.T) {
12371237
inj: injectedResult{PermitStatus: int(UnschedulableAndUnresolvable)},
12381238
},
12391239
},
1240-
want: NewStatus(UnschedulableAndUnresolvable, `rejected by "TestPlugin" at permit: injected status`),
1240+
want: NewStatus(UnschedulableAndUnresolvable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
12411241
},
12421242
{
12431243
name: "WaitPermitPlugin",
@@ -1247,7 +1247,7 @@ func TestPermitPlugins(t *testing.T) {
12471247
inj: injectedResult{PermitStatus: int(Wait)},
12481248
},
12491249
},
1250-
want: NewStatus(Unschedulable, `pod "" rejected while waiting at permit: rejected due to timeout after waiting 0s at plugin TestPlugin`),
1250+
want: NewStatus(Wait, `one or more plugins asked to wait and no plugin rejected pod ""`),
12511251
},
12521252
{
12531253
name: "SuccessSuccessPermitPlugin",
@@ -1425,6 +1425,13 @@ func TestRecordingMetrics(t *testing.T) {
14251425
wantExtensionPoint: "Permit",
14261426
wantStatus: Error,
14271427
},
1428+
{
1429+
name: "Permit - Wait",
1430+
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
1431+
inject: injectedResult{PermitStatus: int(Wait)},
1432+
wantExtensionPoint: "Permit",
1433+
wantStatus: Wait,
1434+
},
14281435
}
14291436

14301437
for _, tt := range tests {
@@ -1578,17 +1585,17 @@ func TestRunBindPlugins(t *testing.T) {
15781585
}
15791586
}
15801587

1581-
func TestPermitWaitingMetric(t *testing.T) {
1588+
func TestPermitWaitDurationMetric(t *testing.T) {
15821589
tests := []struct {
15831590
name string
15841591
inject injectedResult
15851592
wantRes string
15861593
}{
15871594
{
1588-
name: "Permit - Success",
1595+
name: "WaitOnPermit - No Wait",
15891596
},
15901597
{
1591-
name: "Permit - Wait Timeout",
1598+
name: "WaitOnPermit - Wait Timeout",
15921599
inject: injectedResult{PermitStatus: int(Wait)},
15931600
wantRes: "Unschedulable",
15941601
},
@@ -1617,47 +1624,80 @@ func TestPermitWaitingMetric(t *testing.T) {
16171624
}
16181625

16191626
f.RunPermitPlugins(context.TODO(), nil, pod, "")
1627+
f.WaitOnPermit(context.TODO(), pod)
16201628

16211629
collectAndComparePermitWaitDuration(t, tt.wantRes)
16221630
})
16231631
}
16241632
}
16251633

1626-
func TestRejectWaitingPod(t *testing.T) {
1634+
func TestWaitOnPermit(t *testing.T) {
16271635
pod := &v1.Pod{
16281636
ObjectMeta: metav1.ObjectMeta{
16291637
Name: "pod",
16301638
UID: types.UID("pod"),
16311639
},
16321640
}
16331641

1634-
testPermitPlugin := &TestPermitPlugin{}
1635-
r := make(Registry)
1636-
r.Register(permitPlugin,
1637-
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
1638-
return testPermitPlugin, nil
1639-
})
1640-
plugins := &config.Plugins{
1641-
Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}},
1642+
tests := []struct {
1643+
name string
1644+
action func(f Framework)
1645+
wantStatus Code
1646+
wantMessage string
1647+
}{
1648+
{
1649+
name: "Reject Waiting Pod",
1650+
action: func(f Framework) {
1651+
f.GetWaitingPod(pod.UID).Reject("reject message")
1652+
},
1653+
wantStatus: Unschedulable,
1654+
wantMessage: "pod \"pod\" rejected while waiting on permit: reject message",
1655+
},
1656+
{
1657+
name: "Allow Waiting Pod",
1658+
action: func(f Framework) {
1659+
f.GetWaitingPod(pod.UID).Allow(permitPlugin)
1660+
},
1661+
wantStatus: Success,
1662+
wantMessage: "",
1663+
},
16421664
}
16431665

1644-
f, err := newFrameworkWithQueueSortAndBind(r, plugins, emptyArgs)
1645-
if err != nil {
1646-
t.Fatalf("Failed to create framework for testing: %v", err)
1647-
}
1666+
for _, tt := range tests {
1667+
t.Run(tt.name, func(t *testing.T) {
1668+
testPermitPlugin := &TestPermitPlugin{}
1669+
r := make(Registry)
1670+
r.Register(permitPlugin,
1671+
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
1672+
return testPermitPlugin, nil
1673+
})
1674+
plugins := &config.Plugins{
1675+
Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}},
1676+
}
16481677

1649-
go func() {
1650-
for {
1651-
waitingPod := f.GetWaitingPod(pod.UID)
1652-
if waitingPod != nil {
1653-
break
1678+
f, err := newFrameworkWithQueueSortAndBind(r, plugins, emptyArgs)
1679+
if err != nil {
1680+
t.Fatalf("Failed to create framework for testing: %v", err)
16541681
}
1655-
}
1656-
f.RejectWaitingPod(pod.UID)
1657-
}()
1658-
permitStatus := f.RunPermitPlugins(context.Background(), nil, pod, "")
1659-
if permitStatus.Message() != "pod \"pod\" rejected while waiting at permit: removed" {
1660-
t.Fatalf("RejectWaitingPod failed, permitStatus: %v", permitStatus)
1682+
1683+
runPermitPluginsStatus := f.RunPermitPlugins(context.Background(), nil, pod, "")
1684+
if runPermitPluginsStatus.Code() != Wait {
1685+
t.Fatalf("Expected RunPermitPlugins to return status %v, but got %v",
1686+
Wait, runPermitPluginsStatus.Code())
1687+
}
1688+
1689+
go tt.action(f)
1690+
1691+
waitOnPermitStatus := f.WaitOnPermit(context.Background(), pod)
1692+
if waitOnPermitStatus.Code() != tt.wantStatus {
1693+
t.Fatalf("Expected WaitOnPermit to return status %v, but got %v",
1694+
tt.wantStatus, waitOnPermitStatus.Code())
1695+
}
1696+
if waitOnPermitStatus.Message() != tt.wantMessage {
1697+
t.Fatalf("Expected WaitOnPermit to return status with message %q, but got %q",
1698+
tt.wantMessage, waitOnPermitStatus.Message())
1699+
}
1700+
})
16611701
}
16621702
}
16631703

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -468,12 +468,14 @@ type Framework interface {
468468
// RunPermitPlugins runs the set of configured permit plugins. If any of these
469469
// plugins returns a status other than "Success" or "Wait", it does not continue
470470
// running the remaining plugins and returns an error. Otherwise, if any of the
471-
// plugins returns "Wait", then this function will block for the timeout period
472-
// returned by the plugin, if the time expires, then it will return an error.
473-
// Note that if multiple plugins asked to wait, then we wait for the minimum
474-
// timeout duration.
471+
// plugins returns "Wait", then this function will create and add waiting pod
472+
// to a map of currently waiting pods and return status with "Wait" code.
473+
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
475474
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
476475

476+
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
477+
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
478+
477479
// RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose
478480
// whether or not to handle the given Pod. If a bind plugin chooses to skip the
479481
// binding, it should return code=5("skip") status. Otherwise, it should return "Error"
@@ -497,9 +499,9 @@ type Framework interface {
497499
type FrameworkHandle interface {
498500
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
499501
// is taken at the beginning of a scheduling cycle and remains unchanged until
500-
// a pod finishes "Reserve" point. There is no guarantee that the information
502+
// a pod finishes "Permit" point. There is no guarantee that the information
501503
// remains unchanged in the binding phase of scheduling, so plugins in the binding
502-
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it,
504+
// cycle (pre-bind/bind/post-bind/un-reserve plugin) should not use it,
503505
// otherwise a concurrent read/write error might occur, they should use scheduler
504506
// cache instead.
505507
SnapshotSharedLister() schedulerlisters.SharedLister

pkg/scheduler/framework/v1alpha1/waiting_pods_map.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,19 @@ import (
2727

2828
// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
2929
type waitingPodsMap struct {
30-
pods map[types.UID]WaitingPod
30+
pods map[types.UID]*waitingPod
3131
mu sync.RWMutex
3232
}
3333

3434
// newWaitingPodsMap returns a new waitingPodsMap.
3535
func newWaitingPodsMap() *waitingPodsMap {
3636
return &waitingPodsMap{
37-
pods: make(map[types.UID]WaitingPod),
37+
pods: make(map[types.UID]*waitingPod),
3838
}
3939
}
4040

4141
// add a new WaitingPod to the map.
42-
func (m *waitingPodsMap) add(wp WaitingPod) {
42+
func (m *waitingPodsMap) add(wp *waitingPod) {
4343
m.mu.Lock()
4444
defer m.mu.Unlock()
4545
m.pods[wp.GetPod().UID] = wp
@@ -53,11 +53,10 @@ func (m *waitingPodsMap) remove(uid types.UID) {
5353
}
5454

5555
// get a WaitingPod from the map.
56-
func (m *waitingPodsMap) get(uid types.UID) WaitingPod {
56+
func (m *waitingPodsMap) get(uid types.UID) *waitingPod {
5757
m.mu.RLock()
5858
defer m.mu.RUnlock()
5959
return m.pods[uid]
60-
6160
}
6261

6362
// iterate acquires a read lock and iterates over the WaitingPods map.
@@ -77,11 +76,17 @@ type waitingPod struct {
7776
mu sync.RWMutex
7877
}
7978

79+
var _ WaitingPod = &waitingPod{}
80+
8081
// newWaitingPod returns a new waitingPod instance.
8182
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
8283
wp := &waitingPod{
8384
pod: pod,
84-
s: make(chan *Status),
85+
// Allow() and Reject() calls are non-blocking. This property is guaranteed
86+
// by using non-blocking send to this channel. This channel has a buffer of size 1
87+
// to ensure that non-blocking send will not be ignored - possible situation when
88+
// receiving from this channel happens after non-blocking send.
89+
s: make(chan *Status, 1),
8590
}
8691

8792
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))

pkg/scheduler/metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ var (
218218
&metrics.HistogramOpts{
219219
Subsystem: SchedulerSubsystem,
220220
Name: "permit_wait_duration_seconds",
221-
Help: "Duration of waiting in RunPermitPlugins.",
221+
Help: "Duration of waiting on permit.",
222222
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
223223
StabilityLevel: metrics.ALPHA,
224224
},

0 commit comments

Comments
 (0)