Skip to content

Commit 88c5261

Browse files
authored
Merge pull request #8243 from sbueringer/pr-ssa-improve-caching
✨ SSA: improve request caching
2 parents ad11058 + 643009b commit 88c5261

File tree

3 files changed

+192
-51
lines changed

3 files changed

+192
-51
lines changed

controlplane/kubeadm/internal/controllers/helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (r *KubeadmControlPlaneReconciler) updateExternalObject(ctx context.Context
306306
// Update annotations
307307
updatedObject.SetAnnotations(kcp.Spec.MachineTemplate.ObjectMeta.Annotations)
308308

309-
if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedObject); err != nil {
309+
if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedObject, ssa.WithCachingProxy{Cache: r.ssaCache, Original: obj}); err != nil {
310310
return errors.Wrapf(err, "failed to update %s", obj.GetObjectKind().GroupVersionKind().Kind)
311311
}
312312
return nil

internal/util/ssa/patch.go

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ import (
2020
"context"
2121

2222
"github.com/pkg/errors"
23+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
24+
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/klog/v2"
2426
"sigs.k8s.io/controller-runtime/pkg/client"
2527
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
28+
29+
"sigs.k8s.io/cluster-api/internal/contract"
2630
)
2731

2832
// Option is the interface for configuration that modifies Options for a patch request.
@@ -64,11 +68,18 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
6468
opt.ApplyToOptions(options)
6569
}
6670

71+
// Convert the object to unstructured and filter out fields we don't
72+
// want to set (e.g. metadata creationTimestamp).
73+
// Note: This is necessary to avoid continuous reconciles.
74+
modifiedUnstructured, err := prepareModified(c.Scheme(), modified)
75+
if err != nil {
76+
return err
77+
}
78+
6779
var requestIdentifier string
68-
var err error
6980
if options.WithCachingProxy {
7081
// Check if the request is cached.
71-
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modified)
82+
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured)
7283
if err != nil {
7384
return errors.Wrapf(err, "failed to apply object")
7485
}
@@ -81,25 +92,63 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
8192
}
8293
}
8394

84-
gvk, err := apiutil.GVKForObject(modified, c.Scheme())
95+
gvk, err := apiutil.GVKForObject(modifiedUnstructured, c.Scheme())
8596
if err != nil {
86-
return errors.Wrapf(err, "failed to apply object: failed to get GroupVersionKind of modified object %s", klog.KObj(modified))
97+
return errors.Wrapf(err, "failed to apply object: failed to get GroupVersionKind of modified object %s", klog.KObj(modifiedUnstructured))
8798
}
8899

