@@ -32,7 +32,6 @@ import (
3232 "golang.org/x/sync/errgroup"
3333
3434 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3635 "k8s.io/apimachinery/pkg/runtime"
3736 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3837 "k8s.io/client-go/rest"
@@ -50,8 +49,8 @@ import (
5049 brokerv1alpha1 "github.com/platform-mesh/resource-broker/api/broker/v1alpha1"
5150 kcpacceptapi "github.com/platform-mesh/resource-broker/contrib/kcp/pkg/acceptapi"
5251 "github.com/platform-mesh/resource-broker/pkg/broker"
53- brokergeneric "github.com/platform-mesh/resource-broker/pkg/generic"
54- "github.com/platform-mesh/resource-broker/pkg/migration"
52+ genericreconciler "github.com/platform-mesh/resource-broker/pkg/broker /generic"
53+ "github.com/platform-mesh/resource-broker/pkg/broker/ migration"
5554)
5655
5756// Options are the options for creating a Broker.
@@ -134,7 +133,8 @@ func New(opts Options) (*Broker, error) { //nolint:gocyclo
134133
135134 multiProvider := multi .New (multi.Options {})
136135
137- // ------------- kcp acceptapi
136+ /////////////////////////////////////////////////////////////////////////////
137+ // AcceptAPI Controller
138138
139139 // The kcp AcceptAPI provider watches the VW of the AcceptAPI export and
140140 // produces provider clusters.
@@ -218,7 +218,8 @@ func New(opts Options) (*Broker, error) { //nolint:gocyclo
218218 return nil , fmt .Errorf ("error adding acceptapi provider to multi provider: %w" , err )
219219 }
220220
221- // ------------- migration
221+ /////////////////////////////////////////////////////////////////////////////
222+ // Migration Controllers
222223
223224 b .migrationConfigurations = make (map [metav1.GroupVersionKind ]map [metav1.GroupVersionKind ]brokerv1alpha1.MigrationConfiguration )
224225 // using the migrationScheme for both migration and migration config
@@ -248,33 +249,32 @@ func New(opts Options) (*Broker, error) { //nolint:gocyclo
248249 if err := migrationMgr .GetLocalManager ().Add (manager .RunnableFunc (migrationCluster .Start )); err != nil {
249250 return nil , fmt .Errorf ("error adding migration coordination cluster to migration manager: %w" , err )
250251 }
252+
251253 // b.managers["migration"] = migrationMgr
252- if err := mcbuilder .ControllerManagedBy (migrationMgr ).
253- Named (b .opts .Name + "-migration-configuration" ).
254- For (& brokerv1alpha1.MigrationConfiguration {}).
255- Complete (
256- migration .ConfigurationReconcilerFunc (
257- migration.ConfigurationOptions {
258- GetCluster : migrationMgr .GetCluster , // migration configurations can only come from the migration coordination cluster
259- SetMigrationConfiguration : func (from metav1.GroupVersionKind , to metav1.GroupVersionKind , config brokerv1alpha1.MigrationConfiguration ) {
260- b .lock .Lock ()
261- defer b .lock .Unlock ()
262- if _ , ok := b .migrationConfigurations [from ]; ! ok {
263- b .migrationConfigurations [from ] = make (map [metav1.GroupVersionKind ]brokerv1alpha1.MigrationConfiguration )
264- }
265- b.migrationConfigurations [from ][to ] = config
266- },
267- DeleteMigrationConfiguration : func (from metav1.GroupVersionKind , to metav1.GroupVersionKind ) {
268- b .lock .Lock ()
269- defer b .lock .Unlock ()
270- delete (b .migrationConfigurations [from ], to )
271- if len (b .migrationConfigurations [from ]) == 0 {
272- delete (b .migrationConfigurations , from )
273- }
274- },
275- }),
276- ); err != nil {
277- return nil , fmt .Errorf ("failed to create migration configuration reconciler: %w" , err )
254+
255+ migrationConfigOptions := migration.ConfigurationOptions {
256+ GetCluster : migrationMgr .GetCluster , // migration configurations can only come from the migration coordination cluster
257+ ControllerNamePrefix : b .opts .Name ,
258+ SetMigrationConfiguration : func (from metav1.GroupVersionKind , to metav1.GroupVersionKind , config brokerv1alpha1.MigrationConfiguration ) {
259+ b .lock .Lock ()
260+ defer b .lock .Unlock ()
261+ if _ , ok := b .migrationConfigurations [from ]; ! ok {
262+ b .migrationConfigurations [from ] = make (map [metav1.GroupVersionKind ]brokerv1alpha1.MigrationConfiguration )
263+ }
264+ b.migrationConfigurations [from ][to ] = config
265+ },
266+ DeleteMigrationConfiguration : func (from metav1.GroupVersionKind , to metav1.GroupVersionKind ) {
267+ b .lock .Lock ()
268+ defer b .lock .Unlock ()
269+ delete (b .migrationConfigurations [from ], to )
270+ if len (b .migrationConfigurations [from ]) == 0 {
271+ delete (b .migrationConfigurations , from )
272+ }
273+ },
274+ }
275+
276+ if err := migration .SetupConfigurationController (migrationMgr , migrationConfigOptions ); err != nil {
277+ return nil , fmt .Errorf ("failed to create migration reconciler: %w" , err )
278278 }
279279
280280 computeClient , err := client .New (b .opts .ComputeConfig , client.Options {
@@ -284,35 +284,34 @@ func New(opts Options) (*Broker, error) { //nolint:gocyclo
284284 return nil , fmt .Errorf ("error creating compute client: %w" , err )
285285 }
286286
287- if err := mcbuilder .ControllerManagedBy (migrationMgr ).
288- Named (b .opts .Name + "-migration" ).
289- For (& brokerv1alpha1.Migration {}).
290- Complete (
291- migration .MigrationReconcilerFunc (migration.MigrationOptions {
292- Compute : computeClient ,
293- GetCoordinationCluster : migrationMgr .GetCluster ,
294- GetProviderCluster : func (ctx context.Context , clusterName string ) (cluster.Cluster , error ) {
295- if ! strings .HasPrefix (clusterName , broker .ProviderPrefix ) {
296- return nil , fmt .Errorf ("cluster %q is not a provider cluster: %w" , clusterName , multicluster .ErrClusterNotFound )
297- }
298- return multiProvider .Get (ctx , clusterName )
299- },
300- GetMigrationConfiguration : func (fromGVK metav1.GroupVersionKind , toGVK metav1.GroupVersionKind ) (brokerv1alpha1.MigrationConfiguration , bool ) {
301- b .lock .RLock ()
302- defer b .lock .RUnlock ()
303- toMap , ok := b .migrationConfigurations [fromGVK ]
304- if ! ok {
305- return brokerv1alpha1.MigrationConfiguration {}, false
306- }
307- v , ok := toMap [toGVK ]
308- return v , ok
309- },
310- }),
311- ); err != nil {
287+ migrationOptions := migration.MigrationOptions {
288+ Compute : computeClient ,
289+ ControllerNamePrefix : b .opts .Name ,
290+ GetCoordinationCluster : migrationMgr .GetCluster ,
291+ GetProviderCluster : func (ctx context.Context , clusterName string ) (cluster.Cluster , error ) {
292+ if ! strings .HasPrefix (clusterName , broker .ProviderPrefix ) {
293+ return nil , fmt .Errorf ("cluster %q is not a provider cluster: %w" , clusterName , multicluster .ErrClusterNotFound )
294+ }
295+ return multiProvider .Get (ctx , clusterName )
296+ },
297+ GetMigrationConfiguration : func (fromGVK metav1.GroupVersionKind , toGVK metav1.GroupVersionKind ) (brokerv1alpha1.MigrationConfiguration , bool ) {
298+ b .lock .RLock ()
299+ defer b .lock .RUnlock ()
300+ toMap , ok := b .migrationConfigurations [fromGVK ]
301+ if ! ok {
302+ return brokerv1alpha1.MigrationConfiguration {}, false
303+ }
304+ v , ok := toMap [toGVK ]
305+ return v , ok
306+ },
307+ }
308+
309+ if err := migration .SetupController (migrationMgr , migrationOptions ); err != nil {
312310 return nil , fmt .Errorf ("failed to create migration reconciler: %w" , err )
313311 }
314312
315- // ------------- general broker
313+ /////////////////////////////////////////////////////////////////////////////
314+ // Generic Sync Controllers
316315
317316 // The BrokerAPI provider watches the VW of the BrokerAPI export and
318317 // produces consumer clusters.
@@ -340,8 +339,9 @@ func New(opts Options) (*Broker, error) { //nolint:gocyclo
340339 }
341340 b .managers ["general" ] = generalMgr
342341
343- genericOpts := brokergeneric.Options {
344- Coordination : migrationClient ,
342+ genericOpts := genericreconciler.Options {
343+ CoordinationClient : migrationClient ,
344+ ControllerNamePrefix : b .opts .Name ,
345345 GetProviderCluster : func (ctx context.Context , clusterName string ) (cluster.Cluster , error ) {
346346 if ! strings .HasPrefix (clusterName , broker .ProviderPrefix ) {
347347 return nil , fmt .Errorf ("cluster %q is not a provider cluster: %w" , clusterName , multicluster .ErrClusterNotFound )
@@ -386,12 +386,7 @@ func New(opts Options) (*Broker, error) { //nolint:gocyclo
386386 }
387387
388388 for _ , gvk := range broker .ParseKinds (b .opts .WatchKinds ) {
389- obj := & unstructured.Unstructured {}
390- obj .SetGroupVersionKind (gvk )
391- if err := mcbuilder .ControllerManagedBy (generalMgr ).
392- Named (b .opts .Name + "-generic-" + gvk .String ()).
393- For (obj ).
394- Complete (brokergeneric .ReconcileFunc (genericOpts , gvk )); err != nil {
389+ if err := genericreconciler .SetupController (generalMgr , gvk , genericOpts ); err != nil {
395390 return nil , fmt .Errorf ("failed to create generic reconciler for %v: %w" , gvk , err )
396391 }
397392 }
0 commit comments