Skip to content

Commit 8e7d62b

Browse files
authored
Merge pull request #8109 from abdelrahman882/dra-node-readiness
Handle node readiness for DRA after a scale-up
2 parents 5bc430e + b07e1e4 commit 8e7d62b

11 files changed

+836
-48
lines changed

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,17 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
276276

277277
stateUpdateStart := time.Now()
278278

279+
var draSnapshot *drasnapshot.Snapshot
280+
if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.AutoscalingContext.DraProvider != nil {
281+
var err error
282+
draSnapshot, err = a.AutoscalingContext.DraProvider.Snapshot()
283+
if err != nil {
284+
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
285+
}
286+
}
287+
279288
// Get nodes and pods currently living on cluster
280-
allNodes, readyNodes, typedErr := a.obtainNodeLists()
289+
allNodes, readyNodes, typedErr := a.obtainNodeLists(draSnapshot)
281290
if typedErr != nil {
282291
klog.Errorf("Failed to get node list: %v", typedErr)
283292
return typedErr
@@ -302,6 +311,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
302311
klog.Errorf("Failed to get daemonset list: %v", err)
303312
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
304313
}
314+
305315
// Snapshot scale-down actuation status before cache refresh.
306316
scaleDownActuationStatus := a.scaleDownActuator.CheckStatus()
307317
// Call CloudProvider.Refresh before any other calls to cloud provider.
@@ -335,14 +345,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
335345
}
336346
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
337347

338-
var draSnapshot *drasnapshot.Snapshot
339-
if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.AutoscalingContext.DraProvider != nil {
340-
draSnapshot, err = a.AutoscalingContext.DraProvider.Snapshot()
341-
if err != nil {
342-
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
343-
}
344-
}
345-
346348
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil {
347349
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
348350
}
@@ -980,7 +982,7 @@ func (a *StaticAutoscaler) ExitCleanUp() {
980982
a.clusterStateRegistry.Stop()
981983
}
982984

983-
func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) {
985+
func (a *StaticAutoscaler) obtainNodeLists(draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) {
984986
allNodes, err := a.AllNodeLister().List()
985987
if err != nil {
986988
klog.Errorf("Failed to list all nodes: %v", err)
@@ -998,7 +1000,7 @@ func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, caer
9981000
// Treat those nodes as unready until GPU actually becomes available and let
9991001
// our normal handling for booting up nodes deal with this.
10001002
// TODO: Remove this call when we handle dynamically provisioned resources.
1001-
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
1003+
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes, draSnapshot)
10021004
allNodes, readyNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, allNodes, readyNodes)
10031005
return allNodes, readyNodes, nil
10041006
}

cluster-autoscaler/core/static_autoscaler_dra_test.go

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) {
181181
req1Nic := testDeviceRequest{name: "req1Nic", count: 1, selectors: singleAttrSelector(exampleDriver, nicAttribute, nicTypeA)}
182182
req1Global := testDeviceRequest{name: "req1Global", count: 1, selectors: singleAttrSelector(exampleDriver, globalDevAttribute, globalDevTypeA)}
183183

184-
sharedGpuBClaim := testResourceClaim("sharedGpuBClaim", nil, "", []testDeviceRequest{req1GpuB}, nil, nil)
185-
sharedAllocatedGlobalClaim := testResourceClaim("sharedGlobalClaim", nil, "", []testDeviceRequest{req1Global}, []testAllocation{{request: req1Global.name, driver: exampleDriver, pool: "global-pool", device: globalDevice + "-0"}}, nil)
184+
sharedGpuBClaim := testResourceClaim("sharedGpuBClaim", nil, "", []testDeviceRequest{req1GpuB}, nil)
185+
sharedAllocatedGlobalClaim := testResourceClaim("sharedGlobalClaim", nil, "", []testDeviceRequest{req1Global}, []testAllocation{{request: req1Global.name, driver: exampleDriver, pool: "global-pool", device: globalDevice + "-0"}})
186186

187187
testCases := map[string]struct {
188188
nodeGroups map[*testNodeGroupDef]int
@@ -250,10 +250,8 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) {
250250
expectedScaleUps: map[string]int{node1Gpu1Nic1slice.name: 3},
251251
},
252252
"scale-up: scale from 0 nodes in a node group": {
253-
nodeGroups: map[*testNodeGroupDef]int{node1Gpu1Nic1slice: 0},
254-
pods: append(
255-
unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}),
256-
),
253+
nodeGroups: map[*testNodeGroupDef]int{node1Gpu1Nic1slice: 0},
254+
pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}),
257255
expectedScaleUps: map[string]int{node1Gpu1Nic1slice.name: 3},
258256
},
259257
"scale-up: scale from 0 nodes in a node group, with pods on the template nodes consuming DRA resources": {
@@ -264,9 +262,7 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) {
264262
scheduledPod(baseSmallPod, "template-1", node3GpuA1slice.name+"-template", map[*testDeviceRequest][]string{&req1GpuA: {gpuDevice + "-1"}}),
265263
},
266264
},
267-
pods: append(
268-
unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}),
269-
),
265+
pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}),
270266
expectedScaleUps: map[string]int{node3GpuA1slice.name: 3},
271267
},
272268
"scale-up: scale from 0 nodes in a node group, with pods on the template nodes consuming DRA resources, including shared claims": {
@@ -278,16 +274,12 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) {
278274
scheduledPod(baseSmallPod, "template-1", node3GpuA1slice.name+"-template", map[*testDeviceRequest][]string{&req1GpuA: {gpuDevice + "-1"}}, sharedAllocatedGlobalClaim),
279275
},
280276
},
281-
pods: append(
282-
unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}, sharedAllocatedGlobalClaim),
283-
),
277+
pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}, sharedAllocatedGlobalClaim),
284278
expectedScaleUps: map[string]int{node3GpuA1slice.name: 3},
285279
},
286280
"no scale-up: pods requesting multiple different devices, but they're on different nodes": {
287281
nodeGroups: map[*testNodeGroupDef]int{node1GpuA1slice: 1, node1Nic1slice: 1},
288-
pods: append(
289-
unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}),
290-
),
282+
pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}),
291283
},
292284
"scale-up: pods requesting a shared, unallocated claim": {
293285
extraResourceClaims: []*resourceapi.ResourceClaim{sharedGpuBClaim},
@@ -597,13 +589,13 @@ func resourceClaimsForPod(pod *apiv1.Pod, nodeName string, claimCount int, reque
597589
}
598590
}
599591

