Skip to content

Commit e50a24d

Browse files
committed
Move RunNormalizeScorePlugins and ApplyScoreWeights into RunScorePlugins; Also add unit tests for RunScorePlugins.
1 parent 8f0d626 commit e50a24d

File tree

6 files changed

+192
-592
lines changed

6 files changed

+192
-592
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -784,18 +784,6 @@ func PrioritizeNodes(
784784
return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
785785
}
786786

787-
// Run the Normalize Score plugins.
788-
status := framework.RunNormalizeScorePlugins(pluginContext, pod, scoresMap)
789-
if !status.IsSuccess() {
790-
return schedulerapi.HostPriorityList{}, status.AsError()
791-
}
792-
793-
// Apply weights for scores.
794-
status = framework.ApplyScoreWeights(pluginContext, pod, scoresMap)
795-
if !status.IsSuccess() {
796-
return schedulerapi.HostPriorityList{}, status.AsError()
797-
}
798-
799787
// Summarize all scores.
800788
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
801789

pkg/scheduler/framework/v1alpha1/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ go_test(
4848
deps = [
4949
"//pkg/scheduler/apis/config:go_default_library",
5050
"//staging/src/k8s.io/api/core/v1:go_default_library",
51+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
5152
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
5253
],
5354
)

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 11 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,8 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
360360
}
361361
ctx, cancel := context.WithCancel(context.Background())
362362
errCh := schedutil.NewErrorChannel()
363+
364+
// Run Score method for each node in parallel.
363365
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
364366
for _, pl := range f.scorePlugins {
365367
nodeName := nodes[index].Name
@@ -374,83 +376,53 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
374376
}
375377
}
376378
})
377-
378379
if err := errCh.ReceiveError(); err != nil {
379380
msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
380381
klog.Error(msg)
381382
return nil, NewStatus(Error, msg)
382383
}
383384

384-
return pluginToNodeScores, nil
385-
}
386-
387-
// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins.
388-
// It should be called after RunScorePlugins with the PluginToNodeScores result.
389-
// It then modifies the list with normalized scores. It returns a non-success Status
390-
// if any of the NormalizeScore functions returns a non-success status.
391-
func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScores) *Status {
392-
ctx, cancel := context.WithCancel(context.Background())
393-
errCh := schedutil.NewErrorChannel()
385+
// Run NormalizeScore method for each ScoreWithNormalizePlugin in parallel.
394386
workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) {
395387
pl := f.scoreWithNormalizePlugins[index]
396-
nodeScoreList, ok := scores[pl.Name()]
397-
if !ok {
398-
err := fmt.Errorf("normalize score plugin %q has no corresponding scores in the PluginToNodeScores", pl.Name())
399-
errCh.SendErrorWithCancel(err, cancel)
400-
return
401-
}
388+
nodeScoreList := pluginToNodeScores[pl.Name()]
402389
status := pl.NormalizeScore(pc, pod, nodeScoreList)
403390
if !status.IsSuccess() {
404391
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
405392
errCh.SendErrorWithCancel(err, cancel)
406393
return
407394
}
408395
})
409-
410396
if err := errCh.ReceiveError(); err != nil {
411397
msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
412398
klog.Error(msg)
413-
return NewStatus(Error, msg)
399+
return nil, NewStatus(Error, msg)
414400
}
415401

416-
return nil
417-
}
418-
419-
// ApplyScoreWeights applies weights to the score results. It should be called after
420-
// RunNormalizeScorePlugins.
421-
func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScores) *Status {
422-
ctx, cancel := context.WithCancel(context.Background())
423-
errCh := schedutil.NewErrorChannel()
402+
// Apply score defaultWeights for each ScorePlugin in parallel.
424403
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
425404
pl := f.scorePlugins[index]
426405
// Score plugins' weight has been checked when they are initialized.
427406
weight := f.pluginNameToWeightMap[pl.Name()]
428-
nodeScoreList, ok := scores[pl.Name()]
429-
if !ok {
430-
err := fmt.Errorf("score plugin %q has no corresponding scores in the PluginToNodeScores", pl.Name())
431-
errCh.SendErrorWithCancel(err, cancel)
432-
return
433-
}
407+
nodeScoreList := pluginToNodeScores[pl.Name()]
434408

435409
for i, nodeScore := range nodeScoreList {
436410
// return error if score plugin returns invalid score.
437411
if nodeScore.Score > MaxNodeScore || nodeScore.Score < MinNodeScore {
438-
err := fmt.Errorf("score plugin %q returns an invalid score %q, it should in the range of [MinNodeScore, MaxNodeScore] after normalizing", pl.Name(), nodeScore.Score)
412+
err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, MinNodeScore, MaxNodeScore)
439413
errCh.SendErrorWithCancel(err, cancel)
440414
return
441415
}
442-
443416
nodeScoreList[i].Score = nodeScore.Score * weight
444417
}
445418
})
446-
447419
if err := errCh.ReceiveError(); err != nil {
448-
msg := fmt.Sprintf("error while applying score weights for pod %q: %v", pod.Name, err)
420+
msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err)
449421
klog.Error(msg)
450-
return NewStatus(Error, msg)
422+
return nil, NewStatus(Error, msg)
451423
}
452424

453-
return nil
425+
return pluginToNodeScores, nil
454426
}
455427

456428
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a

0 commit comments

Comments
 (0)