Skip to content

Commit 1cbe864

Browse files
authored
🐛 Correctly handle concurrent updates to ClusterResourceSetBinding (kubernetes-sigs#10656)
* fix: Correctly handle concurrent updates to ClusterResourceSetBinding The existing code does not do optimistic locking on the CRSBinding via `resourceVersion` and hence concurent updates (patches) overwrite each other, leading to races and inconsistent state of the CRSBinding. This commit fixes that by forcing optimistic locking via the controller-runtime client patch options. The downside to this is that it leads to more reconciles and log output of failed updates, but this is a much better situation than having inconsistent and inaccurate state stored in CRSBinding. * fixup! refactor: Add requested comment re patching spec only
1 parent 59a4531 commit 1cbe864

File tree

4 files changed

+73
-29
lines changed

4 files changed

+73
-29
lines changed

exp/addons/internal/controllers/clusterresourceset_controller.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func (r *ClusterResourceSetReconciler) getClustersByClusterResourceSetSelector(c
272272
// In Reconcile strategy, resources are re-applied to a particular cluster when their definition changes. The hash in ClusterResourceSetBinding is used to check
273273
// if a resource has changed or not.
274274
// TODO: If a resource already exists in the cluster but not applied by ClusterResourceSet, the resource will be updated ?
275-
func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Context, cluster *clusterv1.Cluster, clusterResourceSet *addonsv1.ClusterResourceSet) error {
275+
func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Context, cluster *clusterv1.Cluster, clusterResourceSet *addonsv1.ClusterResourceSet) (rerr error) {
276276
log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(cluster))
277277
ctx = ctrl.LoggerInto(ctx, log)
278278

@@ -295,16 +295,14 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
295295
return err
296296
}
297297

298-
// Initialize the patch helper.
299-
patchHelper, err := patch.NewHelper(clusterResourceSetBinding, r.Client)
300-
if err != nil {
301-
return err
302-
}
298+
patch := client.MergeFromWithOptions(clusterResourceSetBinding.DeepCopy(), client.MergeFromWithOptimisticLock{})
303299

304300
defer func() {
305301
// Always attempt to Patch the ClusterResourceSetBinding object after each reconciliation.
306-
if err := patchHelper.Patch(ctx, clusterResourceSetBinding); err != nil {
307-
log.Error(err, "Failed to patch config")
302+
// Note only the ClusterResourceSetBinding spec will be patched as it does not have a status field, and so
303+
// using the patch helper is unnecessary.
304+
if err := r.Client.Patch(ctx, clusterResourceSetBinding, patch); err != nil {
305+
rerr = kerrors.NewAggregate([]error{rerr, errors.Wrapf(err, "failed to patch ClusterResourceSetBinding %s", klog.KObj(clusterResourceSetBinding))})
308306
}
309307
}()
310308

@@ -315,7 +313,7 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
315313
Name: clusterResourceSet.Name,
316314
UID: clusterResourceSet.UID,
317315
}))
318-
errList := []error{}
316+
var errList []error
319317
resourceSetBinding := clusterResourceSetBinding.GetOrCreateBinding(clusterResourceSet)
320318

321319
// Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object.

exp/addons/internal/controllers/clusterresourceset_controller_test.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -990,11 +990,56 @@ metadata:
990990
g.Expect(env.Patch(ctx, clusterResourceSetInstance, client.MergeFrom(clusterResourceSetInstance.DeepCopy()))).To(Succeed())
991991

992992
// Wait until ClusterResourceSetBinding is created for the Cluster
993-
g.Eventually(func() bool {
993+
g.Eventually(func(g Gomega) {
994994
binding := &addonsv1.ClusterResourceSetBinding{}
995-
err := env.Get(ctx, clusterResourceSetBindingKey, binding)
996-
return err == nil
997-
}, timeout).Should(BeTrue())
995+
g.Expect(env.Get(ctx, clusterResourceSetBindingKey, binding)).Should(Succeed())
996+
}, timeout).Should(Succeed())
997+
})
998+
999+
t.Run("Should handle applying multiple ClusterResourceSets concurrently to the same cluster", func(t *testing.T) {
1000+
g := NewWithT(t)
1001+
ns := setup(t, g)
1002+
defer teardown(t, g, ns)
1003+
1004+
t.Log("Creating ClusterResourceSet instances that have same labels as selector")
1005+
for range 10 {
1006+
clusterResourceSetInstance := &addonsv1.ClusterResourceSet{
1007+
ObjectMeta: metav1.ObjectMeta{
1008+
Name: fmt.Sprintf("clusterresourceset-%s", util.RandomString(6)),
1009+
Namespace: ns.Name,
1010+
},
1011+
Spec: addonsv1.ClusterResourceSetSpec{
1012+
ClusterSelector: metav1.LabelSelector{
1013+
MatchLabels: labels,
1014+
},
1015+
Resources: []addonsv1.ResourceRef{{Name: configmapName, Kind: "ConfigMap"}, {Name: secretName, Kind: "Secret"}},
1016+
},
1017+
}
1018+
// Create the ClusterResourceSet.
1019+
g.Expect(env.Create(ctx, clusterResourceSetInstance)).To(Succeed())
1020+
}
1021+
1022+
t.Log("Updating the cluster with labels to trigger cluster resource sets to be applied")
1023+
testCluster.SetLabels(labels)
1024+
g.Expect(env.Update(ctx, testCluster)).To(Succeed())
1025+
1026+
t.Log("Verifying ClusterResourceSetBinding shows that all CRS have been applied")
1027+
g.Eventually(func(g Gomega) {
1028+
clusterResourceSetBindingKey := client.ObjectKey{Namespace: testCluster.Namespace, Name: testCluster.Name}
1029+
binding := &addonsv1.ClusterResourceSetBinding{}
1030+
g.Expect(env.Get(ctx, clusterResourceSetBindingKey, binding)).Should(Succeed())
1031+
g.Expect(binding.Spec.Bindings).To(HaveLen(10))
1032+
for _, b := range binding.Spec.Bindings {
1033+
g.Expect(b.Resources).To(HaveLen(2))
1034+
for _, r := range b.Resources {
1035+
g.Expect(r.Applied).To(BeTrue())
1036+
}
1037+
}
1038+
g.Expect(binding.OwnerReferences).To(HaveLen(10))
1039+
}, 4*timeout).Should(Succeed())
1040+
t.Log("Deleting the created ClusterResourceSet instances")
1041+
g.Expect(env.DeleteAllOf(ctx, &addonsv1.ClusterResourceSet{}, client.InNamespace(ns.Name))).To(Succeed())
1042+
g.Expect(env.DeleteAllOf(ctx, &addonsv1.ClusterResourceSetBinding{}, client.InNamespace(ns.Name))).To(Succeed())
9981043
})
9991044
}
10001045

