Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ spec:
PublishedResourceSpec describes the desired resource publication from a service
cluster to kcp.
properties:
enableClusterPaths:
description: |-
EnableClusterPaths toggles whether the Sync Agent will not just store the kcp
cluster name as a label on each locally synced object, but also the full cluster
path. This is optional because it requires additional requests to kcp and
should only be used if the cluster path is of interest on the
service cluster side.
type: boolean
filter:
description: |-
If specified, the filter will be applied to the resources in a workspace
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.3.3 h1:SzB1nHZ2Xi+17FP0zVQBHIZqvwRN9408fJO8h+eeNA8=
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
22 changes: 21 additions & 1 deletion internal/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import (
"github.com/kcp-dev/api-syncagent/internal/sync"
syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1"

kcpcore "github.com/kcp-dev/kcp/sdk/apis/core"
kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand All @@ -52,6 +56,7 @@ type Reconciler struct {
log *zap.SugaredLogger
syncer *sync.ResourceSyncer
remoteDummy *unstructured.Unstructured
pubRes *syncagentv1alpha1.PublishedResource
}

// Create creates a new controller and importantly does *not* add it to the manager,
Expand Down Expand Up @@ -99,6 +104,7 @@ func Create(
log: log,
remoteDummy: remoteDummy,
syncer: syncer,
pubRes: pubRes,
}

ctrlOptions := controller.Options{
Expand Down Expand Up @@ -152,8 +158,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
return reconcile.Result{}, nil
}

syncContext := sync.NewContext(ctx, wsCtx)

// if desired, fetch the cluster path as well (some downstream service providers might make use of it,
// but since it requires an additional permission claim, it's optional)
if r.pubRes.Spec.EnableClusterPaths {
lc := &kcpdevcorev1alpha1.LogicalCluster{}
if err := r.vwClient.Get(wsCtx, types.NamespacedName{Name: kcpdevcorev1alpha1.LogicalClusterName}, lc); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to retrieve remote logicalcluster: %w", err)
}

path := lc.Annotations[kcpcore.LogicalClusterPathAnnotationKey]
syncContext = syncContext.WithClusterPath(logicalcluster.NewPath(path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the case that this annotation is empty and throw an error? Is this a valid or invalid scenario?

}

// sync main object
requeue, err := r.syncer.Process(sync.NewContext(ctx, wsCtx), remoteObj)
requeue, err := r.syncer.Process(syncContext, remoteObj)
if err != nil {
return reconcile.Result{}, err
}
Expand Down
10 changes: 10 additions & 0 deletions internal/controller/syncmanager/lifecycle/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"github.com/kcp-dev/logicalcluster/v3"
"go.uber.org/zap"

kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -130,7 +133,14 @@ func NewCluster(address string, baseRestConfig *rest.Config) (*Cluster, error) {
return newClusterAwareRoundTripper(rt)
})

scheme := runtime.NewScheme()

if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
}
Comment on lines +138 to +140
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me, does adding the scheme require it to be available in the endpoint? Will this fail on setups that don't enable this optional flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my knowledge, this simply prepares the local client to work with types from the given scheme. This does not change anything about the wire protocol between us and the apiserver. The client will not suddenly do an API discovery and fail if the type doesn't exist.


