Skip to content

Commit 8737890

Browse files
authored
Merge pull request kubernetes#86459 from ahg-g/ahg1-affinity-pred
move inter pod affinity predicate logic to its Filter plugin
2 parents 3c854d6 + 429448c commit 8737890

File tree

14 files changed

+1686
-1713
lines changed

14 files changed

+1686
-1713
lines changed

pkg/scheduler/algorithm/predicates/BUILD

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ go_library(
2020
"//pkg/apis/core/v1/helper:go_default_library",
2121
"//pkg/features:go_default_library",
2222
"//pkg/scheduler/algorithm:go_default_library",
23-
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
24-
"//pkg/scheduler/listers:go_default_library",
2523
"//pkg/scheduler/nodeinfo:go_default_library",
2624
"//pkg/scheduler/util:go_default_library",
2725
"//pkg/scheduler/volumebinder:go_default_library",

pkg/scheduler/algorithm/predicates/metadata.go

Lines changed: 5 additions & 309 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,15 @@ import (
2424
v1 "k8s.io/api/core/v1"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/labels"
27-
"k8s.io/apimachinery/pkg/util/sets"
2827
"k8s.io/client-go/util/workqueue"
2928
"k8s.io/klog"
30-
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
3129
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
32-
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
3330
)
3431

3532
// Metadata interface represents anything that can access a predicate metadata.
3633
// DEPRECATED.
3734
type Metadata interface{}
3835

39-
// AntiAffinityTerm's topology key value used in predicate metadata
40-
type topologyPair struct {
41-
key string
42-
value string
43-
}
44-
45-
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int64)" so that
46-
// we can do atomic additions instead of using a global mutext, however we need to consider
47-
// how to init each topologyToMatchedTermCount.
48-
type topologyToMatchedTermCount map[topologyPair]int64
49-
5036
type criticalPath struct {
5137
// topologyValue denotes the topology value mapping to topology key.
5238
topologyValue string
@@ -96,6 +82,11 @@ func (paths *criticalPaths) update(tpVal string, num int32) {
9682
}
9783
}
9884

85+
type topologyPair struct {
86+
key string
87+
value string
88+
}
89+
9990
// PodTopologySpreadMetadata combines tpKeyToCriticalPaths and tpPairToMatchNum
10091
// to represent:
10192
// (1) critical paths where the least pods are matched on each spread constraint.
@@ -120,129 +111,6 @@ type topologySpreadConstraint struct {
120111
selector labels.Selector
121112
}
122113

