Skip to content

Commit fd8a3d6

Browse files
authored
Merge pull request #895 from Huang-Wei/controller-runtime-cache
Compose a cached reader as a cacheOption when initializing a controller-runtime client
2 parents b6b8c13 + 16f553d commit fd8a3d6

File tree

14 files changed

+59
-39
lines changed

14 files changed

+59
-39
lines changed

pkg/capacityscheduling/capacity_scheduling.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import (
4040
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
4141
"k8s.io/kubernetes/pkg/scheduler/metrics"
4242
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
43-
ctrlruntimecache "sigs.k8s.io/controller-runtime/pkg/cache"
4443
"sigs.k8s.io/controller-runtime/pkg/client"
4544

4645
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
@@ -130,18 +129,14 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
130129
}
131130
logger := klog.FromContext(ctx)
132131

133-
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
132+
client, ccache, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
134133
if err != nil {
135134
return nil, err
136135
}
137136

138137
c.client = client
139-
dynamicCache, err := ctrlruntimecache.New(handle.KubeConfig(), ctrlruntimecache.Options{Scheme: scheme})
140-
if err != nil {
141-
return nil, err
142-
}
143138

144-
elasticQuotaInformer, err := dynamicCache.GetInformer(ctx, &v1alpha1.ElasticQuota{})
139+
elasticQuotaInformer, err := ccache.GetInformer(ctx, &v1alpha1.ElasticQuota{})
145140
if err != nil {
146141
return nil, err
147142
}

pkg/coscheduling/coscheduling.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
3030
"k8s.io/klog/v2"
3131
"k8s.io/kubernetes/pkg/scheduler/framework"
32-
"sigs.k8s.io/controller-runtime/pkg/client"
3332

3433
"sigs.k8s.io/scheduler-plugins/apis/config"
3534
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
@@ -75,7 +74,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
7574
_ = clientscheme.AddToScheme(scheme)
7675
_ = v1.AddToScheme(scheme)
7776
_ = v1alpha1.AddToScheme(scheme)
78-
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
77+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
7978
if err != nil {
8079
return nil, err
8180
}
@@ -85,7 +84,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
8584