exp/addons/internal/controllers/clusterresourceset_helpers.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/apimachinery/pkg/types"
3737
"k8s.io/klog/v2"
3838
"sigs.k8s.io/controller-runtime/pkg/client"
39+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3940

4041
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
4142
addonsv1 "sigs.k8s.io/cluster-api/exp/addons/api/v1beta1"
@@ -108,27 +109,27 @@ func createUnstructured(ctx context.Context, c client.Client, obj *unstructured.
108109

109110
// getOrCreateClusterResourceSetBinding retrieves ClusterResourceSetBinding resource owned by the cluster or create a new one if not found.
110111
func (r *ClusterResourceSetReconciler) getOrCreateClusterResourceSetBinding(ctx context.Context, cluster *clusterv1.Cluster, clusterResourceSet *addonsv1.ClusterResourceSet) (*addonsv1.ClusterResourceSetBinding, error) {
111-
clusterResourceSetBinding := &addonsv1.ClusterResourceSetBinding{}
112-
clusterResourceSetBindingKey := client.ObjectKey{
113-
Namespace: cluster.Namespace,
114-
Name: cluster.Name,
112+
clusterResourceSetBinding := &addonsv1.ClusterResourceSetBinding{
113+
ObjectMeta: metav1.ObjectMeta{
114+
Name: cluster.Name,
115+
Namespace: cluster.Namespace,
116+
},
115117
}
118+
clusterResourceSetBindingKey := client.ObjectKeyFromObject(clusterResourceSetBinding)
116119

117120
if err := r.Client.Get(ctx, clusterResourceSetBindingKey, clusterResourceSetBinding); err != nil {
118121
if !apierrors.IsNotFound(err) {
119122
return nil, err
120123
}
121-
clusterResourceSetBinding.Name = cluster.Name
122-
clusterResourceSetBinding.Namespace = cluster.Namespace
123-
clusterResourceSetBinding.OwnerReferences = []metav1.OwnerReference{
124-
{
125-
APIVersion: addonsv1.GroupVersion.String(),
126-
Kind: "ClusterResourceSet",
127-
Name: clusterResourceSet.Name,
128-
UID: clusterResourceSet.UID,
129-
},
124+
err = controllerutil.SetOwnerReference(
125+
clusterResourceSet,
126+
clusterResourceSetBinding,
127+
r.Client.Scheme(),
128+
)
129+
if err != nil {
130+
return nil, errors.Wrapf(err, "failed to set owner reference for clusterResourceSetBinding %s for cluster %s/%s", clusterResourceSetBindingKey, cluster.Namespace, cluster.Name)
130131
}
131-
clusterResourceSetBinding.Spec.Bindings = []*addonsv1.ResourceSetBinding{}
132+
132133
clusterResourceSetBinding.Spec.ClusterName = cluster.Name
133134
if err := r.Client.Create(ctx, clusterResourceSetBinding); err != nil {
134135
if apierrors.IsAlreadyExists(err) {

exp/addons/internal/controllers/suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ func TestMain(m *testing.M) {
8585
Client: mgr.GetClient(),
8686
Tracker: tracker,
8787
}
88-
if err = reconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}, partialSecretCache); err != nil {
88+
if err = reconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 10}, partialSecretCache); err != nil {
8989
panic(fmt.Sprintf("Failed to set up cluster resource set reconciler: %v", err))
9090
}
9191
bindingReconciler := ClusterResourceSetBindingReconciler{
9292
Client: mgr.GetClient(),
9393
}
94-
if err = bindingReconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
94+
if err = bindingReconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 10}); err != nil {
9595
panic(fmt.Sprintf("Failed to set up cluster resource set binding reconciler: %v", err))
9696
}
9797
}

0 commit comments

Comments
 (0)