Skip to content

Commit d784969

Browse files
authored
Make reconcilers multicluster aware (#1215)
* Make reconcilers multicluster aware * Fix linter issues * Re-run generate for license header
1 parent a15aa2a commit d784969

23 files changed

+524
-420
lines changed

operator/cmd/run/run.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,12 @@ func Run(
397397
if v2Controllers {
398398
// Redpanda Reconciler
399399
if err := (&redpandacontrollers.RedpandaReconciler{
400-
KubeConfig: mgr.GetConfig(),
401-
Client: mgr.GetClient(),
402-
EventRecorder: mgr.GetEventRecorderFor("RedpandaReconciler"),
403-
LifecycleClient: lifecycle.NewResourceClient(mgr, lifecycle.V2ResourceManagers(redpandaImage, sidecarImage, cloudSecrets)),
400+
Manager: mcmanager,
401+
LifecycleClient: lifecycle.NewResourceClient(mcmanager, lifecycle.V2ResourceManagers(redpandaImage, sidecarImage, cloudSecrets)),
404402
ClientFactory: factory,
405403
CloudSecretsExpander: cloudExpander,
406404
UseNodePools: opts.enableV2NodepoolController,
407-
}).SetupWithManager(ctx, mgr); err != nil {
405+
}).SetupWithManager(ctx, mcmanager); err != nil {
408406
setupLog.Error(err, "unable to create controller", "controller", "Redpanda")
409407
return err
410408
}
@@ -415,8 +413,8 @@ func Run(
415413
// NodePool Reconciler
416414
if opts.enableV2NodepoolController {
417415
if err := (&redpandacontrollers.NodePoolReconciler{
418-
Client: mgr.GetClient(),
419-
}).SetupWithManager(ctx, mgr); err != nil {
416+
Manager: mcmanager,
417+
}).SetupWithManager(ctx, mcmanager); err != nil {
420418
setupLog.Error(err, "unable to create controller", "controller", "NodePool")
421419
return err
422420
}
@@ -438,7 +436,7 @@ func Run(
438436
return err
439437
}
440438

441-
if err := (&consolecontroller.Controller{Ctl: ctl}).SetupWithManager(ctx, mgr); err != nil {
439+
if err := (&consolecontroller.Controller{Ctl: ctl}).SetupWithManager(ctx, mcmanager); err != nil {
442440
setupLog.Error(err, "unable to create controller", "controller", "Console")
443441
return err
444442
}

operator/internal/controller/console/controller.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ import (
2626
ctrl "sigs.k8s.io/controller-runtime"
2727
"sigs.k8s.io/controller-runtime/pkg/client"
2828
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
29+
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
30+
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
2931

3032
"github.com/redpanda-data/redpanda-operator/charts/console/v3"
3133
redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
3234
"github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2/conversion"
3335
"github.com/redpanda-data/redpanda-operator/operator/internal/controller"
3436
"github.com/redpanda-data/redpanda-operator/operator/pkg/functional"
3537
"github.com/redpanda-data/redpanda-operator/pkg/kube"
38+
"github.com/redpanda-data/redpanda-operator/pkg/multicluster"
3639
"github.com/redpanda-data/redpanda-operator/pkg/otelutil/log"
3740
)
3841

@@ -58,35 +61,37 @@ type Controller struct {
5861
rng *rand.Rand
5962
}
6063

61-
func (c *Controller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
64+
func (c *Controller) SetupWithManager(ctx context.Context, mgr multicluster.Manager) error {
6265
// If rng is not set for testing, create and seed a new one.
6366
if c.rng == nil {
6467
// TODO: Weak RNG is probably acceptable here but best to doublecheck
6568
c.rng = rand.New(rand.NewSource(time.Now().UnixMicro())) //nolint:gosec
6669
}
6770

68-
builder := ctrl.NewControllerManagedBy(mgr)
71+
builder := mcbuilder.ControllerManagedBy(mgr).
72+
For(&redpandav1alpha2.Console{}, mcbuilder.WithEngageWithLocalCluster(true), mcbuilder.WithEngageWithProviderClusters(true))
6973

7074
// NB: As of writing, all console types are namespace scoped.
7175
for _, t := range console.Types() {
72-
builder = builder.Owns(t)
76+
builder = builder.Owns(t, mcbuilder.WithEngageWithLocalCluster(true), mcbuilder.WithEngageWithProviderClusters(true))
7377
}
7478

75-
eventHandler, err := controller.RegisterClusterSourceIndex(ctx, mgr, "console", &redpandav1alpha2.Console{}, &redpandav1alpha2.ConsoleList{})
76-
if err != nil {
77-
return err
78-
}
79+
for _, clusterName := range mgr.GetClusterNames() {
80+
eventHandler, err := controller.RegisterClusterSourceIndex(ctx, mgr, "console", clusterName, &redpandav1alpha2.Console{}, &redpandav1alpha2.ConsoleList{})
81+
if err != nil {
82+
return err
83+
}
7984

80-
return builder.
81-
For(&redpandav1alpha2.Console{}).
8285
// Configure a watch on redpandas using controller-runtime's indexing.
8386
// If a redpanda is updated, any console's referring to it will be
8487
// re-reconciled.
85-
Watches(&redpandav1alpha2.Redpanda{}, eventHandler).
86-
Complete(c)
88+
builder.Watches(&redpandav1alpha2.Redpanda{}, eventHandler, controller.WatchOptions(clusterName)...)
89+
}
90+
91+
return builder.Complete(c)
8792
}
8893

89-
func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
94+
func (c *Controller) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) {
9095
cr, err := kube.Get[redpandav1alpha2.Console](ctx, c.Ctl, req.NamespacedName)
9196
if err != nil {
9297
if apierrors.IsNotFound(err) {

operator/internal/controller/console/controller_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"k8s.io/utils/ptr"
2727
ctrl "sigs.k8s.io/controller-runtime"
2828
"sigs.k8s.io/controller-runtime/pkg/client"
29+
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
30+
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
2931
"sigs.k8s.io/yaml"
3032

3133
consolechart "github.com/redpanda-data/redpanda-operator/charts/console/v3"
@@ -168,7 +170,9 @@ func TestController(t *testing.T) {
168170

169171
// Reconcile the console a few times to ensure determinism.
170172
for range 3 {
171-
_, err = consoleCtrl.Reconcile(t.Context(), ctrl.Request{NamespacedName: kube.AsKey(console)})
173+
req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: kube.AsKey(console)}, ClusterName: mcmanager.LocalCluster}
174+
175+
_, err = consoleCtrl.Reconcile(t.Context(), req)
172176
require.NoError(t, err)
173177

174178
// Get updated console status
@@ -203,7 +207,9 @@ func TestController(t *testing.T) {
203207

204208
// Reconcile the deletion a few times.
205209
for range 3 {
206-
_, err = consoleCtrl.Reconcile(t.Context(), ctrl.Request{NamespacedName: kube.AsKey(console)})
210+
req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: kube.AsKey(console)}, ClusterName: mcmanager.LocalCluster}
211+
212+
_, err = consoleCtrl.Reconcile(t.Context(), req)
207213
require.NoError(t, err)
208214
}
209215

operator/internal/controller/index.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ import (
1616

1717
"k8s.io/apimachinery/pkg/fields"
1818
"k8s.io/apimachinery/pkg/types"
19-
ctrl "sigs.k8s.io/controller-runtime"
2019
"sigs.k8s.io/controller-runtime/pkg/client"
21-
"sigs.k8s.io/controller-runtime/pkg/handler"
2220
"sigs.k8s.io/controller-runtime/pkg/reconcile"
21+
mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler"
2322

2423
redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
24+
"github.com/redpanda-data/redpanda-operator/pkg/multicluster"
2525
)
2626

2727
type clientList[T client.Object] interface {
@@ -33,24 +33,32 @@ func clusterReferenceIndexName(name string) string {
3333
return fmt.Sprintf("__%s_referencing_cluster", name)
3434
}
3535

36-
func RegisterClusterSourceIndex[T redpandav1alpha2.ClusterReferencingObject, U clientList[T]](ctx context.Context, mgr ctrl.Manager, name string, o T, l U) (handler.EventHandler, error) {
36+
func RegisterClusterSourceIndex[T redpandav1alpha2.ClusterReferencingObject, U clientList[T]](ctx context.Context, mgr multicluster.Manager, name, clusterName string, o T, l U) (mchandler.EventHandlerFunc, error) {
3737
indexName := clusterReferenceIndexName(name)
38-
if err := mgr.GetFieldIndexer().IndexField(ctx, o, indexName, indexByClusterSource(func(cr *redpandav1alpha2.ClusterRef) bool {
38+
cluster, err := mgr.GetCluster(ctx, clusterName)
39+
if err != nil {
40+
return nil, err
41+
}
42+
if err := cluster.GetFieldIndexer().IndexField(ctx, o, indexName, indexByClusterSource(func(cr *redpandav1alpha2.ClusterRef) bool {
3943
return cr.IsV2()
4044
})); err != nil {
4145
return nil, err
4246
}
43-
return enqueueFromSourceCluster(mgr, name, l), nil
47+
return enqueueFromSourceCluster(mgr, name, clusterName, l), nil
4448
}
4549

46-
func RegisterV1ClusterSourceIndex[T redpandav1alpha2.ClusterReferencingObject, U clientList[T]](ctx context.Context, mgr ctrl.Manager, name string, o T, l U) (handler.EventHandler, error) {
50+
func RegisterV1ClusterSourceIndex[T redpandav1alpha2.ClusterReferencingObject, U clientList[T]](ctx context.Context, mgr multicluster.Manager, name, clusterName string, o T, l U) (mchandler.EventHandlerFunc, error) {
4751
indexName := clusterReferenceIndexName(name)
48-
if err := mgr.GetFieldIndexer().IndexField(ctx, o, indexName, indexByClusterSource(func(cr *redpandav1alpha2.ClusterRef) bool {
52+
cluster, err := mgr.GetCluster(ctx, clusterName)
53+
if err != nil {
54+
return nil, err
55+
}
56+
if err := cluster.GetFieldIndexer().IndexField(ctx, o, indexName, indexByClusterSource(func(cr *redpandav1alpha2.ClusterRef) bool {
4957
return cr.IsV1()
5058
})); err != nil {
5159
return nil, err
5260
}
53-
return enqueueFromSourceCluster(mgr, name, l), nil
61+
return enqueueFromSourceCluster(mgr, name, clusterName, l), nil
5462
}
5563

5664
func indexByClusterSource(checkRef func(*redpandav1alpha2.ClusterRef) bool) func(o client.Object) []string {
@@ -97,10 +105,15 @@ func sourceClusters[T client.Object, U clientList[T]](ctx context.Context, c cli
97105
return requests, nil
98106
}
99107

100-
func enqueueFromSourceCluster[T client.Object, U clientList[T]](mgr ctrl.Manager, name string, l U) handler.EventHandler {
101-
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
108+
func enqueueFromSourceCluster[T client.Object, U clientList[T]](mgr multicluster.Manager, name string, clusterName string, l U) mchandler.EventHandlerFunc {
109+
return mchandler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
110+
cluster, err := mgr.GetCluster(ctx, clusterName)
111+
if err != nil {
112+
mgr.GetLogger().V(1).Info(fmt.Sprintf("possibly skipping %s reconciliation due to failure to fetch %s associated with cluster", name, name), "error", err)
113+
return nil
114+
}
102115
list := reflect.New(reflect.TypeOf(l).Elem()).Interface().(U)
103-
requests, err := sourceClusters(ctx, mgr.GetClient(), list, name, client.ObjectKeyFromObject(o))
116+
requests, err := sourceClusters(ctx, cluster.GetClient(), list, name, client.ObjectKeyFromObject(o))
104117
if err != nil {
105118
mgr.GetLogger().V(1).Info(fmt.Sprintf("possibly skipping %s reconciliation due to failure to fetch %s associated with cluster", name, name), "error", err)
106119
return nil

operator/internal/controller/redpanda/nodepool_controller.go

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"context"
1414
"reflect"
1515
"strconv"
16+
"time"
1617

1718
"go.opentelemetry.io/otel/attribute"
1819
appsv1 "k8s.io/api/apps/v1"
@@ -22,15 +23,18 @@ import (
2223
ctrl "sigs.k8s.io/controller-runtime"
2324
"sigs.k8s.io/controller-runtime/pkg/client"
2425
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
25-
"sigs.k8s.io/controller-runtime/pkg/handler"
2626
"sigs.k8s.io/controller-runtime/pkg/reconcile"
27+
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
28+
mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler"
29+
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
2730

2831
"github.com/redpanda-data/redpanda-operator/charts/redpanda/v25"
2932
redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
3033
"github.com/redpanda-data/redpanda-operator/operator/internal/controller"
3134
"github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle"
3235
"github.com/redpanda-data/redpanda-operator/operator/internal/statuses"
3336
"github.com/redpanda-data/redpanda-operator/operator/pkg/feature"
37+
"github.com/redpanda-data/redpanda-operator/pkg/multicluster"
3438
"github.com/redpanda-data/redpanda-operator/pkg/otelutil/log"
3539
"github.com/redpanda-data/redpanda-operator/pkg/otelutil/otelkube"
3640
"github.com/redpanda-data/redpanda-operator/pkg/otelutil/trace"
@@ -45,20 +49,14 @@ import (
4549
// NodePoolReconciler reconciles a NodePool object. This reconciler in particular should only update status
4650
// fields and finalizers on the NodePool objects, rendering of NodePools takes place within the RedpandaReconciler.
4751
type NodePoolReconciler struct {
48-
Client client.Client
52+
Manager multicluster.Manager
4953
}
5054

5155
// SetupWithManager sets up the controller with the Manager.
52-
func (r *NodePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
53-
enqueueNodePoolFromCluster, err := controller.RegisterClusterSourceIndex(ctx, mgr, "pool", &redpandav1alpha2.NodePool{}, &redpandav1alpha2.NodePoolList{})
54-
if err != nil {
55-
return err
56-
}
57-
58-
return ctrl.NewControllerManagedBy(mgr).
59-
For(&redpandav1alpha2.NodePool{}).
60-
Watches(&redpandav1alpha2.Redpanda{}, enqueueNodePoolFromCluster).
61-
Watches(&appsv1.StatefulSet{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
56+
func (r *NodePoolReconciler) SetupWithManager(ctx context.Context, mgr multicluster.Manager) error {
57+
builder := mcbuilder.ControllerManagedBy(mgr).
58+
For(&redpandav1alpha2.NodePool{}, mcbuilder.WithEngageWithLocalCluster(true), mcbuilder.WithEngageWithProviderClusters(true)).
59+
Watches(&appsv1.StatefulSet{}, mchandler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
6260
labels := o.GetLabels()
6361
if labels == nil {
6462
return nil
@@ -77,14 +75,40 @@ func (r *NodePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Mana
7775
Name: name,
7876
},
7977
}}
80-
})).
81-
Complete(r)
78+
}))
79+
80+
for _, clusterName := range mgr.GetClusterNames() {
81+
enqueueNodePoolFromCluster, err := controller.RegisterClusterSourceIndex(ctx, mgr, "pool", clusterName, &redpandav1alpha2.NodePool{}, &redpandav1alpha2.NodePoolList{})
82+
if err != nil {
83+
return err
84+
}
85+
86+
builder.Watches(&redpandav1alpha2.Redpanda{}, enqueueNodePoolFromCluster, controller.WatchOptions(clusterName)...)
87+
}
88+
89+
return builder.Complete(r)
8290
}
8391

8492
// Reconcile reconciles NodePool objects
85-
func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
93+
func (r *NodePoolReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (result ctrl.Result, err error) {
94+
l := log.FromContext(ctx).WithName("NodePoolReconciler.Reconcile")
95+
l.V(1).Info("Starting reconcile loop")
96+
start := time.Now()
97+
defer func() {
98+
l.V(1).Info("Finished reconciling", "elapsed", time.Since(start))
99+
}()
100+
101+
k8sCluster, err := r.Manager.GetCluster(ctx, req.ClusterName)
102+
if err != nil {
103+
l.Error(err, "unable to fetch cluster, skipping reconciliation")
104+
return ctrl.Result{}, nil
105+
}
106+
107+
k8sClient := k8sCluster.GetClient()
108+
86109
pool := &redpandav1alpha2.NodePool{}
87-
if err := r.Client.Get(ctx, req.NamespacedName, pool); err != nil {
110+
111+
if err := k8sClient.Get(ctx, req.NamespacedName, pool); err != nil {
88112
return ctrl.Result{}, client.IgnoreNotFound(err)
89113
}
90114

@@ -98,7 +122,7 @@ func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
98122

99123
if !feature.V2Managed.Get(ctx, pool) {
100124
if controllerutil.RemoveFinalizer(pool, FinalizerKey) {
101-
if err := r.Client.Update(ctx, pool); err != nil {
125+
if err := k8sClient.Update(ctx, pool); err != nil {
102126
logger.Error(err, "updating cluster finalizer")
103127
// no need to update the status at this point since the
104128
// previous update failed
@@ -111,7 +135,7 @@ func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
111135
// Examine if the object is under deletion
112136
if !pool.ObjectMeta.DeletionTimestamp.IsZero() {
113137
if controllerutil.RemoveFinalizer(pool, FinalizerKey) {
114-
if err := r.Client.Update(ctx, pool); err != nil {
138+
if err := k8sClient.Update(ctx, pool); err != nil {
115139
logger.Error(err, "updating cluster finalizer")
116140
// no need to update the status at this point since the
117141
// previous update failed
@@ -125,7 +149,7 @@ func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
125149
// If any changes are made, persist the changes and immediately requeue to
126150
// prevent any cache / resource version synchronization issues.
127151
if controllerutil.AddFinalizer(pool, FinalizerKey) || feature.SetDefaults(ctx, feature.V2Flags, pool) {
128-
if err := r.Client.Update(ctx, pool); err != nil {
152+
if err := k8sClient.Update(ctx, pool); err != nil {
129153
logger.Error(err, "updating cluster finalizer or Annotation")
130154
return ignoreConflict(err)
131155
}
@@ -134,7 +158,7 @@ func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
134158

135159
var status statuses.NodePoolStatus
136160
var statefulSets appsv1.StatefulSetList
137-
if err := r.Client.List(ctx, &statefulSets, client.MatchingLabels{
161+
if err := k8sClient.List(ctx, &statefulSets, client.MatchingLabels{
138162
lifecycle.DefaultNamespaceLabel: pool.Namespace,
139163
redpanda.NodePoolLabelName: pool.Name,
140164
}); err != nil {
@@ -191,7 +215,7 @@ func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
191215
}
192216

193217
cluster := &redpandav1alpha2.Redpanda{}
194-
if err := r.Client.Get(ctx, types.NamespacedName{Name: pool.Spec.ClusterRef.Name, Namespace: req.Namespace}, cluster); err != nil {
218+
if err := k8sClient.Get(ctx, types.NamespacedName{Name: pool.Spec.ClusterRef.Name, Namespace: req.Namespace}, cluster); err != nil {
195219
if apierrors.IsNotFound(err) {
196220
status.SetBound(statuses.NodePoolBoundReasonNotBound)
197221
} else {
@@ -204,7 +228,7 @@ func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
204228
if status.UpdateConditions(pool) ||
205229
!reflect.DeepEqual(originalPoolStatus, pool.Status.EmbeddedNodePoolStatus) ||
206230
(pool.Status.DeployedGeneration != originalPoolGeneration) {
207-
return ignoreConflict(r.Client.Status().Update(ctx, pool))
231+
return ignoreConflict(k8sClient.Status().Update(ctx, pool))
208232
}
209233

210234
return ctrl.Result{}, nil

0 commit comments

Comments
 (0)