Skip to content

Commit 81af5c4

Browse files
committed
Run scheduler predicates in parallel
Setting this initially to 4 goroutines. Setting parallelism higher than 4 seems to yield diminishing returns at this point: $ go test -bench=BenchmarkRunFiltersUntilPassingNode ./simulator/clustersnapshot/predicate/ (...) BenchmarkRunFiltersUntilPassingNode/parallelism-1-16 141 8206978 ns/op BenchmarkRunFiltersUntilPassingNode/parallelism-2-16 153 7123724 ns/op BenchmarkRunFiltersUntilPassingNode/parallelism-4-16 183 6997209 ns/op BenchmarkRunFiltersUntilPassingNode/parallelism-8-16 174 7161056 ns/op BenchmarkRunFiltersUntilPassingNode/parallelism-16-16 178 7068643 ns/op This is because the function is currently dominated by ListNodeInfos which causes frequent memory allocation during WrapSchedulerNodeInfo calls. Since NodeInfo is an interface now, we should be able to avoid costly object wrapping on listing, at which point it may make sense to bump this parallelism further.
1 parent 2cd7445 commit 81af5c4

File tree

10 files changed

+113
-24
lines changed

10 files changed

+113
-24
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ type AutoscalingOptions struct {
316316
DynamicResourceAllocationEnabled bool
317317
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
318318
ClusterSnapshotParallelism int
319+
// PredicateParallelism is the number of goroutines to use for running scheduler predicates.
320+
PredicateParallelism int
319321
// CheckCapacityProcessorInstance is the name of the processor instance.
320322
// Only ProvisioningRequests that define this name in their parameters with the key "processorInstance" will be processed by this CA instance.
321323
// It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance.

cluster-autoscaler/config/flags/flags.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ var (
227227
forceDeleteFailedNodes = flag.Bool("force-delete-failed-nodes", false, "Whether to enable force deletion of failed nodes, regardless of the min size of the node group the belong to.")
228228
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
229229
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
230+
predicateParallelism = flag.Int("predicate-parallelism", 4, "Maximum parallelism of scheduler predicate checking.")
230231
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
231232
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
232233
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
@@ -286,6 +287,10 @@ func createAutoscalingOptions() config.AutoscalingOptions {
286287
}
287288
}
288289

290+
if *predicateParallelism < 1 {
291+
klog.Fatalf("Invalid value for --predicate-parallelism flag: %d", *predicateParallelism)
292+
}
293+
289294
return config.AutoscalingOptions{
290295
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
291296
ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold,
@@ -400,6 +405,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
400405
ForceDeleteFailedNodes: *forceDeleteFailedNodes,
401406
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
402407
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
408+
PredicateParallelism: *predicateParallelism,
403409
CheckCapacityProcessorInstance: *checkCapacityProcessorInstance,
404410
MaxInactivityTime: *maxInactivityTimeFlag,
405411
MaxFailingTime: *maxFailingTimeFlag,

cluster-autoscaler/core/autoscaler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
9898
opts.FrameworkHandle = fwHandle
9999
}
100100
if opts.ClusterSnapshot == nil {
101-
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled)
101+
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism)
102102
}
103103
if opts.RemainingPdbTracker == nil {
104104
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
397397
}
398398

399399
func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
400-
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled)
400+
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism)
401401
pods, err := a.autoscalingCtx.AllPodLister().List()
402402
if err != nil {
403403
return nil, err

cluster-autoscaler/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot
126126
opts := coreoptions.AutoscalerOptions{
127127
AutoscalingOptions: autoscalingOptions,
128128
FrameworkHandle: fwHandle,
129-
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
129+
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism),
130130
KubeClient: kubeClient,
131131
InformerFactory: informerFactory,
132132
DebuggingSnapshotter: debuggingSnapshotter,

cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,27 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23+
"sync"
2324

2425
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
2526
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
27+
"k8s.io/client-go/util/workqueue"
2628

2729
apiv1 "k8s.io/api/core/v1"
2830
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
2931
)
3032

