Skip to content

Commit 7225b1e

Browse files
authored
internal/controller: add registry (#2066)
1 parent f71b974 commit 7225b1e

File tree

9 files changed

+123
-169
lines changed

9 files changed

+123
-169
lines changed

internal/controller/registry.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"go.uber.org/zap"
8+
ctrl "sigs.k8s.io/controller-runtime"
9+
"sigs.k8s.io/controller-runtime/pkg/cluster"
10+
"sigs.k8s.io/controller-runtime/pkg/predicate"
11+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
12+
13+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlas"
14+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasbackupcompliancepolicy"
15+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlascustomrole"
16+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasdatabaseuser"
17+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasdatafederation"
18+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasdeployment"
19+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasfederatedauth"
20+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasprivateendpoint"
21+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasproject"
22+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlassearchindexconfig"
23+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasstream"
24+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags"
25+
)
26+
27+
type ManagerAware interface {
28+
SetupWithManager(mgr ctrl.Manager, skipNameValidation bool) error
29+
}
30+
31+
type AkoReconciler interface {
32+
reconcile.Reconciler
33+
ManagerAware
34+
}
35+
36+
type Registry struct {
37+
predicates []predicate.Predicate
38+
deletionProtection bool
39+
independentSyncPeriod time.Duration
40+
featureFlags *featureflags.FeatureFlags
41+
42+
logger *zap.Logger
43+
reconcilers []AkoReconciler
44+
}
45+
46+
func NewRegistry(predicates []predicate.Predicate, deletionProtection bool, logger *zap.Logger, independentSyncPeriod time.Duration, featureFlags *featureflags.FeatureFlags) *Registry {
47+
return &Registry{
48+
predicates: predicates,
49+
deletionProtection: deletionProtection,
50+
logger: logger,
51+
independentSyncPeriod: independentSyncPeriod,
52+
featureFlags: featureFlags,
53+
}
54+
}
55+
56+
func (r *Registry) RegisterWithManager(mgr ctrl.Manager, skipNameValidation bool, ap atlas.Provider) error {
57+
r.registerControllers(mgr, ap)
58+
59+
for _, reconciler := range r.reconcilers {
60+
if err := reconciler.SetupWithManager(mgr, skipNameValidation); err != nil {
61+
return fmt.Errorf("failed to set up with manager: %w", err)
62+
}
63+
}
64+
return nil
65+
}
66+
67+
func (r *Registry) registerControllers(c cluster.Cluster, ap atlas.Provider) {
68+
var reconcilers []AkoReconciler
69+
reconcilers = append(reconcilers, atlasproject.NewAtlasProjectReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
70+
reconcilers = append(reconcilers, atlasdeployment.NewAtlasDeploymentReconciler(c, r.predicates, ap, r.deletionProtection, r.independentSyncPeriod, r.logger))
71+
reconcilers = append(reconcilers, atlasdatabaseuser.NewAtlasDatabaseUserReconciler(c, r.predicates, ap, r.deletionProtection, r.independentSyncPeriod, r.featureFlags, r.logger))
72+
reconcilers = append(reconcilers, atlasdatafederation.NewAtlasDataFederationReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
73+
reconcilers = append(reconcilers, atlasfederatedauth.NewAtlasFederatedAuthReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
74+
reconcilers = append(reconcilers, atlasstream.NewAtlasStreamsInstanceReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
75+
reconcilers = append(reconcilers, atlasstream.NewAtlasStreamsConnectionReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
76+
reconcilers = append(reconcilers, atlassearchindexconfig.NewAtlasSearchIndexConfigReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
77+
reconcilers = append(reconcilers, atlasbackupcompliancepolicy.NewAtlasBackupCompliancePolicyReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
78+
reconcilers = append(reconcilers, atlascustomrole.NewAtlasCustomRoleReconciler(c, r.predicates, ap, r.deletionProtection, r.independentSyncPeriod, r.logger))
79+
reconcilers = append(reconcilers, atlasprivateendpoint.NewAtlasPrivateEndpointReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
80+
r.reconcilers = reconcilers
81+
}

internal/operator/builder.go

Lines changed: 10 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,16 @@ import (
1515
ctrl "sigs.k8s.io/controller-runtime"
1616
"sigs.k8s.io/controller-runtime/pkg/cache"
1717
"sigs.k8s.io/controller-runtime/pkg/client"
18+
"sigs.k8s.io/controller-runtime/pkg/cluster"
1819
"sigs.k8s.io/controller-runtime/pkg/healthz"
1920
ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
2021
"sigs.k8s.io/controller-runtime/pkg/manager"
2122
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2223
"sigs.k8s.io/controller-runtime/pkg/predicate"
2324
"sigs.k8s.io/controller-runtime/pkg/webhook"
2425

26+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller"
2527
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlas"
26-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasbackupcompliancepolicy"
27-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlascustomrole"
28-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasdatabaseuser"
29-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasdatafederation"
30-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasdeployment"
31-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasfederatedauth"
32-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasprivateendpoint"
33-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasproject"
34-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlassearchindexconfig"
35-
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasstream"
3628
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/connectionsecret"
3729
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/watch"
3830
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags"
@@ -159,8 +151,8 @@ func (b *Builder) WithSkipNameValidation(skip bool) *Builder {
159151
return b
160152
}
161153

162-
// Build builds the controller manager and configure operator controllers
163-
func (b *Builder) Build(ctx context.Context) (manager.Manager, error) {
154+
// Build builds the cluster object and configures operator controllers
155+
func (b *Builder) Build(ctx context.Context) (cluster.Cluster, error) {
164156
mergeDefaults(b)
165157

166158
if b.independentSyncPeriod < b.minimumIndependentSyncPeriod {
@@ -186,6 +178,8 @@ func (b *Builder) Build(ctx context.Context) (manager.Manager, error) {
186178
}
187179
}
188180

181+
controllerRegistry := controller.NewRegistry(b.predicates, b.deletionProtection, b.logger, b.independentSyncPeriod, b.featureFlags)
182+
189183
mgr, err := b.managerProvider.New(
190184
b.config,
191185
ctrl.Options{
@@ -213,137 +207,16 @@ func (b *Builder) Build(ctx context.Context) (manager.Manager, error) {
213207
return nil, err
214208
}
215209

216-
if err = indexer.RegisterAll(ctx, mgr, b.logger); err != nil {
217-
return nil, fmt.Errorf("unable to create indexers: %w", err)
218-
}
219-
220210
if b.atlasProvider == nil {
221211
b.atlasProvider = atlas.NewProductionProvider(b.atlasDomain, b.apiSecret, mgr.GetClient(), nil)
222212
}
223213

224-
projectReconciler := atlasproject.NewAtlasProjectReconciler(
225-
mgr,
226-
b.predicates,
227-
b.atlasProvider,
228-
b.deletionProtection,
229-
b.logger,
230-
)
231-
if err = projectReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
232-
return nil, fmt.Errorf("unable to create controller AtlasProject: %w", err)
233-
}
234-
235-
deploymentReconciler := atlasdeployment.NewAtlasDeploymentReconciler(
236-
mgr,
237-
b.predicates,
238-
b.atlasProvider,
239-
b.deletionProtection,
240-
b.independentSyncPeriod,
241-
b.logger,
242-
)
243-
if err = deploymentReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
244-
return nil, fmt.Errorf("unable to create controller AtlasDeployment: %w", err)
245-
}
246-
247-
dbUserReconciler := atlasdatabaseuser.NewAtlasDatabaseUserReconciler(
248-
mgr,
249-
b.predicates,
250-
b.atlasProvider,
251-
b.deletionProtection,
252-
b.independentSyncPeriod,
253-
b.featureFlags,
254-
b.logger,
255-
)
256-
if err = dbUserReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
257-
return nil, fmt.Errorf("unable to create controller AtlasDatabaseUser: %w", err)
258-
}
259-
260-
dataFedReconciler := atlasdatafederation.NewAtlasDataFederationReconciler(
261-
mgr,
262-
b.predicates,
263-
b.atlasProvider,
264-
b.deletionProtection,
265-
b.logger,
266-
)
267-
if err = dataFedReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
268-
return nil, fmt.Errorf("unable to create controller AtlasDataFederation: %w", err)
269-
}
270-
271-
fedAuthReconciler := atlasfederatedauth.NewAtlasFederatedAuthReconciler(
272-
mgr,
273-
b.predicates,
274-
b.atlasProvider,
275-
b.deletionProtection,
276-
b.logger,
277-
)
278-
if err = fedAuthReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
279-
return nil, fmt.Errorf("unable to create controller AtlasFederatedAuth: %w", err)
280-
}
281-
282-
streamsInstanceReconciler := atlasstream.NewAtlasStreamsInstanceReconciler(
283-
mgr,
284-
b.predicates,
285-
b.atlasProvider,
286-
b.deletionProtection,
287-
b.logger,
288-
)
289-
if err = streamsInstanceReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
290-
return nil, fmt.Errorf("unable to create controller AtlasStreamsInstance: %w", err)
291-
}
292-
293-
streamsConnReconciler := atlasstream.NewAtlasStreamsConnectionReconciler(
294-
mgr,
295-
b.predicates,
296-
b.atlasProvider,
297-
b.deletionProtection,
298-
b.logger,
299-
)
300-
if err = streamsConnReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
301-
return nil, fmt.Errorf("unable to create controller AtlasStreamsConnection: %w", err)
302-
}
303-
304-
searchIndexConfigReconciler := atlassearchindexconfig.NewAtlasSearchIndexConfigReconciler(
305-
mgr,
306-
b.predicates,
307-
b.atlasProvider,
308-
b.deletionProtection,
309-
b.logger,
310-
)
311-
if err = searchIndexConfigReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
312-
return nil, fmt.Errorf("unable to create controller AtlasSearchIndexConfig: %w", err)
313-
}
314-
315-
bcpReconciler := atlasbackupcompliancepolicy.NewAtlasBackupCompliancePolicyReconciler(
316-
mgr,
317-
b.predicates,
318-
b.atlasProvider,
319-
b.deletionProtection,
320-
b.logger,
321-
)
322-
if err = bcpReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
323-
return nil, fmt.Errorf("unable to create controller AtlasBackupCompliancePolicy: %w", err)
324-
}
325-
326-
customRolesReconciler := atlascustomrole.NewAtlasCustomRoleReconciler(
327-
mgr,
328-
b.predicates,
329-
b.atlasProvider,
330-
b.deletionProtection,
331-
b.independentSyncPeriod,
332-
b.logger,
333-
)
334-
if err = customRolesReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
335-
return nil, fmt.Errorf("unable to create controller AtlasCustomRole: %w", err)
214+
if err := controllerRegistry.RegisterWithManager(mgr, b.skipNameValidation, b.atlasProvider); err != nil {
215+
return nil, err
336216
}
337217

338-
peReconciler := atlasprivateendpoint.NewAtlasPrivateEndpointReconciler(
339-
mgr,
340-
b.predicates,
341-
b.atlasProvider,
342-
b.deletionProtection,
343-
b.logger,
344-
)
345-
if err = peReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil {
346-
return nil, fmt.Errorf("unable to create controller AtlasPrivateEndpoint: %w", err)
218+
if err := indexer.RegisterAll(ctx, mgr, b.logger); err != nil {
219+
return nil, fmt.Errorf("unable to create indexers: %w", err)
347220
}
348221

349222
return mgr, nil

test/e2e/cache_watch_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"k8s.io/apimachinery/pkg/types"
1313
"k8s.io/apimachinery/pkg/util/sets"
1414
"sigs.k8s.io/controller-runtime/pkg/client"
15-
"sigs.k8s.io/controller-runtime/pkg/manager"
15+
"sigs.k8s.io/controller-runtime/pkg/cluster"
1616

1717
"github.com/mongodb/mongodb-atlas-kubernetes/v2/api"
1818
akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1"
@@ -44,12 +44,12 @@ var _ = Describe("Kubernetes cache watch test:", Label("cache-watch"), func() {
4444
setupSecrets(ctx, testData, namespaces)
4545
defer clearSecrets(ctx, testData, namespaces)
4646

47-
mgr, stop := setupManager(ctx, namespaces, testCase.watchNamespaces)
47+
c, stop := setupCluster(ctx, namespaces, testCase.watchNamespaces)
4848
defer stop()
4949

5050
wantToFindSet := sets.NewString(testCase.wantToFind...)
5151
By("Using the manager cache to get all secrets in all namespaces and checking the expected results", func() {
52-
cache := mgr.GetCache()
52+
cache := c.GetCache()
5353
for _, ns := range namespaces {
5454
err := cache.Get(ctx, types.NamespacedName{Name: config.DefaultOperatorGlobalKey, Namespace: ns.GetName()}, &corev1.Secret{})
5555

@@ -96,12 +96,12 @@ var _ = Describe("Kubernetes cache watch test:", Label("cache-watch"), func() {
9696
setupSecrets(ctx, testData, namespaces)
9797
defer clearSecrets(ctx, testData, namespaces)
9898

99-
mgr, stop := setupManager(ctx, namespaces, testCase.watchNamespaces)
99+
c, stop := setupCluster(ctx, namespaces, testCase.watchNamespaces)
100100
defer stop()
101101

102102
wantToFindSet := sets.NewString(testCase.wantToFind...)
103103
By("Using the manager cache to list all secrets in all namespaces and checking the expected results", func() {
104-
cache := mgr.GetCache()
104+
cache := c.GetCache()
105105
for _, ns := range namespaces {
106106
err := cache.List(ctx, &corev1.SecretList{}, &client.ListOptions{Namespace: ns.GetName()})
107107

@@ -157,7 +157,7 @@ var _ = Describe("Reconciles test:", func() {
157157
setupSecrets(ctx, testData, namespaces)
158158
defer clearSecrets(ctx, testData, namespaces)
159159

160-
_, stop := setupManager(ctx, namespaces, testCase.watchNamespaces)
160+
_, stop := setupCluster(ctx, namespaces, testCase.watchNamespaces)
161161
defer stop()
162162

163163
By("Launching an atlas project on each namespace, expect only listened namespaces to update status", func() {
@@ -286,10 +286,10 @@ func setupSecrets(ctx context.Context, testData *model.TestDataProvider, namespa
286286

287287
type stopper func()
288288

289-
func setupManager(ctx context.Context, namespaces []*corev1.Namespace, wantToWatch []string) (manager.Manager, stopper) {
289+
func setupCluster(ctx context.Context, namespaces []*corev1.Namespace, wantToWatch []string) (cluster.Cluster, stopper) {
290290
var (
291-
wg sync.WaitGroup
292-
mgr manager.Manager
291+
wg sync.WaitGroup
292+
c cluster.Cluster
293293
)
294294

295295
wantToWatchSet := sets.NewString(wantToWatch...)
@@ -315,13 +315,13 @@ func setupManager(ctx context.Context, namespaces []*corev1.Namespace, wantToWat
315315
}
316316

317317
var err error
318-
mgr, err = k8s.BuildManager(managerConfig)
318+
c, err = k8s.BuildCluster(managerConfig)
319319
Expect(err).NotTo(HaveOccurred())
320320

321321
wg.Add(1)
322322
go func(ctx context.Context) {
323323
defer wg.Done()
324-
err := mgr.Start(ctx)
324+
err := c.Start(ctx)
325325
Expect(err).NotTo(HaveOccurred())
326326
}(mgrCtx)
327327
})
@@ -330,7 +330,7 @@ func setupManager(ctx context.Context, namespaces []*corev1.Namespace, wantToWat
330330
// the first namespace should always be cache-accessible
331331
Eventually(func(g Gomega) bool {
332332
return g.Expect(
333-
mgr.GetCache().Get(ctx, types.NamespacedName{Name: namespaces[0].GetName()}, &corev1.Namespace{}),
333+
c.GetCache().Get(ctx, types.NamespacedName{Name: namespaces[0].GetName()}, &corev1.Namespace{}),
334334
).To(Succeed())
335335
}).WithTimeout(time.Minute).Should(BeTrue())
336336
})
@@ -340,7 +340,7 @@ func setupManager(ctx context.Context, namespaces []*corev1.Namespace, wantToWat
340340
wg.Wait()
341341
}
342342

343-
return mgr, stopper
343+
return c, stopper
344344
}
345345

346346
func clearSecrets(ctx context.Context, testData *model.TestDataProvider, namespaces []*corev1.Namespace) {

test/e2e/configuration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,10 @@ var _ = Describe("Configuration namespaced. Deploy deployment", Label("deploymen
159159
})
160160

161161
func mainCycle(testData *model.TestDataProvider) {
162-
mgr := actions.PrepareOperatorConfigurations(testData)
162+
r := actions.PrepareOperatorConfigurations(testData)
163163
ctx := context.Background()
164164
go func(ctx context.Context) {
165-
err := mgr.Start(ctx)
165+
err := r.Start(ctx)
166166
Expect(err).NotTo(HaveOccurred())
167167
}(ctx)
168168

0 commit comments

Comments
 (0)