Skip to content

Commit 997648a

Browse files
committed
Add Un-reserve extension point for the scheduling framework
1 parent 086a86b commit 997648a

File tree

4 files changed

+165
-11
lines changed

4 files changed

+165
-11
lines changed

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type framework struct {
3333
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
3434
reservePlugins []ReservePlugin
3535
prebindPlugins []PrebindPlugin
36+
unreservePlugins []UnreservePlugin
3637
}
3738

3839
var _ = Framework(&framework{})
@@ -64,6 +65,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
6465
if pp, ok := p.(PrebindPlugin); ok {
6566
f.prebindPlugins = append(f.prebindPlugins, pp)
6667
}
68+
if up, ok := p.(UnreservePlugin); ok {
69+
f.unreservePlugins = append(f.unreservePlugins, up)
70+
}
6771
}
6872
return f, nil
6973
}
@@ -105,6 +109,14 @@ func (f *framework) RunReservePlugins(
105109
return nil
106110
}
107111

112+
// RunUnreservePlugins runs the set of configured unreserve plugins.
113+
func (f *framework) RunUnreservePlugins(
114+
pc *PluginContext, pod *v1.Pod, nodeName string) {
115+
for _, pl := range f.unreservePlugins {
116+
pl.Unreserve(pc, pod, nodeName)
117+
}
118+
}
119+
108120
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
109121
// is taken at the beginning of a scheduling cycle and remains unchanged until a
110122
// pod finishes "Reserve". There is no guarantee that the information remains

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,17 @@ type PrebindPlugin interface {
113113
Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
114114
}
115115

116+
// UnreservePlugin is an interface for Unreserve plugins. This is an informational
117+
// extension point. If a pod was reserved and then rejected in a later phase, then
118+
// un-reserve plugins will be notified. Un-reserve plugins should clean up state
119+
// associated with the reserved Pod.
120+
type UnreservePlugin interface {
121+
Plugin
122+
// Unreserve is called by the scheduling framework when a reserved pod was
123+
// rejected in a later phase.
124+
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
125+
}
126+
116127
// Framework manages the set of plugins in use by the scheduling framework.
117128
// Configured plugins are called at specified points in a scheduling context.
118129
type Framework interface {
@@ -128,6 +139,9 @@ type Framework interface {
128139
// plugins returns an error, it does not continue running the remaining ones and
129140
// returns the error. In such case, pod will not be scheduled.
130141
RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
142+
143+
// RunUnreservePlugins runs the set of configured unreserve plugins.
144+
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
131145
}
132146

133147
// FrameworkHandle provides data and some tools that plugins can use. It is

pkg/scheduler/scheduler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,8 @@ func (sched *Scheduler) scheduleOne() {
515515
if err != nil {
516516
klog.Errorf("error assuming pod: %v", err)
517517
metrics.PodScheduleErrors.Inc()
518+
// trigger un-reserve plugins to clean up state associated with the reserved Pod
519+
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
518520
return
519521
}
520522
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
@@ -525,6 +527,8 @@ func (sched *Scheduler) scheduleOne() {
525527
if err != nil {
526528
klog.Errorf("error binding volumes: %v", err)
527529
metrics.PodScheduleErrors.Inc()
530+
// trigger un-reserve plugins to clean up state associated with the reserved Pod
531+
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
528532
return
529533
}
530534
}
@@ -543,6 +547,8 @@ func (sched *Scheduler) scheduleOne() {
543547
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
544548
}
545549
sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message())
550+
// trigger un-reserve plugins to clean up state associated with the reserved Pod
551+
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
546552
return
547553
}
548554

