@@ -20,12 +20,12 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"math"
23
- "sync"
23
+ "sync/atomic "
24
24
25
25
v1 "k8s.io/api/core/v1"
26
26
"k8s.io/apimachinery/pkg/labels"
27
27
"k8s.io/klog"
28
- pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
28
+ "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
29
29
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
30
30
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
31
31
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@@ -48,7 +48,7 @@ type preFilterState struct {
48
48
// it's not guaranteed to be the 2nd minimum match number.
49
49
TpKeyToCriticalPaths map [string ]* criticalPaths
50
50
// TpPairToMatchNum is keyed with topologyPair, and valued with the number of matching pods.
51
- TpPairToMatchNum map [topologyPair ]int32
51
+ TpPairToMatchNum map [topologyPair ]* int32
52
52
}
53
53
54
54
// Clone makes a copy of the given state.
@@ -61,14 +61,15 @@ func (s *preFilterState) Clone() framework.StateData {
61
61
// Constraints are shared because they don't change.
62
62
Constraints : s .Constraints ,
63
63
TpKeyToCriticalPaths : make (map [string ]* criticalPaths , len (s .TpKeyToCriticalPaths )),
64
- TpPairToMatchNum : make (map [topologyPair ]int32 , len (s .TpPairToMatchNum )),
64
+ TpPairToMatchNum : make (map [topologyPair ]* int32 , len (s .TpPairToMatchNum )),
65
65
}
66
66
for tpKey , paths := range s .TpKeyToCriticalPaths {
67
67
copy .TpKeyToCriticalPaths [tpKey ] = & criticalPaths {paths [0 ], paths [1 ]}
68
68
}
69
69
for tpPair , matchNum := range s .TpPairToMatchNum {
70
70
copyPair := topologyPair {key : tpPair .key , value : tpPair .value }
71
- copy .TpPairToMatchNum [copyPair ] = matchNum
71
+ copyCount := * matchNum
72
+ copy .TpPairToMatchNum [copyPair ] = & copyCount
72
73
}
73
74
return & copy
74
75
}
@@ -137,9 +138,9 @@ func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v
137
138
138
139
k , v := constraint .TopologyKey , node .Labels [constraint .TopologyKey ]
139
140
pair := topologyPair {key : k , value : v }
140
- s .TpPairToMatchNum [pair ] = s . TpPairToMatchNum [ pair ] + delta
141
+ * s .TpPairToMatchNum [pair ] += delta
141
142
142
- s .TpKeyToCriticalPaths [k ].update (v , s .TpPairToMatchNum [pair ])
143
+ s .TpKeyToCriticalPaths [k ].update (v , * s .TpPairToMatchNum [pair ])
143
144
}
144
145
}
145
146
@@ -219,52 +220,44 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
219
220
return & preFilterState {}, nil
220
221
}
221
222
222
- var lock sync.Mutex
223
-
224
- // TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
225
- // In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
226
223
s := preFilterState {
227
224
Constraints : constraints ,
228
225
TpKeyToCriticalPaths : make (map [string ]* criticalPaths , len (constraints )),
229
- TpPairToMatchNum : make (map [topologyPair ]int32 ),
230
- }
231
- addTopologyPairMatchNum := func (pair topologyPair , num int32 ) {
232
- lock .Lock ()
233
- s .TpPairToMatchNum [pair ] += num
234
- lock .Unlock ()
226
+ TpPairToMatchNum : make (map [topologyPair ]* int32 ),
235
227
}
236
-
237
- processNode := func (i int ) {
238
- nodeInfo := allNodes [i ]
239
- node := nodeInfo .Node ()
228
+ for _ , n := range allNodes {
229
+ node := n .Node ()
240
230
if node == nil {
241
231
klog .Error ("node not found" )
242
- return
232
+ continue
243
233
}
244
234
// In accordance to design, if NodeAffinity or NodeSelector is defined,
245
235
// spreading is applied to nodes that pass those filters.
246
- if ! pluginhelper .PodMatchesNodeSelectorAndAffinityTerms (pod , node ) {
247
- return
236
+ if ! helper .PodMatchesNodeSelectorAndAffinityTerms (pod , node ) {
237
+ continue
248
238
}
249
-
250
239
// Ensure current node's labels contains all topologyKeys in 'Constraints'.
251
240
if ! nodeLabelsMatchSpreadConstraints (node .Labels , constraints ) {
252
- return
241
+ continue
253
242
}
243
+ for _ , c := range constraints {
244
+ pair := topologyPair {key : c .TopologyKey , value : node .Labels [c .TopologyKey ]}
245
+ s .TpPairToMatchNum [pair ] = new (int32 )
246
+ }
247
+ }
248
+
249
+ processNode := func (i int ) {
250
+ nodeInfo := allNodes [i ]
251
+ node := nodeInfo .Node ()
252
+
254
253
for _ , constraint := range constraints {
255
- matchTotal := int32 (0 )
256
- // nodeInfo.Pods() can be empty; or all pods don't fit
257
- for _ , existingPod := range nodeInfo .Pods () {
258
- // Bypass terminating Pod (see #87621).
259
- if existingPod .DeletionTimestamp != nil || existingPod .Namespace != pod .Namespace {
260
- continue
261
- }
262
- if constraint .Selector .Matches (labels .Set (existingPod .Labels )) {
263
- matchTotal ++
264
- }
265
- }
266
254
pair := topologyPair {key : constraint .TopologyKey , value : node .Labels [constraint .TopologyKey ]}
267
- addTopologyPairMatchNum (pair , matchTotal )
255
+ tpCount := s .TpPairToMatchNum [pair ]
256
+ if tpCount == nil {
257
+ continue
258
+ }
259
+ count := countPodsMatchSelector (nodeInfo .Pods (), constraint .Selector , pod .Namespace )
260
+ atomic .AddInt32 (tpCount , int32 (count ))
268
261
}
269
262
}
270
263
parallelize .Until (context .Background (), len (allNodes ), processNode )
@@ -275,7 +268,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
275
268
s .TpKeyToCriticalPaths [key ] = newCriticalPaths ()
276
269
}
277
270
for pair , num := range s .TpPairToMatchNum {
278
- s .TpKeyToCriticalPaths [pair .key ].update (pair .value , num )
271
+ s .TpKeyToCriticalPaths [pair .key ].update (pair .value , * num )
279
272
}
280
273
281
274
return & s , nil
@@ -322,7 +315,10 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C
322
315
// judging criteria:
323
316
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
324
317
minMatchNum := paths [0 ].MatchNum
325
- matchNum := s .TpPairToMatchNum [pair ]
318
+ matchNum := int32 (0 )
319
+ if tpCount := s .TpPairToMatchNum [pair ]; tpCount != nil {
320
+ matchNum = * tpCount
321
+ }
326
322
skew := matchNum + selfMatchNum - minMatchNum
327
323
if skew > c .MaxSkew {
328
324
klog .V (5 ).Infof ("node '%s' failed spreadConstraint[%s]: MatchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)" , node .Name , tpKey , matchNum , selfMatchNum , minMatchNum , c .MaxSkew )
0 commit comments