Skip to content

Commit 528238f

Browse files
committed
fix: remediator missing custom resource events
Prior to this change, the remediator watches were only being started for new custom resources after the apply attempt had fully completed. This left some time after the object was applied that the remediator could miss events made by third-parties. Normally, this would be fine, because the remediator would revert any change after the watch was started. But if a DELETE event was missed, the object wouldn't be recreated until the next apply attempt. This change adds a CRD Controller to the remediator that watches CRDs and executes any registered handlers when the CRD is established, unestablished, or deleted. The remediator now registers CRD handlers for each resource type it watches, starting and stopping watchers as soon as possible, without waiting for a new apply attempt or watch update retry.
1 parent efad112 commit 528238f

File tree

10 files changed

+252
-103
lines changed

10 files changed

+252
-103
lines changed

cmd/reconciler-manager/main.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func main() {
9595
}
9696
watchFleetMembership := fleetMembershipCRDExists(dynamicClient, mgr.GetRESTMapper(), &setupLog)
9797

98-
crdController := controllers.NewCRDReconciler(
98+
crdController := controllers.NewCRDController(mgr.GetCache(),
9999
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD"))
100100
if err := crdController.Register(mgr); err != nil {
101101
setupLog.Error(err, "failed to register controller", "controller", "CRD")
@@ -108,11 +108,14 @@ func main() {
108108
mgr.GetClient(), watcher, dynamicClient,
109109
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RepoSyncKind),
110110
mgr.GetScheme())
111-
crdController.SetCRDHandler(configsync.RepoSyncCRDName, func() error {
112-
if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil {
113-
return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err)
111+
crdController.SetCRDHandler(configsync.RepoSyncCRDName, func(_ context.Context, established bool) error {
112+
if established {
113+
if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil {
114+
return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err)
115+
}
116+
setupLog.Info("RepoSync controller registration successful")
114117
}
115-
setupLog.Info("RepoSync controller registration successful")
118+
// TODO: unregister RepoSync controller if CRD is deleted?
116119
return nil
117120
})
118121
setupLog.Info("RepoSync controller registration scheduled")
@@ -122,11 +125,14 @@ func main() {
122125
mgr.GetClient(), watcher, dynamicClient,
123126
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RootSyncKind),
124127
mgr.GetScheme())
125-
crdController.SetCRDHandler(configsync.RootSyncCRDName, func() error {
126-
if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil {
127-
return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err)
128+
crdController.SetCRDHandler(configsync.RootSyncCRDName, func(_ context.Context, established bool) error {
129+
if established {
130+
if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil {
131+
return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err)
132+
}
133+
setupLog.Info("RootSync controller registration successful")
128134
}
129-
setupLog.Info("RootSync controller registration successful")
135+
// TODO: unregister RootSync controller if CRD is deleted?
130136
return nil
131137
})
132138
setupLog.Info("RootSync controller registration scheduled")

manifests/ns-reconciler-base-cluster-role.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ rules:
3232
- apiGroups: ["kpt.dev"]
3333
resources: ["resourcegroups/status"]
3434
verbs: ["*"]
35+
- apiGroups: ["apiextensions.k8s.io"]
36+
resources: ["customresourcedefinitions"]
37+
verbs: ["get","list","watch"]

manifests/root-reconciler-base-cluster-role.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ rules:
3232
- apiGroups: ["kpt.dev"]
3333
resources: ["resourcegroups/status"]
3434
verbs: ["*"]
35+
- apiGroups: ["apiextensions.k8s.io"]
36+
resources: ["customresourcedefinitions"]
37+
verbs: ["get","list","watch"]

pkg/reconciler/reconciler.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"kpt.dev/configsync/pkg/parse/events"
3838
"kpt.dev/configsync/pkg/reconciler/finalizer"
3939
"kpt.dev/configsync/pkg/reconciler/namespacecontroller"
40+
"kpt.dev/configsync/pkg/reconcilermanager/controllers"
4041
"kpt.dev/configsync/pkg/remediator"
4142
"kpt.dev/configsync/pkg/remediator/conflict"
4243
"kpt.dev/configsync/pkg/remediator/watch"
@@ -219,10 +220,44 @@ func Run(opts Options) {
219220
klog.Fatalf("Error creating rest config for the remediator: %v", err)
220221
}
221222

223+
// Start listening to signals
224+
signalCtx := signals.SetupSignalHandler()
225+
226+
// Create the ControllerManager
227+
ctrl.SetLogger(textlogger.NewLogger(textlogger.NewConfig()))
228+
mgrOptions := ctrl.Options{
229+
Scheme: core.Scheme,
230+
MapperProvider: func(_ *rest.Config, _ *http.Client) (meta.RESTMapper, error) {
231+
return mapper, nil
232+
},
233+
BaseContext: func() context.Context {
234+
return signalCtx
235+
},
236+
}
237+
// For Namespaced Reconcilers, set the default namespace to watch.
238+
// Otherwise, all namespaced informers will watch at the cluster-scope.
239+
// This prevents Namespaced Reconcilers from needing cluster-scoped read
240+
// permissions.
241+
if opts.ReconcilerScope != declared.RootScope {
242+
mgrOptions.Cache.DefaultNamespaces = map[string]cache.Config{
243+
string(opts.ReconcilerScope): {},
244+
}
245+
}
246+
mgr, err := ctrl.NewManager(cfgForWatch, mgrOptions)
247+
if err != nil {
248+
klog.Fatalf("Instantiating Controller Manager: %v", err)
249+
}
250+
251+
crdController := controllers.NewCRDController(mgr.GetCache(),
252+
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD"))
253+
if err := crdController.Register(mgr); err != nil {
254+
klog.Fatalf("Instantiating CRD Controller: %v", err)
255+
}
256+
222257
conflictHandler := conflict.NewHandler()
223258
fightHandler := fight.NewHandler()
224259

225-
rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, decls, opts.NumWorkers)
260+
rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, crdController, decls, opts.NumWorkers)
226261
if err != nil {
227262
klog.Fatalf("Instantiating Remediator: %v", err)
228263
}
@@ -303,34 +338,6 @@ func Run(opts Options) {
303338
parser = parse.NewNamespaceRunner(parseOpts)
304339
}
305340

306-
// Start listening to signals
307-
signalCtx := signals.SetupSignalHandler()
308-
309-
// Create the ControllerManager
310-
ctrl.SetLogger(textlogger.NewLogger(textlogger.NewConfig()))
311-
mgrOptions := ctrl.Options{
312-
Scheme: core.Scheme,
313-
MapperProvider: func(_ *rest.Config, _ *http.Client) (meta.RESTMapper, error) {
314-
return mapper, nil
315-
},
316-
BaseContext: func() context.Context {
317-
return signalCtx
318-
},
319-
}
320-
// For Namespaced Reconcilers, set the default namespace to watch.
321-
// Otherwise, all namespaced informers will watch at the cluster-scope.
322-
// This prevents Namespaced Reconcilers from needing cluster-scoped read
323-
// permissions.
324-
if opts.ReconcilerScope != declared.RootScope {
325-
mgrOptions.Cache.DefaultNamespaces = map[string]cache.Config{
326-
string(opts.ReconcilerScope): {},
327-
}
328-
}
329-
mgr, err := ctrl.NewManager(cfgForWatch, mgrOptions)
330-
if err != nil {
331-
klog.Fatalf("Instantiating Controller Manager: %v", err)
332-
}
333-
334341
// This cancelFunc will be used by the Finalizer to stop all the other
335342
// controllers (Parser & Remediator).
336343
ctx, stopControllers := context.WithCancel(signalCtx)

pkg/reconcilermanager/controllers/crd_controller.go

Lines changed: 66 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,90 @@ import (
2222
"github.com/go-logr/logr"
2323
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2424
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2526
controllerruntime "sigs.k8s.io/controller-runtime"
26-
"sigs.k8s.io/controller-runtime/pkg/builder"
27-
"sigs.k8s.io/controller-runtime/pkg/client"
28-
"sigs.k8s.io/controller-runtime/pkg/event"
29-
"sigs.k8s.io/controller-runtime/pkg/predicate"
27+
"sigs.k8s.io/controller-runtime/pkg/cache"
3028
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3129
)
3230

3331
// CRDHandler is called by the CRDReconciler to handle establishment of a CRD.
34-
type CRDHandler func() error
32+
type CRDHandler func(ctx context.Context, established bool) error
3533

36-
var _ reconcile.Reconciler = &CRDReconciler{}
34+
// CRDWatcher allows callers to add CRDHandlers for specific CRDs by name.
35+
type CRDWatcher interface {
36+
SetCRDHandler(crdName string, handler CRDHandler)
37+
RemoveCRDHandler(crdName string)
38+
}
39+
40+
var _ reconcile.Reconciler = &CRDController{}
41+
var _ CRDWatcher = &CRDController{}
3742

38-
// CRDReconciler watches CRDs and calls handlers once they are established.
39-
type CRDReconciler struct {
43+
// CRDController watches CRDs and calls handlers once they are established.
44+
type CRDController struct {
4045
loggingController
46+
cache cache.Cache
4147

42-
registerLock sync.Mutex
43-
handlers map[string]CRDHandler
44-
handledCRDs map[string]struct{}
48+
registerLock sync.Mutex
49+
handlers map[string]CRDHandler
50+
handledCRDState map[string]bool
4551
}
4652

47-
// NewCRDReconciler constructs a new CRDReconciler.
48-
func NewCRDReconciler(log logr.Logger) *CRDReconciler {
49-
return &CRDReconciler{
53+
// NewCRDController constructs a new CRDReconciler.
54+
func NewCRDController(cache cache.Cache, log logr.Logger) *CRDController {
55+
return &CRDController{
5056
loggingController: loggingController{
5157
log: log,
5258
},
53-
handlers: make(map[string]CRDHandler),
54-
handledCRDs: make(map[string]struct{}),
59+
cache: cache,
60+
handlers: make(map[string]CRDHandler),
61+
handledCRDState: make(map[string]bool),
5562
}
5663
}
5764

5865
// SetCRDHandler adds an handler for the specified CRD.
5966
// The handler will be called when the CRD becomes established.
6067
// If the handler errors, it will be retried with backoff until success.
61-
// One the handler succeeds, it will not be called again, unless SetCRDHandler
62-
// is called again.
63-
func (r *CRDReconciler) SetCRDHandler(crdName string, crdHandler CRDHandler) {
68+
// One the handler succeeds as established, it will not be called again, unless
69+
// SetCRDHandler is called again or the CRD becomes unestablished or is deleted.
70+
func (r *CRDController) SetCRDHandler(crdName string, crdHandler CRDHandler) {
6471
r.registerLock.Lock()
6572
defer r.registerLock.Unlock()
6673

6774
r.handlers[crdName] = crdHandler
68-
delete(r.handledCRDs, crdName)
75+
delete(r.handledCRDState, crdName)
76+
}
77+
78+
// RemoveCRDHandler removes the handler for the specified CRD.
79+
func (r *CRDController) RemoveCRDHandler(crdName string) {
80+
r.registerLock.Lock()
81+
defer r.registerLock.Unlock()
82+
83+
delete(r.handlers, crdName)
84+
delete(r.handledCRDState, crdName)
6985
}
7086

7187
// Reconcile the otel ConfigMap and update the Deployment annotation.
72-
func (r *CRDReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
88+
func (r *CRDController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
7389
crdName := req.Name
7490
ctx = r.setLoggerValues(ctx, "crd", crdName)
7591

92+
// Established if CRD exists and .status.conditions[type="Established"].status = "True"
93+
var established bool
94+
crdObj := &apiextensionsv1.CustomResourceDefinition{}
95+
if err := r.cache.Get(ctx, req.NamespacedName, crdObj); err != nil {
96+
switch {
97+
// Should never run into NoMatchFound, since CRD is a built-in resource.
98+
case apierrors.IsNotFound(err):
99+
established = false
100+
default:
101+
// Retry with backoff
102+
return reconcile.Result{},
103+
fmt.Errorf("getting CRD from cache: %s: %w", crdName, err)
104+
}
105+
} else {
106+
established = crdIsEstablished(crdObj)
107+
}
108+
76109
r.registerLock.Lock()
77110
defer r.registerLock.Unlock()
78111

@@ -81,57 +114,35 @@ func (r *CRDReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
81114
// No handler for this CRD
82115
return reconcile.Result{}, nil
83116
}
84-
if _, handled := r.handledCRDs[crdName]; handled {
85-
// Already handled
86-
return reconcile.Result{}, nil
117+
if lastHandledState, found := r.handledCRDState[crdName]; found {
118+
if lastHandledState == established {
119+
// Already handled latest state
120+
return reconcile.Result{}, nil
121+
}
87122
}
88123

89-
r.logger(ctx).V(3).Info("reconciling CRD", "crd", crdName)
124+
r.logger(ctx).V(3).Info("reconciling CRD", "crd", crdName, "established", established)
90125

91-
if err := handler(); err != nil {
126+
if err := handler(ctx, established); err != nil {
92127
// Retry with backoff
93128
return reconcile.Result{},
94129
fmt.Errorf("reconciling CRD %s: %w", crdName, err)
95130
}
96-
// Mark CRD as handled
97-
r.handledCRDs[crdName] = struct{}{}
131+
// Record latest handled CRD state
132+
r.handledCRDState[crdName] = established
98133

99-
r.logger(ctx).V(3).Info("reconciling CRD successful", "crd", crdName)
134+
r.logger(ctx).V(3).Info("reconciling CRD successful", "crd", crdName, "established", established)
100135

101136
return controllerruntime.Result{}, nil
102137
}
103138

104139
// Register the CRD controller with reconciler-manager.
105-
func (r *CRDReconciler) Register(mgr controllerruntime.Manager) error {
140+
func (r *CRDController) Register(mgr controllerruntime.Manager) error {
106141
return controllerruntime.NewControllerManagedBy(mgr).
107-
For(&apiextensionsv1.CustomResourceDefinition{},
108-
builder.WithPredicates(
109-
ignoreDeletesPredicate(),
110-
crdIsEstablishedPredicate())).
142+
For(&apiextensionsv1.CustomResourceDefinition{}).
111143
Complete(r)
112144
}
113145

114-
// ignoreDeletesPredicate returns a predicate that handles CREATE, UPDATE, and
115-
// GENERIC events, but not DELETE events.
116-
func ignoreDeletesPredicate() predicate.Predicate {
117-
return predicate.Funcs{
118-
DeleteFunc: func(_ event.DeleteEvent) bool {
119-
return false
120-
},
121-
}
122-
}
123-
124-
// crdIsEstablishedPredicate returns a predicate that only processes events for
125-
// established CRDs.
126-
func crdIsEstablishedPredicate() predicate.Predicate {
127-
return predicate.NewPredicateFuncs(func(obj client.Object) bool {
128-
if crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition); ok {
129-
return crdIsEstablished(crd)
130-
}
131-
return false
132-
})
133-
}
134-
135146
// crdIsEstablished returns true if the given CRD is established on the cluster,
136147
// which indicates if discovery knows about it yet. For more info see
137148
// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#create-a-customresourcedefinition

pkg/reconcilermanager/controllers/reposync_controller.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type RepoSyncReconciler struct {
7474
// configMapWatches stores which namespaces where we are currently watching ConfigMaps
7575
configMapWatches map[string]bool
7676

77-
controller *controller.Controller
77+
controller controller.Controller
7878

7979
cache cache.Cache
8080
}
@@ -472,6 +472,14 @@ func (r *RepoSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile
472472

473473
// Register RepoSync controller with reconciler-manager.
474474
func (r *RepoSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetMembership bool) error {
475+
r.lock.Lock()
476+
defer r.lock.Unlock()
477+
478+
if r.controller != nil {
479+
// Already registered
480+
return nil
481+
}
482+
475483
controllerBuilder := controllerruntime.NewControllerManagedBy(mgr).
476484
WithOptions(controller.Options{
477485
MaxConcurrentReconciles: 1,
@@ -504,7 +512,7 @@ func (r *RepoSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetM
504512
}
505513

506514
ctrlr, err := controllerBuilder.Build(r)
507-
r.controller = &ctrlr
515+
r.controller = ctrlr
508516
r.cache = mgr.GetCache()
509517
return err
510518
}
@@ -524,7 +532,7 @@ func (r *RepoSyncReconciler) watchConfigMaps(rs *v1beta1.RepoSync) error {
524532

525533
if _, ok := r.configMapWatches[rs.Namespace]; !ok {
526534
klog.Infoln("Adding watch for ConfigMaps in namespace ", rs.Namespace)
527-
ctrlr := *r.controller
535+
ctrlr := r.controller
528536

529537
if err := ctrlr.Watch(source.Kind(r.cache, withNamespace(&corev1.ConfigMap{}, rs.Namespace),
530538
handler.EnqueueRequestsFromMapFunc(r.mapConfigMapToRepoSyncs),

0 commit comments

Comments
 (0)