Skip to content

Commit f58c2ae

Browse files
resourcequota: use dynamic informer
The resource quota controller should use a dynamic informer so it can create informer for custom resources.
1 parent a8cbb22 commit f58c2ae

File tree

10 files changed

+202
-27
lines changed

10 files changed

+202
-27
lines changed

cmd/kube-controller-manager/app/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ go_library(
122122
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
123123
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
124124
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
125+
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
125126
"//staging/src/k8s.io/client-go/informers:go_default_library",
126127
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
127128
"//staging/src/k8s.io/client-go/rest:go_default_library",

cmd/kube-controller-manager/app/controllermanager.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ import (
4343
utilfeature "k8s.io/apiserver/pkg/util/feature"
4444
"k8s.io/apiserver/pkg/util/term"
4545
cacheddiscovery "k8s.io/client-go/discovery/cached"
46+
"k8s.io/client-go/dynamic"
47+
"k8s.io/client-go/dynamic/dynamicinformer"
4648
"k8s.io/client-go/informers"
4749
clientset "k8s.io/client-go/kubernetes"
4850
restclient "k8s.io/client-go/rest"
@@ -234,6 +236,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
234236
}
235237

236238
controllerContext.InformerFactory.Start(controllerContext.Stop)
239+
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)
237240
close(controllerContext.InformersStarted)
238241

239242
select {}
@@ -288,6 +291,10 @@ type ControllerContext struct {
288291
// InformerFactory gives access to informers for the controller.
289292
InformerFactory informers.SharedInformerFactory
290293

294+
// GenericInformerFactory gives access to informers for typed resources
295+
// and dynamic resources.
296+
GenericInformerFactory controller.InformerFactory
297+
291298
// ComponentConfig provides access to init options for a given controller
292299
ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration
293300

@@ -433,6 +440,9 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
433440
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
434441
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
435442

443+
dynamicClient := dynamic.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("dynamic-informers"))
444+
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, ResyncPeriod(s)())
445+
436446
// If apiserver is not running we should wait for some time and fail only then. This is particularly
437447
// important when we start apiserver and controller manager at the same time.
438448
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
@@ -459,16 +469,17 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
459469
}
460470

461471
ctx := ControllerContext{
462-
ClientBuilder: clientBuilder,
463-
InformerFactory: sharedInformers,
464-
ComponentConfig: s.ComponentConfig,
465-
RESTMapper: restMapper,
466-
AvailableResources: availableResources,
467-
Cloud: cloud,
468-
LoopMode: loopMode,
469-
Stop: stop,
470-
InformersStarted: make(chan struct{}),
471-
ResyncPeriod: ResyncPeriod(s),
472+
ClientBuilder: clientBuilder,
473+
InformerFactory: sharedInformers,
474+
GenericInformerFactory: controller.NewInformerFactory(sharedInformers, dynamicInformers),
475+
ComponentConfig: s.ComponentConfig,
476+
RESTMapper: restMapper,
477+
AvailableResources: availableResources,
478+
Cloud: cloud,
479+
LoopMode: loopMode,
480+
Stop: stop,
481+
InformersStarted: make(chan struct{}),
482+
ResyncPeriod: ResyncPeriod(s),
472483
}
473484
return ctx, nil
474485
}

