Skip to content

Commit 64a815f

Browse files
authored
Merge pull request #8521 from abdelrahman882/capacity-buffer-controller
Add Capacity Buffer controller logic
2 parents bf86702 + fe61e26 commit 64a815f

File tree

15 files changed

+1068
-12
lines changed

15 files changed

+1068
-12
lines changed

cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1/types.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ type ResourceList map[ResourceName]resource.Quantity
9797
// CapacityBufferSpec defines the desired state of CapacityBuffer.
9898
type CapacityBufferSpec struct {
9999
// ProvisioningStrategy defines how the buffer is utilized.
100-
// "active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
101-
// +kubebuilder:validation:Enum=active-capacity
102-
// +kubebuilder:default="active-capacity"
100+
// "buffer.x-k8s.io/active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
101+
// +kubebuilder:validation:Enum=buffer.x-k8s.io/active-capacity
102+
// +kubebuilder:default="buffer.x-k8s.io/active-capacity"
103103
// +optional
104104
ProvisioningStrategy *string `json:"provisioningStrategy,omitempty" protobuf:"bytes,1,opt,name=provisioningStrategy"`
105105

@@ -123,24 +123,18 @@ type CapacityBufferSpec struct {
123123
// If neither `replicas` nor `percentage` is set, as many chunks as fit within
124124
// defined resource limits (if any) will be created. If both are set, the maximum
125125
// of the two will be used.
126-
// This field is mutually exclusive with `percentage` when `scalableRef` is set.
127126
// +optional
128127
// +kubebuilder:validation:Minimum=0
129128
// +kubebuilder:validation:ExclusiveMinimum=false
130-
// +kubebuilder:validation:Xor=replicas,percentage
131129
Replicas *int32 `json:"replicas,omitempty" protobuf:"varint,4,opt,name=replicas"`
132130

133131
// Percentage defines the desired buffer capacity as a percentage of the
134132
// `scalableRef`'s current replicas. This is only applicable if `scalableRef` is set.
135133
// The absolute number of replicas is calculated from the percentage by rounding up to a minimum of 1.
136134
// For example, if `scalableRef` has 10 replicas and `percentage` is 20, 2 buffer chunks will be created.
137-
// This field is mutually exclusive with `replicas`.
138135
// +optional
139136
// +kubebuilder:validation:Minimum=0
140-
// +kubebuilder:validation:Maximum=100
141-
// +kubebuilder:validation:ExclusiveMaximum=false
142137
// +kubebuilder:validation:ExclusiveMinimum=false
143-
// +kubebuilder:validation:Xor=replicas,percentage
144138
Percentage *int32 `json:"percentage,omitempty" protobuf:"varint,5,opt,name=percentage"`
145139

146140
// Limits, if specified, will limit the number of chunks created for this buffer

cluster-autoscaler/apis/config/crd/autoscaling.x-k8s.io_capacitybuffers.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,12 @@ spec:
100100
- name
101101
type: object
102102
provisioningStrategy:
103-
default: active-capacity
103+
default: buffer.x-k8s.io/active-capacity
104104
description: |-
105105
ProvisioningStrategy defines how the buffer is utilized.
106-
"active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
106+
"buffer.x-k8s.io/active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
107107
enum:
108-
- active-capacity
108+
- buffer.x-k8s.io/active-capacity
109109
type: string
110110
replicas:
111111
description: |-
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package common
18+
19+
import (
20+
"context"
21+
22+
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
23+
client "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/client-go/kubernetes"
28+
)
29+
30+
// Constants to use in Capacity Buffers objects
31+
const (
32+
ActiveProvisioningStrategy = "buffer.x-k8s.io/active-capacity"
33+
ReadyForProvisioningCondition = "ReadyForProvisioning"
34+
ProvisioningCondition = "Provisioning"
35+
ConditionTrue = "True"
36+
ConditionFalse = "False"
37+
DefaultNamespace = "default"
38+
)
39+
40+
// CreatePodTemplate creates a pod template object by calling API server
41+
func CreatePodTemplate(client *kubernetes.Clientset, podTemplate *corev1.PodTemplate) (*corev1.PodTemplate, error) {
42+
return client.CoreV1().PodTemplates(DefaultNamespace).Create(context.TODO(), podTemplate, metav1.CreateOptions{})
43+
}
44+
45+
// UpdateBufferStatus updates the passed buffer object with its defined status
46+
func UpdateBufferStatus(buffersClient client.Interface, buffer *v1.CapacityBuffer) error {
47+
_, err := buffersClient.AutoscalingV1().CapacityBuffers(DefaultNamespace).UpdateStatus(context.TODO(), buffer, metav1.UpdateOptions{})
48+
return err
49+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/klog/v2"
23+
24+
"k8s.io/apimachinery/pkg/labels"
25+
buffersclient "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
26+
27+
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/listers/autoscaling.x-k8s.io/v1"
28+
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
29+
30+
common "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
31+
filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
32+
translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
33+
updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
34+
35+
client "k8s.io/client-go/kubernetes"
36+
)
37+
38+
const loopInterval = time.Second * 5
39+
40+
// BufferController performs updates on Buffers and convert them to pods to be injected
41+
type BufferController interface {
42+
// Run to run the reconciliation loop frequently every x seconds
43+
Run(stopCh <-chan struct{})
44+
}
45+
46+
type bufferController struct {
47+
buffersLister v1.CapacityBufferLister
48+
strategyFilter filters.Filter
49+
statusFilter filters.Filter
50+
translator translators.Translator
51+
updater updater.StatusUpdater
52+
loopInterval time.Duration
53+
}
54+
55+
// NewBufferController creates new bufferController object
56+
func NewBufferController(
57+
buffersLister v1.CapacityBufferLister,
58+
strategyFilter filters.Filter,
59+
statusFilter filters.Filter,
60+
translator translators.Translator,
61+
updater updater.StatusUpdater,
62+
loopInterval time.Duration,
63+
) BufferController {
64+
return &bufferController{
65+
buffersLister: buffersLister,
66+
strategyFilter: strategyFilter,
67+
statusFilter: statusFilter,
68+
translator: translator,
69+
updater: updater,
70+
loopInterval: loopInterval,
71+
}
72+
}
73+
74+
// NewDefaultBufferController creates bufferController with default configs
75+
func NewDefaultBufferController(
76+
listerRegistry kubernetes.ListerRegistry,
77+
capacityBufferClinet buffersclient.Clientset,
78+
nodeBufferListener v1.CapacityBufferLister,
79+
kubeClient client.Clientset,
80+
) BufferController {
81+
return &bufferController{
82+
buffersLister: nodeBufferListener,
83+
// Accepting empty string as it represents nil value for ProvisioningStrategy
84+
strategyFilter: filters.NewStrategyFilter([]string{common.ActiveProvisioningStrategy, ""}),
85+
statusFilter: filters.NewStatusFilter(map[string]string{
86+
common.ReadyForProvisioningCondition: common.ConditionTrue,
87+
common.ProvisioningCondition: common.ConditionTrue,
88+
}),
89+
translator: translators.NewCombinedTranslator(
90+
[]translators.Translator{
91+
translators.NewPodTemplateBufferTranslator(),
92+
},
93+
),
94+
updater: *updater.NewStatusUpdater(&capacityBufferClinet),
95+
loopInterval: loopInterval,
96+
}
97+
}
98+
99+
// Run to run the controller reconcile loop
100+
func (c *bufferController) Run(stopCh <-chan struct{}) {
101+
for {
102+
select {
103+
case <-stopCh:
104+
return
105+
case <-time.After(c.loopInterval):
106+
c.reconcile()
107+
}
108+
}
109+
}
110+
111+
// Reconcile represents single iteration in the main-loop of Updater
112+
func (c *bufferController) reconcile() {
113+
114+
// List all capacity buffers objects
115+
buffers, err := c.buffersLister.List(labels.Everything())
116+
if err != nil {
117+
klog.Errorf("Capacity buffer controller failed to list buffers with error: %v", err.Error())
118+
return
119+
}
120+
klog.V(2).Infof("Capacity buffer controller listed [%v] buffers", len(buffers))
121+
122+
// Filter the desired provisioning strategy
123+
filteredBuffers, _ := c.strategyFilter.Filter(buffers)
124+
klog.V(2).Infof("Capacity buffer controller filtered %v buffers with buffers strategy filter", len(filteredBuffers))
125+
126+
// Filter the desired status
127+
toBeTranslatedBuffers, _ := c.statusFilter.Filter(filteredBuffers)
128+
klog.V(2).Infof("Capacity buffer controller filtered %v buffers with buffers status filter", len(filteredBuffers))
129+
130+
// Extract pod specs and number of replicas from filtered buffers
131+
errors := c.translator.Translate(toBeTranslatedBuffers)
132+
logErrors(errors)
133+
134+
// Update buffer status by calling API server
135+
errors = c.updater.Update(toBeTranslatedBuffers)
136+
logErrors(errors)
137+
}
138+
139+
func logErrors(errors []error) {
140+
for _, error := range errors {
141+
klog.Errorf("Capacity buffer controller error: %v", error.Error())
142+
}
143+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
21+
)
22+
23+
// Filter filters CapacityBuffer based on some criteria.
24+
type Filter interface {
25+
Filter(buffers []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer)
26+
CleanUp()
27+
}
28+
29+
// combinedFilter is a list of Filter
30+
type combinedFilter struct {
31+
filters []Filter
32+
}
33+
34+
// NewCombinedFilter construct combinedFilter.
35+
func NewCombinedFilter(filters []Filter) *combinedFilter {
36+
return &combinedFilter{filters}
37+
}
38+
39+
// AddFilter append a filter to the list.
40+
func (f *combinedFilter) AddFilter(filter Filter) {
41+
f.filters = append(f.filters, filter)
42+
}
43+
44+
// Filter runs sub-filters sequentially
45+
func (f *combinedFilter) Filter(buffers []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer) {
46+
var totalFilteredOutBuffers []*v1.CapacityBuffer
47+
for _, buffersFilter := range f.filters {
48+
updatedBuffersList, filteredOutBuffers := buffersFilter.Filter(buffers)
49+
buffers = updatedBuffersList
50+
totalFilteredOutBuffers = append(totalFilteredOutBuffers, filteredOutBuffers...)
51+
}
52+
return buffers, totalFilteredOutBuffers
53+
}
54+
55+
// CleanUp cleans up the filter's internal structures.
56+
func (f *combinedFilter) CleanUp() {
57+
for _, filter := range f.filters {
58+
filter.CleanUp()
59+
}
60+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
21+
)
22+
23+
// statusFilter filters out buffers with the defined conditions
24+
type statusFilter struct {
25+
conditions map[string]string
26+
}
27+
28+
// NewStatusFilter creates an instance of statusFilter that filters out the buffers with condition in passed conditions.
29+
func NewStatusFilter(conditions map[string]string) *statusFilter {
30+
return &statusFilter{
31+
conditions: conditions,
32+
}
33+
}
34+
35+
// Filter filters the passed buffers based on buffer status conditions
36+
func (f *statusFilter) Filter(buffersToFilter []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer) {
37+
var buffers []*v1.CapacityBuffer
38+
var filteredOutBuffers []*v1.CapacityBuffer
39+
40+
for _, buffer := range buffersToFilter {
41+
if !f.hasCondition(buffer) {
42+
buffers = append(buffers, buffer)
43+
} else {
44+
filteredOutBuffers = append(filteredOutBuffers, buffer)
45+
}
46+
}
47+
return buffers, filteredOutBuffers
48+
}
49+
50+
func (f *statusFilter) hasCondition(buffer *v1.CapacityBuffer) bool {
51+
bufferConditions := buffer.Status.Conditions
52+
for _, condition := range bufferConditions {
53+
if val, found := f.conditions[condition.Type]; found && val == string(condition.Status) {
54+
return true
55+
}
56+
}
57+
return false
58+
}
59+
60+
// CleanUp cleans up the filter's internal structures.
61+
func (f *statusFilter) CleanUp() {
62+
}

0 commit comments

Comments
 (0)