Skip to content

Commit 5e84257

Browse files
author
draveness
committed
feat(scheduler): use reflect to reduce the similar pattern
1 parent 1fe922e commit 5e84257

File tree

3 files changed

+110
-137
lines changed

3 files changed

+110
-137
lines changed

pkg/scheduler/framework/v1alpha1/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_library(
2020
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2121
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
2222
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
23+
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2324
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
2425
"//vendor/k8s.io/klog:go_default_library",
2526
"//vendor/sigs.k8s.io/yaml:go_default_library",

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 63 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package v1alpha1
1919
import (
2020
"context"
2121
"fmt"
22+
"reflect"
2223
"time"
2324

2425
v1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/runtime"
2627
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/apimachinery/pkg/util/sets"
2729
"k8s.io/client-go/util/workqueue"
2830
"k8s.io/klog"
2931
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -101,169 +103,93 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
101103
}
102104
}
103105

104-
if plugins.PreFilter != nil {
105-
for _, pf := range plugins.PreFilter.Enabled {
106-
if pg, ok := pluginsMap[pf.Name]; ok {
107-
p, ok := pg.(PreFilterPlugin)
108-
if !ok {
109-
return nil, fmt.Errorf("plugin %q does not extend prefilter plugin", pf.Name)
110-
}
111-
f.preFilterPlugins = append(f.preFilterPlugins, p)
112-
} else {
113-
return nil, fmt.Errorf("prefilter plugin %q does not exist", pf.Name)
114-
}
115-
}
106+
if err := updatePluginList(reflect.ValueOf(&f.preFilterPlugins), plugins.PreFilter, reflect.TypeOf((*PreFilterPlugin)(nil)), pluginsMap); err != nil {
107+
return nil, err
116108
}
117109

118-
if plugins.Filter != nil {
119-
for _, r := range plugins.Filter.Enabled {
120-
if pg, ok := pluginsMap[r.Name]; ok {
121-
p, ok := pg.(FilterPlugin)
122-
if !ok {
123-
return nil, fmt.Errorf("plugin %q does not extend filter plugin", r.Name)
124-
}
125-
f.filterPlugins = append(f.filterPlugins, p)
126-
} else {
127-
return nil, fmt.Errorf("filter plugin %q does not exist", r.Name)
128-
}
129-
}
110+
if err := updatePluginList(reflect.ValueOf(&f.filterPlugins), plugins.Filter, reflect.TypeOf((*FilterPlugin)(nil)), pluginsMap); err != nil {
111+
return nil, err
130112
}
131113

132-
if plugins.Score != nil {
133-
for _, sc := range plugins.Score.Enabled {
134-
if pg, ok := pluginsMap[sc.Name]; ok {
135-
p, ok := pg.(ScorePlugin)
136-
if !ok {
137-
return nil, fmt.Errorf("plugin %q does not extend score plugin", sc.Name)
138-
}
139-
if f.pluginNameToWeightMap[p.Name()] == 0 {
140-
return nil, fmt.Errorf("score plugin %q is not configured with weight", p.Name())
141-
}
142-
f.scorePlugins = append(f.scorePlugins, p)
143-
} else {
144-
return nil, fmt.Errorf("score plugin %q does not exist", sc.Name)
145-
}
146-
}
114+
if err := updatePluginList(reflect.ValueOf(&f.reservePlugins), plugins.Reserve, reflect.TypeOf((*ReservePlugin)(nil)), pluginsMap); err != nil {
115+
return nil, err
147116
}
148117

149-
if plugins.Reserve != nil {
150-
for _, r := range plugins.Reserve.Enabled {
151-
if pg, ok := pluginsMap[r.Name]; ok {
152-
p, ok := pg.(ReservePlugin)
153-
if !ok {
154-
return nil, fmt.Errorf("plugin %q does not extend reserve plugin", r.Name)
155-
}
156-
f.reservePlugins = append(f.reservePlugins, p)
157-
} else {
158-
return nil, fmt.Errorf("reserve plugin %q does not exist", r.Name)
159-
}
160-
}
118+
if err := updatePluginList(reflect.ValueOf(&f.postFilterPlugins), plugins.PostFilter, reflect.TypeOf((*PostFilterPlugin)(nil)), pluginsMap); err != nil {
119+
return nil, err
161120
}
162121

163-
if plugins.PostFilter != nil {
164-
for _, r := range plugins.PostFilter.Enabled {
165-
if pg, ok := pluginsMap[r.Name]; ok {
166-
p, ok := pg.(PostFilterPlugin)
167-
if !ok {
168-
return nil, fmt.Errorf("plugin %q does not extend post-filter plugin", r.Name)
169-
}
170-
f.postFilterPlugins = append(f.postFilterPlugins, p)
171-
} else {
172-
return nil, fmt.Errorf("post-filter plugin %q does not exist", r.Name)
173-
}
174-
}
122+
if err := updatePluginList(reflect.ValueOf(&f.scorePlugins), plugins.Score, reflect.TypeOf((*ScorePlugin)(nil)), pluginsMap); err != nil {
123+
return nil, err
175124
}
176125

177-
if plugins.PreBind != nil {
178-
for _, pb := range plugins.PreBind.Enabled {
179-
if pg, ok := pluginsMap[pb.Name]; ok {
180-
p, ok := pg.(PreBindPlugin)
181-
if !ok {
182-
return nil, fmt.Errorf("plugin %q does not extend prebind plugin", pb.Name)
183-
}
184-
f.preBindPlugins = append(f.preBindPlugins, p)
185-
} else {
186-
return nil, fmt.Errorf("prebind plugin %q does not exist", pb.Name)
187-
}
188-
}
126+
if err := updatePluginList(reflect.ValueOf(&f.preBindPlugins), plugins.PreBind, reflect.TypeOf((*PreBindPlugin)(nil)), pluginsMap); err != nil {
127+
return nil, err
189128
}
190129

191-
if plugins.Bind != nil {
192-
for _, pb := range plugins.Bind.Enabled {
193-
if pg, ok := pluginsMap[pb.Name]; ok {
194-
p, ok := pg.(BindPlugin)
195-
if !ok {
196-
return nil, fmt.Errorf("plugin %q does not extend bind plugin", pb.Name)
197-
}
198-
f.bindPlugins = append(f.bindPlugins, p)
199-
} else {
200-
return nil, fmt.Errorf("bind plugin %q does not exist", pb.Name)
201-
}
202-
}
130+
if err := updatePluginList(reflect.ValueOf(&f.bindPlugins), plugins.Bind, reflect.TypeOf((*BindPlugin)(nil)), pluginsMap); err != nil {
131+
return nil, err
203132
}
204133

205-
if plugins.PostBind != nil {
206-
for _, pb := range plugins.PostBind.Enabled {
207-
if pg, ok := pluginsMap[pb.Name]; ok {
208-
p, ok := pg.(PostBindPlugin)
209-
if !ok {
210-
return nil, fmt.Errorf("plugin %q does not extend postbind plugin", pb.Name)
211-
}
212-
f.postBindPlugins = append(f.postBindPlugins, p)
213-
} else {
214-
return nil, fmt.Errorf("postbind plugin %q does not exist", pb.Name)
215-
}
216-
}
134+
if err := updatePluginList(reflect.ValueOf(&f.postBindPlugins), plugins.PostBind, reflect.TypeOf((*PostBindPlugin)(nil)), pluginsMap); err != nil {
135+
return nil, err
217136
}
218137

219-
if plugins.Unreserve != nil {
220-
for _, ur := range plugins.Unreserve.Enabled {
221-
if pg, ok := pluginsMap[ur.Name]; ok {
222-
p, ok := pg.(UnreservePlugin)
223-
if !ok {
224-
return nil, fmt.Errorf("plugin %q does not extend unreserve plugin", ur.Name)
225-
}
226-
f.unreservePlugins = append(f.unreservePlugins, p)
227-
} else {
228-
return nil, fmt.Errorf("unreserve plugin %q does not exist", ur.Name)
229-
}
230-
}
138+
if err := updatePluginList(reflect.ValueOf(&f.unreservePlugins), plugins.Unreserve, reflect.TypeOf((*UnreservePlugin)(nil)), pluginsMap); err != nil {
139+
return nil, err
231140
}
232141

233-
if plugins.Permit != nil {
234-
for _, pr := range plugins.Permit.Enabled {
235-
if pg, ok := pluginsMap[pr.Name]; ok {
236-
p, ok := pg.(PermitPlugin)
237-
if !ok {
238-
return nil, fmt.Errorf("plugin %q does not extend permit plugin", pr.Name)
239-
}
240-
f.permitPlugins = append(f.permitPlugins, p)
241-
} else {
242-
return nil, fmt.Errorf("permit plugin %q does not exist", pr.Name)
243-
}
244-
}
142+
if err := updatePluginList(reflect.ValueOf(&f.permitPlugins), plugins.Permit, reflect.TypeOf((*PermitPlugin)(nil)), pluginsMap); err != nil {
143+
return nil, err
245144
}
246145

247-
if plugins.QueueSort != nil {
248-
for _, qs := range plugins.QueueSort.Enabled {
249-
if pg, ok := pluginsMap[qs.Name]; ok {
250-
p, ok := pg.(QueueSortPlugin)
251-
if !ok {
252-
return nil, fmt.Errorf("plugin %q does not extend queue sort plugin", qs.Name)
253-
}
254-
f.queueSortPlugins = append(f.queueSortPlugins, p)
255-
if len(f.queueSortPlugins) > 1 {
256-
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
257-
}
258-
} else {
259-
return nil, fmt.Errorf("queue sort plugin %q does not exist", qs.Name)
260-
}
146+
if err := updatePluginList(reflect.ValueOf(&f.queueSortPlugins), plugins.QueueSort, reflect.TypeOf((*QueueSortPlugin)(nil)), pluginsMap); err != nil {
147+
return nil, err
148+
}
149+
150+
for _, scorePlugin := range f.scorePlugins {
151+
if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 {
152+
return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
261153
}
262154
}
263155

156+
if len(f.queueSortPlugins) > 1 {
157+
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
158+
}
159+
264160
return f, nil
265161
}
266162

163+
func updatePluginList(pluginList reflect.Value, pluginSet *config.PluginSet, pluginType reflect.Type, pluginsMap map[string]Plugin) error {
164+
if pluginSet == nil {
165+
return nil
166+
}
167+
168+
plugins := pluginList.Elem()
169+
pluginType = pluginType.Elem()
170+
set := sets.NewString()
171+
for _, ep := range pluginSet.Enabled {
172+
pg, ok := pluginsMap[ep.Name]
173+
if !ok {
174+
return fmt.Errorf("%s %q does not exist", pluginType.String(), ep.Name)
175+
}
176+
177+
if !reflect.TypeOf(pg).Implements(pluginType) {
178+
return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.String())
179+
}
180+
181+
if set.Has(ep.Name) {
182+
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.String())
183+
}
184+
185+
set.Insert(ep.Name)
186+
187+
newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
188+
plugins.Set(newPlugins)
189+
}
190+
return nil
191+
}
192+
267193
// QueueSortFunc returns the function to sort pods in scheduling queue
268194
func (f *framework) QueueSortFunc() LessFunc {
269195
if len(f.queueSortPlugins) == 0 {

pkg/scheduler/framework/v1alpha1/framework_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package v1alpha1
1919
import (
2020
"fmt"
2121
"reflect"
22+
"strings"
2223
"testing"
2324

2425
v1 "k8s.io/api/core/v1"
@@ -35,6 +36,7 @@ const (
3536
pluginNotImplementingScore = "plugin-not-implementing-score"
3637
preFilterPluginName = "prefilter-plugin"
3738
preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin"
39+
duplicatePluginName = "duplicate-plugin"
3840
)
3941

4042
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@@ -166,12 +168,34 @@ func (pl *TestPreFilterWithExtensionsPlugin) Extensions() PreFilterExtensions {
166168
return pl
167169
}
168170

171+
type TestDuplicatePlugin struct {
172+
}
173+
174+
func (dp *TestDuplicatePlugin) Name() string {
175+
return duplicatePluginName
176+
}
177+
178+
func (dp *TestDuplicatePlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
179+
return nil
180+
}
181+
182+
func (dp *TestDuplicatePlugin) Extensions() PreFilterExtensions {
183+
return nil
184+
}
185+
186+
var _ PreFilterPlugin = &TestDuplicatePlugin{}
187+
188+
func newDuplicatePlugin(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
189+
return &TestDuplicatePlugin{}, nil
190+
}
191+
169192
var registry Registry = func() Registry {
170193
r := make(Registry)
171194
r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1)
172195
r.Register(scoreWithNormalizePlugin2, newScoreWithNormalizePlugin2)
173196
r.Register(scorePlugin1, newScorePlugin1)
174197
r.Register(pluginNotImplementingScore, newPluginNotImplementingScore)
198+
r.Register(duplicatePluginName, newDuplicatePlugin)
175199
return r
176200
}()
177201

@@ -239,6 +263,28 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
239263
}
240264
}
241265

266+
func TestRegisterDuplicatePluginWouldFail(t *testing.T) {
267+
plugin := config.Plugin{Name: duplicatePluginName, Weight: 1}
268+
269+
pluginSet := config.PluginSet{
270+
Enabled: []config.Plugin{
271+
plugin,
272+
plugin,
273+
},
274+
}
275+
plugins := config.Plugins{}
276+
plugins.PreFilter = &pluginSet
277+
278+
_, err := NewFramework(registry, &plugins, emptyArgs)
279+
if err == nil {
280+
t.Fatal("Framework initialization should fail")
281+
}
282+
283+
if err != nil && !strings.Contains(err.Error(), "already registered") {
284+
t.Fatalf("Unexpected error, got %s, expect: plugin already registered", err.Error())
285+
}
286+
}
287+
242288
func TestRunScorePlugins(t *testing.T) {
243289
tests := []struct {
244290
name string

0 commit comments

Comments
 (0)