Skip to content

Commit fc5b4b2

Browse files
authored
Merge pull request #665 from Huang-Wei/elasticquota-migrate-controller-runtime
migrate elasticquota/capacityscheduling to controller-runtime
2 parents 862667a + 28e0736 commit fc5b4b2

File tree

6 files changed

+102
-89
lines changed

6 files changed

+102
-89
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ update-vendor:
112112
hack/update-vendor.sh
113113

114114
.PHONY: unit-test
115-
unit-test:
115+
unit-test: install-envtest
116116
hack/unit-test.sh
117117

118118
.PHONY: install-envtest

cmd/scheduler/main_test.go

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ import (
2020
"context"
2121
"fmt"
2222
"net"
23-
"net/http"
24-
"net/http/httptest"
2523
"os"
2624
"path/filepath"
2725
"testing"
2826

2927
"github.com/google/go-cmp/cmp"
3028
"github.com/spf13/pflag"
3129

30+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
3231
"k8s.io/kubernetes/cmd/kube-scheduler/app"
3332
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
3433
"k8s.io/kubernetes/pkg/scheduler/apis/config"
3534
"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
35+
"sigs.k8s.io/controller-runtime/pkg/envtest"
3636

3737
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
3838
"sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
@@ -48,42 +48,47 @@ import (
4848
)
4949

5050
func TestSetup(t *testing.T) {
51+
testEnv := &envtest.Environment{
52+
CRDDirectoryPaths: []string{
53+
filepath.Join("..", "..", "manifests", "crds"),
54+
},
55+
}
56+
57+
// start envtest cluster
58+
cfg, err := testEnv.Start()
59+
defer testEnv.Stop()
60+
if err != nil {
61+
panic(err)
62+
}
63+
5164
// temp dir
5265
tmpDir, err := os.MkdirTemp("", "scheduler-options")
5366
if err != nil {
5467
t.Fatal(err)
5568
}
5669
defer os.RemoveAll(tmpDir)
5770

58-
// https server
59-
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
60-
w.Header().Set("Content-Type", "application/json")
61-
w.WriteHeader(http.StatusOK)
62-
w.Write([]byte(`{"metadata": {"name": "test"}}`))
63-
}))
64-
defer server.Close()
71+
clusters := make(map[string]*clientcmdapi.Cluster)
72+
clusters["default-cluster"] = &clientcmdapi.Cluster{
73+
Server: cfg.Host,
74+
CertificateAuthorityData: cfg.CAData,
75+
}
76+
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.16.3/examples/scratch-env/main.go
77+
user, err := testEnv.ControlPlane.AddUser(envtest.User{
78+
Name: "envtest-admin",
79+
Groups: []string{"system:masters"},
80+
}, nil)
81+
if err != nil {
82+
t.Fatal(err)
83+
}
84+
kubeConfig, err := user.KubeConfig()
85+
if err != nil {
86+
t.Fatalf("unable to create kubeconfig: %v", err)
87+
}
6588