123-
// PodAffinityMetadata pre-computed state for inter-pod affinity predicate.
124-
type PodAffinityMetadata struct {
125-
// A map of topology pairs to the number of existing pods that has anti-affinity terms that match the "pod".
126-
topologyToMatchedExistingAntiAffinityTerms topologyToMatchedTermCount
127-
// A map of topology pairs to the number of existing pods that match the affinity terms of the "pod".
128-
topologyToMatchedAffinityTerms topologyToMatchedTermCount
129-
// A map of topology pairs to the number of existing pods that match the anti-affinity terms of the "pod".
130-
topologyToMatchedAntiAffinityTerms topologyToMatchedTermCount
131-
}
132-
133-
// updateWithAffinityTerms updates the topologyToMatchedTermCount map with the specified value
134-
// for each affinity term if "targetPod" matches ALL terms.
135-
func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []*affinityTermProperties, value int64) {
136-
if podMatchesAllAffinityTermProperties(targetPod, affinityTerms) {
137-
for _, t := range affinityTerms {
138-
if topologyValue, ok := targetPodNode.Labels[t.topologyKey]; ok {
139-
pair := topologyPair{key: t.topologyKey, value: topologyValue}
140-
m[pair] += value
141-
// value could be a negative value, hence we delete the entry if
142-
// the entry is down to zero.
143-
if m[pair] == 0 {
144-
delete(m, pair)
145-
}
146-
}
147-
}
148-
}
149-
}
150-
151-
// updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
152-
// for each anti-affinity term matched the target pod.
153-
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []*affinityTermProperties, value int64) {
154-
// Check anti-affinity properties.
155-
for _, a := range antiAffinityTerms {
156-
if priorityutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.namespaces, a.selector) {
157-
if topologyValue, ok := targetPodNode.Labels[a.topologyKey]; ok {
158-
pair := topologyPair{key: a.topologyKey, value: topologyValue}
159-
m[pair] += value
160-
// value could be a negative value, hence we delete the entry if
161-
// the entry is down to zero.
162-
if m[pair] == 0 {
163-
delete(m, pair)
164-
}
165-
}
166-
}
167-
}
168-
}
169-
170-
// UpdateWithPod updates the metadata counters with the (anti)affinity matches for the given pod.
171-
func (m *PodAffinityMetadata) UpdateWithPod(updatedPod, pod *v1.Pod, node *v1.Node, multiplier int64) error {
172-
if m == nil {
173-
return nil
174-
}
175-
176-
// Update matching existing anti-affinity terms.
177-
updatedPodAffinity := updatedPod.Spec.Affinity
178-
if updatedPodAffinity != nil && updatedPodAffinity.PodAntiAffinity != nil {
179-
antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(updatedPodAffinity.PodAntiAffinity))
180-
if err != nil {
181-
klog.Errorf("error in getting anti-affinity properties of Pod %v", updatedPod.Name)
182-
return err
183-
}
184-
m.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(pod, node, antiAffinityProperties, multiplier)
185-
}
186-
187-
// Update matching incoming pod (anti)affinity terms.
188-
affinity := pod.Spec.Affinity
189-
podNodeName := updatedPod.Spec.NodeName
190-
if affinity != nil && len(podNodeName) > 0 {
191-
if affinity.PodAffinity == nil {
192-
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
193-
if err != nil {
194-
klog.Errorf("error in getting affinity properties of Pod %v", pod.Name)
195-
return err
196-
}
197-
m.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, affinityProperties, multiplier)
198-
}
199-
if affinity.PodAntiAffinity != nil {
200-
antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
201-
if err != nil {
202-
klog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name)
203-
return err
204-
}
205-
m.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, antiAffinityProperties, multiplier)
206-
}
207-
}
208-
return nil
209-
}
210-
211-
// Clone makes a deep copy of PodAffinityMetadata.
212-
func (m *PodAffinityMetadata) Clone() *PodAffinityMetadata {
213-
if m == nil {
214-
return nil
215-
}
216-
217-
copy := PodAffinityMetadata{}
218-
copy.topologyToMatchedAffinityTerms = m.topologyToMatchedAffinityTerms.clone()
219-
copy.topologyToMatchedAntiAffinityTerms = m.topologyToMatchedAntiAffinityTerms.clone()
220-
copy.topologyToMatchedExistingAntiAffinityTerms = m.topologyToMatchedExistingAntiAffinityTerms.clone()
221-
222-
return &copy
223-
}
224-
225-
// GetPodAffinityMetadata computes inter-pod affinity metadata.
226-
func GetPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo, havePodsWithAffinityNodes []*schedulernodeinfo.NodeInfo) (*PodAffinityMetadata, error) {
227-
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
228-
existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
229-
if err != nil {
230-
return nil, err
231-
}
232-
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
233-
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
234-
incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, allNodes)
235-
if err != nil {
236-
return nil, err
237-
}
238-
239-
return &PodAffinityMetadata{
240-
topologyToMatchedAffinityTerms: incomingPodAffinityMap,
241-
topologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap,
242-
topologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
243-
}, nil
244-
}
245-
246114
// GetPodTopologySpreadMetadata computes pod topology spread metadata.
247115
func GetPodTopologySpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*PodTopologySpreadMetadata, error) {
248116
// We have feature gating in APIServer to strip the spec
@@ -344,18 +212,6 @@ func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints
344212
return true
345213
}
346214

