Skip to content

Commit ce51061

Browse files
committed
migrate elalsticquota/capacityscheduling to controller-runtime
1 parent 862667a commit ce51061

File tree

3 files changed

+63
-59
lines changed

3 files changed

+63
-59
lines changed

pkg/capacityscheduling/capacity_scheduling.go

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/apimachinery/pkg/runtime"
3030
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3131
"k8s.io/client-go/informers"
32+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3233
corelisters "k8s.io/client-go/listers/core/v1"
3334
policylisters "k8s.io/client-go/listers/policy/v1"
3435
"k8s.io/client-go/tools/cache"
@@ -39,23 +40,29 @@ import (
3940
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
4041
"k8s.io/kubernetes/pkg/scheduler/metrics"
4142
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
43+
ctrlruntimecache "sigs.k8s.io/controller-runtime/pkg/cache"
44+
"sigs.k8s.io/controller-runtime/pkg/client"
4245

4346
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
4447
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
45-
"sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
46-
schedinformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
47-
externalv1alpha1 "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"
4848
"sigs.k8s.io/scheduler-plugins/pkg/util"
4949
)
5050

51+
var scheme = runtime.NewScheme()
52+
53+
func init() {
54+
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
55+
utilruntime.Must(v1alpha1.AddToScheme(scheme))
56+
}
57+
5158
// CapacityScheduling is a plugin that implements the mechanism of capacity scheduling.
5259
type CapacityScheduling struct {
5360
sync.RWMutex
54-
fh framework.Handle
55-
podLister corelisters.PodLister
56-
pdbLister policylisters.PodDisruptionBudgetLister
57-
elasticQuotaLister externalv1alpha1.ElasticQuotaLister
58-
elasticQuotaInfos ElasticQuotaInfos
61+
fh framework.Handle
62+
podLister corelisters.PodLister
63+
pdbLister policylisters.PodDisruptionBudgetLister
64+
client client.Client
65+
elasticQuotaInfos ElasticQuotaInfos
5966
}
6067

6168
// PreFilterState computed at PreFilter and used at PostFilter or Reserve.
@@ -119,42 +126,43 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
119126
pdbLister: getPDBLister(handle.SharedInformerFactory()),
120127
}
121128

122-
client, err := versioned.NewForConfig(handle.KubeConfig())
129+
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
123130
if err != nil {
124131
return nil, err
125132
}
126133