6689
configKubeconfig := filepath.Join(tmpDir, "config.kubeconfig")
67-
if err := os.WriteFile(configKubeconfig, []byte(fmt.Sprintf(`
68-
apiVersion: v1
69-
kind: Config
70-
clusters:
71-
- cluster:
72-
insecure-skip-tls-verify: true
73-
server: %s
74-
name: default
75-
contexts:
76-
- context:
77-
cluster: default
78-
user: default
79-
name: default
80-
current-context: default
81-
users:
82-
- name: default
83-
user:
84-
username: config
85-
`, server.URL)), os.FileMode(0600)); err != nil {
86-
t.Fatal(err)
90+
if err := os.WriteFile(configKubeconfig, kubeConfig, os.FileMode(0600)); err != nil {
91+
t.Fatalf("unable to create kubeconfig file: %v", err)
8792
}
8893

8994
// PodState plugin config

hack/unit-test.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ set -o pipefail
2121
SCRIPT_ROOT=$(dirname "${BASH_SOURCE}")/..
2222
source "${SCRIPT_ROOT}/hack/lib/init.sh"
2323

24+
kube::log::status "Configuring envtest"
25+
TEMP_DIR=${TMPDIR-/tmp}
26+
source "${TEMP_DIR}/setup-envtest"
27+
2428
# TODO: make args customizable.
2529
go test -mod=vendor \
2630
sigs.k8s.io/scheduler-plugins/cmd/... \

pkg/capacityscheduling/capacity_scheduling.go

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/apimachinery/pkg/runtime"
3030
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3131
"k8s.io/client-go/informers"
32+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3233
corelisters "k8s.io/client-go/listers/core/v1"
3334
policylisters "k8s.io/client-go/listers/policy/v1"
3435
"k8s.io/client-go/tools/cache"
@@ -39,23 +40,29 @@ import (
3940
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
4041
"k8s.io/kubernetes/pkg/scheduler/metrics"
4142
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
43+
ctrlruntimecache "sigs.k8s.io/controller-runtime/pkg/cache"
44+
"sigs.k8s.io/controller-runtime/pkg/client"
4245

4346
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
4447
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
45-
"sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
46-
schedinformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
47-
externalv1alpha1 "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"
4848
"sigs.k8s.io/scheduler-plugins/pkg/util"
4949
)
5050

51+
var scheme = runtime.NewScheme()
52+
53+
func init() {
54+
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
55+
utilruntime.Must(v1alpha1.AddToScheme(scheme))
56+
}
57+
5158
// CapacityScheduling is a plugin that implements the mechanism of capacity scheduling.
5259
type CapacityScheduling struct {
5360
sync.RWMutex
54-
fh framework.Handle
55-
podLister corelisters.PodLister
56-
pdbLister policylisters.PodDisruptionBudgetLister
57-
elasticQuotaLister externalv1alpha1.ElasticQuotaLister
58-
elasticQuotaInfos ElasticQuotaInfos
61+
fh framework.Handle
62+
podLister corelisters.PodLister
63+
pdbLister policylisters.PodDisruptionBudgetLister
64+
client client.Client
65+
elasticQuotaInfos ElasticQuotaInfos
5966
}
6067

6168
// PreFilterState computed at PreFilter and used at PostFilter or Reserve.
@@ -119,42 +126,43 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
119126
pdbLister: getPDBLister(handle.SharedInformerFactory()),
120127
}
121128

122-
client, err := versioned.NewForConfig(handle.KubeConfig())
129+
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
123130
if err != nil {
124131
return nil, err
125132
}
126133

