Skip to content

Commit 7115527

Browse files
committed
Allow to prefix provisioningClassName to filter provisioning requests
1 parent 02e3d19 commit 7115527

File tree

10 files changed

+229
-86
lines changed

10 files changed

+229
-86
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ type AutoscalingOptions struct {
313313
DynamicResourceAllocationEnabled bool
314314
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
315315
ClusterSnapshotParallelism int
316+
// CheckCapacityProvisioningClassPrefix is the prefix of provisioningClassName that will be filtered by processors.
317+
// Only ProvisioningRequests with this prefix in their class will be processed by this CA.
318+
// It only refers to check capacity ProvisioningRequests.
319+
CheckCapacityProvisioningClassPrefix string
316320
}
317321

318322
// KubeClientOptions specify options for kube client

cluster-autoscaler/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ var (
283283
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
284284
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
285285
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
286+
checkCapacityProvisioningClassPrefix = flag.String("check-capacity-provisioning-class-prefix", "", "Prefix of provisioningClassName that will be filtered by processors. Only ProvisioningRequests with this prefix in their class will be processed by this CA. It refers only to check capacity ProvisioningRequests.")
286287
)
287288

288289
func isFlagPassed(name string) bool {
@@ -464,6 +465,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
464465
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
465466
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
466467
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
468+
CheckCapacityProvisioningClassPrefix: *checkCapacityProvisioningClassPrefix,
467469
}
468470
}
469471

@@ -539,7 +541,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
539541
return nil, nil, err
540542
}
541543

542-
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing)
544+
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.CheckCapacityProvisioningClassPrefix)
543545
if err != nil {
544546
return nil, nil, err
545547
}
@@ -558,7 +560,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
558560

559561
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
560562
opts.ScaleUpOrchestrator = scaleUpOrchestrator
561-
provreqProcesor := provreq.NewProvReqProcessor(client)
563+
provreqProcesor := provreq.NewProvReqProcessor(client, opts.CheckCapacityProvisioningClassPrefix)
562564
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
563565

564566
podListProcessor.AddProcessor(provreqProcesor)

cluster-autoscaler/processors/provreq/injector.go

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@ import (
3737

3838
// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
3939
type ProvisioningRequestPodsInjector struct {
40-
initialRetryTime time.Duration
41-
maxBackoffTime time.Duration
42-
backoffDuration *lru.Cache
43-
clock clock.PassiveClock
44-
client *provreqclient.ProvisioningRequestClient
45-
lastProvisioningRequestProcessTime time.Time
46-
checkCapacityBatchProcessing bool
40+
initialRetryTime time.Duration
41+
maxBackoffTime time.Duration
42+
backoffDuration *lru.Cache
43+
clock clock.PassiveClock
44+
client *provreqclient.ProvisioningRequestClient
45+
lastProvisioningRequestProcessTime time.Time
46+
checkCapacityBatchProcessing bool
47+
checkCapacityProvisioningClassPrefix string
4748
}
4849

4950
// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
@@ -93,16 +94,24 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
9394
p.UpdateLastProcessTime()
9495
}
9596

