Skip to content

Commit 4521ed4

Browse files
Wei WengWei Weng
authored andcommitted
refactor and test improvement
Signed-off-by: Wei Weng <[email protected]>
1 parent 87ee659 commit 4521ed4

File tree

6 files changed

+280
-57
lines changed

6 files changed

+280
-57
lines changed

pkg/resourcewatcher/change_dector.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ var (
4444
// ChangeDetector is a resource watcher which watches all types of resources in the cluster and reconcile the events.
4545
type ChangeDetector struct {
4646
// DiscoveryClient is used to do resource discovery.
47-
DiscoveryClient *discovery.DiscoveryClient
47+
DiscoveryClient discovery.DiscoveryInterface
4848

4949
// RESTMapper is used to convert between GVK and GVR
5050
RESTMapper meta.RESTMapper
@@ -138,26 +138,18 @@ func (d *ChangeDetector) discoverAPIResourcesLoop(ctx context.Context, period ti
138138

139139
// discoverResources goes through all the api resources in the cluster and adds event handlers to informers
140140
func (d *ChangeDetector) discoverResources(dynamicResourceEventHandler cache.ResourceEventHandler) {
141-
newResources, err := getWatchableResources(d.DiscoveryClient)
142-
var dynamicResources []informer.APIResourceMeta
143-
if err != nil {
144-
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
145-
}
146-
for _, res := range newResources {
147-
// all the static resources are disabled by default
148-
if shouldWatchResource(res.GroupVersionResource, d.RESTMapper, d.ResourceConfig) {
149-
dynamicResources = append(dynamicResources, res)
150-
}
151-
}
141+
resourcesToWatch := discoverWatchableResources(d.DiscoveryClient, d.RESTMapper, d.ResourceConfig)
152142

153143
// On the leader, add event handlers to informers that were already created by InformerPopulator
154144
// The informers exist on all pods, but only the leader adds handlers and processes events
155-
for _, res := range dynamicResources {
145+
for _, res := range resourcesToWatch {
156146
d.InformerManager.AddEventHandlerToInformer(res.GroupVersionResource, dynamicResourceEventHandler)
157147
}
158148

159149
// this will start the newly added informers if there is any
160150
d.InformerManager.Start()
151+
152+
klog.V(2).InfoS("Change detector: discovered resources", "count", len(resourcesToWatch))
161153
}
162154

163155
// dynamicResourceFilter filters out resources that we don't want to watch
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcewatcher
18+
19+
import (
20+
"testing"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/apimachinery/pkg/runtime/schema"
24+
fakediscovery "k8s.io/client-go/discovery/fake"
25+
"k8s.io/client-go/kubernetes/fake"
26+
"k8s.io/client-go/restmapper"
27+
"k8s.io/client-go/tools/cache"
28+
29+
"github.com/kubefleet-dev/kubefleet/pkg/utils"
30+
testinformer "github.com/kubefleet-dev/kubefleet/test/utils/informer"
31+
)
32+
33+
func TestChangeDetector_discoverResources(t *testing.T) {
34+
tests := []struct {
35+
name string
36+
discoveryResources []*metav1.APIResourceList
37+
resourceConfig *utils.ResourceConfig
38+
}{
39+
{
40+
name: "discovers and adds handlers for watchable resources",
41+
discoveryResources: []*metav1.APIResourceList{
42+
{
43+
GroupVersion: "v1",
44+
APIResources: []metav1.APIResource{
45+
{
46+
Name: "configmaps",
47+
Kind: "ConfigMap",
48+
Namespaced: true,
49+
Verbs: []string{"list", "watch", "get"},
50+
},
51+
{
52+
Name: "secrets",
53+
Kind: "Secret",
54+
Namespaced: true,
55+
Verbs: []string{"list", "watch", "get"},
56+
},
57+
},
58+
},
59+
},
60+
resourceConfig: nil, // Allow all resources
61+
},
62+
{
63+
name: "skips resources without list/watch verbs",
64+
discoveryResources: []*metav1.APIResourceList{
65+
{
66+
GroupVersion: "v1",
67+
APIResources: []metav1.APIResource{
68+
{
69+
Name: "configmaps",
70+
Kind: "ConfigMap",
71+
Namespaced: true,
72+
Verbs: []string{"get", "delete"}, // Missing list/watch
73+
},
74+
},
75+
},
76+
},
77+
resourceConfig: nil,
78+
},
79+
{
80+
name: "respects resource config filtering",
81+
discoveryResources: []*metav1.APIResourceList{
82+
{
83+
GroupVersion: "v1",
84+
APIResources: []metav1.APIResource{
85+
{
86+
Name: "configmaps",
87+
Kind: "ConfigMap",
88+
Namespaced: true,
89+
Verbs: []string{"list", "watch", "get"},
90+
},
91+
{
92+
Name: "secrets",
93+
Kind: "Secret",
94+
Namespaced: true,
95+
Verbs: []string{"list", "watch", "get"},
96+
},
97+
},
98+
},
99+
},
100+
resourceConfig: func() *utils.ResourceConfig {
101+
rc := utils.NewResourceConfig(false) // Skip mode
102+
_ = rc.Parse("v1/Secret") // Skip secrets
103+
return rc
104+
}(),
105+
},
106+
{
107+
name: "discovers apps group resources",
108+
discoveryResources: []*metav1.APIResourceList{
109+
{
110+
GroupVersion: "apps/v1",
111+
APIResources: []metav1.APIResource{
112+
{
113+
Name: "deployments",
114+
Kind: "Deployment",
115+
Namespaced: true,
116+
Verbs: []string{"list", "watch", "get"},
117+
},
118+
{
119+
Name: "statefulsets",
120+
Kind: "StatefulSet",
121+
Namespaced: true,
122+
Verbs: []string{"list", "watch", "get"},
123+
},
124+
},
125+
},
126+
},
127+
resourceConfig: nil,
128+
},
129+
}
130+
131+
for _, tt := range tests {
132+
t.Run(tt.name, func(t *testing.T) {
133+
// Create fake discovery client
134+
fakeClient := fake.NewSimpleClientset()
135+
fakeDiscovery, ok := fakeClient.Discovery().(*fakediscovery.FakeDiscovery)
136+
if !ok {
137+
t.Fatal("Failed to cast to FakeDiscovery")
138+
}
139+
fakeDiscovery.Resources = tt.discoveryResources
140+
141+
// Create REST mapper
142+
groupResources := []*restmapper.APIGroupResources{}
143+
for _, resourceList := range tt.discoveryResources {
144+
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
145+
if err != nil {
146+
t.Fatalf("Failed to parse group version: %v", err)
147+
}
148+
149+
groupResources = append(groupResources, &restmapper.APIGroupResources{
150+
Group: metav1.APIGroup{
151+
Name: gv.Group,
152+
Versions: []metav1.GroupVersionForDiscovery{
153+
{GroupVersion: resourceList.GroupVersion, Version: gv.Version},
154+
},
155+
PreferredVersion: metav1.GroupVersionForDiscovery{
156+
GroupVersion: resourceList.GroupVersion,
157+
Version: gv.Version,
158+
},
159+
},
160+
VersionedResources: map[string][]metav1.APIResource{
161+
gv.Version: resourceList.APIResources,
162+
},
163+
})
164+
}
165+
restMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
166+
167+
// Create fake informer manager
168+
fakeInformerManager := &testinformer.FakeManager{
169+
APIResources: make(map[schema.GroupVersionKind]bool),
170+
}
171+
172+
// Track handler additions
173+
testHandler := cache.ResourceEventHandlerFuncs{
174+
AddFunc: func(obj interface{}) {},
175+
}
176+
177+
// Create ChangeDetector with the interface type
178+
detector := &ChangeDetector{
179+
DiscoveryClient: fakeDiscovery,
180+
RESTMapper: restMapper,
181+
InformerManager: fakeInformerManager,
182+
ResourceConfig: tt.resourceConfig,
183+
}
184+
185+
// Test discoverResources which discovers resources and adds handlers
186+
detector.discoverResources(testHandler)
187+
188+
// The main goal is to verify no panics occur during discovery and handler addition
189+
})
190+
}
191+
}
192+
193+
func TestChangeDetector_NeedLeaderElection(t *testing.T) {
194+
detector := &ChangeDetector{}
195+
196+
// ChangeDetector SHOULD need leader election so only the leader processes events
197+
if !detector.NeedLeaderElection() {
198+
t.Error("ChangeDetector should need leader election")
199+
}
200+
}

pkg/resourcewatcher/informer_populator.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,30 +81,18 @@ func (p *InformerPopulator) Start(ctx context.Context) error {
8181

8282
// discoverAndCreateInformers discovers API resources and creates informers WITHOUT adding event handlers
8383
func (p *InformerPopulator) discoverAndCreateInformers() {
84-
newResources, err := getWatchableResources(p.DiscoveryClient)
85-
if err != nil {
86-
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
87-
}
88-
89-
var resourcesToPopulate []informer.APIResourceMeta
90-
for _, res := range newResources {
91-
if shouldWatchResource(res.GroupVersionResource, p.RESTMapper, p.ResourceConfig) {
92-
resourcesToPopulate = append(resourcesToPopulate, res)
93-
}
94-
}
84+
resourcesToWatch := discoverWatchableResources(p.DiscoveryClient, p.RESTMapper, p.ResourceConfig)
9585

9686
// Create informers directly without adding event handlers.
9787
// This avoids adding any event handlers on follower pods
98-
for _, res := range resourcesToPopulate {
88+
for _, res := range resourcesToWatch {
9989
p.InformerManager.CreateInformerForResource(res)
10090
}
10191

10292
// Start any newly created informers
10393
p.InformerManager.Start()
10494

105-
if err == nil {
106-
klog.V(2).InfoS("Informer populator: discovered resources", "count", len(resourcesToPopulate))
107-
}
95+
klog.V(2).InfoS("Informer populator: discovered resources", "count", len(resourcesToWatch))
10896
}
10997

11098
// NeedLeaderElection implements LeaderElectionRunnable interface.

pkg/resourcewatcher/resource_collector.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,22 @@ func shouldWatchResource(gvr schema.GroupVersionResource, restMapper meta.RESTMa
107107
}
108108
return true
109109
}
110+
111+
// discoverWatchableResources discovers all API resources in the cluster and filters them
112+
// based on the resource configuration. This is a shared helper used by both InformerPopulator
113+
// and ChangeDetector to ensure consistent resource discovery logic.
114+
func discoverWatchableResources(discoveryClient discovery.DiscoveryInterface, restMapper meta.RESTMapper, resourceConfig *utils.ResourceConfig) []informer.APIResourceMeta {
115+
newResources, err := getWatchableResources(discoveryClient)
116+
if err != nil {
117+
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
118+
}
119+
120+
var resourcesToWatch []informer.APIResourceMeta
121+
for _, res := range newResources {
122+
if shouldWatchResource(res.GroupVersionResource, restMapper, resourceConfig) {
123+
resourcesToWatch = append(resourcesToWatch, res)
124+
}
125+
}
126+
127+
return resourcesToWatch
128+
}

pkg/utils/informer/informermanager_test.go

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"k8s.io/apimachinery/pkg/runtime/schema"
2323
"k8s.io/client-go/dynamic/fake"
2424
"k8s.io/client-go/kubernetes/scheme"
25+
26+
testhandler "github.com/kubefleet-dev/kubefleet/test/utils/handler"
2527
)
2628

2729
func TestGetAllResources(t *testing.T) {
@@ -324,8 +326,8 @@ func TestAddEventHandlerToInformer(t *testing.T) {
324326

325327
// Track handler calls
326328
callCount := 0
327-
handler := &testHandler{
328-
onAdd: func() { callCount++ },
329+
handler := &testhandler.TestHandler{
330+
OnAddFunc: func() { callCount++ },
329331
}
330332

331333
// Add the handler
@@ -340,8 +342,8 @@ func TestAddEventHandlerToInformer(t *testing.T) {
340342

341343
if tt.addTwice {
342344
// Add another handler to the same informer
343-
handler2 := &testHandler{
344-
onAdd: func() { callCount++ },
345+
handler2 := &testhandler.TestHandler{
346+
OnAddFunc: func() { callCount++ },
345347
}
346348
mgr.AddEventHandlerToInformer(tt.gvr, handler2)
347349
}
@@ -502,28 +504,3 @@ func TestCreateInformerForResource_IsIdempotent(t *testing.T) {
502504
t.Error("Expected resource to be marked as present")
503505
}
504506
}
505-
506-
// testHandler is a simple implementation of cache.ResourceEventHandler for testing
507-
type testHandler struct {
508-
onAdd func()
509-
onUpdate func()
510-
onDelete func()
511-
}
512-
513-
func (h *testHandler) OnAdd(obj interface{}, isInInitialList bool) {
514-
if h.onAdd != nil {
515-
h.onAdd()
516-
}
517-
}
518-
519-
func (h *testHandler) OnUpdate(oldObj, newObj interface{}) {
520-
if h.onUpdate != nil {
521-
h.onUpdate()
522-
}
523-
}
524-
525-
func (h *testHandler) OnDelete(obj interface{}) {
526-
if h.onDelete != nil {
527-
h.onDelete()
528-
}
529-
}

0 commit comments

Comments
 (0)