Skip to content

Commit c341417

Browse files
Wei WengWei Weng
authored andcommitted
decouple informer cache population and event handling
Signed-off-by: Wei Weng <[email protected]>
1 parent 6f3e972 commit c341417

File tree

12 files changed

+1354
-97
lines changed

12 files changed

+1354
-97
lines changed

cmd/hubagent/workload/setup.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,23 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
496496
}
497497
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, rateLimiter)
498498

499+
// Set up the InformerPopulator that runs on ALL pods (leader and followers)
500+
// This ensures all pods have synced informer caches for webhook validation
501+
klog.Info("Setting up informer populator")
502+
informerPopulator := &resourcewatcher.InformerPopulator{
503+
DiscoveryClient: discoverClient,
504+
RESTMapper: mgr.GetRESTMapper(),
505+
InformerManager: dynamicInformerManager,
506+
ResourceConfig: resourceConfig,
507+
}
508+
509+
if err := mgr.Add(informerPopulator); err != nil {
510+
klog.ErrorS(err, "Failed to setup informer populator")
511+
return err
512+
}
513+
499514
// Set up a runner that starts all the custom controllers we created above
515+
// This runs ONLY on the leader and adds event handlers to the informers created by InformerPopulator
500516
resourceChangeDetector := &resourcewatcher.ChangeDetector{
501517
DiscoveryClient: discoverClient,
502518
RESTMapper: mgr.GetRESTMapper(),

pkg/resourcewatcher/change_dector.go

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"golang.org/x/sync/errgroup"
2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26-
"k8s.io/apimachinery/pkg/runtime/schema"
2726
"k8s.io/apimachinery/pkg/util/wait"
2827
"k8s.io/client-go/discovery"
2928
"k8s.io/client-go/tools/cache"
@@ -45,7 +44,7 @@ var (
4544
// ChangeDetector is a resource watcher which watches all types of resources in the cluster and reconcile the events.
4645
type ChangeDetector struct {
4746
// DiscoveryClient is used to do resource discovery.
48-
DiscoveryClient *discovery.DiscoveryClient
47+
DiscoveryClient discovery.DiscoveryInterface
4948

5049
// RESTMapper is used to convert between GVK and GVR
5150
RESTMapper meta.RESTMapper
@@ -137,43 +136,20 @@ func (d *ChangeDetector) discoverAPIResourcesLoop(ctx context.Context, period ti
137136
}, period)
138137
}
139138

140-
// discoverResources goes through all the api resources in the cluster and create informers on selected types
139+
// discoverResources goes through all the api resources in the cluster and adds event handlers to informers
141140
func (d *ChangeDetector) discoverResources(dynamicResourceEventHandler cache.ResourceEventHandler) {
142-
newResources, err := d.getWatchableResources()
143-
var dynamicResources []informer.APIResourceMeta
144-
if err != nil {
145-
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
146-
}
147-
for _, res := range newResources {
148-
// all the static resources are disabled by default
149-
if d.shouldWatchResource(res.GroupVersionResource) {
150-
dynamicResources = append(dynamicResources, res)
151-
}
141+
resourcesToWatch := discoverWatchableResources(d.DiscoveryClient, d.RESTMapper, d.ResourceConfig)
142+
143+
// On the leader, add event handlers to informers that were already created by InformerPopulator
144+
// The informers exist on all pods, but only the leader adds handlers and processes events
145+
for _, res := range resourcesToWatch {
146+
d.InformerManager.AddEventHandlerToInformer(res.GroupVersionResource, dynamicResourceEventHandler)
152147
}
153-
d.InformerManager.AddDynamicResources(dynamicResources, dynamicResourceEventHandler, err == nil)
148+
154149
// this will start the newly added informers if there is any
155150
d.InformerManager.Start()
156-
}
157-
158-
// gvrDisabled returns whether GroupVersionResource is disabled.
159-
func (d *ChangeDetector) shouldWatchResource(gvr schema.GroupVersionResource) bool {
160-
// By default, all of the APIs are allowed.
161-
if d.ResourceConfig == nil {
162-
return true
163-
}
164151

165-
gvks, err := d.RESTMapper.KindsFor(gvr)
166-
if err != nil {
167-
klog.ErrorS(err, "gvr transform failed", "gvr", gvr.String())
168-
return false
169-
}
170-
for _, gvk := range gvks {
171-
if d.ResourceConfig.IsResourceDisabled(gvk) {
172-
klog.V(4).InfoS("Skip watch resource", "group version kind", gvk.String())
173-
return false
174-
}
175-
}
176-
return true
152+
klog.V(2).InfoS("Change detector: discovered resources", "count", len(resourcesToWatch))
177153
}
178154

179155
// dynamicResourceFilter filters out resources that we don't want to watch
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcewatcher
18+
19+
import (
20+
"testing"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/apimachinery/pkg/runtime/schema"
24+
fakediscovery "k8s.io/client-go/discovery/fake"
25+
"k8s.io/client-go/kubernetes/fake"
26+
"k8s.io/client-go/restmapper"
27+
"k8s.io/client-go/tools/cache"
28+
29+
"github.com/kubefleet-dev/kubefleet/pkg/utils"
30+
testinformer "github.com/kubefleet-dev/kubefleet/test/utils/informer"
31+
)
32+
33+
func TestChangeDetector_discoverResources(t *testing.T) {
34+
tests := []struct {
35+
name string
36+
discoveryResources []*metav1.APIResourceList
37+
resourceConfig *utils.ResourceConfig
38+
}{
39+
{
40+
name: "discovers and adds handlers for watchable resources",
41+
discoveryResources: []*metav1.APIResourceList{
42+
{
43+
GroupVersion: "v1",
44+
APIResources: []metav1.APIResource{
45+
{
46+
Name: "configmaps",
47+
Kind: "ConfigMap",
48+
Namespaced: true,
49+
Verbs: []string{"list", "watch", "get"},
50+
},
51+
{
52+
Name: "secrets",
53+
Kind: "Secret",
54+
Namespaced: true,
55+
Verbs: []string{"list", "watch", "get"},
56+
},
57+
},
58+
},
59+
},
60+
resourceConfig: nil, // Allow all resources
61+
},
62+
{
63+
name: "skips resources without list/watch verbs",
64+
discoveryResources: []*metav1.APIResourceList{
65+
{
66+
GroupVersion: "v1",
67+
APIResources: []metav1.APIResource{
68+
{
69+
Name: "configmaps",
70+
Kind: "ConfigMap",
71+
Namespaced: true,
72+
Verbs: []string{"get", "delete"}, // Missing list/watch
73+
},
74+
},
75+
},
76+
},
77+
resourceConfig: nil,
78+
},
79+
{
80+
name: "respects resource config filtering",
81+
discoveryResources: []*metav1.APIResourceList{
82+
{
83+
GroupVersion: "v1",
84+
APIResources: []metav1.APIResource{
85+
{
86+
Name: "configmaps",
87+
Kind: "ConfigMap",
88+
Namespaced: true,
89+
Verbs: []string{"list", "watch", "get"},
90+
},
91+
{
92+
Name: "secrets",
93+
Kind: "Secret",
94+
Namespaced: true,
95+
Verbs: []string{"list", "watch", "get"},
96+
},
97+
},
98+
},
99+
},
100+
resourceConfig: func() *utils.ResourceConfig {
101+
rc := utils.NewResourceConfig(false) // Skip mode
102+
_ = rc.Parse("v1/Secret") // Skip secrets
103+
return rc
104+
}(),
105+
},
106+
{
107+
name: "discovers apps group resources",
108+
discoveryResources: []*metav1.APIResourceList{
109+
{
110+
GroupVersion: "apps/v1",
111+
APIResources: []metav1.APIResource{
112+
{
113+
Name: "deployments",
114+
Kind: "Deployment",
115+
Namespaced: true,
116+
Verbs: []string{"list", "watch", "get"},
117+
},
118+
{
119+
Name: "statefulsets",
120+
Kind: "StatefulSet",
121+
Namespaced: true,
122+
Verbs: []string{"list", "watch", "get"},
123+
},
124+
},
125+
},
126+
},
127+
resourceConfig: nil,
128+
},
129+
}
130+
131+
for _, tt := range tests {
132+
t.Run(tt.name, func(t *testing.T) {
133+
// Create fake discovery client
134+
fakeClient := fake.NewSimpleClientset()
135+
fakeDiscovery, ok := fakeClient.Discovery().(*fakediscovery.FakeDiscovery)
136+
if !ok {
137+
t.Fatal("Failed to cast to FakeDiscovery")
138+
}
139+
fakeDiscovery.Resources = tt.discoveryResources
140+
141+
// Create REST mapper
142+
groupResources := []*restmapper.APIGroupResources{}
143+
for _, resourceList := range tt.discoveryResources {
144+
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
145+
if err != nil {
146+
t.Fatalf("Failed to parse group version: %v", err)
147+
}
148+
149+
groupResources = append(groupResources, &restmapper.APIGroupResources{
150+
Group: metav1.APIGroup{
151+
Name: gv.Group,
152+
Versions: []metav1.GroupVersionForDiscovery{
153+
{GroupVersion: resourceList.GroupVersion, Version: gv.Version},
154+
},
155+
PreferredVersion: metav1.GroupVersionForDiscovery{
156+
GroupVersion: resourceList.GroupVersion,
157+
Version: gv.Version,
158+
},
159+
},
160+
VersionedResources: map[string][]metav1.APIResource{
161+
gv.Version: resourceList.APIResources,
162+
},
163+
})
164+
}
165+
restMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
166+
167+
// Create fake informer manager
168+
fakeInformerManager := &testinformer.FakeManager{
169+
APIResources: make(map[schema.GroupVersionKind]bool),
170+
}
171+
172+
// Track handler additions
173+
testHandler := cache.ResourceEventHandlerFuncs{
174+
AddFunc: func(obj interface{}) {},
175+
}
176+
177+
// Create ChangeDetector with the interface type
178+
detector := &ChangeDetector{
179+
DiscoveryClient: fakeDiscovery,
180+
RESTMapper: restMapper,
181+
InformerManager: fakeInformerManager,
182+
ResourceConfig: tt.resourceConfig,
183+
}
184+
185+
// Test discoverResources which discovers resources and adds handlers
186+
detector.discoverResources(testHandler)
187+
188+
// The main goal is to verify no panics occur during discovery and handler addition
189+
})
190+
}
191+
}
192+
193+
func TestChangeDetector_NeedLeaderElection(t *testing.T) {
194+
detector := &ChangeDetector{}
195+
196+
// ChangeDetector SHOULD need leader election so only the leader processes events
197+
if !detector.NeedLeaderElection() {
198+
t.Error("ChangeDetector should need leader election")
199+
}
200+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcewatcher
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/api/meta"
24+
"k8s.io/apimachinery/pkg/util/wait"
25+
"k8s.io/client-go/discovery"
26+
"k8s.io/klog/v2"
27+
"sigs.k8s.io/controller-runtime/pkg/manager"
28+
29+
"github.com/kubefleet-dev/kubefleet/pkg/utils"
30+
"github.com/kubefleet-dev/kubefleet/pkg/utils/informer"
31+
)
32+
33+
const (
34+
// informerPopulatorDiscoveryPeriod is how often the InformerPopulator rediscovers API resources
35+
informerPopulatorDiscoveryPeriod = 30 * time.Second
36+
)
37+
38+
// make sure that our InformerPopulator implements controller runtime interfaces
39+
var (
40+
_ manager.Runnable = &InformerPopulator{}
41+
_ manager.LeaderElectionRunnable = &InformerPopulator{}
42+
)
43+
44+
// InformerPopulator discovers API resources and creates informers for them WITHOUT adding event handlers.
45+
// This allows follower pods to have synced informer caches for webhook validation while the leader's
46+
// ChangeDetector adds event handlers and runs controllers.
47+
type InformerPopulator struct {
48+
// DiscoveryClient is used to do resource discovery.
49+
DiscoveryClient discovery.DiscoveryInterface
50+
51+
// RESTMapper is used to convert between GVK and GVR
52+
RESTMapper meta.RESTMapper
53+
54+
// InformerManager manages all the dynamic informers created by the discovery client
55+
InformerManager informer.Manager
56+
57+
// ResourceConfig contains all the API resources that we won't select based on the allowed or skipped propagating APIs option.
58+
ResourceConfig *utils.ResourceConfig
59+
}
60+
61+
// Start runs the informer populator, discovering resources and creating informers.
62+
// This runs on ALL pods (leader and followers) to ensure all have synced caches.
63+
func (p *InformerPopulator) Start(ctx context.Context) error {
64+
klog.InfoS("Starting the informer populator")
65+
defer klog.InfoS("The informer populator is stopped")
66+
67+
// Run initial discovery to create informers
68+
p.discoverAndCreateInformers()
69+
70+
// Wait for initial cache sync
71+
p.InformerManager.WaitForCacheSync()
72+
klog.InfoS("Informer populator: initial cache sync complete")
73+
74+
// Continue discovering resources periodically to handle CRD installations
75+
wait.UntilWithContext(ctx, func(ctx context.Context) {
76+
p.discoverAndCreateInformers()
77+
}, informerPopulatorDiscoveryPeriod)
78+
79+
return nil
80+
}
81+
82+
// discoverAndCreateInformers discovers API resources and creates informers WITHOUT adding event handlers
83+
func (p *InformerPopulator) discoverAndCreateInformers() {
84+
resourcesToWatch := discoverWatchableResources(p.DiscoveryClient, p.RESTMapper, p.ResourceConfig)
85+
86+
// Create informers directly without adding event handlers.
87+
// This avoids adding any event handlers on follower pods
88+
for _, res := range resourcesToWatch {
89+
p.InformerManager.CreateInformerForResource(res)
90+
}
91+
92+
// Start any newly created informers
93+
p.InformerManager.Start()
94+
95+
klog.V(2).InfoS("Informer populator: discovered resources", "count", len(resourcesToWatch))
96+
}
97+
98+
// NeedLeaderElection implements LeaderElectionRunnable interface.
99+
// Returns false so this runs on ALL pods (leader and followers).
100+
func (p *InformerPopulator) NeedLeaderElection() bool {
101+
return false
102+
}

0 commit comments

Comments
 (0)