@@ -16,252 +16,6 @@ limitations under the License.
16
16
17
17
package predicates
18
18
19
- import (
20
- "context"
21
- "math"
22
- "sync"
23
-
24
- v1 "k8s.io/api/core/v1"
25
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
- "k8s.io/apimachinery/pkg/labels"
27
- "k8s.io/client-go/util/workqueue"
28
- "k8s.io/klog"
29
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
30
- )
31
-
32
19
// Metadata interface represents anything that can access a predicate metadata.
33
20
// DEPRECATED.
34
21
type Metadata interface {}
35
-
36
- type criticalPath struct {
37
- // topologyValue denotes the topology value mapping to topology key.
38
- topologyValue string
39
- // matchNum denotes the number of matching pods.
40
- matchNum int32
41
- }
42
-
43
- // CAVEAT: the reason that `[2]criticalPath` can work is based on the implementation of current
44
- // preemption algorithm, in particular the following 2 facts:
45
- // Fact 1: we only preempt pods on the same node, instead of pods on multiple nodes.
46
- // Fact 2: each node is evaluated on a separate copy of the metadata during its preemption cycle.
47
- // If we plan to turn to a more complex algorithm like "arbitrary pods on multiple nodes", this
48
- // structure needs to be revisited.
49
- type criticalPaths [2 ]criticalPath
50
-
51
- func newCriticalPaths () * criticalPaths {
52
- return & criticalPaths {{matchNum : math .MaxInt32 }, {matchNum : math .MaxInt32 }}
53
- }
54
-
55
- func (paths * criticalPaths ) update (tpVal string , num int32 ) {
56
- // first verify if `tpVal` exists or not
57
- i := - 1
58
- if tpVal == paths [0 ].topologyValue {
59
- i = 0
60
- } else if tpVal == paths [1 ].topologyValue {
61
- i = 1
62
- }
63
-
64
- if i >= 0 {
65
- // `tpVal` exists
66
- paths [i ].matchNum = num
67
- if paths [0 ].matchNum > paths [1 ].matchNum {
68
- // swap paths[0] and paths[1]
69
- paths [0 ], paths [1 ] = paths [1 ], paths [0 ]
70
- }
71
- } else {
72
- // `tpVal` doesn't exist
73
- if num < paths [0 ].matchNum {
74
- // update paths[1] with paths[0]
75
- paths [1 ] = paths [0 ]
76
- // update paths[0]
77
- paths [0 ].topologyValue , paths [0 ].matchNum = tpVal , num
78
- } else if num < paths [1 ].matchNum {
79
- // update paths[1]
80
- paths [1 ].topologyValue , paths [1 ].matchNum = tpVal , num
81
- }
82
- }
83
- }
84
-
85
- type topologyPair struct {
86
- key string
87
- value string
88
- }
89
-
90
- // PodTopologySpreadMetadata combines tpKeyToCriticalPaths and tpPairToMatchNum
91
- // to represent:
92
- // (1) critical paths where the least pods are matched on each spread constraint.
93
- // (2) number of pods matched on each spread constraint.
94
- type PodTopologySpreadMetadata struct {
95
- constraints []topologySpreadConstraint
96
- // We record 2 critical paths instead of all critical paths here.
97
- // criticalPaths[0].matchNum always holds the minimum matching number.
98
- // criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
99
- // it's not guaranteed to be the 2nd minimum match number.
100
- tpKeyToCriticalPaths map [string ]* criticalPaths
101
- // tpPairToMatchNum is keyed with topologyPair, and valued with the number of matching pods.
102
- tpPairToMatchNum map [topologyPair ]int32
103
- }
104
-
105
- // topologySpreadConstraint is an internal version for a hard (DoNotSchedule
106
- // unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
107
- // selector is parsed.
108
- type topologySpreadConstraint struct {
109
- maxSkew int32
110
- topologyKey string
111
- selector labels.Selector
112
- }
113
-
114
- // GetPodTopologySpreadMetadata computes pod topology spread metadata.
115
- func GetPodTopologySpreadMetadata (pod * v1.Pod , allNodes []* schedulernodeinfo.NodeInfo ) (* PodTopologySpreadMetadata , error ) {
116
- // We have feature gating in APIServer to strip the spec
117
- // so don't need to re-check feature gate, just check length of constraints.
118
- constraints , err := filterHardTopologySpreadConstraints (pod .Spec .TopologySpreadConstraints )
119
- if err != nil {
120
- return nil , err
121
- }
122
- if len (constraints ) == 0 {
123
- return & PodTopologySpreadMetadata {}, nil
124
- }
125
-
126
- var lock sync.Mutex
127
-
128
- // TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
129
- // In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
130
- m := PodTopologySpreadMetadata {
131
- constraints : constraints ,
132
- tpKeyToCriticalPaths : make (map [string ]* criticalPaths , len (constraints )),
133
- tpPairToMatchNum : make (map [topologyPair ]int32 ),
134
- }
135
- addTopologyPairMatchNum := func (pair topologyPair , num int32 ) {
136
- lock .Lock ()
137
- m .tpPairToMatchNum [pair ] += num
138
- lock .Unlock ()
139
- }
140
-
141
- processNode := func (i int ) {
142
- nodeInfo := allNodes [i ]
143
- node := nodeInfo .Node ()
144
- if node == nil {
145
- klog .Error ("node not found" )
146
- return
147
- }
148
- // In accordance to design, if NodeAffinity or NodeSelector is defined,
149
- // spreading is applied to nodes that pass those filters.
150
- if ! PodMatchesNodeSelectorAndAffinityTerms (pod , node ) {
151
- return
152
- }
153
-
154
- // Ensure current node's labels contains all topologyKeys in 'constraints'.
155
- if ! NodeLabelsMatchSpreadConstraints (node .Labels , constraints ) {
156
- return
157
- }
158
- for _ , constraint := range constraints {
159
- matchTotal := int32 (0 )
160
- // nodeInfo.Pods() can be empty; or all pods don't fit
161
- for _ , existingPod := range nodeInfo .Pods () {
162
- if existingPod .Namespace != pod .Namespace {
163
- continue
164
- }
165
- if constraint .selector .Matches (labels .Set (existingPod .Labels )) {
166
- matchTotal ++
167
- }
168
- }
169
- pair := topologyPair {key : constraint .topologyKey , value : node .Labels [constraint .topologyKey ]}
170
- addTopologyPairMatchNum (pair , matchTotal )
171
- }
172
- }
173
- workqueue .ParallelizeUntil (context .Background (), 16 , len (allNodes ), processNode )
174
-
175
- // calculate min match for each topology pair
176
- for i := 0 ; i < len (constraints ); i ++ {
177
- key := constraints [i ].topologyKey
178
- m .tpKeyToCriticalPaths [key ] = newCriticalPaths ()
179
- }
180
- for pair , num := range m .tpPairToMatchNum {
181
- m .tpKeyToCriticalPaths [pair .key ].update (pair .value , num )
182
- }
183
-
184
- return & m , nil
185
- }
186
-
187
- func filterHardTopologySpreadConstraints (constraints []v1.TopologySpreadConstraint ) ([]topologySpreadConstraint , error ) {
188
- var result []topologySpreadConstraint
189
- for _ , c := range constraints {
190
- if c .WhenUnsatisfiable == v1 .DoNotSchedule {
191
- selector , err := metav1 .LabelSelectorAsSelector (c .LabelSelector )
192
- if err != nil {
193
- return nil , err
194
- }
195
- result = append (result , topologySpreadConstraint {
196
- maxSkew : c .MaxSkew ,
197
- topologyKey : c .TopologyKey ,
198
- selector : selector ,
199
- })
200
- }
201
- }
202
- return result , nil
203
- }
204
-
205
- // NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
206
- func NodeLabelsMatchSpreadConstraints (nodeLabels map [string ]string , constraints []topologySpreadConstraint ) bool {
207
- for _ , c := range constraints {
208
- if _ , ok := nodeLabels [c .topologyKey ]; ! ok {
209
- return false
210
- }
211
- }
212
- return true
213
- }
214
-
215
- // AddPod updates the metadata with addedPod.
216
- func (m * PodTopologySpreadMetadata ) AddPod (addedPod , preemptorPod * v1.Pod , node * v1.Node ) {
217
- m .updateWithPod (addedPod , preemptorPod , node , 1 )
218
- }
219
-
220
- // RemovePod updates the metadata with deletedPod.
221
- func (m * PodTopologySpreadMetadata ) RemovePod (deletedPod , preemptorPod * v1.Pod , node * v1.Node ) {
222
- m .updateWithPod (deletedPod , preemptorPod , node , - 1 )
223
- }
224
-
225
- func (m * PodTopologySpreadMetadata ) updateWithPod (updatedPod , preemptorPod * v1.Pod , node * v1.Node , delta int32 ) {
226
- if m == nil || updatedPod .Namespace != preemptorPod .Namespace || node == nil {
227
- return
228
- }
229
- if ! NodeLabelsMatchSpreadConstraints (node .Labels , m .constraints ) {
230
- return
231
- }
232
-
233
- podLabelSet := labels .Set (updatedPod .Labels )
234
- for _ , constraint := range m .constraints {
235
- if ! constraint .selector .Matches (podLabelSet ) {
236
- continue
237
- }
238
-
239
- k , v := constraint .topologyKey , node .Labels [constraint .topologyKey ]
240
- pair := topologyPair {key : k , value : v }
241
- m .tpPairToMatchNum [pair ] = m .tpPairToMatchNum [pair ] + delta
242
-
243
- m .tpKeyToCriticalPaths [k ].update (v , m .tpPairToMatchNum [pair ])
244
- }
245
- }
246
-
247
- // Clone makes a deep copy of PodTopologySpreadMetadata.
248
- func (m * PodTopologySpreadMetadata ) Clone () * PodTopologySpreadMetadata {
249
- // m could be nil when EvenPodsSpread feature is disabled
250
- if m == nil {
251
- return nil
252
- }
253
- cp := PodTopologySpreadMetadata {
254
- // constraints are shared because they don't change.
255
- constraints : m .constraints ,
256
- tpKeyToCriticalPaths : make (map [string ]* criticalPaths , len (m .tpKeyToCriticalPaths )),
257
- tpPairToMatchNum : make (map [topologyPair ]int32 , len (m .tpPairToMatchNum )),
258
- }
259
- for tpKey , paths := range m .tpKeyToCriticalPaths {
260
- cp .tpKeyToCriticalPaths [tpKey ] = & criticalPaths {paths [0 ], paths [1 ]}
261
- }
262
- for tpPair , matchNum := range m .tpPairToMatchNum {
263
- copyPair := topologyPair {key : tpPair .key , value : tpPair .value }
264
- cp .tpPairToMatchNum [copyPair ] = matchNum
265
- }
266
- return & cp
267
- }
0 commit comments