347-
func (m topologyToMatchedTermCount) appendMaps(toAppend topologyToMatchedTermCount) {
348-
for pair := range toAppend {
349-
m[pair] += toAppend[pair]
350-
}
351-
}
352-
353-
func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount {
354-
copy := make(topologyToMatchedTermCount, len(m))
355-
copy.appendMaps(m)
356-
return copy
357-
}
358-
359215
// AddPod updates the metadata with addedPod.
360216
func (m *PodTopologySpreadMetadata) AddPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) {
361217
m.updateWithPod(addedPod, preemptorPod, node, 1)
@@ -409,163 +265,3 @@ func (m *PodTopologySpreadMetadata) Clone() *PodTopologySpreadMetadata {
409265
}
410266
return &cp
411267
}
412-
413-
// A processed version of v1.PodAffinityTerm.
414-
type affinityTermProperties struct {
415-
namespaces sets.String
416-
selector labels.Selector
417-
topologyKey string
418-
}
419-
420-
// getAffinityTermProperties receives a Pod and affinity terms and returns the namespaces and
421-
// selectors of the terms.
422-
func getAffinityTermProperties(pod *v1.Pod, terms []v1.PodAffinityTerm) (properties []*affinityTermProperties, err error) {
423-
if terms == nil {
424-
return properties, nil
425-
}
426-
427-
for _, term := range terms {
428-
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
429-
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
430-
if err != nil {
431-
return nil, err
432-
}
433-
properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey})
434-
}
435-
return properties, nil
436-
}
437-
438-
// podMatchesAllAffinityTermProperties returns true IFF the given pod matches all the given properties.
439-
func podMatchesAllAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool {
440-
if len(properties) == 0 {
441-
return false
442-
}
443-
for _, property := range properties {
444-
if !priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) {
445-
return false
446-
}
447-
}
448-
return true
449-
}
450-
451-
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
452-
// (1) Whether it has PodAntiAffinity
453-
// (2) Whether any AffinityTerm matches the incoming pod
454-
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyToMatchedTermCount, error) {
455-
errCh := schedutil.NewErrorChannel()
456-
var lock sync.Mutex
457-
topologyMap := make(topologyToMatchedTermCount)
458-
459-
appendResult := func(toAppend topologyToMatchedTermCount) {
460-
lock.Lock()
461-
defer lock.Unlock()
462-
topologyMap.appendMaps(toAppend)
463-
}
464-
465-
ctx, cancel := context.WithCancel(context.Background())
466-
467-
processNode := func(i int) {
468-
nodeInfo := allNodes[i]
469-
node := nodeInfo.Node()
470-
if node == nil {
471-
klog.Error("node not found")
472-
return
473-
}
474-
for _, existingPod := range nodeInfo.PodsWithAffinity() {
475-
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
476-
if err != nil {
477-
errCh.SendErrorWithCancel(err, cancel)
478-
return
479-
}
480-
if existingPodTopologyMaps != nil {
481-
appendResult(existingPodTopologyMaps)
482-
}
483-
}
484-
}
485-
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
486-
487-
if err := errCh.ReceiveError(); err != nil {
488-
return nil, err
489-
}
490-
491-
return topologyMap, nil
492-
}
493-
494-
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
495-
// It returns a topologyToMatchedTermCount that are checked later by the affinity
496-
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
497-
// need to check all the pods in the cluster.
498-
func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount, error) {
499-
topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
500-
topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount)
501-
affinity := pod.Spec.Affinity
502-
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
503-
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
504-
}
505-
506-
var lock sync.Mutex
507-
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap topologyToMatchedTermCount) {
508-
lock.Lock()
509-
defer lock.Unlock()
510-
if len(nodeTopologyPairsAffinityPodsMap) > 0 {
511-
topologyPairsAffinityPodsMap.appendMaps(nodeTopologyPairsAffinityPodsMap)
512-
}
513-
if len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
514-
topologyToMatchedExistingAntiAffinityTerms.appendMaps(nodeTopologyPairsAntiAffinityPodsMap)
515-
}
516-
}
517-
518-
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
519-
affinityProperties, err := getAffinityTermProperties(pod, affinityTerms)
520-
if err != nil {
521-
return nil, nil, err
522-
}
523-
524-
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
525-
antiAffinityProperties, err := getAffinityTermProperties(pod, antiAffinityTerms)
526-
if err != nil {
527-
return nil, nil, err
528-
}
529-
530-
processNode := func(i int) {
531-
nodeInfo := allNodes[i]
532-
node := nodeInfo.Node()
533-
if node == nil {
534-
klog.Error("node not found")
535-
return
536-
}
537-
nodeTopologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
538-
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount)
539-
for _, existingPod := range nodeInfo.Pods() {
540-
// Check affinity properties.
541-
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod, node, affinityProperties, 1)
542-
543-
// Check anti-affinity properties.
544-
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod, node, antiAffinityProperties, 1)
545-
}
546-
547-
if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
548-
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
549-
}
550-
}
551-
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
552-
553-
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
554-
}
555-
556-
// targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of
557-
// "pod". This function does not check topology.
558-
// So, whether the targetPod actually matches or not needs further checks for a specific
559-
// node.
560-
func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool {
561-
affinity := pod.Spec.Affinity
562-
if affinity == nil || affinity.PodAffinity == nil {
563-
return false
564-
}
565-
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
566-
if err != nil {
567-
klog.Errorf("error in getting affinity properties of Pod %v", pod.Name)
568-
return false
569-
}
570-
return podMatchesAllAffinityTermProperties(targetPod, affinityProperties)
571-
}

0 commit comments

Comments
 (0)