8685
scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second
8786
pgMgr := core.NewPodGroupManager(
88-
client,
87+
c,
8988
handle.SnapshotSharedLister(),
9089
&scheduleTimeDuration,
9190
// Keep the podInformer (from frameworkHandle) as the single source of Pods.

pkg/networkaware/networkoverhead/networkoverhead.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import (
3030
corelisters "k8s.io/client-go/listers/core/v1"
3131
"k8s.io/klog/v2"
3232
"k8s.io/kubernetes/pkg/scheduler/framework"
33-
3433
"sigs.k8s.io/controller-runtime/pkg/client"
3534

3635
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3736
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
37+
"sigs.k8s.io/scheduler-plugins/pkg/util"
3838

3939
agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
4040
ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
@@ -147,15 +147,13 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
147147
if err != nil {
148148
return nil, err
149149
}
150-
client, err := client.New(handle.KubeConfig(), client.Options{
151-
Scheme: scheme,
152-
})
150+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
153151
if err != nil {
154152
return nil, err
155153
}
156154

157155
no := &NetworkOverhead{
158-
Client: client,
156+
Client: c,
159157
logger: logger,
160158
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
161159
handle: handle,

pkg/networkaware/topologicalsort/topologicalsort.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"k8s.io/klog/v2"
2727
"k8s.io/kubernetes/pkg/scheduler/framework"
2828
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
29-
3029
"sigs.k8s.io/controller-runtime/pkg/client"
3130

3231
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3332
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
33+
"sigs.k8s.io/scheduler-plugins/pkg/util"
3434

3535
agv1alpha "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
3636
)
@@ -82,15 +82,13 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
8282
return nil, err
8383
}
8484

85-
client, err := client.New(handle.KubeConfig(), client.Options{
86-
Scheme: scheme,
87-
})
85+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
8886
if err != nil {
8987
return nil, err
9088
}
9189

9290
pl := &TopologicalSort{
93-
Client: client,
91+
Client: c,
9492
logger: logger,
9593
handle: handle,
9694
namespaces: args.Namespaces,

pkg/sysched/sysched.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
2424
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
25+
"sigs.k8s.io/scheduler-plugins/pkg/util"
2526
)
2627

2728
type SySched struct {
@@ -429,12 +430,12 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
429430

430431
v1beta1.AddToScheme(scheme)
431432

432-
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
433+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
433434
if err != nil {
434435
return nil, err
435436
}
436437

437-
sc.client = client
438+
sc.client = c
438439

439440
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
440441

pkg/util/client_util.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"k8s.io/apimachinery/pkg/runtime"
7+
"k8s.io/client-go/rest"
8+
"sigs.k8s.io/controller-runtime/pkg/cache"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
)
11+
12+
// NewClientWithCachedReader returns a controller runtime Client with cache-baked client.
13+
func NewClientWithCachedReader(ctx context.Context, config *rest.Config, scheme *runtime.Scheme) (client.Client, cache.Cache, error) {
14+
ccache, err := cache.New(config, cache.Options{
15+
Scheme: scheme,
16+
})
17+
if err != nil {
18+
return nil, nil, err
19+
}
20+
go ccache.Start(ctx)
21+
ccache.WaitForCacheSync(ctx)
22+
c, err := client.New(config, client.Options{
23+
Scheme: scheme,
24+
Cache: &client.CacheOptions{
25+
Reader: ccache,
26+
},
27+
})
28+
return c, ccache, err
29+
}

test/integration/capacity_scheduling_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestCapacityScheduling(t *testing.T) {
4444
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
4545

4646
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
47-
extClient := util.NewClientOrDie(globalKubeConfig)
47+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
4848
testCtx.ClientSet = cs
4949
testCtx.KubeConfig = globalKubeConfig
5050

test/integration/coscheduling_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestCoschedulingPlugin(t *testing.T) {
4848
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
4949

5050
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
51-
extClient := util.NewClientOrDie(globalKubeConfig)
51+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
5252
testCtx.ClientSet = cs
5353
testCtx.KubeConfig = globalKubeConfig
5454

@@ -381,7 +381,7 @@ func TestPodCompleted(t *testing.T) {
381381
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
382382

383383
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
384-
extClient := util.NewClientOrDie(globalKubeConfig)
384+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
385385
testCtx.ClientSet = cs
386386
testCtx.KubeConfig = globalKubeConfig
387387

@@ -513,7 +513,7 @@ func TestPodgroupBackoff(t *testing.T) {
513513
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
514514

515515
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
516-
extClient := util.NewClientOrDie(globalKubeConfig)
516+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
517517
testCtx.ClientSet = cs
518518
testCtx.KubeConfig = globalKubeConfig
519519

test/integration/elasticquota_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestElasticController(t *testing.T) {
5050
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
5151

5252
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
53-
extClient := util.NewClientOrDie(globalKubeConfig)
53+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
5454
testCtx.ClientSet = cs
5555
testCtx.KubeConfig = globalKubeConfig
5656

test/integration/networkoverhead_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,10 @@ import (
3737
st "k8s.io/kubernetes/pkg/scheduler/testing"
3838
imageutils "k8s.io/kubernetes/test/utils/image"
3939

40-
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
41-
4240
scheconfig "sigs.k8s.io/scheduler-plugins/apis/config"
4341
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
4442
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
43+
clientutil "sigs.k8s.io/scheduler-plugins/pkg/util"
4544
"sigs.k8s.io/scheduler-plugins/test/util"
4645

4746
appgroupapi "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup"
@@ -59,7 +58,7 @@ func TestNetworkOverheadPlugin(t *testing.T) {
5958
utilruntime.Must(agv1alpha1.AddToScheme(scheme))
6059
utilruntime.Must(ntv1alpha1.AddToScheme(scheme))
6160

62-
client, err := ctrlclient.New(globalKubeConfig, ctrlclient.Options{Scheme: scheme})
61+
client, _, err := clientutil.NewClientWithCachedReader(testCtx.Ctx, globalKubeConfig, scheme)
6362

6463
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
6564
testCtx.ClientSet = cs

0 commit comments

Comments
 (0)