diff --git a/cmd/api-syncagent/main.go b/cmd/api-syncagent/main.go index 8fbca5c..c71f07f 100644 --- a/cmd/api-syncagent/main.go +++ b/cmd/api-syncagent/main.go @@ -138,7 +138,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error { return fmt.Errorf("failed to add apiexport controller: %w", err) } - if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector); err != nil { + if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace); err != nil { return fmt.Errorf("failed to add syncmanager controller: %w", err) } diff --git a/internal/controller/sync/controller.go b/internal/controller/sync/controller.go index 049dfc1..e788ea5 100644 --- a/internal/controller/sync/controller.go +++ b/internal/controller/sync/controller.go @@ -63,6 +63,7 @@ func Create( pubRes *syncagentv1alpha1.PublishedResource, discoveryClient *discovery.Client, apiExportName string, + stateNamespace string, log *zap.SugaredLogger, numWorkers int, ) (controller.Controller, error) { @@ -86,7 +87,7 @@ func Create( // create the syncer that holds the meat&potatoes of the synchronization logic mutator := mutation.NewMutator(nil) // pubRes.Spec.Mutation - syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator) + syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace) if err != nil { return nil, fmt.Errorf("failed to create syncer: %w", err) } diff --git a/internal/controller/syncmanager/controller.go b/internal/controller/syncmanager/controller.go index e99d3b4..c590422 100644 --- a/internal/controller/syncmanager/controller.go +++ b/internal/controller/syncmanager/controller.go @@ -69,6 +69,7 @@ type Reconciler struct { recorder record.EventRecorder discoveryClient *discovery.Client prFilter labels.Selector + stateNamespace string apiExport *kcpdevv1alpha1.APIExport @@ -93,6 +94,7 @@ func Add( log *zap.SugaredLogger, apiExport *kcpdevv1alpha1.APIExport, prFilter labels.Selector, + stateNamespace string, ) error { reconciler := &Reconciler{ ctx: ctx, @@ -105,6 +107,7 @@ func Add( syncWorkers: map[string]lifecycle.Controller{}, discoveryClient: discovery.NewClient(localManager.GetClient()), prFilter: prFilter, + stateNamespace: stateNamespace, } _, err := builder.ControllerManagedBy(localManager). @@ -276,6 +279,7 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared &pubRes, r.discoveryClient, r.apiExport.Name, + r.stateNamespace, r.log, numSyncWorkers, ) diff --git a/internal/sync/state_store.go b/internal/sync/state_store.go index 680115f..32a5c6e 100644 --- a/internal/sync/state_store.go +++ b/internal/sync/state_store.go @@ -43,11 +43,15 @@ type objectStateStore struct { backend backend } -func newObjectStateStore(primaryObject, stateCluster syncSide) ObjectStateStore { - kubernetes := newKubernetesBackend(primaryObject, stateCluster) - +func newObjectStateStore(backend backend) ObjectStateStore { return &objectStateStore{ - backend: kubernetes, + backend: backend, + } +} + +func newKubernetesStateStoreCreator(namespace string) newObjectStateStoreFunc { + return func(primaryObject, stateCluster syncSide) ObjectStateStore { + return newObjectStateStore(newKubernetesBackend(namespace, primaryObject, stateCluster)) } } @@ -124,7 +128,7 @@ func hashObject(obj *unstructured.Unstructured) string { return hex.EncodeToString(hash.Sum(nil)) } -func newKubernetesBackend(primaryObject, stateCluster syncSide) *kubernetesBackend { +func newKubernetesBackend(namespace string, primaryObject, stateCluster syncSide) *kubernetesBackend { keyHash := hashObject(primaryObject.object) secretLabels := newObjectKey(primaryObject.object, primaryObject.clusterName).Labels() @@ -134,7 +138,7 @@ func newKubernetesBackend(primaryObject, stateCluster syncSide) *kubernetesBacke secretName: types.NamespacedName{ // trim hash down; 20 was chosen at random Name: fmt.Sprintf("obj-state-%s-%s", primaryObject.clusterName, keyHash[:20]), - Namespace: "kcp-system", + Namespace: namespace, }, labels: secretLabels, stateCluster: stateCluster, diff --git a/internal/sync/state_store_test.go b/internal/sync/state_store_test.go index e6d6493..0291808 100644 --- a/internal/sync/state_store_test.go +++ b/internal/sync/state_store_test.go @@ -38,6 +38,7 @@ func TestStateStoreBasics(t *testing.T) { serviceClusterClient := buildFakeClient() ctx := context.Background() + stateNamespace := "kcp-system" primaryObjectSide := syncSide{ object: primaryObject, @@ -48,7 +49,8 @@ func TestStateStoreBasics(t *testing.T) { client: serviceClusterClient, } - store := newObjectStateStore(primaryObjectSide, stateSide) + storeCreator := newKubernetesStateStoreCreator(stateNamespace) + store := storeCreator(primaryObjectSide, stateSide) /////////////////////////////////////// // get nil from empty store diff --git a/internal/sync/syncer.go b/internal/sync/syncer.go index 17ef97f..361c224 100644 --- a/internal/sync/syncer.go +++ b/internal/sync/syncer.go @@ -59,6 +59,7 @@ func NewResourceSyncer( localCRD *apiextensionsv1.CustomResourceDefinition, remoteAPIGroup string, mutator mutation.Mutator, + stateNamespace string, ) (*ResourceSyncer, error) { // create a dummy that represents the type used on the local service cluster localGVK := projection.PublishedResourceSourceGVK(pubRes) @@ -100,7 +101,7 @@ func NewResourceSyncer( subresources: subresources, destDummy: localDummy, mutator: mutator, - newObjectStateStore: newObjectStateStore, + newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace), }, nil } diff --git a/internal/sync/syncer_test.go b/internal/sync/syncer_test.go index 56a92ea..375b5a3 100644 --- a/internal/sync/syncer_test.go +++ b/internal/sync/syncer_test.go @@ -792,6 +792,8 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { }, } + const stateNamespace = "kcp-system" + for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { localClient := buildFakeClient(testcase.localObject) @@ -806,6 +808,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { testcase.localCRD, testcase.remoteAPIGroup, nil, + stateNamespace, ) if err != nil { t.Fatalf("Failed to create syncer: %v", err) @@ -820,7 +823,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { syncer.newObjectStateStore = func(primaryObject, stateCluster syncSide) ObjectStateStore { // .Process() is called multiple times, but we want the state to persist between reconciles. if backend == nil { - backend = newKubernetesBackend(primaryObject, stateCluster) + backend = newKubernetesBackend(stateNamespace, primaryObject, stateCluster) if testcase.existingState != "" { if err := backend.Put(testcase.remoteObject, clusterName.String(), []byte(testcase.existingState)); err != nil { t.Fatalf("Failed to prime state store: %v", err) @@ -1086,6 +1089,8 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { }, } + const stateNamespace = "kcp-system" + for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { localClient := buildFakeClientWithStatus(testcase.localObject) @@ -1100,6 +1105,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { testcase.localCRD, testcase.remoteAPIGroup, nil, + stateNamespace, ) if err != nil { t.Fatalf("Failed to create syncer: %v", err) @@ -1114,7 +1120,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { syncer.newObjectStateStore = func(primaryObject, stateCluster syncSide) ObjectStateStore { // .Process() is called multiple times, but we want the state to persist between reconciles. if backend == nil { - backend = newKubernetesBackend(primaryObject, stateCluster) + backend = newKubernetesBackend(stateNamespace, primaryObject, stateCluster) if testcase.existingState != "" { if err := backend.Put(testcase.remoteObject, clusterName.String(), []byte(testcase.existingState)); err != nil { t.Fatalf("Failed to prime state store: %v", err)