Skip to content

Commit f898f45

Browse files
authored
Merge pull request kubernetes#89070 from alculquicondor/static-partitioning
Add chunk size option to ParallelizeUntil
2 parents e7852bf + b01e3dc commit f898f45

File tree

19 files changed

+256
-35
lines changed

19 files changed

+256
-35
lines changed

pkg/scheduler/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ filegroup(
123123
"//pkg/scheduler/framework:all-srcs",
124124
"//pkg/scheduler/internal/cache:all-srcs",
125125
"//pkg/scheduler/internal/heap:all-srcs",
126+
"//pkg/scheduler/internal/parallelize:all-srcs",
126127
"//pkg/scheduler/internal/queue:all-srcs",
127128
"//pkg/scheduler/listers:all-srcs",
128129
"//pkg/scheduler/metrics:all-srcs",

pkg/scheduler/core/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//pkg/scheduler/apis/config:go_default_library",
1414
"//pkg/scheduler/framework/v1alpha1:go_default_library",
1515
"//pkg/scheduler/internal/cache:go_default_library",
16+
"//pkg/scheduler/internal/parallelize:go_default_library",
1617
"//pkg/scheduler/internal/queue:go_default_library",
1718
"//pkg/scheduler/listers:go_default_library",
1819
"//pkg/scheduler/metrics:go_default_library",
@@ -28,7 +29,6 @@ go_library(
2829
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
2930
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
3031
"//staging/src/k8s.io/client-go/rest:go_default_library",
31-
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
3232
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
3333
"//vendor/k8s.io/klog:go_default_library",
3434
"//vendor/k8s.io/utils/trace:go_default_library",

pkg/scheduler/core/generic_scheduler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ import (
2828
"time"
2929

3030
"k8s.io/klog"
31+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3132

3233
v1 "k8s.io/api/core/v1"
3334
policy "k8s.io/api/policy/v1beta1"
3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3536
"k8s.io/apimachinery/pkg/labels"
3637
corelisters "k8s.io/client-go/listers/core/v1"
3738
policylisters "k8s.io/client-go/listers/policy/v1beta1"
38-
"k8s.io/client-go/util/workqueue"
3939
extenderv1 "k8s.io/kube-scheduler/extender/v1"
4040
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
4141
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@@ -479,7 +479,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
479479

480480
// Stops searching for more nodes once the configured number of feasible nodes
481481
// are found.
482-
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
482+
parallelize.Until(ctx, len(allNodes), checkNode)
483483
processedNodes := int(filteredLen) + len(statuses)
484484
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
485485

@@ -870,7 +870,7 @@ func (g *genericScheduler) selectNodesForPreemption(
870870
resultLock.Unlock()
871871
}
872872
}
873-
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
873+
parallelize.Until(ctx, len(potentialNodes), checkNode)
874874
return nodeToVictims, nil
875875
}
876876

pkg/scheduler/framework/plugins/interpodaffinity/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
visibility = ["//visibility:public"],
1212
deps = [
1313
"//pkg/scheduler/framework/v1alpha1:go_default_library",
14+
"//pkg/scheduler/internal/parallelize:go_default_library",
1415
"//pkg/scheduler/listers:go_default_library",
1516
"//pkg/scheduler/nodeinfo:go_default_library",
1617
"//pkg/scheduler/util:go_default_library",
@@ -20,7 +21,6 @@ go_library(
2021
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2122
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2223
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
23-
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
2424
"//vendor/k8s.io/klog:go_default_library",
2525
"//vendor/k8s.io/utils/pointer:go_default_library",
2626
],

pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/labels"
2727
"k8s.io/apimachinery/pkg/util/sets"
28-
"k8s.io/client-go/util/workqueue"
2928
"k8s.io/klog"
3029
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
30+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3131
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
3232
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
3333
)
@@ -240,7 +240,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.Node
240240
}
241241
}
242242
}
243-
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
243+
parallelize.Until(ctx, len(allNodes), processNode)
244244

245245
if err := errCh.ReceiveError(); err != nil {
246246
return nil, err
@@ -304,7 +304,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*nodei
304304
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
305305
}
306306
}
307-
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
307+
parallelize.Until(context.Background(), len(allNodes), processNode)
308308

309309
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
310310
}

pkg/scheduler/framework/plugins/interpodaffinity/scoring.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import (
2222

2323
v1 "k8s.io/api/core/v1"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25-
"k8s.io/client-go/util/workqueue"
2625
"k8s.io/klog"
2726
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
27+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
2828
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
2929
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
3030
)
@@ -259,7 +259,7 @@ func (pl *InterPodAffinity) PreScore(
259259
pl.Unlock()
260260
}
261261
}
262-
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
262+
parallelize.Until(ctx, len(allNodes), processNode)
263263
if err := errCh.ReceiveError(); err != nil {
264264
return framework.NewStatus(framework.Error, err.Error())
265265
}

pkg/scheduler/framework/plugins/podtopologyspread/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
deps = [
1414
"//pkg/scheduler/framework/plugins/helper:go_default_library",
1515
"//pkg/scheduler/framework/v1alpha1:go_default_library",
16+
"//pkg/scheduler/internal/parallelize:go_default_library",
1617
"//pkg/scheduler/listers:go_default_library",
1718
"//pkg/scheduler/nodeinfo:go_default_library",
1819
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -25,7 +26,6 @@ go_library(
2526
"//staging/src/k8s.io/client-go/informers:go_default_library",
2627
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
2728
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
28-
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
2929
"//vendor/k8s.io/klog:go_default_library",
3030
],
3131
)

pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import (
2424

2525
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/labels"
27-
"k8s.io/client-go/util/workqueue"
2827
"k8s.io/klog"
2928
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
3029
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
30+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3131
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
3232
)
3333

@@ -267,7 +267,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
267267
addTopologyPairMatchNum(pair, matchTotal)
268268
}
269269
}
270-
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
270+
parallelize.Until(context.Background(), len(allNodes), processNode)
271271

272272
// calculate min match for each topology pair
273273
for i := 0; i < len(constraints); i++ {

pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import (
2525
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/labels"
2727
"k8s.io/apimachinery/pkg/util/sets"
28-
"k8s.io/client-go/util/workqueue"
2928
"k8s.io/klog"
3029
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
3130
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
31+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3232
)
3333

3434
const preScoreStateKey = "PreScore" + Name
@@ -153,7 +153,7 @@ func (pl *PodTopologySpread) PreScore(
153153
atomic.AddInt64(state.TopologyPairToPodCounts[pair], matchSum)
154154
}
155155
}
156-
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
156+
parallelize.Until(ctx, len(allNodes), processAllNode)
157157

158158
cycleState.Write(preScoreStateKey, state)
159159
return nil

pkg/scheduler/framework/v1alpha1/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
deps = [
1616
"//pkg/controller/volume/scheduling:go_default_library",
1717
"//pkg/scheduler/apis/config:go_default_library",
18+
"//pkg/scheduler/internal/parallelize:go_default_library",
1819
"//pkg/scheduler/listers:go_default_library",
1920
"//pkg/scheduler/metrics:go_default_library",
2021
"//pkg/scheduler/nodeinfo:go_default_library",
@@ -26,7 +27,6 @@ go_library(
2627
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2728
"//staging/src/k8s.io/client-go/informers:go_default_library",
2829
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
29-
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
3030
"//staging/src/k8s.io/component-base/metrics:go_default_library",
3131
"//vendor/k8s.io/klog:go_default_library",
3232
"//vendor/sigs.k8s.io/yaml:go_default_library",

0 commit comments

Comments
 (0)