600-
claims = append(claims, testResourceClaim(name, pod, nodeName, claimRequests, claimAllocations, nil))
592+
claims = append(claims, testResourceClaim(name, pod, nodeName, claimRequests, claimAllocations))
601593
}
602594

603595
return claims
604596
}
605597

606-
func testResourceClaim(claimName string, owningPod *apiv1.Pod, nodeName string, requests []testDeviceRequest, allocations []testAllocation, reservedFor []*apiv1.Pod) *resourceapi.ResourceClaim {
598+
func testResourceClaim(claimName string, owningPod *apiv1.Pod, nodeName string, requests []testDeviceRequest, allocations []testAllocation) *resourceapi.ResourceClaim {
607599
var deviceRequests []resourceapi.DeviceRequest
608600
for _, request := range requests {
609601
var selectors []resourceapi.DeviceSelector
@@ -673,15 +665,6 @@ func testResourceClaim(claimName string, owningPod *apiv1.Pod, nodeName string,
673665
UID: owningPod.UID,
674666
},
675667
}
676-
} else {
677-
for _, pod := range podReservations {
678-
podReservations = append(podReservations, resourceapi.ResourceClaimConsumerReference{
679-
APIGroup: "",
680-
Resource: "pods",
681-
Name: pod.Name,
682-
UID: pod.UID,
683-
})
684-
}
685668
}
686669
claim.Status = resourceapi.ResourceClaimStatus{
687670
Allocation: &resourceapi.AllocationResult{

cluster-autoscaler/processors/customresources/custom_resources_processor.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
apiv1 "k8s.io/api/core/v1"
2121
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2222
"k8s.io/autoscaler/cluster-autoscaler/context"
23+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
2324
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
2425
)
2526

@@ -35,14 +36,9 @@ type CustomResourceTarget struct {
3536
type CustomResourcesProcessor interface {
3637
// FilterOutNodesWithUnreadyResources removes nodes that should have a custom resource, but don't have
3738
// it in allocatable from ready nodes list and updates their status to unready on all nodes list.
38-
FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node)
39+
FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node)
3940
// GetNodeResourceTargets returns mapping of resource names to their targets.
4041
GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError)
4142
// CleanUp cleans up processor's internal structures.
4243
CleanUp()
4344
}
44-
45-
// NewDefaultCustomResourcesProcessor returns a default instance of CustomResourcesProcessor.
46-
func NewDefaultCustomResourcesProcessor() CustomResourcesProcessor {
47-
return &GpuCustomResourcesProcessor{}
48-
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package customresources
18+
19+
import (
20+
apiv1 "k8s.io/api/core/v1"
21+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
22+
"k8s.io/autoscaler/cluster-autoscaler/context"
23+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
24+
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
25+
)
26+
27+
// DefaultCustomResourcesProcessor handles multiple custom resource processors and
28+
// executes them in order.
29+
type DefaultCustomResourcesProcessor struct {
30+
customResourcesProcessors []CustomResourcesProcessor
31+
}
32+
33+
// NewDefaultCustomResourcesProcessor returns an instance of DefaultCustomResourcesProcessor.
34+
func NewDefaultCustomResourcesProcessor(draEnabled bool) CustomResourcesProcessor {
35+
customProcessors := []CustomResourcesProcessor{&GpuCustomResourcesProcessor{}}
36+
if draEnabled {
37+
customProcessors = append(customProcessors, &DraCustomResourcesProcessor{})
38+
}
39+
return &DefaultCustomResourcesProcessor{customProcessors}
40+
}
41+
42+
// FilterOutNodesWithUnreadyResources calls the corresponding method for internal custom resources processors in order.
43+
func (p *DefaultCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) {
44+
newAllNodes := allNodes
45+
newReadyNodes := readyNodes
46+
for _, processor := range p.customResourcesProcessors {
47+
newAllNodes, newReadyNodes = processor.FilterOutNodesWithUnreadyResources(context, newAllNodes, newReadyNodes, draSnapshot)
48+
}
49+
return newAllNodes, newReadyNodes
50+
}
51+
52+
// GetNodeResourceTargets calls the corresponding method for internal custom resources processors in order.
53+
func (p *DefaultCustomResourcesProcessor) GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError) {
54+
customResourcesTargets := []CustomResourceTarget{}
55+
for _, processor := range p.customResourcesProcessors {
56+
targets, err := processor.GetNodeResourceTargets(context, node, nodeGroup)
57+
if err != nil {
58+
return nil, err
59+
}
60+
customResourcesTargets = append(customResourcesTargets, targets...)
61+
}
62+
return customResourcesTargets, nil
63+
}
64+
65+
// CleanUp cleans up all internal custom resources processors.
66+
func (p *DefaultCustomResourcesProcessor) CleanUp() {
67+
for _, processor := range p.customResourcesProcessors {
68+
processor.CleanUp()
69+
}
70+
}

0 commit comments

Comments
 (0)