Skip to content

Commit 20d7e08

Browse files
committed
Update code to enable CSI node awareness
1 parent b4a93df commit 20d7e08

File tree

24 files changed

+2256
-27
lines changed

24 files changed

+2256
-27
lines changed

cluster-autoscaler/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ build:
4545
@$(MAKE) build-arch-$(GOARCH)
4646

4747
build-arch-%: clean-arch-%
48-
$(ENVVAR) GOOS=$(GOOS) GOARCH=$* go build -o cluster-autoscaler-$* ${LDFLAGS_FLAG} ${TAGS_FLAG}
48+
$(ENVVAR) GOOS=$(GOOS) GOARCH=$* go build -o cluster-autoscaler-$* -mod=vendor ${LDFLAGS_FLAG} ${TAGS_FLAG}
4949

5050
test-build-tags:
5151
@if [ -z "$(SUPPORTED_BUILD_TAGS)" ]; then \

cluster-autoscaler/cluster-autoscaler-code-analysis.org

Lines changed: 1820 additions & 0 deletions
Large diffs are not rendered by default.

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ type AutoscalingOptions struct {
314314
ForceDeleteFailedNodes bool
315315
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
316316
DynamicResourceAllocationEnabled bool
317+
// CSINodeAwareSchedulingEnabled configures whether logic for handling CSINode objects is enabled.
318+
CSINodeAwareSchedulingEnabled bool
317319
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
318320
ClusterSnapshotParallelism int
319321
// CheckCapacityProcessorInstance is the name of the processor instance.

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ var (
226226
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
227227
forceDeleteFailedNodes = flag.Bool("force-delete-failed-nodes", false, "Whether to enable force deletion of failed nodes, regardless of the min size of the node group the belong to.")
228228
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
229+
enableCSINodeAwareScheduling = flag.Bool("enable-csi-node-aware-scheduling", false, "Whether logic for handling CSINode objects is enabled.")
229230
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
230231
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
231232
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
@@ -399,6 +400,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
399400
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
400401
ForceDeleteFailedNodes: *forceDeleteFailedNodes,
401402
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
403+
CSINodeAwareSchedulingEnabled: *enableCSINodeAwareScheduling,
402404
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
403405
CheckCapacityProcessorInstance: *checkCapacityProcessorInstance,
404406
MaxInactivityTime: *maxInactivityTimeFlag,

cluster-autoscaler/context/autoscaling_context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/expander"
2828
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
2929
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
30+
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
3031
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3233
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -65,6 +66,8 @@ type AutoscalingContext struct {
6566
ProvisioningRequestScaleUpMode bool
6667
// DraProvider is the provider for dynamic resources allocation.
6768
DraProvider *draprovider.Provider
69+
// CsiProvider is the provider for CSI node aware scheduling.
70+
CsiProvider *csinodeprovider.Provider
6871
}
6972

7073
// AutoscalingKubeClients contains all Kubernetes API clients,
@@ -112,6 +115,7 @@ func NewAutoscalingContext(
112115
remainingPdbTracker pdb.RemainingPdbTracker,
113116
clusterStateRegistry *clusterstate.ClusterStateRegistry,
114117
draProvider *draprovider.Provider,
118+
csiProvider *csinodeprovider.Provider,
115119
) *AutoscalingContext {
116120
return &AutoscalingContext{
117121
AutoscalingOptions: options,
@@ -125,6 +129,7 @@ func NewAutoscalingContext(
125129
RemainingPdbTracker: remainingPdbTracker,
126130
ClusterStateRegistry: clusterStateRegistry,
127131
DraProvider: draProvider,
132+
CsiProvider: csiProvider,
128133
}
129134
}
130135

cluster-autoscaler/core/autoscaler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
7676
opts.DeleteOptions,
7777
opts.DrainabilityRules,
7878
opts.DraProvider,
79+
opts.CsiProvider,
7980
), nil
8081
}
8182

@@ -98,7 +99,7 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
9899
opts.FrameworkHandle = fwHandle
99100
}
100101
if opts.ClusterSnapshot == nil {
101-
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled)
102+
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled)
102103
}
103104
if opts.RemainingPdbTracker == nil {
104105
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()

cluster-autoscaler/core/options/autoscaler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
2929
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
3030
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
31+
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
3233
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
3334
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
@@ -57,4 +58,5 @@ type AutoscalerOptions struct {
5758
DeleteOptions options.NodeDeleteOptions
5859
DrainabilityRules rules.Rules
5960
DraProvider *draprovider.Provider
61+
CsiProvider *csinodeprovider.Provider
6062
}

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
3737
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
3838
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
39+
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
3940
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
4041
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
4142
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
@@ -397,7 +398,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
397398
}
398399