cmd/kube-controller-manager/app/core.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ package app
2323
import (
2424
"fmt"
2525
"net"
26+
"net/http"
2627
"strings"
2728
"time"
2829

2930
"k8s.io/klog"
3031

31-
"net/http"
32-
3332
"k8s.io/api/core/v1"
3433
"k8s.io/apimachinery/pkg/runtime/schema"
3534
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -295,7 +294,7 @@ func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, er
295294
QuotaClient: resourceQuotaControllerClient.CoreV1(),
296295
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
297296
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
298-
InformerFactory: ctx.InformerFactory,
297+
InformerFactory: ctx.GenericInformerFactory,
299298
ReplenishmentResyncPeriod: ctx.ResyncPeriod,
300299
DiscoveryFunc: discoveryFunc,
301300
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,

cmd/kube-controller-manager/app/core_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,12 @@ func TestController_DiscoveryError(t *testing.T) {
121121
testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources}
122122
testClientset := NewFakeClientset(testDiscovery)
123123
testClientBuilder := TestClientBuilder{clientset: testClientset}
124+
testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1))
124125
ctx := ControllerContext{
125-
ClientBuilder: testClientBuilder,
126-
InformerFactory: informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)),
127-
InformersStarted: make(chan struct{}),
126+
ClientBuilder: testClientBuilder,
127+
InformerFactory: testInformerFactory,
128+
GenericInformerFactory: testInformerFactory,
129+
InformersStarted: make(chan struct{}),
128130
}
129131
for funcName, controllerInit := range controllerInitFuncMap {
130132
_, _, err := controllerInit(ctx)

pkg/controller/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ go_library(
4747
"controller_ref_manager.go",
4848
"controller_utils.go",
4949
"doc.go",
50+
"informer_factory.go",
5051
"lookup_cache.go",
5152
],
5253
importpath = "k8s.io/kubernetes/pkg/controller",
@@ -79,6 +80,8 @@ go_library(
7980
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
8081
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
8182
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
83+
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
84+
"//staging/src/k8s.io/client-go/informers:go_default_library",
8285
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
8386
"//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1:go_default_library",
8487
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",

pkg/controller/informer_factory.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"k8s.io/apimachinery/pkg/runtime/schema"
21+
"k8s.io/client-go/dynamic/dynamicinformer"
22+
"k8s.io/client-go/informers"
23+
)
24+
25+
// InformerFactory creates informers for each group version resource.
26+
type InformerFactory interface {
27+
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
28+
Start(stopCh <-chan struct{})
29+
}
30+
31+
type informerFactory struct {
32+
typedInformerFactory informers.SharedInformerFactory
33+
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
34+
}
35+
36+
func (i *informerFactory) ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) {
37+
informer, err := i.typedInformerFactory.ForResource(resource)
38+
if err != nil {
39+
return i.dynamicInformerFactory.ForResource(resource), nil
40+
}
41+
return informer, nil
42+
}
43+
44+
func (i *informerFactory) Start(stopCh <-chan struct{}) {
45+
i.typedInformerFactory.Start(stopCh)
46+
i.dynamicInformerFactory.Start(stopCh)
47+
}
48+
49+
// NewInformerFactory creates a new InformerFactory which works with both typed
50+
// resources and dynamic resources
51+
func NewInformerFactory(typedInformerFactory informers.SharedInformerFactory, dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory) InformerFactory {
52+
return &informerFactory{
53+
typedInformerFactory: typedInformerFactory,
54+
dynamicInformerFactory: dynamicInformerFactory,
55+
}
56+
}

