Skip to content

Commit 7abd4b0

Browse files
committed
fix: remediator missing custom resource events
Prior to this change, the remediator watches were only being started for new custom resources after the apply attempt had fully completed. This left some time after the object was applied that the remediator could miss events made by third-parties. Normally, this would be fine, because the remediator would revert any change after the watch was started. But if a DELETE event was missed, the object wouldn't be recreated until the next apply attempt. This change adds a CRD Controller to the remediator that watches CRDs and executes any registered handlers when the CRD is established, unestablished, or deleted. The remediator now registers CRD handlers for each resource type it watches, starting and stopping watchers as soon as possible, without waiting for a new apply attempt or watch update retry.
1 parent efad112 commit 7abd4b0

File tree

14 files changed

+331
-139
lines changed

14 files changed

+331
-139
lines changed

cmd/reconciler-manager/main.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func main() {
9595
}
9696
watchFleetMembership := fleetMembershipCRDExists(dynamicClient, mgr.GetRESTMapper(), &setupLog)
9797

98-
crdController := controllers.NewCRDReconciler(
98+
crdController := controllers.NewCRDController(mgr.GetCache(),
9999
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD"))
100100
if err := crdController.Register(mgr); err != nil {
101101
setupLog.Error(err, "failed to register controller", "controller", "CRD")
@@ -108,11 +108,14 @@ func main() {
108108
mgr.GetClient(), watcher, dynamicClient,
109109
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RepoSyncKind),
110110
mgr.GetScheme())
111-
crdController.SetCRDHandler(configsync.RepoSyncCRDName, func() error {
112-
if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil {
113-
return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err)
111+
crdController.SetCRDHandler(configsync.RepoSyncCRDName, func(_ context.Context, established bool) error {
112+
if established {
113+
if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil {
114+
return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err)
115+
}
116+
setupLog.Info("RepoSync controller registration successful")
114117
}
115-
setupLog.Info("RepoSync controller registration successful")
118+
// TODO: unregister RepoSync controller if CRD is deleted?
116119
return nil
117120
})
118121
setupLog.Info("RepoSync controller registration scheduled")
@@ -122,11 +125,14 @@ func main() {
122125
mgr.GetClient(), watcher, dynamicClient,
123126
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RootSyncKind),
124127
mgr.GetScheme())
125-
crdController.SetCRDHandler(configsync.RootSyncCRDName, func() error {
126-
if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil {
127-
return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err)
128+
crdController.SetCRDHandler(configsync.RootSyncCRDName, func(_ context.Context, established bool) error {
129+
if established {
130+
if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil {
131+
return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err)
132+
}
133+
setupLog.Info("RootSync controller registration successful")
128134
}
129-
setupLog.Info("RootSync controller registration successful")
135+
// TODO: unregister RootSync controller if CRD is deleted?
130136
return nil
131137
})
132138
setupLog.Info("RootSync controller registration scheduled")

manifests/base/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ resources:
1919
# Applying hierarchyconfig-crd.yaml allows client-side validation of the HierarchyConfig resources.
2020
- ../hierarchyconfig-crd.yaml
2121
- ../namespace-selector-crd.yaml
22+
- ../ns-reconciler-cluster-scope-cluster-role.yaml
2223
- ../ns-reconciler-base-cluster-role.yaml
2324
- ../root-reconciler-base-cluster-role.yaml
2425
- ../otel-agent-cm.yaml
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This ClusterRole is used by both root-reconcilers and ns-reconcilers.
16+
# It includes read access for cluster-scope resources.
17+
apiVersion: rbac.authorization.k8s.io/v1
18+
kind: ClusterRole
19+
metadata:
20+
name: configsync.gke.io:ns-reconciler:cluster-scope
21+
labels:
22+
configmanagement.gke.io/system: "true"
23+
configmanagement.gke.io/arch: "csmr"
24+
rules:
25+
- apiGroups: ["apiextensions.k8s.io"]
26+
resources: ["customresourcedefinitions"]
27+
verbs: ["get","list","watch"]

manifests/root-reconciler-base-cluster-role.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ rules:
3232
- apiGroups: ["kpt.dev"]
3333
resources: ["resourcegroups/status"]
3434
verbs: ["*"]
35+
- apiGroups: ["apiextensions.k8s.io"]
36+
resources: ["customresourcedefinitions"]
37+
verbs: ["get","list","watch"]

pkg/reconciler/reconciler.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"kpt.dev/configsync/pkg/parse/events"
3838
"kpt.dev/configsync/pkg/reconciler/finalizer"
3939
"kpt.dev/configsync/pkg/reconciler/namespacecontroller"
40+
"kpt.dev/configsync/pkg/reconcilermanager/controllers"
4041
"kpt.dev/configsync/pkg/remediator"
4142
"kpt.dev/configsync/pkg/remediator/conflict"
4243
"kpt.dev/configsync/pkg/remediator/watch"
@@ -219,10 +220,44 @@ func Run(opts Options) {
219220
klog.Fatalf("Error creating rest config for the remediator: %v", err)
220221
}
221222

223+
// Start listening to signals
224+
signalCtx := signals.SetupSignalHandler()
225+
226+
// Create the ControllerManager
227+
ctrl.SetLogger(textlogger.NewLogger(textlogger.NewConfig()))
228+
mgrOptions := ctrl.Options{
229+
Scheme: core.Scheme,
230+
MapperProvider: func(_ *rest.Config, _ *http.Client) (meta.RESTMapper, error) {
231+
return mapper, nil
232+
},
233+
BaseContext: func() context.Context {
234+
return signalCtx
235+
},
236+
}
237+
// For Namespaced Reconcilers, set the default namespace to watch.
238+
// Otherwise, all namespaced informers will watch at the cluster-scope.
239+
// This prevents Namespaced Reconcilers from needing cluster-scoped read
240+
// permissions.
241+
if opts.ReconcilerScope != declared.RootScope {
242+
mgrOptions.Cache.DefaultNamespaces = map[string]cache.Config{
243+
string(opts.ReconcilerScope): {},
244+
}
245+
}
246+
mgr, err := ctrl.NewManager(cfgForWatch, mgrOptions)
247+
if err != nil {
248+
klog.Fatalf("Instantiating Controller Manager: %v", err)
249+
}
250+
251+
crdController := controllers.NewCRDController(mgr.GetCache(),
252+
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD"))
253+
if err := crdController.Register(mgr); err != nil {
254+
klog.Fatalf("Instantiating CRD Controller: %v", err)
255+
}
256+
222257
conflictHandler := conflict.NewHandler()
223258
fightHandler := fight.NewHandler()
224259

225-
rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, decls, opts.NumWorkers)
260+
rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, crdController, decls, opts.NumWorkers)
226261
if err != nil {
227262
klog.Fatalf("Instantiating Remediator: %v", err)
228263
}
@@ -303,34 +338,6 @@ func Run(opts Options) {
303338
parser = parse.NewNamespaceRunner(parseOpts)
304339
}
305340

306-
// Start listening to signals
307-
signalCtx := signals.SetupSignalHandler()
308-
309-
// Create the ControllerManager
310-
ctrl.SetLogger(textlogger.NewLogger(textlogger.NewConfig()))
311-
mgrOptions := ctrl.Options{
312-
Scheme: core.Scheme,
313-
MapperProvider: func(_ *rest.Config, _ *http.Client) (meta.RESTMapper, error) {
314-
return mapper, nil
315-
},
316-
BaseContext: func() context.Context {
317-
return signalCtx
318-
},
319-
}
320-
// For Namespaced Reconcilers, set the default namespace to watch.
321-
// Otherwise, all namespaced informers will watch at the cluster-scope.
322-
// This prevents Namespaced Reconcilers from needing cluster-scoped read
323-
// permissions.
324-
if opts.ReconcilerScope != declared.RootScope {
325-
mgrOptions.Cache.DefaultNamespaces = map[string]cache.Config{
326-
string(opts.ReconcilerScope): {},
327-
}
328-
}
329-
mgr, err := ctrl.NewManager(cfgForWatch, mgrOptions)
330-
if err != nil {
331-
klog.Fatalf("Instantiating Controller Manager: %v", err)
332-
}
333-
334341
// This cancelFunc will be used by the Finalizer to stop all the other
335342
// controllers (Parser & Remediator).
336343
ctx, stopControllers := context.WithCancel(signalCtx)

pkg/reconcilermanager/controllers/build_names.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,25 @@ import (
2222
)
2323

2424
const (
25+
// RepoSyncClusterScopeClusterRoleName is the name of the ClusterRole with
26+
// cluster-scoped read permissions for the namespace reconciler.
27+
// e.g. configsync.gke.io:ns-reconciler:cluster-scope
28+
RepoSyncClusterScopeClusterRoleName = configsync.GroupName + ":" + core.NsReconcilerPrefix + ":cluster-scope"
2529
// RepoSyncBaseClusterRoleName is the namespace reconciler permissions name.
2630
// e.g. configsync.gke.io:ns-reconciler
2731
RepoSyncBaseClusterRoleName = configsync.GroupName + ":" + core.NsReconcilerPrefix
2832
// RootSyncBaseClusterRoleName is the root reconciler base ClusterRole name.
2933
// e.g. configsync.gke.io:root-reconciler
3034
RootSyncBaseClusterRoleName = configsync.GroupName + ":" + core.RootReconcilerPrefix
35+
// RepoSyncClusterScopeClusterRoleBindingName is the name of the default
36+
// ClusterRoleBinding created for RepoSync objects. This contains basic
37+
// cluster-scoped permissions for RepoSync reconcilers
38+
// (e.g. CustomResourceDefinition watch).
39+
RepoSyncClusterScopeClusterRoleBindingName = RepoSyncClusterScopeClusterRoleName
3140
// RepoSyncBaseRoleBindingName is the name of the default RoleBinding created
32-
// for RepoSync objects. This contains basic permissions for RepoSync reconcilers
33-
//(e.g. RepoSync status update).
41+
// for RepoSync objects. This contains basic namespace-scoped permissions
42+
// for RepoSync reconcilers
43+
// (e.g. RepoSync status update).
3444
RepoSyncBaseRoleBindingName = RepoSyncBaseClusterRoleName
3545
// RootSyncLegacyClusterRoleBindingName is the name of the legacy ClusterRoleBinding created
3646
// for RootSync objects. It is always bound to cluster-admin.

pkg/reconcilermanager/controllers/crd_controller.go

Lines changed: 66 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,90 @@ import (
2222
"github.com/go-logr/logr"
2323
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2424
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2526
controllerruntime "sigs.k8s.io/controller-runtime"
26-
"sigs.k8s.io/controller-runtime/pkg/builder"
27-
"sigs.k8s.io/controller-runtime/pkg/client"
28-
"sigs.k8s.io/controller-runtime/pkg/event"
29-
"sigs.k8s.io/controller-runtime/pkg/predicate"
27+
"sigs.k8s.io/controller-runtime/pkg/cache"
3028
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3129
)
3230

3331
// CRDHandler is called by the CRDReconciler to handle establishment of a CRD.
34-
type CRDHandler func() error
32+
type CRDHandler func(ctx context.Context, established bool) error
3533

36-
var _ reconcile.Reconciler = &CRDReconciler{}
34+
// CRDWatcher allows callers to add CRDHandlers for specific CRDs by name.
35+
type CRDWatcher interface {
36+
SetCRDHandler(crdName string, handler CRDHandler)
37+
RemoveCRDHandler(crdName string)
38+
}
39+
40+
var _ reconcile.Reconciler = &CRDController{}
41+
var _ CRDWatcher = &CRDController{}
3742

38-
// CRDReconciler watches CRDs and calls handlers once they are established.
39-
type CRDReconciler struct {
43+
// CRDController watches CRDs and calls handlers once they are established.
44+
type CRDController struct {
4045
loggingController
46+
cache cache.Cache
4147

42-
registerLock sync.Mutex
43-
handlers map[string]CRDHandler
44-
handledCRDs map[string]struct{}
48+
registerLock sync.Mutex
49+
handlers map[string]CRDHandler
50+
handledCRDState map[string]bool
4551
}
4652

47-
// NewCRDReconciler constructs a new CRDReconciler.
48-
func NewCRDReconciler(log logr.Logger) *CRDReconciler {
49-
return &CRDReconciler{
53+
// NewCRDController constructs a new CRDReconciler.
54+
func NewCRDController(cache cache.Cache, log logr.Logger) *CRDController {
55+
return &CRDController{
5056
loggingController: loggingController{
5157
log: log,
5258
},
53-
handlers: make(map[string]CRDHandler),
54-
handledCRDs: make(map[string]struct{}),
59+
cache: cache,
60+
handlers: make(map[string]CRDHandler),
61+
handledCRDState: make(map[string]bool),
5562
}
5663
}
5764

5865
// SetCRDHandler adds an handler for the specified CRD.
5966
// The handler will be called when the CRD becomes established.
6067
// If the handler errors, it will be retried with backoff until success.
61-
// One the handler succeeds, it will not be called again, unless SetCRDHandler
62-
// is called again.
63-
func (r *CRDReconciler) SetCRDHandler(crdName string, crdHandler CRDHandler) {
68+
// One the handler succeeds as established, it will not be called again, unless
69+
// SetCRDHandler is called again or the CRD becomes unestablished or is deleted.
70+
func (r *CRDController) SetCRDHandler(crdName string, crdHandler CRDHandler) {
6471
r.registerLock.Lock()
6572
defer r.registerLock.Unlock()
6673

6774
r.handlers[crdName] = crdHandler
68-
delete(r.handledCRDs, crdName)
75+
delete(r.handledCRDState, crdName)
76+
}
77+
78+
// RemoveCRDHandler removes the handler for the specified CRD.
79+
func (r *CRDController) RemoveCRDHandler(crdName string) {
80+
r.registerLock.Lock()
81+
defer r.registerLock.Unlock()
82+
83+
delete(r.handlers, crdName)
84+
delete(r.handledCRDState, crdName)
6985
}
7086

7187
// Reconcile the otel ConfigMap and update the Deployment annotation.
72-
func (r *CRDReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
88+
func (r *CRDController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
7389
crdName := req.Name
7490
ctx = r.setLoggerValues(ctx, "crd", crdName)
7591

92+
// Established if CRD exists and .status.conditions[type="Established"].status = "True"
93+
var established bool
94+
crdObj := &apiextensionsv1.CustomResourceDefinition{}
95+
if err := r.cache.Get(ctx, req.NamespacedName, crdObj); err != nil {
96+
switch {
97+
// Should never run into NoMatchFound, since CRD is a built-in resource.
98+
case apierrors.IsNotFound(err):
99+
established = false
100+
default:
101+
// Retry with backoff
102+
return reconcile.Result{},
103+
fmt.Errorf("getting CRD from cache: %s: %w", crdName, err)
104+
}
105+
} else {
106+
established = crdIsEstablished(crdObj)
107+
}
108+
76109
r.registerLock.Lock()
77110
defer r.registerLock.Unlock()
78111

@@ -81,57 +114,35 @@ func (r *CRDReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
81114
// No handler for this CRD
82115
return reconcile.Result{}, nil
83116
}
84-
if _, handled := r.handledCRDs[crdName]; handled {
85-
// Already handled
86-
return reconcile.Result{}, nil
117+
if lastHandledState, found := r.handledCRDState[crdName]; found {
118+
if lastHandledState == established {
119+
// Already handled latest state
120+
return reconcile.Result{}, nil
121+
}
87122
}
88123

89-
r.logger(ctx).V(3).Info("reconciling CRD", "crd", crdName)
124+
r.logger(ctx).V(3).Info("reconciling CRD", "crd", crdName, "established", established)
90125

91-
if err := handler(); err != nil {
126+
if err := handler(ctx, established); err != nil {
92127
// Retry with backoff
93128
return reconcile.Result{},
94129
fmt.Errorf("reconciling CRD %s: %w", crdName, err)
95130
}
96-
// Mark CRD as handled
97-
r.handledCRDs[crdName] = struct{}{}
131+
// Record latest handled CRD state
132+
r.handledCRDState[crdName] = established
98133

99-
r.logger(ctx).V(3).Info("reconciling CRD successful", "crd", crdName)
134+
r.logger(ctx).V(3).Info("reconciling CRD successful", "crd", crdName, "established", established)
100135

101136
return controllerruntime.Result{}, nil
102137
}
103138

104139
// Register the CRD controller with reconciler-manager.
105-
func (r *CRDReconciler) Register(mgr controllerruntime.Manager) error {
140+
func (r *CRDController) Register(mgr controllerruntime.Manager) error {
106141
return controllerruntime.NewControllerManagedBy(mgr).
107-
For(&apiextensionsv1.CustomResourceDefinition{},
108-
builder.WithPredicates(
109-
ignoreDeletesPredicate(),
110-
crdIsEstablishedPredicate())).
142+
For(&apiextensionsv1.CustomResourceDefinition{}).
111143
Complete(r)
112144
}
113145

114-
// ignoreDeletesPredicate returns a predicate that handles CREATE, UPDATE, and
115-
// GENERIC events, but not DELETE events.
116-
func ignoreDeletesPredicate() predicate.Predicate {
117-
return predicate.Funcs{
118-
DeleteFunc: func(_ event.DeleteEvent) bool {
119-
return false
120-
},
121-
}
122-
}
123-
124-
// crdIsEstablishedPredicate returns a predicate that only processes events for
125-
// established CRDs.
126-
func crdIsEstablishedPredicate() predicate.Predicate {
127-
return predicate.NewPredicateFuncs(func(obj client.Object) bool {
128-
if crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition); ok {
129-
return crdIsEstablished(crd)
130-
}
131-
return false
132-
})
133-
}
134-
135146
// crdIsEstablished returns true if the given CRD is established on the cluster,
136147
// which indicates if discovery knows about it yet. For more info see
137148
// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#create-a-customresourcedefinition

0 commit comments

Comments
 (0)