@@ -19,6 +19,7 @@ package priorities
19
19
import (
20
20
"context"
21
21
"sync"
22
+ "sync/atomic"
22
23
23
24
"k8s.io/api/core/v1"
24
25
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -63,15 +64,15 @@ type podAffinityPriorityMap struct {
63
64
nodes []* v1.Node
64
65
// counts store the mapping from node name to so-far computed score of
65
66
// the node.
66
- counts map [string ]float64
67
+ counts map [string ]* int64
67
68
// The first error that we faced.
68
69
firstError error
69
70
}
70
71
71
72
func newPodAffinityPriorityMap (nodes []* v1.Node ) * podAffinityPriorityMap {
72
73
return & podAffinityPriorityMap {
73
74
nodes : nodes ,
74
- counts : make (map [string ]float64 , len (nodes )),
75
+ counts : make (map [string ]* int64 , len (nodes )),
75
76
}
76
77
}
77
78
@@ -83,7 +84,7 @@ func (p *podAffinityPriorityMap) setError(err error) {
83
84
}
84
85
}
85
86
86
- func (p * podAffinityPriorityMap ) processTerm (term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , weight float64 ) {
87
+ func (p * podAffinityPriorityMap ) processTerm (term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , weight int64 ) {
87
88
namespaces := priorityutil .GetNamespacesFromPodAffinityTerm (podDefiningAffinityTerm , term )
88
89
selector , err := metav1 .LabelSelectorAsSelector (term .LabelSelector )
89
90
if err != nil {
@@ -92,22 +93,18 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini
92
93
}
93
94
match := priorityutil .PodMatchesTermsNamespaceAndSelector (podToCheck , namespaces , selector )
94
95
if match {
95
- func () {
96
- p .Lock ()
97
- defer p .Unlock ()
98
- for _ , node := range p .nodes {
99
- if priorityutil .NodesHaveSameTopologyKey (node , fixedNode , term .TopologyKey ) {
100
- p .counts [node .Name ] += weight
101
- }
96
+ for _ , node := range p .nodes {
97
+ if priorityutil .NodesHaveSameTopologyKey (node , fixedNode , term .TopologyKey ) {
98
+ atomic .AddInt64 (p .counts [node .Name ], weight )
102
99
}
103
- }()
100
+ }
104
101
}
105
102
}
106
103
107
104
func (p * podAffinityPriorityMap ) processTerms (terms []v1.WeightedPodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , multiplier int ) {
108
105
for i := range terms {
109
106
term := & terms [i ]
110
- p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , float64 (term .Weight * int32 (multiplier )))
107
+ p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , int64 (term .Weight * int32 (multiplier )))
111
108
}
112
109
}
113
110
@@ -121,17 +118,21 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
121
118
hasAffinityConstraints := affinity != nil && affinity .PodAffinity != nil
122
119
hasAntiAffinityConstraints := affinity != nil && affinity .PodAntiAffinity != nil
123
120
121
+ // priorityMap stores the mapping from node name to so-far computed score of
122
+ // the node.
123
+ pm := newPodAffinityPriorityMap (nodes )
124
124
allNodeNames := make ([]string , 0 , len (nodeNameToInfo ))
125
+ lazyInit := hasAffinityConstraints || hasAntiAffinityConstraints
125
126
for name := range nodeNameToInfo {
126
127
allNodeNames = append (allNodeNames , name )
128
+ // if pod has affinity defined, or target node has affinityPods
129
+ if lazyInit || len (nodeNameToInfo [name ].PodsWithAffinity ()) != 0 {
130
+ pm .counts [name ] = new (int64 )
131
+ }
127
132
}
128
133
129
134
// convert the topology key based weights to the node name based weights
130
- var maxCount float64
131
- var minCount float64
132
- // priorityMap stores the mapping from node name to so-far computed score of
133
- // the node.
134
- pm := newPodAffinityPriorityMap (nodes )
135
+ var maxCount , minCount int64
135
136
136
137
processPod := func (existingPod * v1.Pod ) error {
137
138
existingPodNode , err := ipa .info .GetNodeInfo (existingPod .Spec .NodeName )
@@ -172,7 +173,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
172
173
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
173
174
//}
174
175
for _ , term := range terms {
175
- pm .processTerm (& term , existingPod , pod , existingPodNode , float64 (ipa .hardPodAffinityWeight ))
176
+ pm .processTerm (& term , existingPod , pod , existingPodNode , int64 (ipa .hardPodAffinityWeight ))
176
177
}
177
178
}
178
179
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
@@ -194,7 +195,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
194
195
nodeInfo := nodeNameToInfo [allNodeNames [i ]]
195
196
if nodeInfo .Node () != nil {
196
197
if hasAffinityConstraints || hasAntiAffinityConstraints {
197
- // We need to process all the nodes .
198
+ // We need to process all the pods .
198
199
for _ , existingPod := range nodeInfo .Pods () {
199
200
if err := processPod (existingPod ); err != nil {
200
201
pm .setError (err )
@@ -217,20 +218,24 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
217
218
}
218
219
219
220
for _ , node := range nodes {
220
- if pm .counts [node .Name ] > maxCount {
221
- maxCount = pm .counts [node .Name ]
221
+ if pm .counts [node .Name ] == nil {
222
+ continue
223
+ }
224
+ if * pm .counts [node .Name ] > maxCount {
225
+ maxCount = * pm .counts [node .Name ]
222
226
}
223
- if pm .counts [node .Name ] < minCount {
224
- minCount = pm .counts [node .Name ]
227
+ if * pm .counts [node .Name ] < minCount {
228
+ minCount = * pm .counts [node .Name ]
225
229
}
226
230
}
227
231
228
232
// calculate final priority score for each node
229
233
result := make (schedulerapi.HostPriorityList , 0 , len (nodes ))
234
+ maxMinDiff := maxCount - minCount
230
235
for _ , node := range nodes {
231
236
fScore := float64 (0 )
232
- if ( maxCount - minCount ) > 0 {
233
- fScore = float64 (schedulerapi .MaxPriority ) * (( pm .counts [node .Name ] - minCount ) / (maxCount - minCount ))
237
+ if maxMinDiff > 0 && pm . counts [ node . Name ] != nil {
238
+ fScore = float64 (schedulerapi .MaxPriority ) * (float64 ( * pm .counts [node .Name ]- minCount ) / float64 (maxCount - minCount ))
234
239
}
235
240
result = append (result , schedulerapi.HostPriority {Host : node .Name , Score : int (fScore )})
236
241
if klog .V (10 ) {
0 commit comments