@@ -18,8 +18,10 @@ package scheduler
1818import (
1919 "sort"
2020 "strings"
21+ "sync"
2122
2223 corev1 "k8s.io/api/core/v1"
24+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
2325 "k8s.io/klog/v2"
2426
2527 "github.com/Project-HAMi/HAMi/pkg/device"
@@ -207,44 +209,59 @@ func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceR
207209 NodeList : make ([]* policy.NodeScore , 0 ),
208210 }
209211
210- //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {
211- // res := make(NodeScoreList, 0, len(*nodes))
212+ wg := sync.WaitGroup {}
213+ mutex := sync.Mutex {}
214+ errCh := make (chan error , len (* nodes ))
212215 for nodeID , node := range * nodes {
213- viewStatus (* node )
214- score := policy.NodeScore {NodeID : nodeID , Node : node .Node , Devices : make (util.PodDevices ), Score : 0 }
215- score .ComputeDefaultScore (node .Devices )
216-
217- //This loop is for different container request
218- ctrfit := false
219- for ctrid , n := range nums {
220- sums := 0
221- for _ , k := range n {
222- sums += int (k .Nums )
223- }
216+ wg .Add (1 )
217+ go func (nodeID string , node * NodeUsage ) {
218+ defer wg .Done ()
219+
220+ viewStatus (* node )
221+ score := policy.NodeScore {NodeID : nodeID , Node : node .Node , Devices : make (util.PodDevices ), Score : 0 }
222+ score .ComputeDefaultScore (node .Devices )
224223
225- if sums == 0 {
226- for idx := range score .Devices {
227- for len (score .Devices [idx ]) <= ctrid {
228- score .Devices [idx ] = append (score .Devices [idx ], util.ContainerDevices {})
224+ //This loop is for different container request
225+ ctrfit := false
226+ for ctrid , n := range nums {
227+ sums := 0
228+ for _ , k := range n {
229+ sums += int (k .Nums )
230+ }
231+
232+ if sums == 0 {
233+ for idx := range score .Devices {
234+ for len (score .Devices [idx ]) <= ctrid {
235+ score .Devices [idx ] = append (score .Devices [idx ], util.ContainerDevices {})
236+ }
237+ score.Devices [idx ][ctrid ] = append (score.Devices [idx ][ctrid ], util.ContainerDevice {})
238+ continue
229239 }
230- score.Devices [idx ][ctrid ] = append (score.Devices [idx ][ctrid ], util.ContainerDevice {})
231- continue
240+ }
241+ klog .V (5 ).InfoS ("fitInDevices" , "pod" , klog .KObj (task ), "node" , nodeID )
242+ fit , _ := fitInDevices (node , n , annos , task , & score .Devices )
243+ ctrfit = fit
244+ if ! fit {
245+ klog .InfoS ("calcScore:node not fit pod" , "pod" , klog .KObj (task ), "node" , nodeID )
246+ failedNodes [nodeID ] = "node not fit pod"
247+ break
232248 }
233249 }
234- klog .V (5 ).InfoS ("fitInDevices" , "pod" , klog .KObj (task ), "node" , nodeID )
235- fit , _ := fitInDevices (node , n , annos , task , & score .Devices )
236- ctrfit = fit
237- if ! fit {
238- klog .InfoS ("calcScore:node not fit pod" , "pod" , klog .KObj (task ), "node" , nodeID )
239- failedNodes [nodeID ] = "node not fit pod"
240- break
250+
251+ if ctrfit {
252+ mutex .Lock ()
253+ res .NodeList = append (res .NodeList , & score )
254+ mutex .Unlock ()
255+ score .OverrideScore (node .Devices , userNodePolicy )
241256 }
242- }
257+ }(nodeID , node )
258+ }
259+ wg .Wait ()
260+ close (errCh )
243261
244- if ctrfit {
245- res .NodeList = append (res .NodeList , & score )
246- score .OverrideScore (node .Devices , userNodePolicy )
247- }
262+ var errorsSlice []error
263+ for e := range errCh {
264+ errorsSlice = append (errorsSlice , e )
248265 }
249- return & res , nil
266+ return & res , utilerrors . NewAggregate ( errorsSlice )
250267}
0 commit comments