diff --git a/cmd/api-syncagent/main.go b/cmd/api-syncagent/main.go index c71f07f..c3edd66 100644 --- a/cmd/api-syncagent/main.go +++ b/cmd/api-syncagent/main.go @@ -138,7 +138,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error { return fmt.Errorf("failed to add apiexport controller: %w", err) } - if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace); err != nil { + if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace, opts.AgentName); err != nil { return fmt.Errorf("failed to add syncmanager controller: %w", err) } diff --git a/docs/getting-started.md b/docs/getting-started.md index 128a338..02a461d 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -69,8 +69,12 @@ itself and a reference to the kubeconfig secret we just created. # Required: the name of the APIExport in kcp that this Sync Agent is supposed to serve. apiExportName: test.example.com -# Required: This Agent's public name, purely for informational purposes. -# If not set, defaults to the Helm release name. +# Required: This Agent's public name, used to signal ownership over locally synced objects. +# This value must be a valid Kubernetes label value, see +# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set +# for more information. +# Changing this value after the fact will make the agent ignore previously created objects, +# so beware and relabel if necessary. agentName: unique-test # Required: Name of the Kubernetes Secret that contains a "kubeconfig" key, diff --git a/internal/controller/sync/controller.go b/internal/controller/sync/controller.go index c00d3e9..51f8778 100644 --- a/internal/controller/sync/controller.go +++ b/internal/controller/sync/controller.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/kontext" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -69,6 +70,7 @@ func Create( discoveryClient *discovery.Client, apiExportName string, stateNamespace string, + agentName string, log *zap.SugaredLogger, numWorkers int, ) (controller.Controller, error) { @@ -92,7 +94,7 @@ func Create( // create the syncer that holds the meat&potatoes of the synchronization logic mutator := mutation.NewMutator(nil) // pubRes.Spec.Mutation - syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace) + syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace, agentName) if err != nil { return nil, fmt.Errorf("failed to create syncer: %w", err) } @@ -135,7 +137,12 @@ func Create( return []reconcile.Request{*req} }) - if err := c.Watch(source.Kind(localManager.GetCache(), localDummy, enqueueRemoteObjForLocalObj)); err != nil { + // only watch local objects that we own + nameFilter := predicate.NewTypedPredicateFuncs(func(u *unstructured.Unstructured) bool { + return sync.OwnedBy(u, agentName) + }) + + if err := c.Watch(source.Kind(localManager.GetCache(), localDummy, enqueueRemoteObjForLocalObj, nameFilter)); err != nil { return nil, err } diff --git a/internal/controller/syncmanager/controller.go b/internal/controller/syncmanager/controller.go index c590422..a1d963d 100644 --- a/internal/controller/syncmanager/controller.go +++ b/internal/controller/syncmanager/controller.go @@ -70,6 +70,7 @@ type Reconciler struct { discoveryClient *discovery.Client prFilter labels.Selector stateNamespace string + agentName string apiExport *kcpdevv1alpha1.APIExport @@ -95,6 +96,7 @@ func Add( apiExport *kcpdevv1alpha1.APIExport, prFilter labels.Selector, stateNamespace string, + agentName string, ) error { reconciler := &Reconciler{ ctx: ctx, @@ -108,6 +110,7 @@ func Add( discoveryClient: discovery.NewClient(localManager.GetClient()), prFilter: prFilter, stateNamespace: stateNamespace, + agentName: agentName, } _, err := builder.ControllerManagedBy(localManager). @@ -280,6 +283,7 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared r.discoveryClient, r.apiExport.Name, r.stateNamespace, + r.agentName, r.log, numSyncWorkers, ) diff --git a/internal/sync/object_syncer.go b/internal/sync/object_syncer.go index 34630c9..5b43720 100644 --- a/internal/sync/object_syncer.go +++ b/internal/sync/object_syncer.go @@ -38,6 +38,10 @@ import ( type objectCreatorFunc func(source *unstructured.Unstructured) *unstructured.Unstructured type objectSyncer struct { + // When set, the syncer will create a label on the destination object that contains + // this value; used to allow multiple agents syncing *the same* API from one + // service cluster onto multiple different kcp's. + agentName string // creates a new destination object; does not need to perform cleanup like // removing unwanted metadata, that's done by the syncer automatically destCreator objectCreatorFunc @@ -281,6 +285,9 @@ func (s *objectSyncer) ensureDestinationObject(log *zap.SugaredLogger, source, d sourceObjKey := newObjectKey(source.object, source.clusterName, source.workspacePath) ensureLabels(destObj, sourceObjKey.Labels()) + // remember what agent synced this object + s.labelWithAgent(destObj) + // put optional additional annotations on the new object ensureAnnotations(destObj, sourceObjKey.Annotations()) @@ -326,6 +333,7 @@ func (s *objectSyncer) adoptExistingDestinationObject(log *zap.SugaredLogger, de // the destination object from another source object, which would then lead to the two source objects // "fighting" about the one destination object. ensureLabels(existingDestObj, sourceKey.Labels()) + s.labelWithAgent(existingDestObj) ensureAnnotations(existingDestObj, sourceKey.Annotations()) if err := dest.client.Update(dest.ctx, existingDestObj); err != nil { @@ -425,3 +433,9 @@ func (s *objectSyncer) createMergePatch(base, revision *unstructured.Unstructure func (s *objectSyncer) isIrrelevantTopLevelField(fieldName string) bool { return fieldName == "kind" || fieldName == "apiVersion" || fieldName == "metadata" || slices.Contains(s.subresources, fieldName) } + +func (s *objectSyncer) labelWithAgent(obj *unstructured.Unstructured) { + if s.agentName != "" { + ensureLabels(obj, map[string]string{agentNameLabel: s.agentName}) + } +} diff --git a/internal/sync/syncer.go b/internal/sync/syncer.go index 3287fb6..13217c9 100644 --- a/internal/sync/syncer.go +++ b/internal/sync/syncer.go @@ -46,6 +46,8 @@ type ResourceSyncer struct { mutator mutation.Mutator + agentName string + // newObjectStateStore is used for testing purposes newObjectStateStore newObjectStateStoreFunc } @@ -59,6 +61,7 @@ func NewResourceSyncer( remoteAPIGroup string, mutator mutation.Mutator, stateNamespace string, + agentName string, ) (*ResourceSyncer, error) { // create a dummy that represents the type used on the local service cluster localGVK := projection.PublishedResourceSourceGVK(pubRes) @@ -100,6 +103,7 @@ func NewResourceSyncer( subresources: subresources, destDummy: localDummy, mutator: mutator, + agentName: agentName, newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace), }, nil } @@ -145,6 +149,8 @@ func (s *ResourceSyncer) Process(ctx Context, remoteObj *unstructured.Unstructur stateStore := s.newObjectStateStore(sourceSide, destSide) syncer := objectSyncer{ + // The primary object should be labelled with the agent name. + agentName: s.agentName, subresources: s.subresources, // use the projection and renaming rules configured in the PublishedResource destCreator: s.createLocalObjectCreator(ctx), diff --git a/internal/sync/syncer_related.go b/internal/sync/syncer_related.go index 320f9d5..87d3978 100644 --- a/internal/sync/syncer_related.go +++ b/internal/sync/syncer_related.go @@ -127,6 +127,8 @@ func (s *ResourceSyncer) processRelatedResource(log *zap.SugaredLogger, stateSto } syncer := objectSyncer{ + // Related objects within kcp are not labelled with the agent name because it's unnecessary. + // agentName: "", // use the same state store as we used for the main resource, to keep everything contained // in one place, on the service cluster side stateStore: stateStore, diff --git a/internal/sync/syncer_test.go b/internal/sync/syncer_test.go index 79e3e7a..80b46ec 100644 --- a/internal/sync/syncer_test.go +++ b/internal/sync/syncer_test.go @@ -182,6 +182,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -208,6 +209,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -255,6 +257,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -312,6 +315,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -348,6 +352,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -374,6 +379,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -410,6 +416,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -436,6 +443,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -484,6 +492,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { "existing-annotation": "annotation-value", }, Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -525,6 +534,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { "new-annotation": "hei-verden", }, Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -564,6 +574,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -591,6 +602,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -636,6 +648,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { "prevent-instant-deletion-in-tests", }, Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -667,6 +680,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { }, DeletionTimestamp: &nonEmptyTime, Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -748,6 +762,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { }, DeletionTimestamp: &nonEmptyTime, Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -778,6 +793,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { }, DeletionTimestamp: &nonEmptyTime, Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -809,6 +825,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { testcase.remoteAPIGroup, nil, stateNamespace, + "textor-the-doctor", ) if err != nil { t.Fatalf("Failed to create syncer: %v", err) @@ -970,6 +987,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -1002,6 +1020,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -1041,6 +1060,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -1073,6 +1093,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "testcluster-my-test-thing", Labels: map[string]string{ + agentNameLabel: "textor-the-doctor", remoteObjectClusterLabel: "testcluster", remoteObjectNamespaceLabel: "", remoteObjectNameLabel: "my-test-thing", @@ -1106,6 +1127,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { testcase.remoteAPIGroup, nil, stateNamespace, + "textor-the-doctor", ) if err != nil { t.Fatalf("Failed to create syncer: %v", err) diff --git a/internal/sync/types.go b/internal/sync/types.go index cd08049..076f0cf 100644 --- a/internal/sync/types.go +++ b/internal/sync/types.go @@ -16,6 +16,8 @@ limitations under the License. package sync +import ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + const ( // deletionFinalizer is the finalizer put on remote objects to prevent // them from being deleted before the local objects can be cleaned up. @@ -31,6 +33,10 @@ const ( remoteObjectWorkspacePathAnnotation = "syncagent.kcp.io/remote-object-workspace-path" + // agentNameLabel contains the Sync Agent's name and is used to allow multiple Sync Agents + // on the same service cluster, syncing *the same* API to different kcp's. + agentNameLabel = "syncagent.kcp.io/agent-name" + // objectStateLabelName is put on object state Secrets to allow for easier mass deletions // if ever necessary. objectStateLabelName = "syncagent.kcp.io/object-state" @@ -45,3 +51,7 @@ const ( // metadata of the related object. relatedObjectAnnotationPrefix = "related-resources.syncagent.kcp.io/" ) + +func OwnedBy(obj ctrlruntimeclient.Object, agentName string) bool { + return obj.GetLabels()[agentNameLabel] == agentName +}