Skip to content

Commit 8388db9

Browse files
committed
Update code to enable CSI node awareness
Fix code to use new framework
1 parent 9caf573 commit 8388db9

File tree

57 files changed

+685
-169
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+685
-169
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ type AutoscalingOptions struct {
314314
ForceDeleteFailedNodes bool
315315
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
316316
DynamicResourceAllocationEnabled bool
317+
// CSINodeAwareSchedulingEnabled configures whether logic for handling CSINode objects is enabled.
318+
CSINodeAwareSchedulingEnabled bool
317319
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
318320
ClusterSnapshotParallelism int
319321
// PredicateParallelism is the number of goroutines to use for running scheduler predicates.

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ var (
226226
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
227227
forceDeleteFailedNodes = flag.Bool("force-delete-failed-nodes", false, "Whether to enable force deletion of failed nodes, regardless of the min size of the node group the belong to.")
228228
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
229+
enableCSINodeAwareScheduling = flag.Bool("enable-csi-node-aware-scheduling", false, "Whether logic for handling CSINode objects is enabled.")
229230
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
230231
predicateParallelism = flag.Int("predicate-parallelism", 4, "Maximum parallelism of scheduler predicate checking.")
231232
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
@@ -405,6 +406,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
405406
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
406407
ForceDeleteFailedNodes: *forceDeleteFailedNodes,
407408
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
409+
CSINodeAwareSchedulingEnabled: *enableCSINodeAwareScheduling,
408410
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
409411
PredicateParallelism: *predicateParallelism,
410412
CheckCapacityProcessorInstance: *checkCapacityProcessorInstance,

cluster-autoscaler/context/autoscaling_context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/expander"
2828
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
2929
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
30+
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
3031
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3233
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -65,6 +66,8 @@ type AutoscalingContext struct {
6566
ProvisioningRequestScaleUpMode bool
6667
// DraProvider is the provider for dynamic resources allocation.
6768
DraProvider *draprovider.Provider
69+
// CsiProvider is the provider for CSI node aware scheduling.
70+
CsiProvider *csinodeprovider.Provider
6871
}
6972

7073
// AutoscalingKubeClients contains all Kubernetes API clients,
@@ -112,6 +115,7 @@ func NewAutoscalingContext(
112115
remainingPdbTracker pdb.RemainingPdbTracker,
113116
clusterStateRegistry *clusterstate.ClusterStateRegistry,
114117
draProvider *draprovider.Provider,
118+
csiProvider *csinodeprovider.Provider,
115119
) *AutoscalingContext {
116120
return &AutoscalingContext{
117121
AutoscalingOptions: options,
@@ -125,6 +129,7 @@ func NewAutoscalingContext(
125129
RemainingPdbTracker: remainingPdbTracker,
126130
ClusterStateRegistry: clusterStateRegistry,
127131
DraProvider: draProvider,
132+
CsiProvider: csiProvider,
128133
}
129134
}
130135

cluster-autoscaler/core/autoscaler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
7676
opts.DeleteOptions,
7777
opts.DrainabilityRules,
7878
opts.DraProvider,
79+
opts.CsiProvider,
7980
), nil
8081
}
8182

@@ -91,14 +92,14 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
9192
opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
9293
}
9394
if opts.FrameworkHandle == nil {
94-
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
95+
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled)
9596
if err != nil {
9697
return err
9798
}
9899
opts.FrameworkHandle = fwHandle
99100
}
100101
if opts.ClusterSnapshot == nil {
101-
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism)
102+
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism, opts.CSINodeAwareSchedulingEnabled)
102103
}
103104
if opts.RemainingPdbTracker == nil {
104105
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()

cluster-autoscaler/core/options/autoscaler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
2929
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
3030
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
31+
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
3233
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
3334
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
@@ -57,4 +58,5 @@ type AutoscalerOptions struct {
5758
DeleteOptions options.NodeDeleteOptions
5859
DrainabilityRules rules.Rules
5960
DraProvider *draprovider.Provider
61+
CsiProvider *csinodeprovider.Provider
6062
}

cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestFilterOutExpendable(t *testing.T) {
110110
t.Run(tc.name, func(t *testing.T) {
111111
processor := NewFilterOutExpendablePodListProcessor()
112112
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
113-
err := snapshot.SetClusterState(tc.nodes, nil, nil)
113+
err := snapshot.SetClusterState(tc.nodes, nil, nil, nil)
114114
assert.NoError(t, err)
115115

116116
pods, err := processor.Process(&ca_context.AutoscalingContext{

cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
280280
}
281281

282282
clusterSnapshot := snapshotFactory()
283-
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, nil); err != nil {
283+
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, nil, nil); err != nil {
284284
assert.NoError(b, err)
285285
}
286286

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
3737
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
3838
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
39+
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
3940
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
4041
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
4142
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
@@ -397,7 +398,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
397398
}
398399

399400
func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
400-
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism)
401+
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism, a.autoscalingCtx.CSINodeAwareSchedulingEnabled)
401402
pods, err := a.autoscalingCtx.AllPodLister().List()
402403
if err != nil {
403404
return nil, err
@@ -414,7 +415,15 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
414415
}
415416
}
416417

417-
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
418+
var csiSnapshot *csisnapshot.Snapshot
419+
if a.autoscalingCtx.CSINodeAwareSchedulingEnabled {
420+
csiSnapshot, err = a.autoscalingCtx.CsiProvider.Snapshot()
421+
if err != nil {
422+
return nil, err
423+
}
424+
}
425+
426+
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot, csiSnapshot)
418427
if err != nil {
419428
return nil, err
420429
}

cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
12281228
t.Fatalf("Couldn't create daemonset lister")
12291229
}
12301230

1231-
registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
1231+
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
12321232
autoscalingCtx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
12331233
if err != nil {
12341234
t.Fatalf("Couldn't set up autoscaling context: %v", err)
@@ -1541,7 +1541,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
15411541

15421542
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
15431543
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
1544-
registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, nil, nil, nil, nil, nil)
1544+
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, pdbLister, nil, nil, nil, nil, nil)
15451545
autoscalingCtx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
15461546
if err != nil {
15471547
t.Fatalf("Couldn't set up autoscaling context: %v", err)

cluster-autoscaler/core/scaledown/actuation/drain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
139139
provider := testprovider.NewTestCloudProviderBuilder().Build()
140140
provider.AddNodeGroup("ng1", 1, 10, 1)
141141
provider.AddNode("ng1", n1)
142-
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
142+
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
143143

144144
autoscalingCtx, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
145145
assert.NoError(t, err)

0 commit comments

Comments
 (0)