@@ -19,7 +19,7 @@ package interpodaffinity
19
19
import (
20
20
"context"
21
21
"fmt"
22
- "sync"
22
+ "sync/atomic "
23
23
24
24
v1 "k8s.io/api/core/v1"
25
25
"k8s.io/klog/v2"
@@ -128,7 +128,7 @@ func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, t
128
128
}
129
129
}
130
130
131
- // updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
131
+ // updateWithAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
132
132
// for each anti-affinity term matched the target pod.
133
133
func (m topologyToMatchedTermCount ) updateWithAntiAffinityTerms (targetPod * v1.Pod , targetPodNode * v1.Node , antiAffinityTerms []framework.AffinityTerm , value int64 ) {
134
134
// Check anti-affinity terms.
@@ -160,101 +160,82 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
160
160
return true
161
161
}
162
162
163
- // getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
164
- // (1) Whether it has PodAntiAffinity
165
- // (2) Whether ANY AffinityTerm matches the incoming pod
166
- func getMatchingAntiAffinityTopologyPairsOfPod (newPod * v1.Pod , existingPod * framework.PodInfo , node * v1.Node ) topologyToMatchedTermCount {
167
- topologyMap := make (topologyToMatchedTermCount )
168
- for _ , term := range existingPod .RequiredAntiAffinityTerms {
169
- if schedutil .PodMatchesTermsNamespaceAndSelector (newPod , term .Namespaces , term .Selector ) {
170
- if topologyValue , ok := node .Labels [term .TopologyKey ]; ok {
171
- pair := topologyPair {key : term .TopologyKey , value : topologyValue }
172
- topologyMap [pair ]++
173
- }
174
- }
175
- }
176
- return topologyMap
177
- }
178
-
179
163
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
180
164
// (1) Whether it has PodAntiAffinity
181
165
// (2) Whether any AffinityTerm matches the incoming pod
182
166
func getTPMapMatchingExistingAntiAffinity (pod * v1.Pod , allNodes []* framework.NodeInfo ) topologyToMatchedTermCount {
183
- var lock sync.Mutex
184
- topologyMap := make (topologyToMatchedTermCount )
185
-
186
- appendResult := func (toAppend topologyToMatchedTermCount ) {
187
- lock .Lock ()
188
- defer lock .Unlock ()
189
- topologyMap .append (toAppend )
190
- }
191
-
167
+ topoMaps := make ([]topologyToMatchedTermCount , len (allNodes ))
168
+ index := int32 (- 1 )
192
169
processNode := func (i int ) {
193
170
nodeInfo := allNodes [i ]
194
171
node := nodeInfo .Node ()
195
172
if node == nil {
196
173
klog .Error ("node not found" )
197
174
return
198
175
}
176
+ topoMap := make (topologyToMatchedTermCount )
199
177
for _ , existingPod := range nodeInfo .PodsWithAffinity {
200
- existingPodTopologyMaps := getMatchingAntiAffinityTopologyPairsOfPod (pod , existingPod , node )
201
- if len ( existingPodTopologyMaps ) != 0 {
202
- appendResult ( existingPodTopologyMaps )
203
- }
178
+ topoMap . updateWithAntiAffinityTerms (pod , node , existingPod . RequiredAntiAffinityTerms , 1 )
179
+ }
180
+ if len ( topoMap ) != 0 {
181
+ topoMaps [ atomic . AddInt32 ( & index , 1 )] = topoMap
204
182
}
205
183
}
206
184
parallelize .Until (context .Background (), len (allNodes ), processNode )
207
185
208
- return topologyMap
186
+ result := make (topologyToMatchedTermCount )
187
+ for i := 0 ; i <= int (index ); i ++ {
188
+ result .append (topoMaps [i ])
189
+ }
190
+
191
+ return result
209
192
}
210
193
211
194
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
212
195
// It returns a topologyToMatchedTermCount that are checked later by the affinity
213
196
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
214
197
// need to check all the pods in the cluster.
215
198
func getTPMapMatchingIncomingAffinityAntiAffinity (podInfo * framework.PodInfo , allNodes []* framework.NodeInfo ) (topologyToMatchedTermCount , topologyToMatchedTermCount ) {
216
- topologyPairsAffinityPodsMap := make (topologyToMatchedTermCount )
217
- topologyToMatchedExistingAntiAffinityTerms := make (topologyToMatchedTermCount )
199
+ affinityCounts := make (topologyToMatchedTermCount )
200
+ antiAffinityCounts := make (topologyToMatchedTermCount )
218
201
if len (podInfo .RequiredAffinityTerms ) == 0 && len (podInfo .RequiredAntiAffinityTerms ) == 0 {
219
- return topologyPairsAffinityPodsMap , topologyToMatchedExistingAntiAffinityTerms
220
- }
221
-
222
- var lock sync.Mutex
223
- appendResult := func (nodeName string , nodeTopologyPairsAffinityPodsMap , nodeTopologyPairsAntiAffinityPodsMap topologyToMatchedTermCount ) {
224
- lock .Lock ()
225
- defer lock .Unlock ()
226
- if len (nodeTopologyPairsAffinityPodsMap ) > 0 {
227
- topologyPairsAffinityPodsMap .append (nodeTopologyPairsAffinityPodsMap )
228
- }
229
- if len (nodeTopologyPairsAntiAffinityPodsMap ) > 0 {
230
- topologyToMatchedExistingAntiAffinityTerms .append (nodeTopologyPairsAntiAffinityPodsMap )
231
- }
202
+ return affinityCounts , antiAffinityCounts
232
203
}
233
204
205
+ affinityCountsList := make ([]topologyToMatchedTermCount , len (allNodes ))
206
+ antiAffinityCountsList := make ([]topologyToMatchedTermCount , len (allNodes ))
207
+ index := int32 (- 1 )
234
208
processNode := func (i int ) {
235
209
nodeInfo := allNodes [i ]
236
210
node := nodeInfo .Node ()
237
211
if node == nil {
238
212
klog .Error ("node not found" )
239
213
return
240
214
}
241
- nodeTopologyPairsAffinityPodsMap := make (topologyToMatchedTermCount )
242
- nodeTopologyPairsAntiAffinityPodsMap := make (topologyToMatchedTermCount )
215
+ affinity := make (topologyToMatchedTermCount )
216
+ antiAffinity := make (topologyToMatchedTermCount )
243
217
for _ , existingPod := range nodeInfo .Pods {
244
218
// Check affinity terms.
245
- nodeTopologyPairsAffinityPodsMap .updateWithAffinityTerms (existingPod .Pod , node , podInfo .RequiredAffinityTerms , 1 )
219
+ affinity .updateWithAffinityTerms (existingPod .Pod , node , podInfo .RequiredAffinityTerms , 1 )
246
220
247
221
// Check anti-affinity terms.
248
- nodeTopologyPairsAntiAffinityPodsMap .updateWithAntiAffinityTerms (existingPod .Pod , node , podInfo .RequiredAntiAffinityTerms , 1 )
222
+ antiAffinity .updateWithAntiAffinityTerms (existingPod .Pod , node , podInfo .RequiredAntiAffinityTerms , 1 )
249
223
}
250
224
251
- if len (nodeTopologyPairsAffinityPodsMap ) > 0 || len (nodeTopologyPairsAntiAffinityPodsMap ) > 0 {
252
- appendResult (node .Name , nodeTopologyPairsAffinityPodsMap , nodeTopologyPairsAntiAffinityPodsMap )
225
+ if len (affinity ) > 0 || len (antiAffinity ) > 0 {
226
+ k := atomic .AddInt32 (& index , 1 )
227
+ affinityCountsList [k ] = affinity
228
+ antiAffinityCountsList [k ] = antiAffinity
253
229
}
254
230
}
255
231
parallelize .Until (context .Background (), len (allNodes ), processNode )
256
232
257
- return topologyPairsAffinityPodsMap , topologyToMatchedExistingAntiAffinityTerms
233
+ for i := 0 ; i <= int (index ); i ++ {
234
+ affinityCounts .append (affinityCountsList [i ])
235
+ antiAffinityCounts .append (antiAffinityCountsList [i ])
236
+ }
237
+
238
+ return affinityCounts , antiAffinityCounts
258
239
}
259
240
260
241
// PreFilter invoked at the prefilter extension point.
0 commit comments