pkg/controller/resourcequota/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ go_library(
3232
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
3333
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
3434
"//staging/src/k8s.io/client-go/discovery:go_default_library",
35-
"//staging/src/k8s.io/client-go/informers:go_default_library",
3635
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
3736
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
3837
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",

pkg/controller/resourcequota/resource_quota_controller.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"k8s.io/apimachinery/pkg/util/sets"
3636
"k8s.io/apimachinery/pkg/util/wait"
3737
"k8s.io/client-go/discovery"
38-
"k8s.io/client-go/informers"
3938
coreinformers "k8s.io/client-go/informers/core/v1"
4039
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
4140
corelisters "k8s.io/client-go/listers/core/v1"
@@ -52,12 +51,6 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
5251
// that may require quota to be recalculated.
5352
type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
5453

55-
// InformerFactory is all the quota system needs to interface with informers.
56-
type InformerFactory interface {
57-
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
58-
Start(stopCh <-chan struct{})
59-
}
60-
6154
// ResourceQuotaControllerOptions holds options for creating a quota controller
6255
type ResourceQuotaControllerOptions struct {
6356
// Must have authority to list all quotas, and update quota status
@@ -75,7 +68,7 @@ type ResourceQuotaControllerOptions struct {
7568
// InformersStarted knows if informers were started.
7669
InformersStarted <-chan struct{}
7770
// InformerFactory interfaces with informers.
78-
InformerFactory InformerFactory
71+
InformerFactory controller.InformerFactory
7972
// Controls full resync of objects monitored for replenishment.
8073
ReplenishmentResyncPeriod controller.ResyncPeriodFunc
8174
}

pkg/controller/resourcequota/resource_quota_monitor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type QuotaMonitor struct {
8686
resourceChanges workqueue.RateLimitingInterface
8787

8888
// interfaces with informers
89-
informerFactory InformerFactory
89+
informerFactory controller.InformerFactory
9090

9191
// list of resources to ignore
9292
ignoredResources map[schema.GroupResource]struct{}
@@ -101,7 +101,7 @@ type QuotaMonitor struct {
101101
registry quota.Registry
102102
}
103103

104-
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
104+
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory controller.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
105105
return &QuotaMonitor{
106106
informersStarted: informersStarted,
107107
informerFactory: informerFactory,

test/e2e/apimachinery/resource_quota.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ import (
2727
"k8s.io/apimachinery/pkg/api/errors"
2828
"k8s.io/apimachinery/pkg/api/resource"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3031
"k8s.io/apimachinery/pkg/util/intstr"
3132
"k8s.io/apimachinery/pkg/util/wait"
3233
clientset "k8s.io/client-go/kubernetes"
3334
"k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
3435
"k8s.io/kubernetes/test/e2e/framework"
36+
"k8s.io/kubernetes/test/utils/crd"
3537
imageutils "k8s.io/kubernetes/test/utils/image"
3638

3739
. "github.com/onsi/ginkgo"
@@ -487,6 +489,89 @@ var _ = SIGDescribe("ResourceQuota", func() {
487489
Expect(err).NotTo(HaveOccurred())
488490
})
489491

492+
It("should create a ResourceQuota and capture the life of a custom resource.", func() {
493+
By("Creating a Custom Resource Definition")
494+
testcrd, err := crd.CreateTestCRD(f)
495+
Expect(err).NotTo(HaveOccurred())
496+
defer testcrd.CleanUp()
497+
countResourceName := "count/" + testcrd.Crd.Spec.Names.Plural + "." + testcrd.Crd.Spec.Group
498+
// resourcequota controller needs to take 30 seconds at most to detect the new custom resource.
499+
// in order to make sure the resourcequota controller knows this resource, we create one test
500+
// resourcequota object, and triggering updates on it until the status is updated.
501+
quotaName := "quota-for-" + testcrd.Crd.Spec.Names.Plural
502+
resourceQuota, err := createResourceQuota(f.ClientSet, f.Namespace.Name, &v1.ResourceQuota{
503+
ObjectMeta: metav1.ObjectMeta{Name: quotaName},
504+
Spec: v1.ResourceQuotaSpec{
505+
Hard: v1.ResourceList{
506+
v1.ResourceName(countResourceName): resource.MustParse("0"),
507+
},
508+
},
509+
})
510+
err = updateResourceQuotaUntilUsageAppears(f.ClientSet, f.Namespace.Name, quotaName, v1.ResourceName(countResourceName))
511+
Expect(err).NotTo(HaveOccurred())
512+
err = f.ClientSet.CoreV1().ResourceQuotas(f.Namespace.Name).Delete(quotaName, nil)
513+
Expect(err).NotTo(HaveOccurred())
514+
515+
By("Counting existing ResourceQuota")
516+
c, err := countResourceQuota(f.ClientSet, f.Namespace.Name)
517+
Expect(err).NotTo(HaveOccurred())
518+
519+
By("Creating a ResourceQuota")
520+
quotaName = "test-quota"
521+
resourceQuota = newTestResourceQuota(quotaName)
522+
resourceQuota.Spec.Hard[v1.ResourceName(countResourceName)] = resource.MustParse("1")
523+
resourceQuota, err = createResourceQuota(f.ClientSet, f.Namespace.Name, resourceQuota)
524+
Expect(err).NotTo(HaveOccurred())
525+
526+
By("Ensuring resource quota status is calculated")
527+
usedResources := v1.ResourceList{}
528+
usedResources[v1.ResourceQuotas] = resource.MustParse(strconv.Itoa(c + 1))
529+
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0")
530+
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
531+
Expect(err).NotTo(HaveOccurred())
532+
533+
By("Creating a custom resource")
534+
resourceClient := testcrd.GetV1DynamicClient()
535+
testcr, err := instantiateCustomResource(&unstructured.Unstructured{
536+
Object: map[string]interface{}{
537+
"apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0],
538+
"kind": testcrd.Crd.Spec.Names.Kind,
539+
"metadata": map[string]interface{}{
540+
"name": "test-cr-1",
541+
},
542+
},
543+
}, resourceClient, testcrd.Crd)
544+
Expect(err).NotTo(HaveOccurred())
545+
546+
By("Ensuring resource quota status captures custom resource creation")
547+
usedResources = v1.ResourceList{}
548+
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("1")
549+
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
550+
Expect(err).NotTo(HaveOccurred())
551+
552+
By("Creating a second custom resource")
553+
_, err = instantiateCustomResource(&unstructured.Unstructured{
554+
Object: map[string]interface{}{
555+
"apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0],
556+
"kind": testcrd.Crd.Spec.Names.Kind,
557+
"metadata": map[string]interface{}{
558+
"name": "test-cr-2",
559+
},
560+
},
561+
}, resourceClient, testcrd.Crd)
562+
// since we only give one quota, this creation should fail.
563+
Expect(err).To(HaveOccurred())
564+
565+
By("Deleting a custom resource")
566+
err = deleteCustomResource(resourceClient, testcr.GetName())
567+
Expect(err).NotTo(HaveOccurred())
568+
569+
By("Ensuring resource quota status released usage")
570+
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0")
571+
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
572+
Expect(err).NotTo(HaveOccurred())
573+
})
574+
490575
It("should verify ResourceQuota with terminating scopes.", func() {
491576
By("Creating a ResourceQuota with terminating scope")
492577
quotaTerminatingName := "quota-terminating"
@@ -1524,3 +1609,29 @@ func waitForResourceQuota(c clientset.Interface, ns, quotaName string, used v1.R
15241609
return true, nil
15251610
})
15261611
}
1612+
1613+
// updateResourceQuotaUntilUsageAppears updates the resource quota object until the usage is populated
1614+
// for the specific resource name.
1615+
func updateResourceQuotaUntilUsageAppears(c clientset.Interface, ns, quotaName string, resourceName v1.ResourceName) error {
1616+
return wait.Poll(framework.Poll, 1*time.Minute, func() (bool, error) {
1617+
resourceQuota, err := c.CoreV1().ResourceQuotas(ns).Get(quotaName, metav1.GetOptions{})
1618+
if err != nil {
1619+
return false, err
1620+
}
1621+
// verify that the quota shows the expected used resource values
1622+
_, ok := resourceQuota.Status.Used[resourceName]
1623+
if ok {
1624+
return true, nil
1625+
}
1626+
1627+
current := resourceQuota.Spec.Hard[resourceName]
1628+
current.Add(resource.MustParse("1"))
1629+
resourceQuota.Spec.Hard[resourceName] = current
1630+
_, err = c.CoreV1().ResourceQuotas(ns).Update(resourceQuota)
1631+
// ignoring conflicts since someone else may already updated it.
1632+
if errors.IsConflict(err) {
1633+
return false, nil
1634+
}
1635+
return false, err
1636+
})
1637+
}

0 commit comments

Comments
 (0)