@@ -558,6 +564,8 @@ func (sched *Scheduler) scheduleOne() {
558564
if err != nil {
559565
klog.Errorf("error binding pod: %v", err)
560566
metrics.PodScheduleErrors.Inc()
567+
// trigger un-reserve plugins to clean up state associated with the reserved Pod
568+
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
561569
} else {
562570
klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
563571
metrics.PodScheduleSuccesses.Inc()

test/integration/scheduler/framework_test.go

Lines changed: 131 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,24 @@ package scheduler
1818

1919
import (
2020
"fmt"
21-
"k8s.io/apimachinery/pkg/runtime"
2221
"testing"
2322
"time"
2423

2524
"k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/apimachinery/pkg/util/wait"
2727
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
2828
)
2929

3030
// TesterPlugin is common ancestor for a test plugin that allows injection of
3131
// failures and some other test functionalities.
3232
type TesterPlugin struct {
33-
numReserveCalled int
34-
numPrebindCalled int
35-
failReserve bool
36-
failPrebind bool
37-
rejectPrebind bool
33+
numReserveCalled int
34+
numPrebindCalled int
35+
numUnreserveCalled int
36+
failReserve bool
37+
failPrebind bool
38+
rejectPrebind bool
3839
}
3940

4041
type ReservePlugin struct {
@@ -45,19 +46,27 @@ type PrebindPlugin struct {
4546
TesterPlugin
4647
}
4748

49+
type UnreservePlugin struct {
50+
TesterPlugin
51+
}
52+
4853
const (
49-
reservePluginName = "reserve-plugin"
50-
prebindPluginName = "prebind-plugin"
54+
reservePluginName = "reserve-plugin"
55+
prebindPluginName = "prebind-plugin"
56+
unreservePluginName = "unreserve-plugin"
5157
)
5258

5359
var _ = framework.ReservePlugin(&ReservePlugin{})
5460
var _ = framework.PrebindPlugin(&PrebindPlugin{})
61+
var _ = framework.UnreservePlugin(&UnreservePlugin{})
5562

5663
// Name returns name of the plugin.
5764
func (rp *ReservePlugin) Name() string {
5865
return reservePluginName
5966
}
6067

68+
var resPlugin = &ReservePlugin{}
69+
6170
// Reserve is a test function that returns an error or nil, depending on the
6271
// value of "failReserve".
6372
func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
@@ -68,14 +77,13 @@ func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeN
6877
return nil
6978
}
7079

71-
var resPlugin = &ReservePlugin{}
72-
var pbdPlugin = &PrebindPlugin{}
73-
7480
// NewReservePlugin is the factory for reserve plugin.
7581
func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
7682
return resPlugin, nil
7783
}
7884

85+
var pbdPlugin = &PrebindPlugin{}
86+
7987
// Name returns name of the plugin.
8088
func (pp *PrebindPlugin) Name() string {
8189
return prebindPluginName
@@ -93,11 +101,39 @@ func (pp *PrebindPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeN
93101
return nil
94102
}
95103

104+
// reset used to reset numPrebindCalled.
105+
func (pp *PrebindPlugin) reset() {
106+
pp.numPrebindCalled = 0
107+
}
108+
96109
// NewPrebindPlugin is the factory for prebind plugin.
97110
func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
98111
return pbdPlugin, nil
99112
}
100113

114+
var unresPlugin = &UnreservePlugin{}
115+
116+
// Name returns name of the plugin.
117+
func (up *UnreservePlugin) Name() string {
118+
return unreservePluginName
119+
}
120+
121+
// Unreserve is a test function that returns an error or nil, depending on the
122+
// value of "failUnreserve".
123+
func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
124+
up.numUnreserveCalled++
125+
}
126+
127+
// reset used to reset numUnreserveCalled.
128+
func (up *UnreservePlugin) reset() {
129+
up.numUnreserveCalled = 0
130+
}
131+
132+
// NewUnreservePlugin is the factory for unreserve plugin.
133+
func NewUnreservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
134+
return unresPlugin, nil
135+
}
136+
101137
// TestReservePlugin tests invocation of reserve plugins.
102138
func TestReservePlugin(t *testing.T) {
103139
// Create a plugin registry for testing. Register only a reserve plugin.
@@ -216,3 +252,87 @@ func TestPrebindPlugin(t *testing.T) {
216252
cleanupPods(cs, t, []*v1.Pod{pod})
217253
}
218254
}
255+
256+
// TestUnreservePlugin tests invocation of un-reserve plugin
257+
func TestUnreservePlugin(t *testing.T) {
258+
// TODO: register more plugin which would trigger un-reserve plugin
259+
registry := framework.Registry{
260+
unreservePluginName: NewUnreservePlugin,
261+
prebindPluginName: NewPrebindPlugin,
262+
}
263+
264+
// Create the master and the scheduler with the test plugin set.
265+
context := initTestSchedulerWithOptions(t,
266+
initTestMaster(t, "unreserve-plugin", nil),
267+
false, nil, registry, false, time.Second)
268+
defer cleanupTest(t, context)
269+
270+
cs := context.clientSet
271+
// Add a few nodes.
272+
_, err := createNodes(cs, "test-node", nil, 2)
273+
if err != nil {
274+
t.Fatalf("Cannot create nodes: %v", err)
275+
}
276+
277+
tests := []struct {
278+
prebindFail bool
279+
prebindReject bool
280+
}{
281+
{
282+
prebindFail: false,
283+
prebindReject: false,
284+
},
285+
{
286+
prebindFail: true,
287+
prebindReject: false,
288+
},
289+
{
290+
prebindFail: false,
291+
prebindReject: true,
292+
},
293+
{
294+
prebindFail: true,
295+
prebindReject: true,
296+
},
297+
}
298+
299+
for i, test := range tests {
300+
pbdPlugin.failPrebind = test.prebindFail
301+
pbdPlugin.rejectPrebind = test.prebindReject
302+
303+
// Create a best effort pod.
304+
pod, err := createPausePod(cs,
305+
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
306+
if err != nil {
307+
t.Errorf("Error while creating a test pod: %v", err)
308+
}
309+
310+
if test.prebindFail {
311+
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
312+
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
313+
}
314+
if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled {
315+
t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled)
316+
}
317+
} else {
318+
if test.prebindReject {
319+
if err = waitForPodUnschedulable(cs, pod); err != nil {
320+
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
321+
}
322+
if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled {
323+
t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled)
324+
}
325+
} else {
326+
if err = waitForPodToSchedule(cs, pod); err != nil {
327+
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
328+
}
329+
if unresPlugin.numUnreserveCalled > 0 {
330+
t.Errorf("test #%v: Didn't expected the unreserve plugin to be called, was called %d times.", i, unresPlugin.numUnreserveCalled)
331+
}
332+
}
333+
}
334+
unresPlugin.reset()
335+
pbdPlugin.reset()
336+
cleanupPods(cs, t, []*v1.Pod{pod})
337+
}
338+
}

0 commit comments

Comments
 (0)