Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/api-syncagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
return fmt.Errorf("failed to add apiexport controller: %w", err)
}

if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector); err != nil {
if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace); err != nil {
return fmt.Errorf("failed to add syncmanager controller: %w", err)
}

Expand Down
3 changes: 2 additions & 1 deletion internal/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func Create(
pubRes *syncagentv1alpha1.PublishedResource,
discoveryClient *discovery.Client,
apiExportName string,
stateNamespace string,
log *zap.SugaredLogger,
numWorkers int,
) (controller.Controller, error) {
Expand All @@ -86,7 +87,7 @@ func Create(

// create the syncer that holds the meat&potatoes of the synchronization logic
mutator := mutation.NewMutator(nil) // pubRes.Spec.Mutation
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator)
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace)
if err != nil {
return nil, fmt.Errorf("failed to create syncer: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/syncmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Reconciler struct {
recorder record.EventRecorder
discoveryClient *discovery.Client
prFilter labels.Selector
stateNamespace string

apiExport *kcpdevv1alpha1.APIExport

Expand All @@ -93,6 +94,7 @@ func Add(
log *zap.SugaredLogger,
apiExport *kcpdevv1alpha1.APIExport,
prFilter labels.Selector,
stateNamespace string,
) error {
reconciler := &Reconciler{
ctx: ctx,
Expand All @@ -105,6 +107,7 @@ func Add(
syncWorkers: map[string]lifecycle.Controller{},
discoveryClient: discovery.NewClient(localManager.GetClient()),
prFilter: prFilter,
stateNamespace: stateNamespace,
}

_, err := builder.ControllerManagedBy(localManager).
Expand Down Expand Up @@ -276,6 +279,7 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
&pubRes,
r.discoveryClient,
r.apiExport.Name,
r.stateNamespace,
r.log,
numSyncWorkers,
)
Expand Down
16 changes: 10 additions & 6 deletions internal/sync/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ type objectStateStore struct {
backend backend
}

func newObjectStateStore(primaryObject, stateCluster syncSide) ObjectStateStore {
kubernetes := newKubernetesBackend(primaryObject, stateCluster)

func newObjectStateStore(backend backend) ObjectStateStore {
return &objectStateStore{
backend: kubernetes,
backend: backend,
}
}

func newKubernetesStateStoreCreator(namespace string) newObjectStateStoreFunc {
return func(primaryObject, stateCluster syncSide) ObjectStateStore {
return newObjectStateStore(newKubernetesBackend(namespace, primaryObject, stateCluster))
}
}

Expand Down Expand Up @@ -124,7 +128,7 @@ func hashObject(obj *unstructured.Unstructured) string {
return hex.EncodeToString(hash.Sum(nil))
}

func newKubernetesBackend(primaryObject, stateCluster syncSide) *kubernetesBackend {
func newKubernetesBackend(namespace string, primaryObject, stateCluster syncSide) *kubernetesBackend {
keyHash := hashObject(primaryObject.object)

secretLabels := newObjectKey(primaryObject.object, primaryObject.clusterName).Labels()
Expand All @@ -134,7 +138,7 @@ func newKubernetesBackend(primaryObject, stateCluster syncSide) *kubernetesBacke
secretName: types.NamespacedName{
// trim hash down; 20 was chosen at random
Name: fmt.Sprintf("obj-state-%s-%s", primaryObject.clusterName, keyHash[:20]),
Namespace: "kcp-system",
Namespace: namespace,
},
labels: secretLabels,
stateCluster: stateCluster,
Expand Down
4 changes: 3 additions & 1 deletion internal/sync/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestStateStoreBasics(t *testing.T) {

serviceClusterClient := buildFakeClient()
ctx := context.Background()
stateNamespace := "kcp-system"

primaryObjectSide := syncSide{
object: primaryObject,
Expand All @@ -48,7 +49,8 @@ func TestStateStoreBasics(t *testing.T) {
client: serviceClusterClient,
}

store := newObjectStateStore(primaryObjectSide, stateSide)
storeCreator := newKubernetesStateStoreCreator(stateNamespace)
store := storeCreator(primaryObjectSide, stateSide)

///////////////////////////////////////
// get nil from empty store
Expand Down
3 changes: 2 additions & 1 deletion internal/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewResourceSyncer(
localCRD *apiextensionsv1.CustomResourceDefinition,
remoteAPIGroup string,
mutator mutation.Mutator,
stateNamespace string,
) (*ResourceSyncer, error) {
// create a dummy that represents the type used on the local service cluster
localGVK := projection.PublishedResourceSourceGVK(pubRes)
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewResourceSyncer(
subresources: subresources,
destDummy: localDummy,
mutator: mutator,
newObjectStateStore: newObjectStateStore,
newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace),
}, nil
}

Expand Down
10 changes: 8 additions & 2 deletions internal/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,8 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
},
}

const stateNamespace = "kcp-system"

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
localClient := buildFakeClient(testcase.localObject)
Expand All @@ -806,6 +808,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
testcase.localCRD,
testcase.remoteAPIGroup,
nil,
stateNamespace,
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand All @@ -820,7 +823,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
syncer.newObjectStateStore = func(primaryObject, stateCluster syncSide) ObjectStateStore {
// .Process() is called multiple times, but we want the state to persist between reconciles.
if backend == nil {
backend = newKubernetesBackend(primaryObject, stateCluster)
backend = newKubernetesBackend(stateNamespace, primaryObject, stateCluster)
if testcase.existingState != "" {
if err := backend.Put(testcase.remoteObject, clusterName.String(), []byte(testcase.existingState)); err != nil {
t.Fatalf("Failed to prime state store: %v", err)
Expand Down Expand Up @@ -1086,6 +1089,8 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
},
}

const stateNamespace = "kcp-system"

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
localClient := buildFakeClientWithStatus(testcase.localObject)
Expand All @@ -1100,6 +1105,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
testcase.localCRD,
testcase.remoteAPIGroup,
nil,
stateNamespace,
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand All @@ -1114,7 +1120,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
syncer.newObjectStateStore = func(primaryObject, stateCluster syncSide) ObjectStateStore {
// .Process() is called multiple times, but we want the state to persist between reconciles.
if backend == nil {
backend = newKubernetesBackend(primaryObject, stateCluster)
backend = newKubernetesBackend(stateNamespace, primaryObject, stateCluster)
if testcase.existingState != "" {
if err := backend.Put(testcase.remoteObject, clusterName.String(), []byte(testcase.existingState)); err != nil {
t.Fatalf("Failed to prime state store: %v", err)
Expand Down
Loading