From 3682cfb4da046479b6f4d6ad447cc8fd26f91b6d Mon Sep 17 00:00:00 2001 From: Christoph Mewes Date: Mon, 17 Feb 2025 14:37:19 +0100 Subject: [PATCH 1/4] add first basic tests for basic object synchronization On-behalf-of: @SAP christoph.mewes@sap.com --- hack/ci/run-e2e-tests.sh | 3 +- test/e2e/sync/primary_test.go | 334 ++++++++++++++++++++++++++++++++++ test/utils/process.go | 1 + test/utils/wait.go | 38 ++++ 4 files changed, 375 insertions(+), 1 deletion(-) create mode 100644 test/e2e/sync/primary_test.go diff --git a/hack/ci/run-e2e-tests.sh b/hack/ci/run-e2e-tests.sh index 9d0640f..b185304 100755 --- a/hack/ci/run-e2e-tests.sh +++ b/hack/ci/run-e2e-tests.sh @@ -80,6 +80,7 @@ export AGENT_BINARY="$(realpath _build/api-syncagent)" # time to run the tests echodate "Running e2e tests…" -(set -x; go test -tags e2e -timeout 2h -v ./test/e2e/...) +WHAT="${WHAT:-./test/e2e/...}" +(set -x; go test -tags e2e -timeout 2h -v $WHAT) echodate "Done. :-)" diff --git a/test/e2e/sync/primary_test.go b/test/e2e/sync/primary_test.go new file mode 100644 index 0000000..a409bd0 --- /dev/null +++ b/test/e2e/sync/primary_test.go @@ -0,0 +1,334 @@ +//go:build e2e + +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/kcp-dev/logicalcluster/v3" + + syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1" + "github.com/kcp-dev/api-syncagent/test/utils" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + yamlutil "k8s.io/apimachinery/pkg/util/yaml" + ctrlruntime "sigs.k8s.io/controller-runtime" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/kontext" +) + +func TestSyncSimpleObject(t *testing.T) { + const ( + apiExportName = "kcp.example.com" + orgWorkspace = "sync-simple" + ) + + ctx := context.Background() + ctrlruntime.SetLogger(logr.Discard()) + + // setup a test environment in kcp + orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName) + + // start a service cluster + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + }) + + // publish Crontabs and Backups + t.Logf("Publishing CRDs…") + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "publish-crontabs", + }, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + // These rules make finding the local object easier, but should not be used in production. + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "$remoteName", + Namespace: "synced-$remoteNamespace", + }, + }, + } + + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create PublishedResource: %v", err) + } + + // start the agent in the background to update the APIExport with the CronTabs API + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName) + + // wait until the API is available + teamCtx := kontext.WithCluster(ctx, logicalcluster.Name(fmt.Sprintf("root:%s:team-1", orgWorkspace))) + kcpClient := utils.GetKcpAdminClusterClient(t) + utils.WaitForBoundAPI(t, teamCtx, kcpClient, schema.GroupVersionResource{ + Group: apiExportName, + Version: "v1", + Resource: "crontabs", + }) + + // create a Crontab object in a team workspace + t.Log("Creating CronTab in kcp…") + crontab := yamlToUnstructured(t, ` +apiVersion: kcp.example.com/v1 +kind: CronTab +metadata: + namespace: default + name: my-crontab +spec: + cronSpec: '* * *' + image: ubuntu:latest +`) + + if err := kcpClient.Create(teamCtx, crontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + // wait for the agent to sync the object down into the service cluster + + t.Logf("Wait for CronTab to be synced…") + copy := &unstructured.Unstructured{} + copy.SetAPIVersion("example.com/v1") + copy.SetKind("CronTab") + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"} + return envtestClient.Get(ctx, copyKey, copy) == nil, nil + }) + if err != nil { + t.Fatalf("Failed to wait for object to be synced down: %v", err) + } +} + +func TestLocalChangesAreKept(t *testing.T) { + const ( + apiExportName = "kcp.example.com" + orgWorkspace = "sync-undo-local-changes" + ) + + ctx := context.Background() + ctrlruntime.SetLogger(logr.Discard()) + + // setup a test environment in kcp + orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName) + + // start a service cluster + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + }) + + // publish Crontabs and Backups + t.Logf("Publishing CRDs…") + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "publish-crontabs", + }, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + // These rules make finding the local object easier, but should not be used in production. + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "$remoteName", + Namespace: "synced-$remoteNamespace", + }, + }, + } + + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create PublishedResource: %v", err) + } + + // start the agent in the background to update the APIExport with the CronTabs API + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName) + + // wait until the API is available + teamCtx := kontext.WithCluster(ctx, logicalcluster.Name(fmt.Sprintf("root:%s:team-1", orgWorkspace))) + kcpClient := utils.GetKcpAdminClusterClient(t) + utils.WaitForBoundAPI(t, teamCtx, kcpClient, schema.GroupVersionResource{ + Group: apiExportName, + Version: "v1", + Resource: "crontabs", + }) + + // create a Crontab object in a team workspace + t.Log("Creating CronTab in kcp…") + crontab := yamlToUnstructured(t, ` +apiVersion: kcp.example.com/v1 +kind: CronTab +metadata: + namespace: default + name: my-crontab +spec: + cronSpec: '* * *' + image: ubuntu:latest +`) + + if err := kcpClient.Create(teamCtx, crontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + // wait for the agent to sync the object down into the service cluster + + t.Logf("Wait for CronTab to be synced…") + copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"} + + copy := &unstructured.Unstructured{} + copy.SetAPIVersion("example.com/v1") + copy.SetKind("CronTab") + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + return envtestClient.Get(ctx, copyKey, copy) == nil, nil + }) + if err != nil { + t.Fatalf("Failed to wait for object to be synced down: %v", err) + } + + // make some changes on the service cluster; this is usually an external operator doing some + // defaulting, maybe even a mutation webhook + t.Logf("Modifying local object…") + newCronSpec := "this-should-not-be-reverted" + unstructured.SetNestedField(copy.Object, newCronSpec, "spec", "cronSpec") + + if err := envtestClient.Update(ctx, copy); err != nil { + t.Fatalf("Failed to update synced object in service cluster: %v", err) + } + + // make some changes in kcp, these should be applied to the local object without overwriting the cronSpec + + // refresh the current object state + if err := kcpClient.Get(teamCtx, ctrlruntimeclient.ObjectKeyFromObject(crontab), crontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + newImage := "new-value" + unstructured.SetNestedField(crontab.Object, newImage, "spec", "image") + + t.Logf("Modifying object in kcp…") + if err := kcpClient.Update(teamCtx, crontab); err != nil { + t.Fatalf("Failed to update source object in kcp: %v", err) + } + + // wait for the agent to sync again + t.Logf("Waiting for the agent to sync again…") + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + if err := envtestClient.Get(ctx, copyKey, copy); err != nil { + return false, err + } + + value, existing, err := unstructured.NestedString(copy.Object, "spec", "cronSpec") + if err != nil { + return false, err + } + + if !existing { + return false, errors.New("field does not exist in object anymore, this should not have happened") + } + + if value != newCronSpec { + return false, fmt.Errorf("cronSpec was reverted back to %q, should still be %q", value, newCronSpec) + } + + value, existing, err = unstructured.NestedString(copy.Object, "spec", "image") + if err != nil { + return false, err + } + + if !existing { + return false, errors.New("field does not exist in object anymore, this should not have happened") + } + + return value == newImage, nil + }) + if err != nil { + t.Fatalf("Failed to wait for object to be synced: %v", err) + } + + // Now we actually change the cronSpec in kcp, and this change _must_ make it to the service cluster. + t.Logf("Modify object in kcp again…") + + if err := kcpClient.Get(teamCtx, ctrlruntimeclient.ObjectKeyFromObject(crontab), crontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + kcpNewCronSpec := "users-new-desired-cronspec" + unstructured.SetNestedField(crontab.Object, kcpNewCronSpec, "spec", "cronSpec") + + if err := kcpClient.Update(teamCtx, crontab); err != nil { + t.Fatalf("Failed to update source object in kcp: %v", err) + } + + // wait for the agent to sync again + t.Logf("Waiting for the agent to sync again…") + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + if err := envtestClient.Get(ctx, copyKey, copy); err != nil { + return false, err + } + + value, existing, err := unstructured.NestedString(copy.Object, "spec", "cronSpec") + if err != nil { + return false, err + } + + if !existing { + return false, errors.New("field does not exist in object anymore, this should not have happened") + } + + return value == kcpNewCronSpec, nil + }) + if err != nil { + t.Fatalf("Failed to wait for object to be synced: %v", err) + } +} + +func yamlToUnstructured(t *testing.T, data string) *unstructured.Unstructured { + t.Helper() + + decoder := yamlutil.NewYAMLOrJSONDecoder(strings.NewReader(data), 100) + + var rawObj runtime.RawExtension + if err := decoder.Decode(&rawObj); err != nil { + t.Fatalf("Failed to decode: %v", err) + } + + obj, _, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil) + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + t.Fatal(err) + } + + return &unstructured.Unstructured{Object: unstructuredMap} +} diff --git a/test/utils/process.go b/test/utils/process.go index df4a4da..7d00e94 100644 --- a/test/utils/process.go +++ b/test/utils/process.go @@ -88,6 +88,7 @@ func RunAgent( "--kcp-kubeconfig", kcpKubeconfig, "--namespace", "kube-system", "--log-format", "Console", + "--log-debug=true", "--health-address", "0", "--metrics-address", "0", } diff --git a/test/utils/wait.go b/test/utils/wait.go index 4094b8d..161bcad 100644 --- a/test/utils/wait.go +++ b/test/utils/wait.go @@ -18,9 +18,13 @@ package utils import ( "context" + "slices" "testing" "time" + kcpapisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,3 +44,37 @@ func WaitForObject(t *testing.T, ctx context.Context, client ctrlruntimeclient.C t.Logf("%T is ready.", obj) } + +func WaitForBoundAPI(t *testing.T, ctx context.Context, client ctrlruntimeclient.Client, gvr schema.GroupVersionResource) { + t.Helper() + + t.Log("Waiting for API to be bound in kcp…") + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (bool, error) { + apiBindings := &kcpapisv1alpha1.APIBindingList{} + err := client.List(ctx, apiBindings) + if err != nil { + return false, err + } + + for _, binding := range apiBindings.Items { + if bindingHasGVR(binding, gvr) { + return true, nil + } + } + + return false, nil + }) + if err != nil { + t.Fatalf("Failed to wait for API %v to become available: %v", gvr, err) + } +} + +func bindingHasGVR(binding kcpapisv1alpha1.APIBinding, gvr schema.GroupVersionResource) bool { + for _, bound := range binding.Status.BoundResources { + if bound.Group == gvr.Group && bound.Resource == gvr.Resource && slices.Contains(bound.StorageVersions, gvr.Version) { + return true + } + } + + return false +} From 316bb1d2e4a5246daa2591cac87cb43edb314c20 Mon Sep 17 00:00:00 2001 From: Christoph Mewes Date: Mon, 17 Feb 2025 16:36:24 +0100 Subject: [PATCH 2/4] actually implement support for resource filtering On-behalf-of: @SAP christoph.mewes@sap.com --- internal/controller/apiexport/controller.go | 5 ++ internal/controller/sync/controller.go | 55 +++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/internal/controller/apiexport/controller.go b/internal/controller/apiexport/controller.go index 05c90d3..ecd5453 100644 --- a/internal/controller/apiexport/controller.go +++ b/internal/controller/apiexport/controller.go @@ -139,6 +139,11 @@ func (r *Reconciler) reconcile(ctx context.Context) error { for _, pubResource := range filteredPubResources { arsList.Insert(pubResource.Status.ResourceSchemaName) + // to evaluate the namespace filter, the agent needs to fetch the namespace + if filter := pubResource.Spec.Filter; filter != nil && filter.Namespace != nil { + claimedResources.Insert("namespaces") + } + for _, rr := range pubResource.Spec.Related { resource, err := mapper.ResourceFor(schema.GroupVersionResource{ Resource: rr.Kind, diff --git a/internal/controller/sync/controller.go b/internal/controller/sync/controller.go index 4c9d5b4..ecd13e1 100644 --- a/internal/controller/sync/controller.go +++ b/internal/controller/sync/controller.go @@ -33,7 +33,10 @@ import ( kcpcore "github.com/kcp-dev/kcp/sdk/apis/core" kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -165,6 +168,27 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{}, nil } + // if there is a namespace, get it if a namespace filter is also configured + var namespace *corev1.Namespace + if filter := r.pubRes.Spec.Filter; filter != nil && filter.Namespace != nil && remoteObj.GetNamespace() != "" { + namespace = &corev1.Namespace{} + key := types.NamespacedName{Name: remoteObj.GetNamespace()} + + if err := r.vwClient.Get(wsCtx, key, namespace); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to retrieve remote object's namespace: %w", err) + } + } + + // apply filtering rules to scope down the number of objects we sync + include, err := r.objectMatchesFilter(remoteObj, namespace) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to apply filtering rules: %w", err) + } + + if !include { + return reconcile.Result{}, nil + } + syncContext := sync.NewContext(ctx, wsCtx) // if desired, fetch the cluster path as well (some downstream service providers might make use of it, @@ -193,3 +217,34 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return result, nil } + +func (r *Reconciler) objectMatchesFilter(remoteObj *unstructured.Unstructured, namespace *corev1.Namespace) (bool, error) { + if r.pubRes.Spec.Filter == nil { + return true, nil + } + + objMatches, err := r.matchesFilter(remoteObj, r.pubRes.Spec.Filter.Resource) + if err != nil || !objMatches { + return false, err + } + + nsMatches, err := r.matchesFilter(namespace, r.pubRes.Spec.Filter.Namespace) + if err != nil || !nsMatches { + return false, err + } + + return true, nil +} + +func (r *Reconciler) matchesFilter(obj metav1.Object, selector *metav1.LabelSelector) (bool, error) { + if selector == nil { + return true, nil + } + + s, err := metav1.LabelSelectorAsSelector(selector) + if err != nil { + return false, err + } + + return s.Matches(labels.Set(obj.GetLabels())), nil +} From 73ad15e9f528ff30c9bb32325255ee0c86096693 Mon Sep 17 00:00:00 2001 From: Christoph Mewes Date: Mon, 17 Feb 2025 16:37:08 +0100 Subject: [PATCH 3/4] add test for the resource filtering On-behalf-of: @SAP christoph.mewes@sap.com --- test/e2e/sync/primary_test.go | 125 ++++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 4 deletions(-) diff --git a/test/e2e/sync/primary_test.go b/test/e2e/sync/primary_test.go index a409bd0..af2c0c6 100644 --- a/test/e2e/sync/primary_test.go +++ b/test/e2e/sync/primary_test.go @@ -122,7 +122,7 @@ spec: copy.SetAPIVersion("example.com/v1") copy.SetKind("CronTab") - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"} return envtestClient.Get(ctx, copyKey, copy) == nil, nil }) @@ -210,7 +210,7 @@ spec: copy.SetAPIVersion("example.com/v1") copy.SetKind("CronTab") - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { return envtestClient.Get(ctx, copyKey, copy) == nil, nil }) if err != nil { @@ -244,7 +244,7 @@ spec: // wait for the agent to sync again t.Logf("Waiting for the agent to sync again…") - err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { if err := envtestClient.Get(ctx, copyKey, copy); err != nil { return false, err } @@ -293,7 +293,7 @@ spec: // wait for the agent to sync again t.Logf("Waiting for the agent to sync again…") - err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { if err := envtestClient.Get(ctx, copyKey, copy); err != nil { return false, err } @@ -332,3 +332,120 @@ func yamlToUnstructured(t *testing.T, data string) *unstructured.Unstructured { return &unstructured.Unstructured{Object: unstructuredMap} } + +func TestResourceFilter(t *testing.T) { + const ( + apiExportName = "kcp.example.com" + orgWorkspace = "sync-resource-filter" + ) + + ctx := context.Background() + ctrlruntime.SetLogger(logr.Discard()) + + // setup a test environment in kcp + orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName) + + // start a service cluster + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + }) + + // publish Crontabs and Backups + t.Logf("Publishing CRDs…") + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "publish-crontabs", + }, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + // These rules make finding the local object easier, but should not be used in production. + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "$remoteName", + Namespace: "synced-$remoteNamespace", + }, + Filter: &syncagentv1alpha1.ResourceFilter{ + Resource: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "include": "me", + }, + }, + }, + }, + } + + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create PublishedResource: %v", err) + } + + // start the agent in the background to update the APIExport with the CronTabs API + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName) + + // wait until the API is available + teamCtx := kontext.WithCluster(ctx, logicalcluster.Name(fmt.Sprintf("root:%s:team-1", orgWorkspace))) + kcpClient := utils.GetKcpAdminClusterClient(t) + utils.WaitForBoundAPI(t, teamCtx, kcpClient, schema.GroupVersionResource{ + Group: apiExportName, + Version: "v1", + Resource: "crontabs", + }) + + // create two Crontab objects in a team workspace + t.Log("Creating CronTab in kcp…") + ignoredCrontab := yamlToUnstructured(t, ` +apiVersion: kcp.example.com/v1 +kind: CronTab +metadata: + namespace: default + name: ignored +spec: + image: ubuntu:latest +`) + + if err := kcpClient.Create(teamCtx, ignoredCrontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + includedCrontab := yamlToUnstructured(t, ` +apiVersion: kcp.example.com/v1 +kind: CronTab +metadata: + namespace: default + name: included + labels: + include: me +spec: + image: debian:12 +`) + + if err := kcpClient.Create(teamCtx, includedCrontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + // wait for the agent to sync only one of the objects down into the service cluster + + t.Logf("Wait for CronTab to be synced…") + copy := &unstructured.Unstructured{} + copy.SetAPIVersion("example.com/v1") + copy.SetKind("CronTab") + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { + copyKey := types.NamespacedName{Namespace: "synced-default", Name: "included"} + return envtestClient.Get(ctx, copyKey, copy) == nil, nil + }) + if err != nil { + t.Fatalf("Failed to wait for object to be synced down: %v", err) + } + + // the only good negative check is to wait for a timeout + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { + copyKey := types.NamespacedName{Namespace: "synced-default", Name: "ignored"} + return envtestClient.Get(ctx, copyKey, copy) == nil, nil + }) + if err == nil { + t.Fatal("Expected no ignored object to be found on the service cluster, but did.") + } +} From 50f7cd0b2fa896cbffe6ae444fdbae2b422edc40 Mon Sep 17 00:00:00 2001 From: Christoph Mewes Date: Mon, 17 Feb 2025 16:52:17 +0100 Subject: [PATCH 4/4] handle race condition when creating namespaces in the service cluster On-behalf-of: @SAP christoph.mewes@sap.com --- internal/sync/object_syncer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/sync/object_syncer.go b/internal/sync/object_syncer.go index 5b43720..480f76a 100644 --- a/internal/sync/object_syncer.go +++ b/internal/sync/object_syncer.go @@ -349,6 +349,10 @@ func (s *objectSyncer) ensureNamespace(ctx context.Context, log *zap.SugaredLogg return nil } + // Use a get-then-create approach to benefit from having a cache; otherwise if we always + // send a create request, we're needlessly spamming the kube apiserver. Yes, this approach + // is a race condition and we have to check for AlreadyExists later down the line, but that + // only occurs on cold caches. During normal operations this should be more efficient. ns := &corev1.Namespace{} if err := client.Get(ctx, types.NamespacedName{Name: namespace}, ns); ctrlruntimeclient.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to check: %w", err) @@ -358,7 +362,7 @@ func (s *objectSyncer) ensureNamespace(ctx context.Context, log *zap.SugaredLogg ns.Name = namespace log.Debugw("Creating namespace…", "namespace", namespace) - if err := client.Create(ctx, ns); err != nil { + if err := client.Create(ctx, ns); err != nil && !apierrors.IsAlreadyExists(err) { return fmt.Errorf("failed to create: %w", err) } }