3133
// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework.
3234
type SchedulerPluginRunner struct {
33-
fwHandle *framework.Handle
34-
snapshot clustersnapshot.ClusterSnapshot
35-
lastIndex int
35+
fwHandle *framework.Handle
36+
snapshot clustersnapshot.ClusterSnapshot
37+
lastIndex int
38+
parallelism int
3639
}
3740

3841
// NewSchedulerPluginRunner builds a SchedulerPluginRunner.
39-
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot) *SchedulerPluginRunner {
40-
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot}
42+
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot, parallelism int) *SchedulerPluginRunner {
43+
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot, parallelism: parallelism}
4144
}
4245

4346
// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided
@@ -64,38 +67,63 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
6467
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
6568
}
6669

67-
for i := range nodeInfosList {
68-
// Determine which NodeInfo to check next.
69-
nodeInfo := nodeInfosList[(p.lastIndex+i)%len(nodeInfosList)]
70+
var (
71+
foundNode *apiv1.Node
72+
foundCycleState *schedulerframework.CycleState
73+
foundIndex int
74+
mu sync.Mutex
75+
)
76+
77+
ctx, cancel := context.WithCancel(context.Background())
78+
defer cancel()
79+
80+
checkNode := func(i int) {
81+
nodeIndex := (p.lastIndex + i) % len(nodeInfosList)
82+
nodeInfo := nodeInfosList[nodeIndex]
7083

7184
// Plugins can filter some Nodes out during the PreFilter phase, if they're sure the Nodes won't work for the Pod at that stage.
7285
// Filters are only run for Nodes that haven't been filtered out during the PreFilter phase. Match that behavior here - skip such Nodes.
7386
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) {
74-
continue
87+
return
7588
}
7689

7790
// Nodes with the Unschedulable bit set will be rejected by one of the plugins during the Filter phase below. We can check that quickly here
7891
// and short-circuit to avoid running the expensive Filter phase at all in this case.
7992
if nodeInfo.Node().Spec.Unschedulable {
80-
continue
93+
return
8194
}
8295

8396
// Check if the NodeInfo matches the provided filtering condition. This should be less expensive than running the Filter phase below, so
8497
// check this first.
8598
if !nodeMatches(nodeInfo) {
86-
continue
99+
return
87100
}
88101

89102
// Run the Filter phase of the framework. Plugins retrieve the state they saved during PreFilter from CycleState, and answer whether the
90103
// given Pod can be scheduled on the given Node.
91-
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
104+
clonedState := state.Clone()
105+
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), clonedState, pod, nodeInfo.ToScheduler())
92106
if filterStatus.IsSuccess() {
93107
// Filter passed for all plugins, so this pod can be scheduled on this Node.
94-
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList)
95-
return nodeInfo.Node(), state, nil
108+
mu.Lock()
109+
defer mu.Unlock()
110+
if foundNode == nil {
111+
foundNode = nodeInfo.Node()
112+
foundCycleState = clonedState.(*schedulerframework.CycleState)
113+
foundIndex = nodeIndex
114+
cancel()
115+
}
96116
}
97117
// Filter didn't pass for some plugin, so this Node won't work - move on to the next one.
98118
}
119+
120+
workqueue.ParallelizeUntil(ctx, p.parallelism, len(nodeInfosList), checkNode)
121+
122+
if foundNode != nil {
123+
p.lastIndex = (foundIndex + 1) % len(nodeInfosList)
124+
return foundNode, foundCycleState, nil
125+
}
126+
99127
return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
100128
}
101129

cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package predicate
1818

