Skip to content

Commit 59c66da

Browse files
authored
Merge pull request kubernetes#89487 from alculquicondor/per_node_spreading
Optimize preferred spreading for hostname topology
2 parents e178cac + d2b1903 commit 59c66da

File tree

6 files changed

+50
-35
lines changed

6 files changed

+50
-35
lines changed

pkg/scheduler/framework/plugins/defaultpodtopologyspread/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_test(
2626
deps = [
2727
"//pkg/scheduler/framework/v1alpha1:go_default_library",
2828
"//pkg/scheduler/internal/cache:go_default_library",
29+
"//pkg/scheduler/internal/parallelize:go_default_library",
2930
"//pkg/scheduler/testing:go_default_library",
3031
"//staging/src/k8s.io/api/apps/v1:go_default_library",
3132
"//staging/src/k8s.io/api/core/v1:go_default_library",

pkg/scheduler/framework/plugins/defaultpodtopologyspread/default_pod_topology_spread_perf_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/client-go/kubernetes/fake"
2626
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
2727
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
28+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
2829
st "k8s.io/kubernetes/pkg/scheduler/testing"
2930
)
3031

@@ -76,15 +77,14 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
7677
if !status.IsSuccess() {
7778
b.Fatalf("unexpected error: %v", status)
7879
}
79-
var gotList framework.NodeScoreList
80-
for _, node := range filteredNodes {
81-
score, status := plugin.Score(ctx, state, pod, node.Name)
82-
if !status.IsSuccess() {
83-
b.Errorf("unexpected error: %v", status)
84-
}
85-
gotList = append(gotList, framework.NodeScore{Name: node.Name, Score: score})
80+
gotList := make(framework.NodeScoreList, len(filteredNodes))
81+
scoreNode := func(i int) {
82+
n := filteredNodes[i]
83+
score, _ := plugin.Score(ctx, state, pod, n.Name)
84+
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
8685
}
87-
status = plugin.NormalizeScore(context.Background(), state, pod, gotList)
86+
parallelize.Until(ctx, len(filteredNodes), scoreNode)
87+
status = plugin.NormalizeScore(ctx, state, pod, gotList)
8888
if !status.IsSuccess() {
8989
b.Fatal(status)
9090
}

pkg/scheduler/framework/plugins/podtopologyspread/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ go_test(
4141
deps = [
4242
"//pkg/scheduler/framework/v1alpha1:go_default_library",
4343
"//pkg/scheduler/internal/cache:go_default_library",
44+
"//pkg/scheduler/internal/parallelize:go_default_library",
4445
"//pkg/scheduler/nodeinfo:go_default_library",
4546
"//pkg/scheduler/testing:go_default_library",
4647
"//staging/src/k8s.io/api/apps/v1:go_default_library",

pkg/scheduler/framework/plugins/podtopologyspread/common.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,17 @@ func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint,
8282
}
8383
return result, nil
8484
}
85+
86+
func countPodsMatchSelector(pods []*v1.Pod, selector labels.Selector, ns string) int {
87+
count := 0
88+
for _, p := range pods {
89+
// Bypass terminating Pod (see #87621).
90+
if p.DeletionTimestamp != nil || p.Namespace != ns {
91+
continue
92+
}
93+
if selector.Matches(labels.Set(p.Labels)) {
94+
count++
95+
}
96+
}
97+
return count
98+
}

pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"sync/atomic"
2424

2525
v1 "k8s.io/api/core/v1"
26-
"k8s.io/apimachinery/pkg/labels"
2726
"k8s.io/apimachinery/pkg/util/sets"
2827
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
2928
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@@ -73,6 +72,10 @@ func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, fi
7372
continue
7473
}
7574
for _, constraint := range s.Constraints {
75+
// per-node counts are calculated during Score.
76+
if constraint.TopologyKey == v1.LabelHostname {
77+
continue
78+
}
7679
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
7780
if s.TopologyPairToPodCounts[pair] == nil {
7881
s.TopologyPairToPodCounts[pair] = new(int64)
@@ -103,7 +106,7 @@ func (pl *PodTopologySpread) PreScore(
103106
}
104107

105108
state := &preScoreState{
106-
NodeNameSet: sets.String{},
109+
NodeNameSet: make(sets.String, len(filteredNodes)),
107110
TopologyPairToPodCounts: make(map[topologyPair]*int64),
108111
}
109112
err = pl.initPreScoreState(state, pod, filteredNodes)
@@ -134,22 +137,13 @@ func (pl *PodTopologySpread) PreScore(
134137
pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
135138
// If current topology pair is not associated with any candidate node,
136139
// continue to avoid unnecessary calculation.
137-
if state.TopologyPairToPodCounts[pair] == nil {
140+
// Per-node counts are also skipped, as they are done during Score.
141+
tpCount := state.TopologyPairToPodCounts[pair]
142+
if tpCount == nil {
138143
continue
139144
}
140-
141-
// <matchSum> indicates how many pods (on current node) match the <constraint>.
142-
matchSum := int64(0)
143-
for _, existingPod := range nodeInfo.Pods() {
144-
// Bypass terminating Pod (see #87621).
145-
if existingPod.DeletionTimestamp != nil || existingPod.Namespace != pod.Namespace {
146-
continue
147-
}
148-
if c.Selector.Matches(labels.Set(existingPod.Labels)) {
149-
matchSum++
150-
}
151-
}
152-
atomic.AddInt64(state.TopologyPairToPodCounts[pair], matchSum)
145+
count := countPodsMatchSelector(nodeInfo.Pods(), c.Selector, pod.Namespace)
146+
atomic.AddInt64(tpCount, int64(count))
153147
}
154148
}
155149
parallelize.Until(ctx, len(allNodes), processAllNode)
@@ -183,9 +177,14 @@ func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.Cy
183177
var score int64
184178
for _, c := range s.Constraints {
185179
if tpVal, ok := node.Labels[c.TopologyKey]; ok {
186-
pair := topologyPair{key: c.TopologyKey, value: tpVal}
187-
matchSum := *s.TopologyPairToPodCounts[pair]
188-
score += matchSum
180+
if c.TopologyKey == v1.LabelHostname {
181+
count := countPodsMatchSelector(nodeInfo.Pods(), c.Selector, pod.Namespace)
182+
score += int64(count)
183+
} else {
184+
pair := topologyPair{key: c.TopologyKey, value: tpVal}
185+
matchSum := *s.TopologyPairToPodCounts[pair]
186+
score += matchSum
187+
}
189188
}
190189
}
191190
return score, nil

pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/client-go/kubernetes/fake"
3030
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
3131
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
32+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3233
st "k8s.io/kubernetes/pkg/scheduler/testing"
3334
"k8s.io/utils/pointer"
3435
)
@@ -746,19 +747,18 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
746747
b.ResetTimer()
747748

748749
for i := 0; i < b.N; i++ {
749-
var gotList framework.NodeScoreList
750750
status := p.PreScore(ctx, state, pod, filteredNodes)
751751
if !status.IsSuccess() {
752752
b.Fatalf("unexpected error: %v", status)
753753
}
754-
for _, n := range filteredNodes {
755-
score, status := p.Score(context.Background(), state, pod, n.Name)
756-
if !status.IsSuccess() {
757-
b.Fatalf("unexpected error: %v", status)
758-
}
759-
gotList = append(gotList, framework.NodeScore{Name: n.Name, Score: score})
754+
gotList := make(framework.NodeScoreList, len(filteredNodes))
755+
scoreNode := func(i int) {
756+
n := filteredNodes[i]
757+
score, _ := p.Score(ctx, state, pod, n.Name)
758+
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
760759
}
761-
status = p.NormalizeScore(context.Background(), state, pod, gotList)
760+
parallelize.Until(ctx, len(filteredNodes), scoreNode)
761+
status = p.NormalizeScore(ctx, state, pod, gotList)
762762
if !status.IsSuccess() {
763763
b.Fatal(status)
764764
}

0 commit comments

Comments
 (0)