127-
schedSharedInformerFactory := schedinformer.NewSharedInformerFactory(client, 0)
128-
c.elasticQuotaLister = schedSharedInformerFactory.Scheduling().V1alpha1().ElasticQuotas().Lister()
129-
elasticQuotaInformer := schedSharedInformerFactory.Scheduling().V1alpha1().ElasticQuotas().Informer()
130-
elasticQuotaInformer.AddEventHandler(
131-
cache.FilteringResourceEventHandler{
132-
FilterFunc: func(obj interface{}) bool {
133-
switch t := obj.(type) {
134-
case *v1alpha1.ElasticQuota:
134+
c.client = client
135+
dynamicCache, err := ctrlruntimecache.New(handle.KubeConfig(), ctrlruntimecache.Options{Scheme: scheme})
136+
if err != nil {
137+
return nil, err
138+
}
139+
// TODO: pass in context.
140+
elasticQuotaInformer, err := dynamicCache.GetInformer(context.Background(), &v1alpha1.ElasticQuota{})
141+
if err != nil {
142+
return nil, err
143+
}
144+
elasticQuotaInformer.AddEventHandler(cache.FilteringResourceEventHandler{
145+
FilterFunc: func(obj interface{}) bool {
146+
switch t := obj.(type) {
147+
case *v1alpha1.ElasticQuota:
148+
return true
149+
case cache.DeletedFinalStateUnknown:
150+
if _, ok := t.Obj.(*v1alpha1.ElasticQuota); ok {
135151
return true
136-
case cache.DeletedFinalStateUnknown:
137-
if _, ok := t.Obj.(*v1alpha1.ElasticQuota); ok {
138-
return true
139-
}
140-
utilruntime.HandleError(fmt.Errorf("cannot convert to *v1alpha1.ElasticQuota: %v", obj))
141-
return false
142-
default:
143-
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T", obj))
144-
return false
145152
}
146-
},
147-
Handler: cache.ResourceEventHandlerFuncs{
148-
AddFunc: c.addElasticQuota,
149-
UpdateFunc: c.updateElasticQuota,
150-
DeleteFunc: c.deleteElasticQuota,
151-
},
152-
})
153-
154-
schedSharedInformerFactory.Start(nil)
155-
if !cache.WaitForCacheSync(nil, elasticQuotaInformer.HasSynced) {
156-
return nil, fmt.Errorf("timed out waiting for caches to sync %v", Name)
157-
}
153+
utilruntime.HandleError(fmt.Errorf("cannot convert to *v1alpha1.ElasticQuota: %v", obj))
154+
return false
155+
default:
156+
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T", obj))
157+
return false
158+
}
159+
},
160+
Handler: cache.ResourceEventHandlerFuncs{
161+
AddFunc: c.addElasticQuota,
162+
UpdateFunc: c.updateElasticQuota,
163+
DeleteFunc: c.deleteElasticQuota,
164+
},
165+
})
158166

159167
podInformer := handle.SharedInformerFactory().Core().V1().Pods().Informer()
160168
podInformer.AddEventHandler(
@@ -688,12 +696,13 @@ func (c *CapacityScheduling) addPod(obj interface{}) {
688696
elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace]
689697
// If elasticQuotaInfo is nil, try to list ElasticQuotas through elasticQuotaLister
690698
if elasticQuotaInfo == nil {
691-
eqs, err := c.elasticQuotaLister.ElasticQuotas(pod.Namespace).List(labels.NewSelector())
692-
if err != nil {
699+
var eqList v1alpha1.ElasticQuotaList
700+
if err := c.client.List(context.Background(), &eqList, client.InNamespace(pod.Namespace)); err != nil {
693701
klog.ErrorS(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace)
694702
return
695703
}
696704

705+
eqs := eqList.Items
697706
// If the length of elasticQuotas is 0, return.
698707
if len(eqs) == 0 {
699708
return

test/integration/capacity_scheduling_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,20 @@ import (
2222
"time"
2323

2424
v1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/api/errors"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/api/resource"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/util/wait"
2929
"k8s.io/client-go/kubernetes"
30-
"k8s.io/klog/v2"
3130
"k8s.io/kubernetes/pkg/scheduler"
3231
schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
3332
fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
3433
st "k8s.io/kubernetes/pkg/scheduler/testing"
34+
"sigs.k8s.io/controller-runtime/pkg/client"
3535

3636
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
3737
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
3838
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
39-
"sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
4039
"sigs.k8s.io/scheduler-plugins/test/util"
4140
)
4241

@@ -45,7 +44,7 @@ func TestCapacityScheduling(t *testing.T) {
4544
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
4645

4746
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
48-
extClient := versioned.NewForConfigOrDie(globalKubeConfig)
47+
extClient := util.NewClientOrDie(globalKubeConfig)
4948
testCtx.ClientSet = cs
5049
testCtx.KubeConfig = globalKubeConfig
5150

@@ -557,21 +556,17 @@ func TestCapacityScheduling(t *testing.T) {
557556
}
558557
}
559558

560-
func createElasticQuotas(ctx context.Context, client versioned.Interface, elasticQuotas []*v1alpha1.ElasticQuota) error {
559+
func createElasticQuotas(ctx context.Context, client client.Client, elasticQuotas []*v1alpha1.ElasticQuota) error {
561560
for _, eq := range elasticQuotas {
562-
_, err := client.SchedulingV1alpha1().ElasticQuotas(eq.Namespace).Create(ctx, eq, metav1.CreateOptions{})
563-
if err != nil && !errors.IsAlreadyExists(err) {
561+
if err := client.Create(ctx, eq); err != nil && !apierrors.IsAlreadyExists(err) {
564562
return err
565563
}
566564
}
567565
return nil
568566
}
569567

570-
func cleanupElasticQuotas(ctx context.Context, client versioned.Interface, elasticQuotas []*v1alpha1.ElasticQuota) {
568+
func cleanupElasticQuotas(ctx context.Context, client client.Client, elasticQuotas []*v1alpha1.ElasticQuota) {
571569
for _, eq := range elasticQuotas {
572-
err := client.SchedulingV1alpha1().ElasticQuotas(eq.Namespace).Delete(ctx, eq.Name, metav1.DeleteOptions{})
573-
if err != nil {
574-
klog.ErrorS(err, "Failed to clean up ElasticQuota", "elasticQuota", klog.KObj(eq))
575-
}
570+
_ = client.Delete(ctx, eq)
576571
}
577572
}

test/integration/elasticquota_controller_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/api/resource"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/apimachinery/pkg/util/runtime"
2930
"k8s.io/apimachinery/pkg/util/wait"
3031
quota "k8s.io/apiserver/pkg/quota/v1"
@@ -34,23 +35,22 @@ import (
3435
"k8s.io/kubernetes/pkg/scheduler"
3536
fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
3637
st "k8s.io/kubernetes/pkg/scheduler/testing"
37-
3838
ctrl "sigs.k8s.io/controller-runtime"
3939
"sigs.k8s.io/controller-runtime/pkg/manager"
4040

4141
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
4242
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
4343
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
4444
"sigs.k8s.io/scheduler-plugins/pkg/controllers"
45-
"sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
45+
"sigs.k8s.io/scheduler-plugins/test/util"
4646
)
4747

4848
func TestElasticController(t *testing.T) {
4949
testCtx := &testContext{}
5050
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
5151

5252
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
53-
extClient := versioned.NewForConfigOrDie(globalKubeConfig)
53+
extClient := util.NewClientOrDie(globalKubeConfig)
5454
testCtx.ClientSet = cs
5555
testCtx.KubeConfig = globalKubeConfig
5656

@@ -311,8 +311,8 @@ func TestElasticController(t *testing.T) {
311311

312312
if err := wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) {
313313
for _, v := range tt.used {
314-
eq, err := extClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(testCtx.Ctx, v.Name, metav1.GetOptions{})
315-
if err != nil {
314+
var eq schedv1alpha1.ElasticQuota
315+
if err := extClient.Get(testCtx.Ctx, types.NamespacedName{Namespace: v.Namespace, Name: v.Name}, &eq); err != nil {
316316
// This could be a connection error so we want to retry.
317317
klog.ErrorS(err, "Failed to obtain the elasticQuota clientSet")
318318
return false, err
@@ -345,8 +345,8 @@ func TestElasticController(t *testing.T) {
345345

346346
if err := wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) {
347347
for _, v := range tt.want {
348-
eq, err := extClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(testCtx.Ctx, v.Name, metav1.GetOptions{})
349-
if err != nil {
348+
var eq schedv1alpha1.ElasticQuota
349+
if err := extClient.Get(testCtx.Ctx, types.NamespacedName{Namespace: v.Namespace, Name: v.Name}, &eq); err != nil {
350350
// This could be a connection error so we want to retry.
351351
klog.ErrorS(err, "Failed to obtain the elasticQuota clientSet")
352352
return false, err

0 commit comments

Comments
 (0)