97+
func (p *ProvisioningRequestPodsInjector) isSupportedClass(pr *provreqwrapper.ProvisioningRequest) bool {
98+
return provisioningrequest.SupportedProvisioningClass(pr.Spec.ProvisioningClassName, p.checkCapacityProvisioningClassPrefix)
99+
}
100+
101+
func (p *ProvisioningRequestPodsInjector) shouldMarkAsAccepted(pr *provreqwrapper.ProvisioningRequest) bool {
102+
// Don't mark as accepted the check capacity ProvReq when batch processing is enabled.
103+
// It will be marked later, in parallel, during processing the requests.
104+
return !p.checkCapacityBatchProcessing || !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName)
105+
}
106+
96107
// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
97-
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
98-
isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool,
99-
) ([]*apiv1.Pod, error) {
108+
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest() ([]*apiv1.Pod, error) {
100109
provReqs, err := p.client.ProvisioningRequests()
101110
if err != nil {
102111
return nil, err
103112
}
104113
for _, pr := range provReqs {
105-
if !isSupportedClass(pr) {
114+
if !p.isSupportedClass(pr) {
106115
continue
107116
}
108117

@@ -117,16 +126,13 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
117126
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
118127
continue
119128
}
120-
// Don't mark as accepted the check capacity ProvReq when batch processing is enabled.
121-
// It will be marked later, in parallel, during processing the requests.
122-
if pr.Spec.ProvisioningClassName == v1.ProvisioningClassCheckCapacity && p.checkCapacityBatchProcessing {
123-
p.UpdateLastProcessTime()
129+
if p.shouldMarkAsAccepted(pr) {
130+
if err := p.MarkAsAccepted(pr); err != nil {
131+
continue
132+
}
124133
return podsFromProvReq, nil
125134
}
126-
if err := p.MarkAsAccepted(pr); err != nil {
127-
continue
128-
}
129-
135+
p.UpdateLastProcessTime()
130136
return podsFromProvReq, nil
131137
}
132138
return nil, nil
@@ -139,6 +145,10 @@ type ProvisioningRequestWithPods struct {
139145
Pods []*apiv1.Pod
140146
}
141147

148+
func (p *ProvisioningRequestPodsInjector) matchesCheckCapacityClass(provisioningClassName string) bool {
149+
return provisioningClassName == p.checkCapacityProvisioningClassPrefix+v1.ProvisioningClassCheckCapacity
150+
}
151+
142152
// GetCheckCapacityBatch returns up to the requested number of ProvisioningRequestWithPods.
143153
// We do not mark the PRs as accepted here.
144154
// If we fail to get the pods for a PR, we mark the PR as failed and issue an update.
@@ -152,7 +162,7 @@ func (p *ProvisioningRequestPodsInjector) GetCheckCapacityBatch(maxPrs int) ([]P
152162
if len(prsWithPods) >= maxPrs {
153163
break
154164
}
155-
if pr.Spec.ProvisioningClassName != v1.ProvisioningClassCheckCapacity {
165+
if !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName) {
156166
continue
157167
}
158168
if !p.IsAvailableForProvisioning(pr) {
@@ -175,15 +185,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
175185
_ *context.AutoscalingContext,
176186
unschedulablePods []*apiv1.Pod,
177187
) ([]*apiv1.Pod, error) {
178-
podsFromProvReq, err := p.GetPodsFromNextRequest(
179-
func(pr *provreqwrapper.ProvisioningRequest) bool {
180-
_, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]
181-
if !found {
182-
klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name)
183-
}
184-
return found
185-
})
186-
188+
podsFromProvReq, err := p.GetPodsFromNextRequest()
187189
if err != nil {
188190
return unschedulablePods, err
189191
}
@@ -195,19 +197,20 @@ func (p *ProvisioningRequestPodsInjector) Process(
195197
func (p *ProvisioningRequestPodsInjector) CleanUp() {}
196198

197199
// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
198-
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool) (*ProvisioningRequestPodsInjector, error) {
200+
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool, checkCapacityProvisioningClassPrefix string) (*ProvisioningRequestPodsInjector, error) {
199201
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
200202
if err != nil {
201203
return nil, err
202204
}
203205
return &ProvisioningRequestPodsInjector{
204-
initialRetryTime: initialBackoffTime,
205-
maxBackoffTime: maxBackoffTime,
206-
backoffDuration: lru.New(maxCacheSize),
207-
client: client,
208-
clock: clock.RealClock{},
209-
lastProvisioningRequestProcessTime: time.Now(),
210-
checkCapacityBatchProcessing: checkCapacityBatchProcessing,
206+
initialRetryTime: initialBackoffTime,
207+
maxBackoffTime: maxBackoffTime,
208+
backoffDuration: lru.New(maxCacheSize),
209+
client: client,
210+
clock: clock.RealClock{},
211+
lastProvisioningRequestProcessTime: time.Now(),
212+
checkCapacityBatchProcessing: checkCapacityBatchProcessing,
213+
checkCapacityProvisioningClassPrefix: checkCapacityProvisioningClassPrefix,
211214
}, nil
212215
}
213216

cluster-autoscaler/processors/provreq/injector_test.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
6969
podsA := 10
7070
newProvReqA := testProvisioningRequestWithCondition("new", podsA, v1.ProvisioningClassCheckCapacity)
7171
newAcceptedProvReqA := testProvisioningRequestWithCondition("new-accepted", podsA, v1.ProvisioningClassCheckCapacity, accepted)
72+
newProvReqAPrefixed := testProvisioningRequestWithCondition("new-prefixed", podsA, "test-prefix.check-capacity.autoscaling.x-k8s.io")
7273

7374
podsB := 20
7475
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, v1.ProvisioningClassBestEffortAtomicScaleUp, notProvisioned, accepted)
@@ -79,20 +80,20 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
7980
unknownClass := testProvisioningRequestWithCondition("new-accepted", podsA, "unknown-class", accepted)
8081

8182
testCases := []struct {
82-
name string
83-
provReqs []*provreqwrapper.ProvisioningRequest
84-
existingUnsUnschedulablePodCount int
85-
checkCapacityBatchProcessing bool
86-
wantUnscheduledPodCount int
87-
wantUpdatedConditionName string
83+
name string
84+
provReqs []*provreqwrapper.ProvisioningRequest
85+
existingUnsUnschedulablePodCount int
86+
checkCapacityBatchProcessing bool
87+
checkCapacityProvisioningClassPrefix string
88+
wantUnscheduledPodCount int
89+
wantUpdatedConditionName string
8890
}{
8991
{
9092
name: "New ProvisioningRequest, pods are injected and Accepted condition is added",
9193
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
9294
wantUnscheduledPodCount: podsA,
9395
wantUpdatedConditionName: newProvReqA.Name,
9496
},
95-
9697
{
9798
name: "New check capacity ProvisioningRequest with batch processing, pods are injected and Accepted condition is not added",
9899
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
@@ -106,6 +107,22 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
106107
wantUnscheduledPodCount: podsA,
107108
wantUpdatedConditionName: newAcceptedProvReqA.Name,
108109
},
110+
{
111+
name: "New ProvisioningRequest with not matching custom prefix, no pods are injected",
112+
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed},
113+
},
114+
{
115+
name: "New ProvisioningRequest with not matching prefix, no pods are injected",
116+
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
117+
checkCapacityProvisioningClassPrefix: "test-prefix.",
118+
},
119+
{
120+
name: "New check capacity ProvisioningRequest with matching prefix, pods are injected and Accepted condition is added",
121+
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed, provisionedAcceptedProvReqB},
122+
checkCapacityProvisioningClassPrefix: "test-prefix.",
123+
wantUnscheduledPodCount: podsA,
124+
wantUpdatedConditionName: newProvReqAPrefixed.Name,
125+
},
109126
{
110127
name: "Provisioned=False, pods are injected",
111128
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq},
@@ -140,7 +157,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
140157
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
141158
backoffTime := lru.New(100)
142159
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
143-
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing}
160+
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing, tc.checkCapacityProvisioningClassPrefix}
144161
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
145162
if err != nil {
146163
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)