1919
import (
20+
"fmt"
2021
"os"
2122
"path/filepath"
2223
"testing"
@@ -363,6 +364,58 @@ func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfigurati
363364
if err != nil {
364365
return nil, nil, err
365366
}
366-
snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true)
367-
return NewSchedulerPluginRunner(fwHandle, snapshot), snapshot, nil
367+
snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1)
368+
return NewSchedulerPluginRunner(fwHandle, snapshot, 1), snapshot, nil
369+
}
370+
371+
func BenchmarkRunFiltersUntilPassingNode(b *testing.B) {
372+
pod := BuildTestPod("p", 100, 1000)
373+
nodes := make([]*apiv1.Node, 0, 5001)
374+
podsOnNodes := make(map[string][]*apiv1.Pod)
375+
376+
for i := 0; i < 5000; i++ {
377+
nodeName := fmt.Sprintf("n-%d", i)
378+
node := BuildTestNode(nodeName, 10, 1000)
379+
nodes = append(nodes, node)
380+
// Add 10 small pods to each node
381+
pods := make([]*apiv1.Pod, 0, 10)
382+
for j := 0; j < 10; j++ {
383+
pods = append(pods, BuildTestPod(fmt.Sprintf("p-%d-%d", i, j), 1, 1))
384+
}
385+
podsOnNodes[nodeName] = pods
386+
}
387+
// Last node is the only one that can fit the pod.
388+
lastNodeName := fmt.Sprintf("n-%d", len(nodes))
389+
lastNode := BuildTestNode(lastNodeName, 1000, 1000)
390+
nodes = append(nodes, lastNode)
391+
392+
pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(nil)
393+
assert.NoError(b, err)
394+
395+
for _, node := range nodes {
396+
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, podsOnNodes[node.Name]...))
397+
assert.NoError(b, err)
398+
}
399+
400+
testCases := []struct {
401+
parallelism int
402+
}{
403+
{parallelism: 1},
404+
{parallelism: 2},
405+
{parallelism: 4},
406+
{parallelism: 8},
407+
{parallelism: 16},
408+
}
409+
410+
for _, tc := range testCases {
411+
b.Run(fmt.Sprintf("parallelism-%d", tc.parallelism), func(b *testing.B) {
412+
pluginRunner.parallelism = tc.parallelism
413+
b.ResetTimer()
414+
for i := 0; i < b.N; i++ {
415+
pluginRunner.lastIndex = 0 // Reset state for each run
416+
_, _, err := pluginRunner.RunFiltersUntilPassingNode(pod, func(info *framework.NodeInfo) bool { return true })
417+
assert.NoError(b, err)
418+
}
419+
})
420+
}
368421
}

cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type PredicateSnapshot struct {
3737
}
3838

3939
// NewPredicateSnapshot builds a PredicateSnapshot.
40-
func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool) *PredicateSnapshot {
40+
func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool, parallelism int) *PredicateSnapshot {
4141
snapshot := &PredicateSnapshot{
4242
ClusterSnapshotStore: snapshotStore,
4343
draEnabled: draEnabled,
@@ -46,7 +46,7 @@ func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fw
4646
// which operate on *framework.NodeInfo. The only object that allows obtaining *framework.NodeInfos is PredicateSnapshot, so we have an ugly circular
4747
// dependency between PluginRunner and PredicateSnapshot.
4848
// TODO: Refactor PluginRunner so that it doesn't depend on PredicateSnapshot (e.g. move retrieving NodeInfos out of PluginRunner, to PredicateSnapshot).
49-
snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot)
49+
snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot, parallelism)
5050
return snapshot
5151
}
5252

cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
4949
if err != nil {
5050
return nil, err
5151
}
52-
return NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true), nil
52+
return NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1), nil
5353
},
5454
"delta": func() (clustersnapshot.ClusterSnapshot, error) {
5555
fwHandle, err := framework.NewTestFrameworkHandle()
5656
if err != nil {
5757
return nil, err
5858
}
59-
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil
59+
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true, 1), nil
6060
},
6161
}
6262

cluster-autoscaler/simulator/clustersnapshot/testsnapshot/test_snapshot.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ func NewCustomTestSnapshotAndHandle(snapshotStore clustersnapshot.ClusterSnapsho
5757
if err != nil {
5858
return nil, nil, err
5959
}
60-
return predicate.NewPredicateSnapshot(snapshotStore, testFwHandle, true), testFwHandle, nil
60+
return predicate.NewPredicateSnapshot(snapshotStore, testFwHandle, true, 1), testFwHandle, nil
6161
}

0 commit comments

Comments
 (0)