Skip to content

Commit 4d479af

Browse files
committed
adds dynamic informer factory
1 parent ba66014 commit 4d479af

File tree

7 files changed

+492
-0
lines changed

7 files changed

+492
-0
lines changed

staging/src/k8s.io/client-go/dynamic/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ filegroup(
5959
name = "all-srcs",
6060
srcs = [
6161
":package-srcs",
62+
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:all-srcs",
6263
"//staging/src/k8s.io/client-go/dynamic/dynamiclister:all-srcs",
6364
"//staging/src/k8s.io/client-go/dynamic/fake:all-srcs",
6465
],
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = [
6+
"informer.go",
7+
"interface.go",
8+
],
9+
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/dynamic/dynamicinformer",
10+
importpath = "k8s.io/client-go/dynamic/dynamicinformer",
11+
visibility = ["//visibility:public"],
12+
deps = [
13+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
16+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
17+
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
18+
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
19+
"//staging/src/k8s.io/client-go/dynamic/dynamiclister:go_default_library",
20+
"//staging/src/k8s.io/client-go/informers:go_default_library",
21+
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
22+
],
23+
)
24+
25+
go_test(
26+
name = "go_default_test",
27+
srcs = ["informer_test.go"],
28+
embed = [":go_default_library"],
29+
deps = [
30+
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
31+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
32+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
33+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
34+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
35+
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
36+
"//staging/src/k8s.io/client-go/dynamic/fake:go_default_library",
37+
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
38+
],
39+
)
40+
41+
filegroup(
42+
name = "package-srcs",
43+
srcs = glob(["**"]),
44+
tags = ["automanaged"],
45+
visibility = ["//visibility:private"],
46+
)
47+
48+
filegroup(
49+
name = "all-srcs",
50+
srcs = [":package-srcs"],
51+
tags = ["automanaged"],
52+
visibility = ["//visibility:public"],
53+
)
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicinformer
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
25+
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
"k8s.io/apimachinery/pkg/watch"
28+
"k8s.io/client-go/dynamic"
29+
"k8s.io/client-go/dynamic/dynamiclister"
30+
"k8s.io/client-go/informers"
31+
"k8s.io/client-go/tools/cache"
32+
)
33+
34+
// NewDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory for all namespaces.
35+
func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory {
36+
return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
37+
}
38+
39+
// NewFilteredDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory.
40+
// Listers obtained via this factory will be subject to the same filters as specified here.
41+
func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory {
42+
return &dynamicSharedInformerFactory{
43+
client: client,
44+
defaultResync: defaultResync,
45+
namespace: metav1.NamespaceAll,
46+
informers: map[schema.GroupVersionResource]informers.GenericInformer{},
47+
startedInformers: make(map[schema.GroupVersionResource]bool),
48+
}
49+
}
50+
51+
type dynamicSharedInformerFactory struct {
52+
client dynamic.Interface
53+
defaultResync time.Duration
54+
namespace string
55+
56+
lock sync.Mutex
57+
informers map[schema.GroupVersionResource]informers.GenericInformer
58+
// startedInformers is used for tracking which informers have been started.
59+
// This allows Start() to be called multiple times safely.
60+
startedInformers map[schema.GroupVersionResource]bool
61+
}
62+
63+
var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
64+
65+
func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
66+
f.lock.Lock()
67+
defer f.lock.Unlock()
68+
69+
key := gvr
70+
informer, exists := f.informers[key]
71+
if exists {
72+
return informer
73+
}
74+
75+
informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil)
76+
f.informers[key] = informer
77+
78+
return informer
79+
}
80+
81+
// Start initializes all requested informers.
82+
func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
83+
f.lock.Lock()
84+
defer f.lock.Unlock()
85+
86+
for informerType, informer := range f.informers {
87+
if !f.startedInformers[informerType] {
88+
go informer.Informer().Run(stopCh)
89+
f.startedInformers[informerType] = true
90+
}
91+
}
92+
}
93+
94+
// WaitForCacheSync waits for all started informers' cache were synced.
95+
func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
96+
informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
97+
f.lock.Lock()
98+
defer f.lock.Unlock()
99+
100+
informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
101+
for informerType, informer := range f.informers {
102+
if f.startedInformers[informerType] {
103+
informers[informerType] = informer.Informer()
104+
}
105+
}
106+
return informers
107+
}()
108+
109+
res := map[schema.GroupVersionResource]bool{}
110+
for informType, informer := range informers {
111+
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
112+
}
113+
return res
114+
}
115+
116+
// NewFilteredDynamicInformer constructs a new informer for a dynamic type.
117+
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
118+
return &dynamicInformer{
119+
gvr: gvr,
120+
informer: cache.NewSharedIndexInformer(
121+
&cache.ListWatch{
122+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
123+
if tweakListOptions != nil {
124+
tweakListOptions(&options)
125+
}
126+
return client.Resource(gvr).Namespace(namespace).List(options)
127+
},
128+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
129+
if tweakListOptions != nil {
130+
tweakListOptions(&options)
131+
}
132+
return client.Resource(gvr).Namespace(namespace).Watch(options)
133+
},
134+
},
135+
&unstructured.Unstructured{},
136+
resyncPeriod,
137+
indexers,
138+
),
139+
}
140+
}
141+
142+
type dynamicInformer struct {
143+
informer cache.SharedIndexInformer
144+
gvr schema.GroupVersionResource
145+
}
146+
147+
var _ informers.GenericInformer = &dynamicInformer{}
148+
149+
func (d *dynamicInformer) Informer() cache.SharedIndexInformer {
150+
return d.informer
151+
}
152+
153+
func (d *dynamicInformer) Lister() cache.GenericLister {
154+
return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr))
155+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicinformer_test
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
"k8s.io/apimachinery/pkg/api/equality"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/apimachinery/pkg/util/diff"
30+
"k8s.io/client-go/dynamic/dynamicinformer"
31+
"k8s.io/client-go/dynamic/fake"
32+
"k8s.io/client-go/tools/cache"
33+
)
34+
35+
func TestDynamicSharedInformerFactory(t *testing.T) {
36+
scenarios := []struct {
37+
name string
38+
existingObj *unstructured.Unstructured
39+
gvr schema.GroupVersionResource
40+
ns string
41+
trigger func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured
42+
handler func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs
43+
}{
44+
// scenario 1
45+
{
46+
name: "scenario 1: test if adding an object triggers AddFunc",
47+
ns: "ns-foo",
48+
gvr: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"},
49+
trigger: func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, _ *unstructured.Unstructured) *unstructured.Unstructured {
50+
testObject := newUnstructured("extensions/v1beta1", "Deployment", "ns-foo", "name-foo")
51+
createdObj, err := fakeClient.Resource(gvr).Namespace(ns).Create(testObject, metav1.CreateOptions{})
52+
if err != nil {
53+
t.Error(err)
54+
}
55+
return createdObj
56+
},
57+
handler: func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs {
58+
return &cache.ResourceEventHandlerFuncs{
59+
AddFunc: func(obj interface{}) {
60+
rcvCh <- obj.(*unstructured.Unstructured)
61+
},
62+
}
63+
},
64+
},
65+
66+
// scenario 2
67+
{
68+
name: "scenario 2: tests if updating an object triggers UpdateFunc",
69+
ns: "ns-foo",
70+
gvr: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"},
71+
existingObj: newUnstructured("extensions/v1beta1", "Deployment", "ns-foo", "name-foo"),
72+
trigger: func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured {
73+
testObject.Object["spec"] = "updatedName"
74+
updatedObj, err := fakeClient.Resource(gvr).Namespace(ns).Update(testObject, metav1.UpdateOptions{})
75+
if err != nil {
76+
t.Error(err)
77+
}
78+
return updatedObj
79+
},
80+
handler: func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs {
81+
return &cache.ResourceEventHandlerFuncs{
82+
UpdateFunc: func(old, updated interface{}) {
83+
rcvCh <- updated.(*unstructured.Unstructured)
84+
},
85+
}
86+
},
87+
},
88+
89+
// scenario 3
90+
{
91+
name: "scenario 3: test if deleting an object triggers DeleteFunc",
92+
ns: "ns-foo",
93+
gvr: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"},
94+
existingObj: newUnstructured("extensions/v1beta1", "Deployment", "ns-foo", "name-foo"),
95+
trigger: func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured {
96+
err := fakeClient.Resource(gvr).Namespace(ns).Delete(testObject.GetName(), &metav1.DeleteOptions{})
97+
if err != nil {
98+
t.Error(err)
99+
}
100+
return testObject
101+
},
102+
handler: func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs {
103+
return &cache.ResourceEventHandlerFuncs{
104+
DeleteFunc: func(obj interface{}) {
105+
rcvCh <- obj.(*unstructured.Unstructured)
106+
},
107+
}
108+
},
109+
},
110+
}
111+
112+
for _, ts := range scenarios {
113+
t.Run(ts.name, func(t *testing.T) {
114+
// test data
115+
timeout := time.Duration(3 * time.Second)
116+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
117+
defer cancel()
118+
scheme := runtime.NewScheme()
119+
informerReciveObjectCh := make(chan *unstructured.Unstructured, 1)
120+
objs := []runtime.Object{}
121+
if ts.existingObj != nil {
122+
objs = append(objs, ts.existingObj)
123+
}
124+
fakeClient := fake.NewSimpleDynamicClient(scheme, objs...)
125+
target := dynamicinformer.NewDynamicSharedInformerFactory(fakeClient, 0)
126+
127+
// act
128+
informerListerForGvr := target.ForResource(ts.gvr)
129+
informerListerForGvr.Informer().AddEventHandler(ts.handler(informerReciveObjectCh))
130+
target.Start(ctx.Done())
131+
if synced := target.WaitForCacheSync(ctx.Done()); !synced[ts.gvr] {
132+
t.Errorf("informer for %s hasn't synced", ts.gvr)
133+
}
134+
135+
testObject := ts.trigger(ts.gvr, ts.ns, fakeClient, ts.existingObj)
136+
select {
137+
case objFromInformer := <-informerReciveObjectCh:
138+
if !equality.Semantic.DeepEqual(testObject, objFromInformer) {
139+
t.Fatalf("%v", diff.ObjectDiff(testObject, objFromInformer))
140+
}
141+
case <-ctx.Done():
142+
t.Errorf("tested informer haven't received an object, waited %v", timeout)
143+
}
144+
})
145+
}
146+
}
147+
148+
func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured {
149+
return &unstructured.Unstructured{
150+
Object: map[string]interface{}{
151+
"apiVersion": apiVersion,
152+
"kind": kind,
153+
"metadata": map[string]interface{}{
154+
"namespace": namespace,
155+
"name": name,
156+
},
157+
"spec": name,
158+
},
159+
}
160+
}

0 commit comments

Comments
 (0)