Skip to content

Commit 134d636

Browse files
authored
Merge pull request #8090 from mtrqq/dra-shapshot-patch
Patches based implementation for DRA snapshot.
2 parents a880a2b + 98f86a7 commit 134d636

31 files changed

+3054
-1012
lines changed

cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"k8s.io/autoscaler/cluster-autoscaler/config"
2727
"k8s.io/autoscaler/cluster-autoscaler/context"
2828
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
29-
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3029
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3130
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
3231
)
@@ -111,7 +110,7 @@ func TestFilterOutExpendable(t *testing.T) {
111110
t.Run(tc.name, func(t *testing.T) {
112111
processor := NewFilterOutExpendablePodListProcessor()
113112
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
114-
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
113+
err := snapshot.SetClusterState(tc.nodes, nil, nil)
115114
assert.NoError(t, err)
116115

117116
pods, err := processor.Process(&context.AutoscalingContext{

cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
2828
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
2929
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
30-
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3130
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3231
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
3332
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -281,7 +280,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
281280
}
282281

283282
clusterSnapshot := snapshotFactory()
284-
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, drasnapshot.Snapshot{}); err != nil {
283+
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, nil); err != nil {
285284
assert.NoError(b, err)
286285
}
287286

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
406406
scheduledPods := kube_util.ScheduledPods(pods)
407407
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)
408408

409-
var draSnapshot drasnapshot.Snapshot
409+
var draSnapshot *drasnapshot.Snapshot
410410
if a.ctx.DynamicResourceAllocationEnabled && a.ctx.DraProvider != nil {
411411
draSnapshot, err = a.ctx.DraProvider.Snapshot()
412412
if err != nil {

cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type testCase struct {
4242
desc string
4343
nodes []*apiv1.Node
4444
pods []*apiv1.Pod
45-
draSnapshot drasnapshot.Snapshot
45+
draSnapshot *drasnapshot.Snapshot
4646
draEnabled bool
4747
wantUnneeded []string
4848
wantUnremovable []*simulator.UnremovableNode

cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import (
4646
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
4747
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
4848
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
49-
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
5049
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
5150
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
5251
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -1044,7 +1043,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
10441043
// build orchestrator
10451044
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
10461045
assert.NoError(t, err)
1047-
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), drasnapshot.Snapshot{})
1046+
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), nil)
10481047
assert.NoError(t, err)
10491048
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
10501049
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@@ -1154,7 +1153,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
11541153
}
11551154
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
11561155
assert.NoError(t, err)
1157-
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
1156+
err = context.ClusterSnapshot.SetClusterState(nodes, pods, nil)
11581157
assert.NoError(t, err)
11591158
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
11601159
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1197,7 +1196,7 @@ func TestBinpackingLimiter(t *testing.T) {
11971196

11981197
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
11991198
assert.NoError(t, err)
1200-
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
1199+
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
12011200
assert.NoError(t, err)
12021201
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
12031202
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@@ -1257,7 +1256,7 @@ func TestScaleUpNoHelp(t *testing.T) {
12571256
}
12581257
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
12591258
assert.NoError(t, err)
1260-
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
1259+
err = context.ClusterSnapshot.SetClusterState(nodes, pods, nil)
12611260
assert.NoError(t, err)
12621261
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
12631262
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1412,7 +1411,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
14121411
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
14131412
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
14141413
assert.NoError(t, err)
1415-
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
1414+
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
14161415
assert.NoError(t, err)
14171416
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
14181417
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1496,7 +1495,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
14961495
}
14971496
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
14981497
assert.NoError(t, err)
1499-
err = context.ClusterSnapshot.SetClusterState(nodes, podList, drasnapshot.Snapshot{})
1498+
err = context.ClusterSnapshot.SetClusterState(nodes, podList, nil)
15001499
assert.NoError(t, err)
15011500
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
15021501
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1672,7 +1671,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
16721671
assert.NoError(t, err)
16731672

16741673
nodes := []*apiv1.Node{n1, n2}
1675-
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
1674+
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
16761675
assert.NoError(t, err)
16771676
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
16781677
processors := processorstest.NewTestProcessors(&context)

cluster-autoscaler/core/scaleup/resource/manager_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"k8s.io/autoscaler/cluster-autoscaler/core/test"
3333
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
3434
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
35-
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3635
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3736
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
3837
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -72,7 +71,7 @@ func TestDeltaForNode(t *testing.T) {
7271

7372
ng := testCase.nodeGroupConfig
7473
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
75-
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
74+
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
7675
assert.NoError(t, err)
7776
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
7877

@@ -115,7 +114,7 @@ func TestResourcesLeft(t *testing.T) {
115114

116115
ng := testCase.nodeGroupConfig
117116
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
118-
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
117+
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
119118
assert.NoError(t, err)
120119
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
121120

@@ -168,7 +167,7 @@ func TestApplyLimits(t *testing.T) {
168167

169168
ng := testCase.nodeGroupConfig
170169
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
171-
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
170+
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
172171
assert.NoError(t, err)
173172
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
174173

@@ -235,7 +234,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
235234
assert.NoError(t, err)
236235

237236
nodes := []*corev1.Node{n1}
238-
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
237+
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
239238
assert.NoError(t, err)
240239
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
241240

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
335335
}
336336
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
337337

338-
var draSnapshot drasnapshot.Snapshot
338+
var draSnapshot *drasnapshot.Snapshot
339339
if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.AutoscalingContext.DraProvider != nil {
340340
draSnapshot, err = a.AutoscalingContext.DraProvider.Snapshot()
341341
if err != nil {

cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
2929
"k8s.io/autoscaler/cluster-autoscaler/context"
3030
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
31-
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3231
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3332
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3433
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
@@ -86,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
8685

8786
nodes := []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1, ready7, readyToBeDeleted6}
8887
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
89-
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
88+
err := snapshot.SetClusterState(nodes, nil, nil)
9089
assert.NoError(t, err)
9190

9291
ctx := context.AutoscalingContext{
@@ -173,7 +172,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
173172

174173
nodes := []*apiv1.Node{unready4, unready3, ready2, ready1}
175174
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
176-
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
175+
err := snapshot.SetClusterState(nodes, nil, nil)
177176
assert.NoError(t, err)
178177

179178
// Fill cache
@@ -264,7 +263,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
264263

265264
nodes := []*apiv1.Node{ready1}
266265
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
267-
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
266+
err := snapshot.SetClusterState(nodes, nil, nil)
268267
assert.NoError(t, err)
269268

270269
ctx := context.AutoscalingContext{

cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ type ClusterSnapshotStore interface {
7777

7878
// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
7979
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
80-
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error
80+
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot *drasnapshot.Snapshot) error
8181

8282
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot without checking scheduler predicates.
8383
ForceAddPod(pod *apiv1.Pod, nodeName string) error
@@ -93,7 +93,7 @@ type ClusterSnapshotStore interface {
9393
RemoveSchedulerNodeInfo(nodeName string) error
9494

9595
// DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot.
96-
DraSnapshot() drasnapshot.Snapshot
96+
DraSnapshot() *drasnapshot.Snapshot
9797

9898
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert().
9999
// Use WithForkedSnapshot() helper function instead if possible.

cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, e
5656
if err != nil {
5757
return nil, err
5858
}
59+
5960
if s.draEnabled {
6061
return s.ClusterSnapshotStore.DraSnapshot().WrapSchedulerNodeInfo(schedNodeInfo)
6162
}

0 commit comments

Comments
 (0)