@@ -23,6 +23,8 @@ import (
23
23
24
24
v1 "k8s.io/api/core/v1"
25
25
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
+ "k8s.io/apimachinery/pkg/labels"
27
+ "k8s.io/apimachinery/pkg/util/sets"
26
28
"k8s.io/client-go/util/workqueue"
27
29
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
28
30
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@@ -36,47 +38,67 @@ import (
36
38
type topologyPairToScore map [string ]map [string ]int64
37
39
38
40
type podAffinityPriorityMap struct {
39
- // nodes contain all nodes that should be considered.
40
- nodes []* v1. Node
41
- // tracks a topology pair score so far.
42
- topologyScore topologyPairToScore
41
+ topologyScore topologyPairToScore
42
+ affinityTerms []* weightedAffinityTerm
43
+ antiAffinityTerms [] * weightedAffinityTerm
44
+ hardPodAffinityWeight int32
43
45
sync.Mutex
44
46
}
45
47
46
- func newPodAffinityPriorityMap (nodes []* v1.Node ) * podAffinityPriorityMap {
47
- return & podAffinityPriorityMap {
48
- nodes : nodes ,
49
- topologyScore : make (topologyPairToScore ),
50
- }
48
+ // A "processed" representation of v1.WeightedAffinityTerm.
49
+ type weightedAffinityTerm struct {
50
+ namespaces sets.String
51
+ selector labels.Selector
52
+ weight int32
53
+ topologyKey string
51
54
}
52
55
53
- func ( p * podAffinityPriorityMap ) processTerm ( term * v1.PodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1. Pod , fixedNode * v1.Node , weight int64 ) error {
54
- namespaces := priorityutil .GetNamespacesFromPodAffinityTerm (podDefiningAffinityTerm , term )
56
+ func newWeightedAffinityTerm ( pod * v1.Pod , term * v1.PodAffinityTerm , weight int32 ) ( * weightedAffinityTerm , error ) {
57
+ namespaces := priorityutil .GetNamespacesFromPodAffinityTerm (pod , term )
55
58
selector , err := metav1 .LabelSelectorAsSelector (term .LabelSelector )
56
59
if err != nil {
57
- return err
60
+ return nil , err
61
+ }
62
+ return & weightedAffinityTerm {namespaces : namespaces , selector : selector , topologyKey : term .TopologyKey , weight : weight }, nil
63
+ }
64
+
65
+ func getProcessedTerms (pod * v1.Pod , terms []v1.WeightedPodAffinityTerm ) ([]* weightedAffinityTerm , error ) {
66
+ if terms == nil {
67
+ return nil , nil
58
68
}
69
+
70
+ var processedTerms []* weightedAffinityTerm
71
+ for i := range terms {
72
+ p , err := newWeightedAffinityTerm (pod , & terms [i ].PodAffinityTerm , terms [i ].Weight )
73
+ if err != nil {
74
+ return nil , err
75
+ }
76
+ processedTerms = append (processedTerms , p )
77
+ }
78
+ return processedTerms , nil
79
+ }
80
+
81
+ func (p * podAffinityPriorityMap ) processTerm (term * weightedAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , multiplier int ) error {
59
82
if len (fixedNode .Labels ) == 0 {
60
83
return nil
61
84
}
62
85
63
- match := priorityutil .PodMatchesTermsNamespaceAndSelector (podToCheck , namespaces , selector )
64
- tpValue , tpValueExist := fixedNode .Labels [term .TopologyKey ]
86
+ match := priorityutil .PodMatchesTermsNamespaceAndSelector (podToCheck , term . namespaces , term . selector )
87
+ tpValue , tpValueExist := fixedNode .Labels [term .topologyKey ]
65
88
if match && tpValueExist {
66
89
p .Lock ()
67
- if p .topologyScore [term .TopologyKey ] == nil {
68
- p .topologyScore [term .TopologyKey ] = make (map [string ]int64 )
90
+ if p .topologyScore [term .topologyKey ] == nil {
91
+ p .topologyScore [term .topologyKey ] = make (map [string ]int64 )
69
92
}
70
- p.topologyScore [term.TopologyKey ][tpValue ] += weight
93
+ p.topologyScore [term.topologyKey ][tpValue ] += int64 ( term . weight * int32 ( multiplier ))
71
94
p .Unlock ()
72
95
}
73
96
return nil
74
97
}
75
98
76
- func (p * podAffinityPriorityMap ) processTerms (terms []v1.WeightedPodAffinityTerm , podDefiningAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , multiplier int ) error {
77
- for i := range terms {
78
- term := & terms [i ]
79
- if err := p .processTerm (& term .PodAffinityTerm , podDefiningAffinityTerm , podToCheck , fixedNode , int64 (term .Weight * int32 (multiplier ))); err != nil {
99
+ func (p * podAffinityPriorityMap ) processTerms (terms []* weightedAffinityTerm , podToCheck * v1.Pod , fixedNode * v1.Node , multiplier int ) error {
100
+ for _ , term := range terms {
101
+ if err := p .processTerm (term , podToCheck , fixedNode , multiplier ); err != nil {
80
102
return err
81
103
}
82
104
}
@@ -143,6 +165,75 @@ func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, shar
143
165
return nil
144
166
}
145
167
168
+ func (p * podAffinityPriorityMap ) processExistingPod (existingPod * v1.Pod , existingPodNodeInfo * schedulernodeinfo.NodeInfo , incomingPod * v1.Pod ) error {
169
+ existingPodAffinity := existingPod .Spec .Affinity
170
+ existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity .PodAffinity != nil
171
+ existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity .PodAntiAffinity != nil
172
+ existingPodNode := existingPodNodeInfo .Node ()
173
+
174
+ // For every soft pod affinity term of <pod>, if <existingPod> matches the term,
175
+ // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
176
+ // value as that of <existingPods>`s node by the term`s weight.
177
+ if err := p .processTerms (p .affinityTerms , existingPod , existingPodNode , 1 ); err != nil {
178
+ return err
179
+ }
180
+
181
+ // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
182
+ // decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
183
+ // value as that of <existingPod>`s node by the term`s weight.
184
+ if err := p .processTerms (p .antiAffinityTerms , existingPod , existingPodNode , - 1 ); err != nil {
185
+ return err
186
+ }
187
+
188
+ if existingHasAffinityConstraints {
189
+ // For every hard pod affinity term of <existingPod>, if <pod> matches the term,
190
+ // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
191
+ // value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
192
+ if p .hardPodAffinityWeight > 0 {
193
+ terms := existingPodAffinity .PodAffinity .RequiredDuringSchedulingIgnoredDuringExecution
194
+ // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
195
+ //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
196
+ // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
197
+ //}
198
+ for i := range terms {
199
+ term := & terms [i ]
200
+ processedTerm , err := newWeightedAffinityTerm (existingPod , term , p .hardPodAffinityWeight )
201
+ if err != nil {
202
+ return err
203
+ }
204
+ if err := p .processTerm (processedTerm , incomingPod , existingPodNode , 1 ); err != nil {
205
+ return err
206
+ }
207
+ }
208
+ }
209
+ // For every soft pod affinity term of <existingPod>, if <pod> matches the term,
210
+ // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
211
+ // value as that of <existingPod>'s node by the term's weight.
212
+ terms , err := getProcessedTerms (existingPod , existingPodAffinity .PodAffinity .PreferredDuringSchedulingIgnoredDuringExecution )
213
+ if err != nil {
214
+ klog .Error (err )
215
+ return nil
216
+ }
217
+
218
+ if err := p .processTerms (terms , incomingPod , existingPodNode , 1 ); err != nil {
219
+ return err
220
+ }
221
+ }
222
+ if existingHasAntiAffinityConstraints {
223
+ // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
224
+ // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
225
+ // value as that of <existingPod>'s node by the term's weight.
226
+ terms , err := getProcessedTerms (existingPod , existingPodAffinity .PodAntiAffinity .PreferredDuringSchedulingIgnoredDuringExecution )
227
+ if err != nil {
228
+ return err
229
+ }
230
+ if err := p .processTerms (terms , incomingPod , existingPodNode , - 1 ); err != nil {
231
+ return err
232
+ }
233
+ }
234
+ return nil
235
+ }
236
+
146
237
func buildTopologyPairToScore (
147
238
pod * v1.Pod ,
148
239
sharedLister schedulerlisters.SharedLister ,
@@ -158,9 +249,8 @@ func buildTopologyPairToScore(
158
249
hasAffinityConstraints := affinity != nil && affinity .PodAffinity != nil
159
250
hasAntiAffinityConstraints := affinity != nil && affinity .PodAntiAffinity != nil
160
251
161
- // pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
162
- pm := newPodAffinityPriorityMap (filteredNodes )
163
-
252
+ // Unless the pod being scheduled has affinity terms, we only
253
+ // need to process nodes hosting pods with affinity.
164
254
allNodes , err := sharedLister .NodeInfos ().HavePodsWithAffinityList ()
165
255
if err != nil {
166
256
klog .Errorf ("get pods with affinity list error, err: %v" , err )
@@ -174,93 +264,45 @@ func buildTopologyPairToScore(
174
264
}
175
265
}
176
266
177
- processPod := func (existingPod * v1.Pod ) error {
178
- existingPodNodeInfo , err := sharedLister .NodeInfos ().Get (existingPod .Spec .NodeName )
179
- if err != nil {
180
- klog .Errorf ("Node not found, %v" , existingPod .Spec .NodeName )
267
+ var affinityTerms []* weightedAffinityTerm
268
+ var antiAffinityTerms []* weightedAffinityTerm
269
+ if hasAffinityConstraints {
270
+ if affinityTerms , err = getProcessedTerms (pod , affinity .PodAffinity .PreferredDuringSchedulingIgnoredDuringExecution ); err != nil {
271
+ klog .Error (err )
181
272
return nil
182
273
}
183
- existingPodAffinity := existingPod .Spec .Affinity
184
- existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity .PodAffinity != nil
185
- existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity .PodAntiAffinity != nil
186
- existingPodNode := existingPodNodeInfo .Node ()
187
-
188
- if hasAffinityConstraints {
189
- // For every soft pod affinity term of <pod>, if <existingPod> matches the term,
190
- // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
191
- // value as that of <existingPods>`s node by the term`s weight.
192
- terms := affinity .PodAffinity .PreferredDuringSchedulingIgnoredDuringExecution
193
- if err := pm .processTerms (terms , pod , existingPod , existingPodNode , 1 ); err != nil {
194
- return err
195
- }
196
- }
197
- if hasAntiAffinityConstraints {
198
- // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
199
- // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
200
- // value as that of <existingPod>`s node by the term`s weight.
201
- terms := affinity .PodAntiAffinity .PreferredDuringSchedulingIgnoredDuringExecution
202
- if err := pm .processTerms (terms , pod , existingPod , existingPodNode , - 1 ); err != nil {
203
- return err
204
- }
274
+ }
275
+ if hasAntiAffinityConstraints {
276
+ if antiAffinityTerms , err = getProcessedTerms (pod , affinity .PodAntiAffinity .PreferredDuringSchedulingIgnoredDuringExecution ); err != nil {
277
+ klog .Error (err )
278
+ return nil
205
279
}
280
+ }
206
281
207
- if existingHasAffinityConstraints {
208
- // For every hard pod affinity term of <existingPod>, if <pod> matches the term,
209
- // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
210
- // value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
211
- if hardPodAffinityWeight > 0 {
212
- terms := existingPodAffinity .PodAffinity .RequiredDuringSchedulingIgnoredDuringExecution
213
- // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
214
- //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
215
- // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
216
- //}
217
- for _ , term := range terms {
218
- if err := pm .processTerm (& term , existingPod , pod , existingPodNode , int64 (hardPodAffinityWeight )); err != nil {
219
- return err
220
- }
221
- }
222
- }
223
- // For every soft pod affinity term of <existingPod>, if <pod> matches the term,
224
- // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
225
- // value as that of <existingPod>'s node by the term's weight.
226
- terms := existingPodAffinity .PodAffinity .PreferredDuringSchedulingIgnoredDuringExecution
227
- if err := pm .processTerms (terms , existingPod , pod , existingPodNode , 1 ); err != nil {
228
- return err
229
- }
230
- }
231
- if existingHasAntiAffinityConstraints {
232
- // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
233
- // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
234
- // value as that of <existingPod>'s node by the term's weight.
235
- terms := existingPodAffinity .PodAntiAffinity .PreferredDuringSchedulingIgnoredDuringExecution
236
- if err := pm .processTerms (terms , existingPod , pod , existingPodNode , - 1 ); err != nil {
237
- return err
238
- }
239
- }
240
- return nil
282
+ pm := podAffinityPriorityMap {
283
+ topologyScore : make (topologyPairToScore ),
284
+ affinityTerms : affinityTerms ,
285
+ antiAffinityTerms : antiAffinityTerms ,
286
+ hardPodAffinityWeight : hardPodAffinityWeight ,
241
287
}
242
288
243
289
errCh := schedutil .NewErrorChannel ()
244
290
ctx , cancel := context .WithCancel (context .Background ())
245
291
processNode := func (i int ) {
246
292
nodeInfo := allNodes [i ]
247
293
if nodeInfo .Node () != nil {
294
+ // Unless the pod being scheduled has affinity terms, we only
295
+ // need to process pods with affinity in the node.
296
+ podsToProcess := nodeInfo .PodsWithAffinity ()
248
297
if hasAffinityConstraints || hasAntiAffinityConstraints {
249
298
// We need to process all the pods.
250
- for _ , existingPod := range nodeInfo .Pods () {
251
- if err := processPod (existingPod ); err != nil {
252
- errCh .SendErrorWithCancel (err , cancel )
253
- return
254
- }
255
- }
256
- } else {
257
- // The pod doesn't have any constraints - we need to check only existing
258
- // ones that have some.
259
- for _ , existingPod := range nodeInfo .PodsWithAffinity () {
260
- if err := processPod (existingPod ); err != nil {
261
- errCh .SendErrorWithCancel (err , cancel )
262
- return
263
- }
299
+ podsToProcess = nodeInfo .Pods ()
300
+ }
301
+
302
+ for _ , existingPod := range podsToProcess {
303
+ if err := pm .processExistingPod (existingPod , nodeInfo , pod ); err != nil {
304
+ errCh .SendErrorWithCancel (err , cancel )
305
+ return
264
306
}
265
307
}
266
308
}
0 commit comments