89100
patchOptions := []client.PatchOption{
90101
client.ForceOwnership,
91102
client.FieldOwner(fieldManager),
92103
}
93-
if err := c.Patch(ctx, modified, client.Apply, patchOptions...); err != nil {
94-
return errors.Wrapf(err, "failed to apply %s %s", gvk.Kind, klog.KObj(modified))
104+
if err := c.Patch(ctx, modifiedUnstructured, client.Apply, patchOptions...); err != nil {
105+
return errors.Wrapf(err, "failed to apply %s %s", gvk.Kind, klog.KObj(modifiedUnstructured))
106+
}
107+
108+
// Write back the modified object so callers can access the patched object.
109+
if err := c.Scheme().Convert(modifiedUnstructured, modified, ctx); err != nil {
110+
return errors.Wrapf(err, "failed to write modified object")
95111
}
96112

97113
if options.WithCachingProxy {
98114
// If the SSA call did not update the object, add the request to the cache.
99-
if options.Original.GetResourceVersion() == modified.GetResourceVersion() {
115+
if options.Original.GetResourceVersion() == modifiedUnstructured.GetResourceVersion() {
100116
options.Cache.Add(requestIdentifier)
101117
}
102118
}
103119

104120
return nil
105121
}
122+
123+
// prepareModified converts obj into an Unstructured and filters out undesired fields.
124+
func prepareModified(scheme *runtime.Scheme, obj client.Object) (*unstructured.Unstructured, error) {
125+
u := &unstructured.Unstructured{}
126+
switch obj.(type) {
127+
case *unstructured.Unstructured:
128+
u = obj.DeepCopyObject().(*unstructured.Unstructured)
129+
default:
130+
if err := scheme.Convert(obj, u, nil); err != nil {
131+
return nil, errors.Wrap(err, "failed to convert object to Unstructured")
132+
}
133+
}
134+
135+
// Only keep the paths that we have opinions on.
136+
FilterObject(u, &FilterObjectInput{
137+
AllowedPaths: []contract.Path{
138+
// apiVersion, kind, name and namespace are required field for a server side apply intent.
139+
{"apiVersion"},
140+
{"kind"},
141+
{"metadata", "name"},
142+
{"metadata", "namespace"},
143+
// uid is optional for a server side apply intent but sets the expectation of an object getting created or a specific one updated.
144+
{"metadata", "uid"},
145+
// our controllers only have an opinion on labels, annotation, finalizers ownerReferences and spec.
146+
{"metadata", "labels"},
147+
{"metadata", "annotations"},
148+
{"metadata", "finalizers"},
149+
{"metadata", "ownerReferences"},
150+
{"spec"},
151+
},
152+
})
153+
return u, nil
154+
}

internal/util/ssa/patch_test.go

Lines changed: 135 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@ package ssa
1818

1919
import (
2020
"testing"
21+
"time"
2122

2223
. "github.com/onsi/gomega"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2325
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26+
"k8s.io/utils/pointer"
2427
"sigs.k8s.io/controller-runtime/pkg/client"
2528

29+
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
2630
"sigs.k8s.io/cluster-api/internal/test/builder"
2731
)
2832

@@ -33,47 +37,135 @@ func TestPatch(t *testing.T) {
3337
ns, err := env.CreateNamespace(ctx, "ssa")
3438
g.Expect(err).ToNot(HaveOccurred())
3539

36-
// Build the test object to work with.
37-
initialObject := builder.TestInfrastructureCluster(ns.Name, "obj1").WithSpecFields(map[string]interface{}{
38-
"spec.controlPlaneEndpoint.host": "1.2.3.4",
39-
"spec.controlPlaneEndpoint.port": int64(1234),
40-
"spec.foo": "bar",
41-
}).Build()
42-
43-
fieldManager := "test-manager"
44-
ssaCache := NewCache()
45-
46-
// Create the object
47-
createObject := initialObject.DeepCopy()
48-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
49-
50-
// Update the object and verify that the request was not cached as the object was changed.
51-
// Get the original object.
52-
originalObject := initialObject.DeepCopy()
53-
g.Expect(env.Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject))
54-
// Modify the object
55-
modifiedObject := initialObject.DeepCopy()
56-
g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed())
57-
// Compute request identifier, so we can later verify that the update call was not cached.
58-
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedObject)
59-
g.Expect(err).ToNot(HaveOccurred())
60-
// Update the object
61-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
62-
// Verify that request was not cached (as it changed the object)
63-
g.Expect(ssaCache.Has(requestIdentifier)).To(BeFalse())
64-
65-
// Repeat the same update and verify that the request was cached as the object was not changed.
66-
// Get the original object.
67-
originalObject = initialObject.DeepCopy()
68-
g.Expect(env.Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject))
69-
// Modify the object
70-
modifiedObject = initialObject.DeepCopy()
71-
g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed())
72-
// Compute request identifier, so we can later verify that the update call was cached.
73-
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedObject)
74-
g.Expect(err).ToNot(HaveOccurred())
75-
// Update the object
76-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
77-
// Verify that request was cached (as it did not change the object)
78-
g.Expect(ssaCache.Has(requestIdentifier)).To(BeTrue())
40+
t.Run("Test patch with unstructured", func(t *testing.T) {
41+
// Build the test object to work with.
42+
initialObject := builder.TestInfrastructureCluster(ns.Name, "obj1").WithSpecFields(map[string]interface{}{
43+
"spec.controlPlaneEndpoint.host": "1.2.3.4",
44+
"spec.controlPlaneEndpoint.port": int64(1234),
45+
"spec.foo": "bar",
46+
}).Build()
47+
48+
fieldManager := "test-manager"
49+
ssaCache := NewCache()
50+
51+
// 1. Create the object
52+
createObject := initialObject.DeepCopy()
53+
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
54+
55+
// 2. Update the object and verify that the request was not cached as the object was changed.
56+
// Get the original object.
57+
originalObject := initialObject.DeepCopy()
58+
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject))
59+
// Modify the object
60+
modifiedObject := initialObject.DeepCopy()
61+
g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed())
62+
// Compute request identifier, so we can later verify that the update call was not cached.
63+
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
64+
g.Expect(err).ToNot(HaveOccurred())
65+
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
66+
g.Expect(err).ToNot(HaveOccurred())
67+
// Update the object
68+
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
69+
// Verify that request was not cached (as it changed the object)
70+
g.Expect(ssaCache.Has(requestIdentifier)).To(BeFalse())
71+
72+
// 3. Repeat the same update and verify that the request was cached as the object was not changed.
73+
// Get the original object.
74+
originalObject = initialObject.DeepCopy()
75+
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject))
76+
// Modify the object
77+
modifiedObject = initialObject.DeepCopy()
78+
g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed())
79+
// Compute request identifier, so we can later verify that the update call was cached.
80+
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
81+
g.Expect(err).ToNot(HaveOccurred())
82+
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
83+
g.Expect(err).ToNot(HaveOccurred())
84+
// Update the object
85+
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
86+
// Verify that request was cached (as it did not change the object)
87+
g.Expect(ssaCache.Has(requestIdentifier)).To(BeTrue())
88+
})
89+
90+
t.Run("Test patch with Machine", func(t *testing.T) {
91+
// Build the test object to work with.
92+
initialObject := &clusterv1.Machine{
93+
TypeMeta: metav1.TypeMeta{
94+
APIVersion: clusterv1.GroupVersion.String(),
95+
Kind: "Machine",
96+
},
97+
ObjectMeta: metav1.ObjectMeta{
98+
Name: "machine-1",
99+
Namespace: ns.Name,
100+
Labels: map[string]string{
101+
"label": "labelValue",
102+
},
103+
Annotations: map[string]string{
104+
"annotation": "annotationValue",
105+
},
106+
},
107+
Spec: clusterv1.MachineSpec{
108+
ClusterName: "cluster-1",
109+
Version: pointer.String("v1.25.0"),
110+
NodeDrainTimeout: &metav1.Duration{Duration: 10 * time.Second},
111+
Bootstrap: clusterv1.Bootstrap{
112+
DataSecretName: pointer.String("data-secret"),
113+
},
114+
},
115+
}
116+
fieldManager := "test-manager"
117+
ssaCache := NewCache()
118+
119+
// 1. Create the object
120+
createObject := initialObject.DeepCopy()
121+
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
122+
// Note: We have to patch the status here to explicitly set these two status fields.
123+
// If we don't do it the Machine defaulting webhook will try to set the two fields to false.
124+
// For an unknown reason this will happen with the 2nd update call (3.) below and not before.
125+
// This means that this call would unexpectedly not cache the object because the resourceVersion
126+
// is changed because the fields are set.
127+
// It's unclear why those status fields are not already set during create (1.) or the first update (2.)
128+
// (the webhook is returning patches for the two fields in those requests as well).
129+
// To further investigate this behavior it would be necessary to debug the kube-apiserver.
130+
// Fortunately, in reality this is not an issue as the fields will be set sooner or later and then
131+
// the requests are cached.
132+
createObjectWithStatus := createObject.DeepCopy()
133+
createObjectWithStatus.Status.BootstrapReady = false
134+
createObjectWithStatus.Status.InfrastructureReady = false
135+
g.Expect(env.Status().Patch(ctx, createObjectWithStatus, client.MergeFrom(createObject)))
136+
137+
// 2. Update the object and verify that the request was not cached as the object was changed.
138+
// Get the original object.
139+
originalObject := initialObject.DeepCopy()
140+
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject))
141+
// Modify the object
142+
modifiedObject := initialObject.DeepCopy()
143+
modifiedObject.Spec.NodeDrainTimeout = &metav1.Duration{Duration: 5 * time.Second}
144+
// Compute request identifier, so we can later verify that the update call was not cached.
145+
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
146+
g.Expect(err).ToNot(HaveOccurred())
147+
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
148+
g.Expect(err).ToNot(HaveOccurred())
149+
// Update the object
150+
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
151+
// Verify that request was not cached (as it changed the object)
152+
g.Expect(ssaCache.Has(requestIdentifier)).To(BeFalse())
153+
154+
// 3. Repeat the same update and verify that the request was cached as the object was not changed.
155+
// Get the original object.
156+
originalObject = initialObject.DeepCopy()
157+
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject))
158+
// Modify the object
159+
modifiedObject = initialObject.DeepCopy()
160+
modifiedObject.Spec.NodeDrainTimeout = &metav1.Duration{Duration: 5 * time.Second}
161+
// Compute request identifier, so we can later verify that the update call was cached.
162+
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
163+
g.Expect(err).ToNot(HaveOccurred())
164+
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
165+
g.Expect(err).ToNot(HaveOccurred())
166+
// Update the object
167+
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
168+
// Verify that request was cached (as it did not change the object)
169+
g.Expect(ssaCache.Has(requestIdentifier)).To(BeTrue())
170+
})
79171
}

0 commit comments

Comments
 (0)