Skip to content

Commit 429ad7d

Browse files
authored
Merge pull request kubernetes#86659 from Huang-Wei/eps-move-pred
Move pod topology spread predicate logic to its filter plugin
2 parents b52cdca + 8a4dce5 commit 429ad7d

File tree

9 files changed

+984
-1026
lines changed

9 files changed

+984
-1026
lines changed

pkg/scheduler/algorithm/predicates/BUILD

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ go_library(
2424
"//pkg/volume/util:go_default_library",
2525
"//staging/src/k8s.io/api/core/v1:go_default_library",
2626
"//staging/src/k8s.io/api/storage/v1:go_default_library",
27-
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
2827
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
2928
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
3029
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
3130
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
3231
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
3332
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
3433
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
35-
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
3634
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
3735
"//vendor/k8s.io/klog:go_default_library",
3836
],
@@ -42,7 +40,6 @@ go_test(
4240
name = "go_default_test",
4341
srcs = [
4442
"max_attachable_volume_predicate_test.go",
45-
"metadata_test.go",
4643
"predicates_test.go",
4744
"utils_test.go",
4845
],
@@ -53,8 +50,6 @@ go_test(
5350
"//pkg/features:go_default_library",
5451
"//pkg/scheduler/listers/fake:go_default_library",
5552
"//pkg/scheduler/nodeinfo:go_default_library",
56-
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
57-
"//pkg/scheduler/testing:go_default_library",
5853
"//pkg/volume/util:go_default_library",
5954
"//staging/src/k8s.io/api/core/v1:go_default_library",
6055
"//staging/src/k8s.io/api/storage/v1:go_default_library",

pkg/scheduler/algorithm/predicates/metadata.go

Lines changed: 0 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -16,252 +16,6 @@ limitations under the License.
1616

1717
package predicates
1818

19-
import (
20-
"context"
21-
"math"
22-
"sync"
23-
24-
v1 "k8s.io/api/core/v1"
25-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26-
"k8s.io/apimachinery/pkg/labels"
27-
"k8s.io/client-go/util/workqueue"
28-
"k8s.io/klog"
29-
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
30-
)
31-
3219
// Metadata interface represents anything that can access a predicate metadata.
3320
// DEPRECATED.
3421
type Metadata interface{}
35-
36-
type criticalPath struct {
37-
// topologyValue denotes the topology value mapping to topology key.
38-
topologyValue string
39-
// matchNum denotes the number of matching pods.
40-
matchNum int32
41-
}
42-
43-
// CAVEAT: the reason that `[2]criticalPath` can work is based on the implementation of current
44-
// preemption algorithm, in particular the following 2 facts:
45-
// Fact 1: we only preempt pods on the same node, instead of pods on multiple nodes.
46-
// Fact 2: each node is evaluated on a separate copy of the metadata during its preemption cycle.
47-
// If we plan to turn to a more complex algorithm like "arbitrary pods on multiple nodes", this
48-
// structure needs to be revisited.
49-
type criticalPaths [2]criticalPath
50-
51-
func newCriticalPaths() *criticalPaths {
52-
return &criticalPaths{{matchNum: math.MaxInt32}, {matchNum: math.MaxInt32}}
53-
}
54-
55-
func (paths *criticalPaths) update(tpVal string, num int32) {
56-
// first verify if `tpVal` exists or not
57-
i := -1
58-
if tpVal == paths[0].topologyValue {
59-
i = 0
60-
} else if tpVal == paths[1].topologyValue {
61-
i = 1
62-
}
63-
64-
if i >= 0 {
65-
// `tpVal` exists
66-
paths[i].matchNum = num
67-
if paths[0].matchNum > paths[1].matchNum {
68-
// swap paths[0] and paths[1]
69-
paths[0], paths[1] = paths[1], paths[0]
70-
}
71-
} else {
72-
// `tpVal` doesn't exist
73-
if num < paths[0].matchNum {
74-
// update paths[1] with paths[0]
75-
paths[1] = paths[0]
76-
// update paths[0]
77-
paths[0].topologyValue, paths[0].matchNum = tpVal, num
78-
} else if num < paths[1].matchNum {
79-
// update paths[1]
80-
paths[1].topologyValue, paths[1].matchNum = tpVal, num
81-
}
82-
}
83-
}
84-
85-
type topologyPair struct {
86-
key string
87-
value string
88-
}
89-
90-
// PodTopologySpreadMetadata combines tpKeyToCriticalPaths and tpPairToMatchNum
91-
// to represent:
92-
// (1) critical paths where the least pods are matched on each spread constraint.
93-
// (2) number of pods matched on each spread constraint.
94-
type PodTopologySpreadMetadata struct {
95-
constraints []topologySpreadConstraint
96-
// We record 2 critical paths instead of all critical paths here.
97-
// criticalPaths[0].matchNum always holds the minimum matching number.
98-
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
99-
// it's not guaranteed to be the 2nd minimum match number.
100-
tpKeyToCriticalPaths map[string]*criticalPaths
101-
// tpPairToMatchNum is keyed with topologyPair, and valued with the number of matching pods.
102-
tpPairToMatchNum map[topologyPair]int32
103-
}
104-
105-
// topologySpreadConstraint is an internal version for a hard (DoNotSchedule
106-
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
107-
// selector is parsed.
108-
type topologySpreadConstraint struct {
109-
maxSkew int32
110-
topologyKey string
111-
selector labels.Selector
112-
}
113-
114-
// GetPodTopologySpreadMetadata computes pod topology spread metadata.
115-
func GetPodTopologySpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*PodTopologySpreadMetadata, error) {
116-
// We have feature gating in APIServer to strip the spec
117-
// so don't need to re-check feature gate, just check length of constraints.
118-
constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
119-
if err != nil {
120-
return nil, err
121-
}
122-
if len(constraints) == 0 {
123-
return &PodTopologySpreadMetadata{}, nil
124-
}
125-
126-
var lock sync.Mutex
127-
128-
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
129-
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
130-
m := PodTopologySpreadMetadata{
131-
constraints: constraints,
132-
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
133-
tpPairToMatchNum: make(map[topologyPair]int32),
134-
}
135-
addTopologyPairMatchNum := func(pair topologyPair, num int32) {
136-
lock.Lock()
137-
m.tpPairToMatchNum[pair] += num
138-
lock.Unlock()
139-
}
140-
141-
processNode := func(i int) {
142-
nodeInfo := allNodes[i]
143-
node := nodeInfo.Node()
144-
if node == nil {
145-
klog.Error("node not found")
146-
return
147-
}
148-
// In accordance to design, if NodeAffinity or NodeSelector is defined,
149-
// spreading is applied to nodes that pass those filters.
150-
if !PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
151-
return
152-
}
153-
154-
// Ensure current node's labels contains all topologyKeys in 'constraints'.
155-
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
156-
return
157-
}
158-
for _, constraint := range constraints {
159-
matchTotal := int32(0)
160-
// nodeInfo.Pods() can be empty; or all pods don't fit
161-
for _, existingPod := range nodeInfo.Pods() {
162-
if existingPod.Namespace != pod.Namespace {
163-
continue
164-
}
165-
if constraint.selector.Matches(labels.Set(existingPod.Labels)) {
166-
matchTotal++
167-
}
168-
}
169-
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
170-
addTopologyPairMatchNum(pair, matchTotal)
171-
}
172-
}
173-
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
174-
175-
// calculate min match for each topology pair
176-
for i := 0; i < len(constraints); i++ {
177-
key := constraints[i].topologyKey
178-
m.tpKeyToCriticalPaths[key] = newCriticalPaths()
179-
}
180-
for pair, num := range m.tpPairToMatchNum {
181-
m.tpKeyToCriticalPaths[pair.key].update(pair.value, num)
182-
}
183-
184-
return &m, nil
185-
}
186-
187-
func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
188-
var result []topologySpreadConstraint
189-
for _, c := range constraints {
190-
if c.WhenUnsatisfiable == v1.DoNotSchedule {
191-
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
192-
if err != nil {
193-
return nil, err
194-
}
195-
result = append(result, topologySpreadConstraint{
196-
maxSkew: c.MaxSkew,
197-
topologyKey: c.TopologyKey,
198-
selector: selector,
199-
})
200-
}
201-
}
202-
return result, nil
203-
}
204-
205-
// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
206-
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
207-
for _, c := range constraints {
208-
if _, ok := nodeLabels[c.topologyKey]; !ok {
209-
return false
210-
}
211-
}
212-
return true
213-
}
214-
215-
// AddPod updates the metadata with addedPod.
216-
func (m *PodTopologySpreadMetadata) AddPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) {
217-
m.updateWithPod(addedPod, preemptorPod, node, 1)
218-
}
219-
220-
// RemovePod updates the metadata with deletedPod.
221-
func (m *PodTopologySpreadMetadata) RemovePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) {
222-
m.updateWithPod(deletedPod, preemptorPod, node, -1)
223-
}
224-
225-
func (m *PodTopologySpreadMetadata) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) {
226-
if m == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
227-
return
228-
}
229-
if !NodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
230-
return
231-
}
232-
233-
podLabelSet := labels.Set(updatedPod.Labels)
234-
for _, constraint := range m.constraints {
235-
if !constraint.selector.Matches(podLabelSet) {
236-
continue
237-
}
238-
239-
k, v := constraint.topologyKey, node.Labels[constraint.topologyKey]
240-
pair := topologyPair{key: k, value: v}
241-
m.tpPairToMatchNum[pair] = m.tpPairToMatchNum[pair] + delta
242-
243-
m.tpKeyToCriticalPaths[k].update(v, m.tpPairToMatchNum[pair])
244-
}
245-
}
246-
247-
// Clone makes a deep copy of PodTopologySpreadMetadata.
248-
func (m *PodTopologySpreadMetadata) Clone() *PodTopologySpreadMetadata {
249-
// m could be nil when EvenPodsSpread feature is disabled
250-
if m == nil {
251-
return nil
252-
}
253-
cp := PodTopologySpreadMetadata{
254-
// constraints are shared because they don't change.
255-
constraints: m.constraints,
256-
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(m.tpKeyToCriticalPaths)),
257-
tpPairToMatchNum: make(map[topologyPair]int32, len(m.tpPairToMatchNum)),
258-
}
259-
for tpKey, paths := range m.tpKeyToCriticalPaths {
260-
cp.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
261-
}
262-
for tpPair, matchNum := range m.tpPairToMatchNum {
263-
copyPair := topologyPair{key: tpPair.key, value: tpPair.value}
264-
cp.tpPairToMatchNum[copyPair] = matchNum
265-
}
266-
return &cp
267-
}

