Skip to content

Commit e3bdb4b

Browse files
Store topology spread constraints in metadata with labels.Selector
Signed-off-by: Aldo Culquicondor <[email protected]>
1 parent ce11622 commit e3bdb4b

File tree

7 files changed

+336
-253
lines changed

7 files changed

+336
-253
lines changed

pkg/scheduler/algorithm/predicates/metadata.go

Lines changed: 63 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func (paths *criticalPaths) update(tpVal string, num int32) {
116116
// (1) critical paths where the least pods are matched on each spread constraint.
117117
// (2) number of pods matched on each spread constraint.
118118
type evenPodsSpreadMetadata struct {
119+
constraints []topologySpreadConstraint
119120
// We record 2 critical paths instead of all critical paths here.
120121
// criticalPaths[0].matchNum always holds the minimum matching number.
121122
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
@@ -125,6 +126,15 @@ type evenPodsSpreadMetadata struct {
125126
tpPairToMatchNum map[topologyPair]int32
126127
}
127128

129+
// topologySpreadConstraint is an internal version for a hard (DoNotSchedule
130+
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
131+
// selector is parsed.
132+
type topologySpreadConstraint struct {
133+
maxSkew int32
134+
topologyKey string
135+
selector labels.Selector
136+
}
137+
128138
type serviceAffinityMetadata struct {
129139
matchingPodList []*v1.Pod
130140
matchingPodServices []*v1.Service
@@ -420,17 +430,20 @@ func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo,
420430
func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) {
421431
// We have feature gating in APIServer to strip the spec
422432
// so don't need to re-check feature gate, just check length of constraints.
423-
constraints := getHardTopologySpreadConstraints(pod)
433+
constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
434+
if err != nil {
435+
return nil, err
436+
}
424437
if len(constraints) == 0 {
425438
return nil, nil
426439
}
427440

428-
errCh := schedutil.NewErrorChannel()
429441
var lock sync.Mutex
430442

431443
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
432444
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
433445
m := evenPodsSpreadMetadata{
446+
constraints: constraints,
434447
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
435448
tpPairToMatchNum: make(map[topologyPair]int32),
436449
}
@@ -440,8 +453,6 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
440453
lock.Unlock()
441454
}
442455

443-
ctx, cancel := context.WithCancel(context.Background())
444-
445456
processNode := func(i int) {
446457
nodeInfo := allNodes[i]
447458
node := nodeInfo.Node()
@@ -466,28 +477,19 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
466477
if existingPod.Namespace != pod.Namespace {
467478
continue
468479
}
469-
ok, err := PodMatchesSpreadConstraint(existingPod.Labels, constraint)
470-
if err != nil {
471-
errCh.SendErrorWithCancel(err, cancel)
472-
return
473-
}
474-
if ok {
480+
if constraint.selector.Matches(labels.Set(existingPod.Labels)) {
475481
matchTotal++
476482
}
477483
}
478-
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
484+
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
479485
addTopologyPairMatchNum(pair, matchTotal)
480486
}
481487
}
482-
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
483-
484-
if err := errCh.ReceiveError(); err != nil {
485-
return nil, err
486-
}
488+
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
487489

488490
// calculate min match for each topology pair
489491
for i := 0; i < len(constraints); i++ {
490-
key := constraints[i].TopologyKey
492+
key := constraints[i].topologyKey
491493
m.tpKeyToCriticalPaths[key] = newCriticalPaths()
492494
}
493495
for pair, num := range m.tpPairToMatchNum {
@@ -497,36 +499,28 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
497499
return &m, nil
498500
}
499501

500-
func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) {
501-
if pod != nil {
502-
for _, constraint := range pod.Spec.TopologySpreadConstraints {
503-
if constraint.WhenUnsatisfiable == v1.DoNotSchedule {
504-
constraints = append(constraints, constraint)
502+
func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
503+
var result []topologySpreadConstraint
504+
for _, c := range constraints {
505+
if c.WhenUnsatisfiable == v1.DoNotSchedule {
506+
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
507+
if err != nil {
508+
return nil, err
505509
}
510+
result = append(result, topologySpreadConstraint{
511+
maxSkew: c.MaxSkew,
512+
topologyKey: c.TopologyKey,
513+
selector: selector,
514+
})
506515
}
507516
}
508-
return
509-
}
510-
511-
// PodMatchesSpreadConstraint verifies if <constraint.LabelSelector> matches <podLabelSet>.
512-
// Some corner cases:
513-
// 1. podLabelSet = nil => returns (false, nil)
514-
// 2. constraint.LabelSelector = nil => returns (false, nil)
515-
func PodMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) {
516-
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
517-
if err != nil {
518-
return false, err
519-
}
520-
if !selector.Matches(podLabelSet) {
521-
return false, nil
522-
}
523-
return true, nil
517+
return result, nil
524518
}
525519

526520
// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
527-
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool {
528-
for _, constraint := range constraints {
529-
if _, ok := nodeLabels[constraint.TopologyKey]; !ok {
521+
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
522+
for _, c := range constraints {
523+
if _, ok := nodeLabels[c.topologyKey]; !ok {
530524
return false
531525
}
532526
}
@@ -581,57 +575,55 @@ func (m *topologyPairsMaps) clone() *topologyPairsMaps {
581575
return copy
582576
}
583577

584-
func (c *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
585-
return c.updatePod(addedPod, preemptorPod, node, 1)
578+
func (m *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) {
579+
m.updatePod(addedPod, preemptorPod, node, 1)
586580
}
587581

588-
func (c *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) error {
589-
return c.updatePod(deletedPod, preemptorPod, node, -1)
582+
func (m *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) {
583+
m.updatePod(deletedPod, preemptorPod, node, -1)
590584
}
591585

592-
func (c *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error {
593-
if updatedPod.Namespace != preemptorPod.Namespace || node == nil {
594-
return nil
586+
func (m *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) {
587+
if m == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
588+
return
595589
}
596-
constraints := getHardTopologySpreadConstraints(preemptorPod)
597-
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
598-
return nil
590+
if !NodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
591+
return
599592
}
600593

601594
podLabelSet := labels.Set(updatedPod.Labels)
602-
for _, constraint := range constraints {
603-
if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil {
604-
return err
605-
} else if !match {
595+
for _, constraint := range m.constraints {
596+
if !constraint.selector.Matches(podLabelSet) {
606597
continue
607598
}
608599

609-
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
600+
k, v := constraint.topologyKey, node.Labels[constraint.topologyKey]
610601
pair := topologyPair{key: k, value: v}
611-
c.tpPairToMatchNum[pair] = c.tpPairToMatchNum[pair] + delta
602+
m.tpPairToMatchNum[pair] = m.tpPairToMatchNum[pair] + delta
612603

613-
c.tpKeyToCriticalPaths[k].update(v, c.tpPairToMatchNum[pair])
604+
m.tpKeyToCriticalPaths[k].update(v, m.tpPairToMatchNum[pair])
614605
}
615-
return nil
616606
}
617607

618-
func (c *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
608+
func (m *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
619609
// c could be nil when EvenPodsSpread feature is disabled
620-
if c == nil {
610+
if m == nil {
621611
return nil
622612
}
623-
copy := evenPodsSpreadMetadata{
624-
tpKeyToCriticalPaths: make(map[string]*criticalPaths),
625-
tpPairToMatchNum: make(map[topologyPair]int32),
613+
cp := evenPodsSpreadMetadata{
614+
// constraints are shared because they don't change.
615+
constraints: m.constraints,
616+
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(m.tpKeyToCriticalPaths)),
617+
tpPairToMatchNum: make(map[topologyPair]int32, len(m.tpPairToMatchNum)),
626618
}
627-
for tpKey, paths := range c.tpKeyToCriticalPaths {
628-
copy.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
619+
for tpKey, paths := range m.tpKeyToCriticalPaths {
620+
cp.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
629621
}
630-
for tpPair, matchNum := range c.tpPairToMatchNum {
622+
for tpPair, matchNum := range m.tpPairToMatchNum {
631623
copyPair := topologyPair{key: tpPair.key, value: tpPair.value}
632-
copy.tpPairToMatchNum[copyPair] = matchNum
624+
cp.tpPairToMatchNum[copyPair] = matchNum
633625
}
634-
return &copy
626+
return &cp
635627
}
636628

637629
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
@@ -642,10 +634,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) erro
642634
return fmt.Errorf("deletedPod and meta.pod must not be the same")
643635
}
644636
meta.podAffinityMetadata.removePod(deletedPod)
645-
// Delete pod from the pod spread topology maps.
646-
if err := meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node); err != nil {
647-
return err
648-
}
637+
meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node)
649638
meta.serviceAffinityMetadata.removePod(deletedPod, node)
650639

651640
return nil
@@ -667,9 +656,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
667656
}
668657
// Update meta.evenPodsSpreadMetadata if meta.pod has hard spread constraints
669658
// and addedPod matches that
670-
if err := meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node); err != nil {
671-
return err
672-
}
659+
meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node)
673660

674661
meta.serviceAffinityMetadata.addPod(addedPod, meta.pod, node)
675662

0 commit comments

Comments
 (0)