@@ -19,6 +19,7 @@ package priorities
19
19
import (
20
20
"context"
21
21
"sync"
22
+ "sync/atomic"
22
23
23
24
"k8s.io/api/core/v1"
24
25
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -63,15 +64,15 @@ type podAffinityPriorityMap struct {
63
64
nodes []* v1.Node
64
65
// counts store the mapping from node name to so-far computed score of
65
66
// the node.
66
- counts map [string ]float64
67
+ counts map [string ]* int64
67
68
// The first error that we faced.
68
69
firstError error
69
70
}
70
71
71
72
func newPodAffinityPriorityMap (nodes []* v1.Node ) * podAffinityPriorityMap {
72
73
return & podAffinityPriorityMap {
73
74
nodes : nodes ,
74
- counts : make (map [string ]float64 , len (nodes )),
75
+ counts : make (map [string ]* int64 , len (nodes )),
75
76
}
76
77
}
77
78
@@ -83,7 +84,7 @@ func (p *podAffinityPriorityMap) setError(err error) {
83
84
}
84
85
}
85
86
86
- func (p * podAffinityPriorityMap ) processTerm (term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , weight float64 ) {
87
+ func (p * podAffinityPriorityMap ) processTerm (term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , weight int64 ) {
87
88
namespaces := priorityutil .GetNamespacesFromPodAffinityTerm (podDefiningAffinityTerm , term )
88
89
selector , err := metav1 .LabelSelectorAsSelector (term .LabelSelector )
89
90
if err != nil {
@@ -92,22 +93,18 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini
92
93
}
93
94
match := priorityutil .PodMatchesTermsNamespaceAndSelector (podToCheck , namespaces , selector )
94
95
if match {
95
- func () {
96
- p .Lock ()
97
- defer p .Unlock ()
98
- for _ , node := range p .nodes {
99
- if priorityutil .NodesHaveSameTopologyKey (node , fixedNode , term .TopologyKey ) {
100
- p .counts [node .Name ] += weight
101
- }
96
+ for _ , node := range p .nodes {
97
+ if priorityutil .NodesHaveSameTopologyKey (node , fixedNode , term .TopologyKey ) {
98
+ atomic .AddInt64 (p .counts [node .Name ], weight )
102
99
}
103
- }()
100
+ }
104
101
}
105
102
}
106
103
107
104
func (p * podAffinityPriorityMap ) processTerms (terms []v1.WeightedPodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , multiplier int ) {
108
105
for i := range terms {
109
106
term := & terms [i ]
110
- p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , float64 (term .Weight * int32 (multiplier )))
107
+ p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , int64 (term .Weight * int32 (multiplier )))
111
108
}
112
109
}
113
110
@@ -121,17 +118,17 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
121
118
hasAffinityConstraints := affinity != nil && affinity .PodAffinity != nil
122
119
hasAntiAffinityConstraints := affinity != nil && affinity .PodAntiAffinity != nil
123
120
121
+ // priorityMap stores the mapping from node name to so-far computed score of
122
+ // the node.
123
+ pm := newPodAffinityPriorityMap (nodes )
124
124
allNodeNames := make ([]string , 0 , len (nodeNameToInfo ))
125
125
for name := range nodeNameToInfo {
126
126
allNodeNames = append (allNodeNames , name )
127
+ pm .counts [name ] = new (int64 )
127
128
}
128
129
129
130
// convert the topology key based weights to the node name based weights
130
- var maxCount float64
131
- var minCount float64
132
- // priorityMap stores the mapping from node name to so-far computed score of
133
- // the node.
134
- pm := newPodAffinityPriorityMap (nodes )
131
+ var maxCount , minCount int64
135
132
136
133
processPod := func (existingPod * v1.Pod ) error {
137
134
existingPodNode , err := ipa .info .GetNodeInfo (existingPod .Spec .NodeName )
@@ -172,7 +169,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
172
169
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
173
170
//}
174
171
for _ , term := range terms {
175
- pm .processTerm (& term , existingPod , pod , existingPodNode , float64 (ipa .hardPodAffinityWeight ))
172
+ pm .processTerm (& term , existingPod , pod , existingPodNode , int64 (ipa .hardPodAffinityWeight ))
176
173
}
177
174
}
178
175
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
@@ -217,11 +214,11 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
217
214
}
218
215
219
216
for _ , node := range nodes {
220
- if pm .counts [node .Name ] > maxCount {
221
- maxCount = pm .counts [node .Name ]
217
+ if * pm .counts [node .Name ] > maxCount {
218
+ maxCount = * pm .counts [node .Name ]
222
219
}
223
- if pm .counts [node .Name ] < minCount {
224
- minCount = pm .counts [node .Name ]
220
+ if * pm .counts [node .Name ] < minCount {
221
+ minCount = * pm .counts [node .Name ]
225
222
}
226
223
}
227
224
@@ -230,7 +227,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
230
227
for _ , node := range nodes {
231
228
fScore := float64 (0 )
232
229
if (maxCount - minCount ) > 0 {
233
- fScore = float64 (schedulerapi .MaxPriority ) * (( pm .counts [node .Name ] - minCount ) / (maxCount - minCount ))
230
+ fScore = float64 (schedulerapi .MaxPriority ) * (float64 ( * pm .counts [node .Name ]- minCount ) / float64 (maxCount - minCount ))
234
231
}
235
232
result = append (result , schedulerapi.HostPriority {Host : node .Name , Score : int (fScore )})
236
233
if klog .V (10 ) {
0 commit comments