pkg/scheduler/algorithm/predicates/predicates.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package predicates
1818

1919
import (
20-
"errors"
2120
"fmt"
2221
"os"
2322
"regexp"
@@ -837,57 +836,3 @@ func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, f
837836
func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
838837
return false, nil, fmt.Errorf("this function should never be called")
839838
}
840-
841-
// PodTopologySpreadPredicate checks if a pod can be scheduled on a node which satisfies
842-
// its topologySpreadConstraints.
843-
func PodTopologySpreadPredicate(pod *v1.Pod, meta *PodTopologySpreadMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
844-
node := nodeInfo.Node()
845-
if node == nil {
846-
return false, nil, fmt.Errorf("node not found")
847-
}
848-
849-
// nil meta is illegal.
850-
if meta == nil {
851-
// TODO(autoscaler): get it implemented.
852-
return false, nil, errors.New("metadata not pre-computed for PodTopologySpreadPredicate")
853-
}
854-
855-
// However, "empty" meta is legit which tolerates every toSchedule Pod.
856-
if len(meta.tpPairToMatchNum) == 0 || len(meta.constraints) == 0 {
857-
return true, nil, nil
858-
}
859-
860-
podLabelSet := labels.Set(pod.Labels)
861-
for _, c := range meta.constraints {
862-
tpKey := c.topologyKey
863-
tpVal, ok := node.Labels[c.topologyKey]
864-
if !ok {
865-
klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)
866-
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
867-
}
868-
869-
selfMatchNum := int32(0)
870-
if c.selector.Matches(podLabelSet) {
871-
selfMatchNum = 1
872-
}
873-
874-
pair := topologyPair{key: tpKey, value: tpVal}
875-
paths, ok := meta.tpKeyToCriticalPaths[tpKey]
876-
if !ok {
877-
// error which should not happen
878-
klog.Errorf("internal error: get paths from key %q of %#v", tpKey, meta.tpKeyToCriticalPaths)
879-
continue
880-
}
881-
// judging criteria:
882-
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
883-
minMatchNum := paths[0].matchNum
884-
matchNum := meta.tpPairToMatchNum[pair]
885-
skew := matchNum + selfMatchNum - minMatchNum
886-
if skew > c.maxSkew {
887-
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.maxSkew)
888-
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
889-
}
890-
}
891-
892-
return true, nil, nil
893-
}

pkg/scheduler/framework/plugins/podtopologyspread/BUILD

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "go_default_library",
5-
srcs = ["pod_topology_spread.go"],
5+
srcs = [
6+
"filtering.go",
7+
"plugin.go",
8+
],
69
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread",
710
visibility = ["//visibility:public"],
811
deps = [
@@ -13,20 +16,25 @@ go_library(
1316
"//pkg/scheduler/listers:go_default_library",
1417
"//pkg/scheduler/nodeinfo:go_default_library",
1518
"//staging/src/k8s.io/api/core/v1:go_default_library",
19+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
20+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1621
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
22+
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
1723
"//vendor/k8s.io/klog:go_default_library",
1824
],
1925
)
2026

2127
go_test(
2228
name = "go_default_test",
23-
srcs = ["pod_topology_spread_test.go"],
29+
srcs = ["filtering_test.go"],
2430
embed = [":go_default_library"],
2531
deps = [
2632
"//pkg/scheduler/framework/v1alpha1:go_default_library",
2733
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
2834
"//pkg/scheduler/testing:go_default_library",
2935
"//staging/src/k8s.io/api/core/v1:go_default_library",
36+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
37+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
3038
],
3139
)
3240

0 commit comments

Comments
 (0)