127-
schedSharedInformerFactory := schedinformer.NewSharedInformerFactory(client, 0)
128-
c.elasticQuotaLister = schedSharedInformerFactory.Scheduling().V1alpha1().ElasticQuotas().Lister()
129-
elasticQuotaInformer := schedSharedInformerFactory.Scheduling().V1alpha1().ElasticQuotas().Informer()
130-
elasticQuotaInformer.AddEventHandler(
131-
cache.FilteringResourceEventHandler{
132-
FilterFunc: func(obj interface{}) bool {
133-
switch t := obj.(type) {
134-
case *v1alpha1.ElasticQuota:
134+
c.client = client
135+
dynamicCache, err := ctrlruntimecache.New(handle.KubeConfig(), ctrlruntimecache.Options{Scheme: scheme})
136+
if err != nil {
137+
return nil, err
138+
}
139+
// TODO: pass in context.
140+
elasticQuotaInformer, err := dynamicCache.GetInformer(context.Background(), &v1alpha1.ElasticQuota{})
141+
if err != nil {
142+
return nil, err
143+
}
144+
elasticQuotaInformer.AddEventHandler(cache.FilteringResourceEventHandler{
145+
FilterFunc: func(obj interface{}) bool {
146+
switch t := obj.(type) {
147+
case *v1alpha1.ElasticQuota:
148+
return true
149+
case cache.DeletedFinalStateUnknown:
150+
if _, ok := t.Obj.(*v1alpha1.ElasticQuota); ok {
135151
return true
136-
case cache.DeletedFinalStateUnknown:
137-
if _, ok := t.Obj.(*v1alpha1.ElasticQuota); ok {
138-
return true
139-
}
140-
utilruntime.HandleError(fmt.Errorf("cannot convert to *v1alpha1.ElasticQuota: %v", obj))
141-
return false
142-
default:
143-
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T", obj))
144-
return false
145152
}
146-
},
147-
Handler: cache.ResourceEventHandlerFuncs{
148-
AddFunc: c.addElasticQuota,
149-
UpdateFunc: c.updateElasticQuota,
150-
DeleteFunc: c.deleteElasticQuota,
151-
},
152-
})
153-
154-
schedSharedInformerFactory.Start(nil)
155-
if !cache.WaitForCacheSync(nil, elasticQuotaInformer.HasSynced) {
156-
return nil, fmt.Errorf("timed out waiting for caches to sync %v", Name)
157-
}
153+
utilruntime.HandleError(fmt.Errorf("cannot convert to *v1alpha1.ElasticQuota: %v", obj))
154+
return false
155+
default:
156+
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T", obj))
157+
return false
158+
}
159+
},
160+
Handler: cache.ResourceEventHandlerFuncs{
161+
AddFunc: c.addElasticQuota,
162+
UpdateFunc: c.updateElasticQuota,
163+
DeleteFunc: c.deleteElasticQuota,
164+
},
165+
})
158166

159167
podInformer := handle.SharedInformerFactory().Core().V1().Pods().Informer()
160168
podInformer.AddEventHandler(
@@ -688,12 +696,13 @@ func (c *CapacityScheduling) addPod(obj interface{}) {
688696
elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace]
689697
// If elasticQuotaInfo is nil, try to list ElasticQuotas through elasticQuotaLister
690698
if elasticQuotaInfo == nil {
691-
eqs, err := c.elasticQuotaLister.ElasticQuotas(pod.Namespace).List(labels.NewSelector())
692-
if err != nil {
699+
var eqList v1alpha1.ElasticQuotaList
700+
if err := c.client.List(context.Background(), &eqList, client.InNamespace(pod.Namespace)); err != nil {
693701
klog.ErrorS(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace)
694702
return
695703
}
696704

705+
eqs := eqList.Items
697706
// If the length of elasticQuotas is 0, return.
698707
if len(eqs) == 0 {
699708
return

test/integration/capacity_scheduling_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,20 @@ import (
2222
"time"
2323

2424
v1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/api/errors"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/api/resource"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/util/wait"
2929
"k8s.io/client-go/kubernetes"
30-
"k8s.io/klog/v2"
3130
"k8s.io/kubernetes/pkg/scheduler"
3231
schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
3332
fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
3433
st "k8s.io/kubernetes/pkg/scheduler/testing"
34+
"sigs.k8s.io/controller-runtime/pkg/client"
3535

3636
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
3737
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
3838
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
39-
"sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
4039
"sigs.k8s.io/scheduler-plugins/test/util"
4140
)
4241

@@ -45,7 +44,7 @@ func TestCapacityScheduling(t *testing.T) {
4544
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
4645

4746
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
48-
extClient := versioned.NewForConfigOrDie(globalKubeConfig)
47+
extClient := util.NewClientOrDie(globalKubeConfig)
4948
testCtx.ClientSet = cs
5049
testCtx.KubeConfig = globalKubeConfig
5150

@@ -557,21 +556,17 @@ func TestCapacityScheduling(t *testing.T) {
557556
}
558557
}
559558

560-
func createElasticQuotas(ctx context.Context, client versioned.Interface, elasticQuotas []*v1alpha1.ElasticQuota) error {
559+
func createElasticQuotas(ctx context.Context, client client.Client, elasticQuotas []*v1alpha1.ElasticQuota) error {
561560
for _, eq := range elasticQuotas {
562-
_, err := client.SchedulingV1alpha1().ElasticQuotas(eq.Namespace).Create(ctx, eq, metav1.CreateOptions{})
563-
if err != nil && !errors.IsAlreadyExists(err) {
561+
if err := client.Create(ctx, eq); err != nil && !apierrors.IsAlreadyExists(err) {
564562
return err
565563
}
566564
}
567565
return nil
568566
}
569567

570-
func cleanupElasticQuotas(ctx context.Context, client versioned.Interface, elasticQuotas []*v1alpha1.ElasticQuota) {
568+
func cleanupElasticQuotas(ctx context.Context, client client.Client, elasticQuotas []*v1alpha1.ElasticQuota) {
571569
for _, eq := range elasticQuotas {
572-
err := client.SchedulingV1alpha1().ElasticQuotas(eq.Namespace).Delete(ctx, eq.Name, metav1.DeleteOptions{})
573-
if err != nil {
574-
klog.ErrorS(err, "Failed to clean up ElasticQuota", "elasticQuota", klog.KObj(eq))
575-
}
570+
_ = client.Delete(ctx, eq)
576571
}
577572
}

test/integration/elasticquota_controller_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/api/resource"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/apimachinery/pkg/util/runtime"
2930
"k8s.io/apimachinery/pkg/util/wait"
3031
quota "k8s.io/apiserver/pkg/quota/v1"
@@ -34,23 +35,22 @@ import (
3435
"k8s.io/kubernetes/pkg/scheduler"
3536
fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
3637
st "k8s.io/kubernetes/pkg/scheduler/testing"
37-
3838
ctrl "sigs.k8s.io/controller-runtime"
3939
"sigs.k8s.io/controller-runtime/pkg/manager"
4040

4141
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
4242
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
4343
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
4444
"sigs.k8s.io/scheduler-plugins/pkg/controllers"
45-
"sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
45+
"sigs.k8s.io/scheduler-plugins/test/util"
4646
)
4747

4848
func TestElasticController(t *testing.T) {
4949
testCtx := &testContext{}
5050
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
5151

5252
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
53-
extClient := versioned.NewForConfigOrDie(globalKubeConfig)
53+
extClient := util.NewClientOrDie(globalKubeConfig)
5454
testCtx.ClientSet = cs
5555
testCtx.KubeConfig = globalKubeConfig
5656

@@ -311,8 +311,8 @@ func TestElasticController(t *testing.T) {
311311

312312
if err := wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) {
313313
for _, v := range tt.used {
314-
eq, err := extClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(testCtx.Ctx, v.Name, metav1.GetOptions{})
315-
if err != nil {
314+
var eq schedv1alpha1.ElasticQuota
315+
if err := extClient.Get(testCtx.Ctx, types.NamespacedName{Namespace: v.Namespace, Name: v.Name}, &eq); err != nil {
316316
// This could be a connection error so we want to retry.
317317
klog.ErrorS(err, "Failed to obtain the elasticQuota clientSet")
318318
return false, err
@@ -345,8 +345,8 @@ func TestElasticController(t *testing.T) {
345345

346346
if err := wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) {
347347
for _, v := range tt.want {
348-
eq, err := extClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(testCtx.Ctx, v.Name, metav1.GetOptions{})
349-
if err != nil {
348+
var eq schedv1alpha1.ElasticQuota
349+
if err := extClient.Get(testCtx.Ctx, types.NamespacedName{Namespace: v.Namespace, Name: v.Name}, &eq); err != nil {
350350
// This could be a connection error so we want to retry.
351351
klog.ErrorS(err, "Failed to obtain the elasticQuota clientSet")
352352
return false, err

0 commit comments

Comments
 (0)