clusterObj, err := cluster.New(config, func(o *cluster.Options) {
o.Scheme = scheme
o.NewCache = kcp.NewClusterAwareCache
o.NewAPIReader = kcp.NewClusterAwareAPIReader
o.NewClient = kcp.NewClusterAwareClient
Expand Down
16 changes: 14 additions & 2 deletions internal/sync/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package sync
import (
"context"

"github.com/kcp-dev/logicalcluster/v3"

"sigs.k8s.io/controller-runtime/pkg/kontext"
)

type Context struct {
clusterName string
clusterName logicalcluster.Name
clusterPath logicalcluster.Path
local context.Context
remote context.Context
}
Expand All @@ -35,8 +38,17 @@ func NewContext(local, remote context.Context) Context {
}

return Context{
clusterName: string(clusterName),
clusterName: clusterName,
local: local,
remote: remote,
}
}

func (c *Context) WithClusterPath(path logicalcluster.Path) Context {
return Context{
clusterName: c.clusterName,
clusterPath: path,
local: c.local,
remote: c.remote,
}
}
2 changes: 1 addition & 1 deletion internal/sync/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestNewContext(t *testing.T) {

combinedCtx := NewContext(context.Background(), ctx)

if combinedCtx.clusterName != clusterName.String() {
if combinedCtx.clusterName != clusterName {
t.Fatalf("Expected function to recognize the cluster name in the context, but got %q", combinedCtx.clusterName)
}
}
41 changes: 27 additions & 14 deletions internal/sync/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sync
import (
"context"

"github.com/kcp-dev/logicalcluster/v3"
"go.uber.org/zap"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,7 +66,7 @@ func ensureFinalizer(ctx context.Context, log *zap.SugaredLogger, client ctrlrun
finalizers.Insert(deletionFinalizer)
obj.SetFinalizers(sets.List(finalizers))

log.Debugw("Adding finalizer…", "on", newObjectKey(obj, ""), "finalizer", finalizer)
log.Debugw("Adding finalizer…", "on", newObjectKey(obj, "", logicalcluster.None), "finalizer", finalizer)
if err := client.Patch(ctx, obj, ctrlruntimeclient.MergeFrom(original)); err != nil {
return false, err
}
Expand All @@ -84,7 +85,7 @@ func removeFinalizer(ctx context.Context, log *zap.SugaredLogger, client ctrlrun
finalizers.Delete(deletionFinalizer)
obj.SetFinalizers(sets.List(finalizers))

log.Debugw("Removing finalizer…", "on", newObjectKey(obj, ""), "finalizer", finalizer)
log.Debugw("Removing finalizer…", "on", newObjectKey(obj, "", logicalcluster.None), "finalizer", finalizer)
if err := client.Patch(ctx, obj, ctrlruntimeclient.MergeFrom(original)); err != nil {
return false, err
}
Expand All @@ -93,16 +94,18 @@ func removeFinalizer(ctx context.Context, log *zap.SugaredLogger, client ctrlrun
}

type objectKey struct {
Cluster string
Namespace string
Name string
ClusterName logicalcluster.Name
ClusterPath logicalcluster.Path
Namespace string
Name string
}

func newObjectKey(obj metav1.Object, clusterName string) objectKey {
func newObjectKey(obj metav1.Object, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) objectKey {
return objectKey{
Cluster: clusterName,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
ClusterName: clusterName,
ClusterPath: clusterPath,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
}

Expand All @@ -111,8 +114,8 @@ func (k objectKey) String() string {
if k.Namespace != "" {
result = k.Namespace + "/" + result
}
if k.Cluster != "" {
result = k.Cluster + "|" + result
if k.ClusterName != "" {
result = string(k.ClusterName) + "|" + result
}

return result
Expand All @@ -123,17 +126,27 @@ func (k objectKey) Key() string {
if k.Namespace != "" {
result = k.Namespace + "_" + result
}
if k.Cluster != "" {
result = k.Cluster + "_" + result
if k.ClusterName != "" {
result = string(k.ClusterName) + "_" + result
}

return result
}

func (k objectKey) Labels() labels.Set {
return labels.Set{
remoteObjectClusterLabel: k.Cluster,
remoteObjectClusterLabel: string(k.ClusterName),
remoteObjectNamespaceLabel: k.Namespace,
remoteObjectNameLabel: k.Name,
}
}

func (k objectKey) Annotations() labels.Set {
s := labels.Set{}

if !k.ClusterPath.Empty() {
s[remoteObjectClusterPathAnnotation] = k.ClusterPath.String()
}

return s
}
13 changes: 11 additions & 2 deletions internal/sync/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package sync
import (
"testing"

"github.com/kcp-dev/logicalcluster/v3"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
Expand All @@ -34,7 +36,8 @@ func createNewObject(name, namespace string) metav1.Object {
func TestObjectKey(t *testing.T) {
testcases := []struct {
object metav1.Object
clusterName string
clusterName logicalcluster.Name
clusterPath logicalcluster.Path
expected string
}{
{
Expand All @@ -57,11 +60,17 @@ func TestObjectKey(t *testing.T) {
clusterName: "abc123",
expected: "abc123|namespace/test",
},
{
object: createNewObject("test", "namespace"),
clusterName: "abc123",
clusterPath: logicalcluster.NewPath("this:should:not:appear:in:the:key"),
expected: "abc123|namespace/test",
},
}

for _, testcase := range testcases {
t.Run("", func(t *testing.T) {
key := newObjectKey(testcase.object, testcase.clusterName)
key := newObjectKey(testcase.object, testcase.clusterName, testcase.clusterPath)

if stringified := key.String(); stringified != testcase.expected {
t.Fatalf("Expected %q but got %q.", testcase.expected, stringified)
Expand Down
23 changes: 17 additions & 6 deletions internal/sync/object_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"slices"

jsonpatch "github.com/evanphx/json-patch/v5"
"github.com/kcp-dev/logicalcluster/v3"
"go.uber.org/zap"
"k8c.io/reconciler/pkg/equality"

Expand Down Expand Up @@ -54,7 +55,8 @@ type objectSyncer struct {

type syncSide struct {
ctx context.Context
clusterName string
clusterName logicalcluster.Name
clusterPath logicalcluster.Path
client ctrlruntimeclient.Client
object *unstructured.Unstructured
}
Expand Down Expand Up @@ -104,7 +106,7 @@ func (s *objectSyncer) Sync(log *zap.SugaredLogger, source, dest syncSide) (requ
// do not try to update a destination object that is in deletion
// (this should only happen if a service admin manually deletes something on the service cluster)
if dest.object.GetDeletionTimestamp() != nil {
log.Debugw("Destination object is in deletion, skipping any further synchronization", "dest-object", newObjectKey(dest.object, dest.clusterName))
log.Debugw("Destination object is in deletion, skipping any further synchronization", "dest-object", newObjectKey(dest.object, dest.clusterName, logicalcluster.None))
return false, nil
}

Expand Down Expand Up @@ -173,14 +175,19 @@ func (s *objectSyncer) syncObjectSpec(log *zap.SugaredLogger, source, dest syncS
sourceObjCopy := source.object.DeepCopy()
stripMetadata(sourceObjCopy)

log = log.With("dest-object", newObjectKey(dest.object, dest.clusterName))
log = log.With("dest-object", newObjectKey(dest.object, dest.clusterName, logicalcluster.None))

// calculate the patch to go from the last known state to the current source object's state
if lastKnownSourceState != nil {
// ignore difference in GVK
lastKnownSourceState.SetAPIVersion(sourceObjCopy.GetAPIVersion())
lastKnownSourceState.SetKind(sourceObjCopy.GetKind())

// update annotations (this is important if the admin later flipped the enableClusterPaths
// option in the PublishedResource)
sourceKey := newObjectKey(source.object, source.clusterName, source.clusterPath)
ensureAnnotations(sourceObjCopy, sourceKey.Annotations())

// now we can diff the two versions and create a patch
rawPatch, err := s.createMergePatch(lastKnownSourceState, sourceObjCopy)
if err != nil {
Expand Down Expand Up @@ -271,11 +278,14 @@ func (s *objectSyncer) ensureDestinationObject(log *zap.SugaredLogger, source, d
stripMetadata(destObj)

// remember the connection between the source and destination object
sourceObjKey := newObjectKey(source.object, source.clusterName)
sourceObjKey := newObjectKey(source.object, source.clusterName, source.clusterPath)
ensureLabels(destObj, sourceObjKey.Labels())

// put optional additional annotations on the new object
ensureAnnotations(destObj, sourceObjKey.Annotations())

// finally, we can create the destination object
objectLog := log.With("dest-object", newObjectKey(destObj, dest.clusterName))
objectLog := log.With("dest-object", newObjectKey(destObj, dest.clusterName, logicalcluster.None))
objectLog.Debugw("Creating destination object…")

if err := dest.client.Create(dest.ctx, destObj); err != nil {
Expand Down Expand Up @@ -316,6 +326,7 @@ func (s *objectSyncer) adoptExistingDestinationObject(log *zap.SugaredLogger, de
// the destination object from another source object, which would then lead to the two source objects
// "fighting" about the one destination object.
ensureLabels(existingDestObj, sourceKey.Labels())
ensureAnnotations(existingDestObj, sourceKey.Annotations())

if err := dest.client.Update(dest.ctx, existingDestObj); err != nil {
return fmt.Errorf("failed to upsert current destination object labels: %w", err)
Expand Down Expand Up @@ -356,7 +367,7 @@ func (s *objectSyncer) handleDeletion(log *zap.SugaredLogger, source, dest syncS
// if the destination object still exists, delete it and wait for it to be cleaned up
if dest.object != nil {
if dest.object.GetDeletionTimestamp() == nil {
log.Debugw("Deleting destination object…", "dest-object", newObjectKey(dest.object, dest.clusterName))
log.Debugw("Deleting destination object…", "dest-object", newObjectKey(dest.object, dest.clusterName, logicalcluster.None))
if err := dest.client.Delete(dest.ctx, dest.object); err != nil {
return false, fmt.Errorf("failed to delete destination object: %w", err)
}
Expand Down
Loading