399400
func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
400-
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled)
401+
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.CSINodeAwareSchedulingEnabled)
401402
pods, err := a.autoscalingCtx.AllPodLister().List()
402403
if err != nil {
403404
return nil, err
@@ -414,7 +415,15 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
414415
}
415416
}
416417

417-
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
418+
var csiSnapshot *csisnapshot.Snapshot
419+
if a.autoscalingCtx.CSINodeAwareSchedulingEnabled {
420+
csiSnapshot, err = a.autoscalingCtx.CsiProvider.Snapshot()
421+
if err != nil {
422+
return nil, err
423+
}
424+
}
425+
426+
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot, csiSnapshot)
418427
if err != nil {
419428
return nil, err
420429
}

cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
107107
if aErr != nil {
108108
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
109109
}
110-
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
110+
klog.V(4).Infof("hemant Upcoming %d nodes", len(upcomingNodes))
111111

112112
nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
113113
if o.processors != nil && o.processors.NodeGroupListProcessor != nil {
@@ -135,11 +135,14 @@ func (o *ScaleUpOrchestrator) ScaleUp(
135135
for nodegroupID := range skippedNodeGroups {
136136
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingCtx, nodegroupID)
137137
}
138+
klog.V(4).Infof("hemant validNodeGroups %d", len(validNodeGroups))
138139

139140
// Calculate expansion options
140141
schedulablePodGroups := map[string][]estimator.PodEquivalenceGroup{}
141142
var options []expander.Option
142143

144+
// This code here runs a simulation to see which pods can be scheduled on which node groups.
145+
// TODO: Fix bug with CSI node not being added to the simulation.
143146
for _, nodeGroup := range validNodeGroups {
144147
schedulablePodGroups[nodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()])
145148
}
@@ -150,6 +153,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
150153

151154
if len(option.Pods) == 0 || option.NodeCount == 0 {
152155
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
156+
klog.Infof("hemant no pod can fit to %s", nodeGroup.Id())
153157
} else if allOrNothing && len(option.Pods) < len(unschedulablePods) {
154158
klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id())
155159
} else {
@@ -486,6 +490,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
486490
o.autoscalingCtx.ClusterSnapshot,
487491
estimator.NewEstimationContext(o.autoscalingCtx.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount),
488492
)
493+
klog.Infof("hemant about to run estimater for node group %s", nodeGroup.Id())
489494
option.NodeCount, option.Pods = expansionEstimator.Estimate(podGroups, nodeInfo, nodeGroup)
490495
metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart)
491496

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
4646
"k8s.io/autoscaler/cluster-autoscaler/simulator"
4747
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
48+
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
49+
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
4850
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
4951
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
5052
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
@@ -141,7 +143,8 @@ func NewStaticAutoscaler(
141143
scaleUpOrchestrator scaleup.Orchestrator,
142144
deleteOptions options.NodeDeleteOptions,
143145
drainabilityRules rules.Rules,
144-
draProvider *draprovider.Provider) *StaticAutoscaler {
146+
draProvider *draprovider.Provider,
147+
csiProvider *csinodeprovider.Provider) *StaticAutoscaler {
145148

146149
klog.V(4).Infof("Creating new static autoscaler with opts: %v", opts)
147150

@@ -162,7 +165,8 @@ func NewStaticAutoscaler(
162165
debuggingSnapshotter,
163166
remainingPdbTracker,
164167
clusterStateRegistry,
165-
draProvider)
168+
draProvider,
169+
csiProvider)
166170

167171
taintConfig := taints.NewTaintConfig(opts)
168172
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
@@ -280,6 +284,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
280284
}
281285
}
282286

287+
var csiSnapshot *csisnapshot.Snapshot
288+
if a.AutoscalingContext.CsiProvider != nil {
289+
var err error
290+
csiSnapshot, err = a.AutoscalingContext.CsiProvider.Snapshot()
291+
if err != nil {
292+
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
293+
}
294+
}
295+
283296
// Get nodes and pods currently living on cluster
284297
allNodes, readyNodes, typedErr := a.obtainNodeLists(draSnapshot)
285298
if typedErr != nil {
@@ -340,7 +353,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
340353
}
341354
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
342355

343-
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil {
356+
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot, csiSnapshot); err != nil {
344357
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
345358
}
346359
// Initialize Pod Disruption Budget tracking

0 commit comments

Comments
 (0)