@@ -19,7 +19,6 @@ package priorities
19
19
import (
20
20
"context"
21
21
"sync"
22
- "sync/atomic"
23
22
24
23
"k8s.io/api/core/v1"
25
24
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -64,15 +63,15 @@ type podAffinityPriorityMap struct {
64
63
nodes []* v1.Node
65
64
// counts store the mapping from node name to so-far computed score of
66
65
// the node.
67
- counts map [string ]* int64
66
+ counts map [string ]float64
68
67
// The first error that we faced.
69
68
firstError error
70
69
}
71
70
72
71
func newPodAffinityPriorityMap (nodes []* v1.Node ) * podAffinityPriorityMap {
73
72
return & podAffinityPriorityMap {
74
73
nodes : nodes ,
75
- counts : make (map [string ]* int64 , len (nodes )),
74
+ counts : make (map [string ]float64 , len (nodes )),
76
75
}
77
76
}
78
77
@@ -84,7 +83,7 @@ func (p *podAffinityPriorityMap) setError(err error) {
84
83
}
85
84
}
86
85
87
- func (p * podAffinityPriorityMap ) processTerm (term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , weight int64 ) {
86
+ func (p * podAffinityPriorityMap ) processTerm (term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , weight float64 ) {
88
87
namespaces := priorityutil .GetNamespacesFromPodAffinityTerm (podDefiningAffinityTerm , term )
89
88
selector , err := metav1 .LabelSelectorAsSelector (term .LabelSelector )
90
89
if err != nil {
@@ -93,18 +92,22 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini
93
92
}
94
93
match := priorityutil .PodMatchesTermsNamespaceAndSelector (podToCheck , namespaces , selector )
95
94
if match {
96
- for _ , node := range p .nodes {
97
- if priorityutil .NodesHaveSameTopologyKey (node , fixedNode , term .TopologyKey ) {
98
- atomic .AddInt64 (p .counts [node .Name ], weight )
95
+ func () {
96
+ p .Lock ()
97
+ defer p .Unlock ()
98
+ for _ , node := range p .nodes {
99
+ if priorityutil .NodesHaveSameTopologyKey (node , fixedNode , term .TopologyKey ) {
100
+ p .counts [node .Name ] += weight
101
+ }
99
102
}
100
- }
103
+ }()
101
104
}
102
105
}
103
106
104
107
func (p * podAffinityPriorityMap ) processTerms (terms []v1.WeightedPodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , multiplier int ) {
105
108
for i := range terms {
106
109
term := & terms [i ]
107
- p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , int64 (term .Weight * int32 (multiplier )))
110
+ p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , float64 (term .Weight * int32 (multiplier )))
108
111
}
109
112
}
110
113
@@ -118,17 +121,17 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
118
121
hasAffinityConstraints := affinity != nil && affinity .PodAffinity != nil
119
122
hasAntiAffinityConstraints := affinity != nil && affinity .PodAntiAffinity != nil
120
123
121
- // priorityMap stores the mapping from node name to so-far computed score of
122
- // the node.
123
- pm := newPodAffinityPriorityMap (nodes )
124
124
allNodeNames := make ([]string , 0 , len (nodeNameToInfo ))
125
125
for name := range nodeNameToInfo {
126
126
allNodeNames = append (allNodeNames , name )
127
- pm .counts [name ] = new (int64 )
128
127
}
129
128
130
129
// convert the topology key based weights to the node name based weights
131
- var maxCount , minCount int64
130
+ var maxCount float64
131
+ var minCount float64
132
+ // priorityMap stores the mapping from node name to so-far computed score of
133
+ // the node.
134
+ pm := newPodAffinityPriorityMap (nodes )
132
135
133
136
processPod := func (existingPod * v1.Pod ) error {
134
137
existingPodNode , err := ipa .info .GetNodeInfo (existingPod .Spec .NodeName )
@@ -169,7 +172,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
169
172
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
170
173
//}
171
174
for _ , term := range terms {
172
- pm .processTerm (& term , existingPod , pod , existingPodNode , int64 (ipa .hardPodAffinityWeight ))
175
+ pm .processTerm (& term , existingPod , pod , existingPodNode , float64 (ipa .hardPodAffinityWeight ))
173
176
}
174
177
}
175
178
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
@@ -214,11 +217,11 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
214
217
}
215
218
216
219
for _ , node := range nodes {
217
- if * pm .counts [node .Name ] > maxCount {
218
- maxCount = * pm .counts [node .Name ]
220
+ if pm .counts [node .Name ] > maxCount {
221
+ maxCount = pm .counts [node .Name ]
219
222
}
220
- if * pm .counts [node .Name ] < minCount {
221
- minCount = * pm .counts [node .Name ]
223
+ if pm .counts [node .Name ] < minCount {
224
+ minCount = pm .counts [node .Name ]
222
225
}
223
226
}
224
227
@@ -227,7 +230,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
227
230
for _ , node := range nodes {
228
231
fScore := float64 (0 )
229
232
if (maxCount - minCount ) > 0 {
230
- fScore = float64 (schedulerapi .MaxPriority ) * (float64 ( * pm .counts [node .Name ]- minCount ) / float64 (maxCount - minCount ))
233
+ fScore = float64 (schedulerapi .MaxPriority ) * (( pm .counts [node .Name ] - minCount ) / (maxCount - minCount ))
231
234
}
232
235
result = append (result , schedulerapi.HostPriority {Host : node .Name , Score : int (fScore )})
233
236
if klog .V (10 ) {
0 commit comments