Skip to content

Commit 343817e

Browse files
authored
Merge pull request kubernetes#93831 from cofyc/fix93830
scheduler: forget the pod when the reserve plugins fail
2 parents 0436287 + 1176ef9 commit 343817e

File tree

4 files changed

+157
-18
lines changed

4 files changed

+157
-18
lines changed

pkg/scheduler/scheduler.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -422,9 +422,6 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta
422422
}
423423
if err != nil {
424424
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
425-
if err := sched.SchedulerCache.ForgetPod(assumed); err != nil {
426-
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
427-
}
428425
return
429426
}
430427

@@ -504,22 +501,25 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
504501
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
505502
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
506503
if err != nil {
504+
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
507505
// This is most probably result of a BUG in retrying logic.
508506
// We report an error here so that pod scheduling can be retried.
509507
// This relies on the fact that Error will check if the pod has been bound
510508
// to a node and if so will not add it back to the unscheduled pods queue
511509
// (otherwise this would cause an infinite loop).
512510
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
513-
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
514511
return
515512
}
516513

517514
// Run the Reserve method of reserve plugins.
518515
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
519-
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
520516
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
521517
// trigger un-reserve to clean up state associated with the reserved Pod
522518
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
519+
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
520+
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
521+
}
522+
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
523523
return
524524
}
525525

@@ -587,6 +587,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
587587
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
588588
// trigger un-reserve plugins to clean up state associated with the reserved Pod
589589
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
590+
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
591+
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
592+
}
590593
sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
591594
} else {
592595
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.

pkg/scheduler/scheduler_test.go

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,17 +214,57 @@ func TestSchedulerScheduleOne(t *testing.T) {
214214
errB := errors.New("binder")
215215

216216
table := []struct {
217-
name string
218-
injectBindError error
219-
sendPod *v1.Pod
220-
algo core.ScheduleAlgorithm
221-
expectErrorPod *v1.Pod
222-
expectForgetPod *v1.Pod
223-
expectAssumedPod *v1.Pod
224-
expectError error
225-
expectBind *v1.Binding
226-
eventReason string
217+
name string
218+
injectBindError error
219+
sendPod *v1.Pod
220+
algo core.ScheduleAlgorithm
221+
registerPluginFuncs []st.RegisterPluginFunc
222+
expectErrorPod *v1.Pod
223+
expectForgetPod *v1.Pod
224+
expectAssumedPod *v1.Pod
225+
expectError error
226+
expectBind *v1.Binding
227+
eventReason string
227228
}{
229+
{
230+
name: "error reserve pod",
231+
sendPod: podWithID("foo", ""),
232+
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
233+
registerPluginFuncs: []st.RegisterPluginFunc{
234+
st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
235+
},
236+
expectErrorPod: podWithID("foo", testNode.Name),
237+
expectForgetPod: podWithID("foo", testNode.Name),
238+
expectAssumedPod: podWithID("foo", testNode.Name),
239+
expectError: errors.New(`error while running Reserve in "FakeReserve" reserve plugin for pod "foo": reserve error`),
240+
eventReason: "FailedScheduling",
241+
},
242+
{
243+
name: "error permit pod",
244+
sendPod: podWithID("foo", ""),
245+
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
246+
registerPluginFuncs: []st.RegisterPluginFunc{
247+
st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
248+
},
249+
expectErrorPod: podWithID("foo", testNode.Name),
250+
expectForgetPod: podWithID("foo", testNode.Name),
251+
expectAssumedPod: podWithID("foo", testNode.Name),
252+
expectError: errors.New(`error while running "FakePermit" permit plugin for pod "foo": permit error`),
253+
eventReason: "FailedScheduling",
254+
},
255+
{
256+
name: "error prebind pod",
257+
sendPod: podWithID("foo", ""),
258+
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
259+
registerPluginFuncs: []st.RegisterPluginFunc{
260+
st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.NewStatus(framework.Error, "prebind error"))),
261+
},
262+
expectErrorPod: podWithID("foo", testNode.Name),
263+
expectForgetPod: podWithID("foo", testNode.Name),
264+
expectAssumedPod: podWithID("foo", testNode.Name),
265+
expectError: errors.New(`error while running "FakePreBind" prebind plugin for pod "foo": prebind error`),
266+
eventReason: "FailedScheduling",
267+
},
228268
{
229269
name: "bind assumed pod scheduled",
230270
sendPod: podWithID("foo", ""),
@@ -252,7 +292,8 @@ func TestSchedulerScheduleOne(t *testing.T) {
252292
expectErrorPod: podWithID("foo", testNode.Name),
253293
expectForgetPod: podWithID("foo", testNode.Name),
254294
eventReason: "FailedScheduling",
255-
}, {
295+
},
296+
{
256297
name: "deleting pod",
257298
sendPod: deletingPod("foo"),
258299
algo: mockScheduler{core.ScheduleResult{}, nil},
@@ -296,10 +337,11 @@ func TestSchedulerScheduleOne(t *testing.T) {
296337
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
297338
return true, gotBinding, item.injectBindError
298339
})
299-
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
340+
registerPluginFuncs := append(item.registerPluginFuncs,
300341
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
301342
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
302-
}, frameworkruntime.WithClientSet(client))
343+
)
344+
fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client))
303345
if err != nil {
304346
t.Fatal(err)
305347
}