cluster-autoscaler/processors/provreq/processor.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,16 @@ type injector interface {
5050
}
5151

5252
type provReqProcessor struct {
53-
now func() time.Time
54-
maxUpdated int
55-
client *provreqclient.ProvisioningRequestClient
56-
injector injector
53+
now func() time.Time
54+
maxUpdated int
55+
client *provreqclient.ProvisioningRequestClient
56+
injector injector
57+
checkCapacityProvisioningClassPrefix string
5758
}
5859

5960
// NewProvReqProcessor return ProvisioningRequestProcessor.
60-
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
61-
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator()}
61+
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, checkCapacityProvisioningClassPrefix string) *provReqProcessor {
62+
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(), checkCapacityProvisioningClassPrefix: checkCapacityProvisioningClassPrefix}
6263
}
6364

6465
// Refresh implements loop.Observer interface and will be run at the start
@@ -84,7 +85,7 @@ func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningReques
8485
if len(expiredProvReq) >= p.maxUpdated {
8586
break
8687
}
87-
if ok, found := provisioningrequest.SupportedProvisioningClasses[provReq.Spec.ProvisioningClassName]; !ok || !found {
88+
if !provisioningrequest.SupportedProvisioningClass(provReq.Spec.ProvisioningClassName, p.checkCapacityProvisioningClassPrefix) {
8889
continue
8990
}
9091
conditions := provReq.Status.Conditions
@@ -144,7 +145,7 @@ func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error {
144145
}
145146
podsToCreate := []*apiv1.Pod{}
146147
for _, provReq := range provReqs {
147-
if !conditions.ShouldCapacityBeBooked(provReq) {
148+
if !conditions.ShouldCapacityBeBooked(provReq, p.checkCapacityProvisioningClassPrefix) {
148149
continue
149150
}
150151
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)

cluster-autoscaler/processors/provreq/processor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func TestRefresh(t *testing.T) {
155155
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
156156
additionalPr.Spec.ProvisioningClassName = v1.ProvisioningClassCheckCapacity
157157

158-
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil}
158+
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil, ""}
159159
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
160160

161161
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
@@ -215,7 +215,7 @@ func TestDeleteOldProvReqs(t *testing.T) {
215215

216216
client := provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr, oldFailedPr, oldExpiredPr)
217217

218-
processor := provReqProcessor{func() time.Time { return now }, 1, client, nil}
218+
processor := provReqProcessor{func() time.Time { return now }, 1, client, nil, ""}
219219
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr, oldFailedPr, oldExpiredPr})
220220

221221
_, err := client.ProvisioningRequestNoCache(oldFailedPr.Namespace, oldFailedPr.Name)

cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (o *checkCapacityProvClass) getProvisioningRequestsAndPods(unschedulablePod
132132
if !o.isBatchEnabled() {
133133
klog.Info("Processing single provisioning request (non-batch)")
134134
prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods)
135-
prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassCheckCapacity)
135+
prs = provreqclient.FilterOutProvisioningClass(prs, o.context.CheckCapacityProvisioningClassPrefix+v1.ProvisioningClassCheckCapacity)
136136
if len(prs) == 0 {
137137
return nil, nil
138138
}

0 commit comments

Comments
 (0)