pkg/scheduler/testing/fake_plugins.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"sync/atomic"
23+
"time"
2324

2425
v1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/runtime"
@@ -152,3 +153,81 @@ func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFac
152153
}, nil
153154
}
154155
}
156+
157+
// FakeReservePlugin is a test reserve plugin.
158+
type FakeReservePlugin struct {
159+
Status *framework.Status
160+
}
161+
162+
// Name returns name of the plugin.
163+
func (pl *FakeReservePlugin) Name() string {
164+
return "FakeReserve"
165+
}
166+
167+
// Reserve invoked at the Reserve extension point.
168+
func (pl *FakeReservePlugin) Reserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status {
169+
return pl.Status
170+
}
171+
172+
// Unreserve invoked at the Unreserve extension point.
173+
func (pl *FakeReservePlugin) Unreserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) {
174+
}
175+
176+
// NewFakeReservePlugin initializes a fakeReservePlugin and returns it.
177+
func NewFakeReservePlugin(status *framework.Status) frameworkruntime.PluginFactory {
178+
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
179+
return &FakeReservePlugin{
180+
Status: status,
181+
}, nil
182+
}
183+
}
184+
185+
// FakePreBindPlugin is a test prebind plugin.
186+
type FakePreBindPlugin struct {
187+
Status *framework.Status
188+
}
189+
190+
// Name returns name of the plugin.
191+
func (pl *FakePreBindPlugin) Name() string {
192+
return "FakePreBind"
193+
}
194+
195+
// PreBind invoked at the PreBind extension point.
196+
func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status {
197+
return pl.Status
198+
}
199+
200+
// NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it.
201+
func NewFakePreBindPlugin(status *framework.Status) frameworkruntime.PluginFactory {
202+
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
203+
return &FakePreBindPlugin{
204+
Status: status,
205+
}, nil
206+
}
207+
}
208+
209+
// FakePermitPlugin is a test permit plugin.
210+
type FakePermitPlugin struct {
211+
Status *framework.Status
212+
Timeout time.Duration
213+
}
214+
215+
// Name returns name of the plugin.
216+
func (pl *FakePermitPlugin) Name() string {
217+
return "FakePermit"
218+
}
219+
220+
// Permit invoked at the Permit extension point.
221+
func (pl *FakePermitPlugin) Permit(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
222+
return pl.Status, pl.Timeout
223+
}
224+
225+
// NewFakePermitPlugin initializes a fakePermitPlugin and returns it.
226+
func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) frameworkruntime.PluginFactory {
227+
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
228+
return &FakePermitPlugin{
229+
Status: status,
230+
Timeout: timeout,
231+
}, nil
232+
}
233+
}

pkg/scheduler/testing/framework_helpers.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,21 @@ func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory
5252
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")
5353
}
5454

55+
// RegisterReservePlugin returns a function to register a Reserve Plugin to a given registry.
56+
func RegisterReservePlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
57+
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Reserve")
58+
}
59+
60+
// RegisterPermitPlugin returns a function to register a Permit Plugin to a given registry.
61+
func RegisterPermitPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
62+
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Permit")
63+
}
64+
65+
// RegisterPreBindPlugin returns a function to register a PreBind Plugin to a given registry.
66+
func RegisterPreBindPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
67+
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreBind")
68+
}
69+
5570
// RegisterScorePlugin returns a function to register a Score Plugin to a given registry.
5671
func RegisterScorePlugin(pluginName string, pluginNewFunc runtime.PluginFactory, weight int32) RegisterPluginFunc {
5772
return RegisterPluginAsExtensionsWithWeight(pluginName, weight, pluginNewFunc, "Score")

0 commit comments

Comments
 (0)