Skip to content

Commit 157403f

Browse files
committed
Improve wait for cache
1 parent f9028d7 commit 157403f

File tree

6 files changed

+167
-28
lines changed

6 files changed

+167
-28
lines changed

internal/controllers/machinedeployment/machinedeployment_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
354354
// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
355355
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
356356
// a duplicate MachineSet.
357-
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet creation", ms); err != nil {
357+
if err := clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "MachineSet creation", ms); err != nil {
358358
return err
359359
}
360360

internal/controllers/machinedeployment/machinedeployment_sync.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
3434
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
35+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
3536
"sigs.k8s.io/cluster-api/util/collections"
3637
v1beta1conditions "sigs.k8s.io/cluster-api/util/conditions/deprecated/v1beta1"
3738
"sigs.k8s.io/cluster-api/util/patch"
@@ -335,6 +336,7 @@ func (r *Reconciler) cleanupDeployment(ctx context.Context, oldMSs []*clusterv1.
335336
sort.Sort(mdutil.MachineSetsByCreationTimestamp(cleanableMSes))
336337
log.V(4).Info("Looking to cleanup old machine sets for deployment")
337338

339+
machineSetsDeleted := []*clusterv1.MachineSet{}
338340
for i := range cleanableMSCount {
339341
ms := cleanableMSes[i]
340342
if ms.Spec.Replicas == nil {
@@ -353,10 +355,12 @@ func (r *Reconciler) cleanupDeployment(ctx context.Context, oldMSs []*clusterv1.
353355
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedDelete", "Failed to delete MachineSet %q: %v", ms.Name, err)
354356
return err
355357
}
358+
machineSetsDeleted = append(machineSetsDeleted, ms)
359+
356360
// Note: We intentionally log after Delete because we want this log line to show up only after DeletionTimestamp has been set.
357361
log.Info("Deleting MachineSet (cleanup of old MachineSet)", "MachineSet", klog.KObj(ms))
358362
r.recorder.Eventf(deployment, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted MachineSet %q", ms.Name)
359363
}
360364

361-
return nil
365+
return clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "MachineSet deletion (cleanup of old MachineSet)", machineSetsDeleted...)
362366
}

internal/controllers/machineset/machineset_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ func (r *Reconciler) createMachines(ctx context.Context, s *scope, machinesToAdd
894894
}
895895

896896
// Wait for cache update to ensure following reconcile gets latest change.
897-
return ctrl.Result{}, clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Machine creation", machinesAdded...)
897+
return ctrl.Result{}, clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "Machine creation", machinesAdded...)
898898
}
899899

900900
func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDelete int) (ctrl.Result, error) {
@@ -942,7 +942,7 @@ func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDel
942942
}
943943

944944
// Wait for cache update to ensure following reconcile gets latest change.
945-
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion", machinesDeleted...); err != nil {
945+
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion (scale down)", machinesDeleted...); err != nil {
946946
errs = append(errs, err)
947947
}
948948
if len(errs) > 0 {

internal/controllers/topology/cluster/reconcile_state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope
693693
// Wait until MachineDeployment is visible in the cache.
694694
// Note: We have to do this because otherwise using a cached client in current state could
695695
// miss a newly created MachineDeployment (because the cache might be stale).
696-
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineDeployment creation", md.Object); err != nil {
696+
if err := clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "MachineDeployment creation", md.Object); err != nil {
697697
return err
698698
}
699699

@@ -1016,7 +1016,7 @@ func (r *Reconciler) createMachinePool(ctx context.Context, s *scope.Scope, mp *
10161016
// Wait until MachinePool is visible in the cache.
10171017
// Note: We have to do this because otherwise using a cached client in current state could
10181018
// miss a newly created MachinePool (because the cache might be stale).
1019-
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachinePool creation", mp.Object)
1019+
return clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "MachinePool creation", mp.Object)
10201020
}
10211021

10221022
// updateMachinePool updates a MachinePool. Also updates the corresponding objects if necessary.

internal/util/client/client.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,20 @@ var (
4949
// all passed in objects with at least the passed in resourceVersion.
5050
// This is done by retrieving objects from the cache via the client and then comparing resourceVersions.
5151
// Note: This func will update the passed in objects while polling.
52+
// Note: resourceVersion must be set on the passed in objects.
5253
// Note: The generic parameter enforces that all objects have the same type.
5354
func WaitForCacheToBeUpToDate[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
5455
return waitFor(ctx, c, action, checkIfObjectUpToDate, objs...)
5556
}
5657

58+
// WaitForObjectsToBeAddedToTheCache waits until the cache is up-to-date in the sense of that the
59+
// passed in objects exist in the cache.
60+
// Note: This func will update the passed in objects while polling.
61+
// Note: The generic parameter enforces that all objects have the same type.
62+
func WaitForObjectsToBeAddedToTheCache[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
63+
return waitFor(ctx, c, action, checkIfObjectAdded, objs...)
64+
}
65+
5766
// WaitForObjectsToBeDeletedFromTheCache waits until the cache is up-to-date in the sense of that the
5867
// passed in objects have been either removed from the cache or they have a deletionTimestamp set.
5968
// Note: This func will update the passed in objects while polling.
@@ -64,22 +73,21 @@ func WaitForObjectsToBeDeletedFromTheCache[T client.Object](ctx context.Context,
6473

6574
// checkIfObjectUpToDate checks if an object is up-to-date and returns an error if it is not.
6675
func checkIfObjectUpToDate(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
76+
if desiredObj.MinimumResourceVersion == "" {
77+
// Unexpected error occurred: resourceVersion not set on passed in object (not retryable).
78+
return false, errors.Errorf("%s: cannot compare with invalid resourceVersion: resourceVersion not set",
79+
klog.KObj(desiredObj.Object))
80+
}
81+
6782
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
6883
if apierrors.IsNotFound(err) {
69-
// Object is not yet in the cache (retryable).
70-
return true, err
84+
// Done, object was deleted in the meantime.
85+
return false, nil
7186
}
7287
// Unexpected error occurred (not retryable).
7388
return false, err
7489
}
7590

76-
if desiredObj.MinimumResourceVersion == "" {
77-
// Done, if MinimumResourceVersion is empty, as it is enough if the object exists in the cache.
78-
// Note: This can happen when the ServerSidePatchHelper is used to create an object as the ServerSidePatchHelper
79-
// does not update the object after Apply and accordingly resourceVersion remains empty.
80-
return false, nil
81-
}
82-
8391
cmp, err := compareResourceVersion(desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion)
8492
if err != nil {
8593
// Unexpected error occurred: invalid resourceVersion (not retryable).
@@ -96,6 +104,20 @@ func checkIfObjectUpToDate(ctx context.Context, c client.Client, desiredObj desi
96104
return false, nil
97105
}
98106

107+
func checkIfObjectAdded(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
108+
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
109+
if apierrors.IsNotFound(err) {
110+
// Object is not yet in the cache (retryable).
111+
return true, err
112+
}
113+
// Unexpected error occurred (not retryable).
114+
return false, err
115+
}
116+
117+
// Done, object exists in the cache.
118+
return false, nil
119+
}
120+
99121
func checkIfObjectDeleted(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
100122
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
101123
if apierrors.IsNotFound(err) {

internal/util/client/client_test.go

Lines changed: 126 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,23 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
5656
{
5757
name: "no-op if no objects are passed in",
5858
},
59+
{
60+
name: "error if passed in objects have no resourceVersion set",
61+
objs: []client.Object{
62+
machine("machine-1", "", nil),
63+
machine("machine-2", "", nil),
64+
},
65+
clientResponses: map[client.ObjectKey][]client.Object{
66+
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
67+
machine("machine-1", "1", nil),
68+
},
69+
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
70+
machine("machine-2", "2", nil),
71+
},
72+
},
73+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine update: " +
74+
"default/machine-1: cannot compare with invalid resourceVersion: resourceVersion not set",
75+
},
5976
{
6077
name: "error if passed in objects have invalid resourceVersion",
6178
objs: []client.Object{
@@ -70,7 +87,7 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
7087
machine("machine-2", "2", nil),
7188
},
7289
},
73-
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: " +
90+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine update: " +
7491
"default/machine-1: cannot compare with invalid resourceVersion: current: 1, expected to be >= invalidResourceVersion: resource version is not well formed: invalidResourceVersion",
7592
},
7693
{
@@ -87,35 +104,30 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
87104
machine("machine-2", "invalidResourceVersion", nil),
88105
},
89106
},
90-
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: " +
107+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine update: " +
91108
"default/machine-1: cannot compare with invalid resourceVersion: current: invalidResourceVersion, expected to be >= 1: resource version is not well formed: invalidResourceVersion",
92109
},
93110
{
94-
name: "error if objects never show up in the cache",
111+
name: "success if objects are never visible in the cache (deleted before WaitForCacheToBeUpToDate is called)",
95112
objs: []client.Object{
96113
machine("machine-1", "1", nil),
97114
machine("machine-2", "2", nil),
98115
machine("machine-3", "3", nil),
99116
machine("machine-4", "4", nil),
100117
},
101118
clientResponses: map[client.ObjectKey][]client.Object{},
102-
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out: [" +
103-
"machines.cluster.x-k8s.io \"machine-1\" not found, " +
104-
"machines.cluster.x-k8s.io \"machine-2\" not found, " +
105-
"machines.cluster.x-k8s.io \"machine-3\" not found, " +
106-
"machines.cluster.x-k8s.io \"machine-4\" not found]",
107119
},
108120
{
109121
name: "success if objects are instantly up-to-date",
110122
objs: []client.Object{
111-
machine("machine-1", "", nil),
123+
machine("machine-1", "1", nil),
112124
machine("machine-2", "2", nil),
113125
machine("machine-3", "3", nil),
114126
machine("machine-4", "4", nil),
115127
},
116128
clientResponses: map[client.ObjectKey][]client.Object{
117129
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
118-
// For this object it's enough if it shows up, exact resourceVersion doesn't matter.
130+
// This object has an even newer resourceVersion.
119131
machine("machine-1", "5", nil),
120132
},
121133
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
@@ -133,14 +145,13 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
133145
{
134146
name: "success if objects are up-to-date after a few tries",
135147
objs: []client.Object{
136-
machine("machine-1", "", nil),
148+
machine("machine-1", "1", nil),
137149
machine("machine-2", "10", nil),
138150
machine("machine-3", "11", nil),
139151
machine("machine-4", "12", nil),
140152
},
141153
clientResponses: map[client.ObjectKey][]client.Object{
142154
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
143-
// For this object it's enough if it shows up, exact resourceVersion doesn't matter.
144155
machine("machine-1", "4", nil),
145156
},
146157
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
@@ -195,7 +206,109 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
195206
},
196207
})
197208

198-
err := WaitForCacheToBeUpToDate(t.Context(), fakeClient, "Machine creation", tt.objs...)
209+
err := WaitForCacheToBeUpToDate(t.Context(), fakeClient, "Machine update", tt.objs...)
210+
if tt.wantErr != "" {
211+
g.Expect(err).To(HaveOccurred())
212+
g.Expect(err.Error()).To(Equal(tt.wantErr))
213+
} else {
214+
g.Expect(err).ToNot(HaveOccurred())
215+
}
216+
})
217+
}
218+
}
219+
220+
func Test_WaitForObjectsToBeAddedToTheCache(t *testing.T) {
221+
// Modify timeout to speed up test
222+
waitBackoff = wait.Backoff{
223+
Duration: 25 * time.Microsecond,
224+
Cap: 2 * time.Second,
225+
Factor: 1.2,
226+
Steps: 5,
227+
}
228+
229+
tests := []struct {
230+
name string
231+
objs []client.Object
232+
clientResponses map[client.ObjectKey][]client.Object
233+
wantErr string
234+
}{
235+
{
236+
name: "no-op if no objects are passed in",
237+
},
238+
{
239+
name: "error if objects never show up in the cache",
240+
objs: []client.Object{
241+
machine("machine-1", "1", nil),
242+
machine("machine-2", "2", nil),
243+
machine("machine-3", "3", nil),
244+
machine("machine-4", "4", nil),
245+
},
246+
clientResponses: map[client.ObjectKey][]client.Object{},
247+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out: [" +
248+
"machines.cluster.x-k8s.io \"machine-1\" not found, " +
249+
"machines.cluster.x-k8s.io \"machine-2\" not found, " +
250+
"machines.cluster.x-k8s.io \"machine-3\" not found, " +
251+
"machines.cluster.x-k8s.io \"machine-4\" not found]",
252+
},
253+
{
254+
name: "success if objects instantly show up in the cache",
255+
objs: []client.Object{
256+
machine("machine-1", "1", nil),
257+
machine("machine-2", "2", nil),
258+
machine("machine-3", "3", nil),
259+
machine("machine-4", "4", nil),
260+
},
261+
clientResponses: map[client.ObjectKey][]client.Object{
262+
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
263+
// This object has an even newer resourceVersion.
264+
machine("machine-1", "5", nil),
265+
},
266+
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
267+
machine("machine-2", "2", nil),
268+
},
269+
{Namespace: metav1.NamespaceDefault, Name: "machine-3"}: {
270+
machine("machine-3", "3", nil),
271+
},
272+
{Namespace: metav1.NamespaceDefault, Name: "machine-4"}: {
273+
// This object has an even newer resourceVersion.
274+
machine("machine-4", "6", nil),
275+
},
276+
},
277+
},
278+
}
279+
280+
for _, tt := range tests {
281+
t.Run(tt.name, func(t *testing.T) {
282+
g := NewWithT(t)
283+
284+
scheme := runtime.NewScheme()
285+
_ = clusterv1.AddToScheme(scheme)
286+
287+
callCounter := map[client.ObjectKey]int{}
288+
fakeClient := interceptor.NewClient(fake.NewClientBuilder().WithScheme(scheme).Build(), interceptor.Funcs{
289+
Get: func(ctx context.Context, _ client.WithWatch, key client.ObjectKey, obj client.Object, _ ...client.GetOption) error {
290+
if len(tt.clientResponses) == 0 || len(tt.clientResponses[key]) == 0 {
291+
return apierrors.NewNotFound(schema.GroupResource{
292+
Group: clusterv1.GroupVersion.Group,
293+
Resource: "machines",
294+
}, key.Name)
295+
}
296+
297+
currentCall := callCounter[key]
298+
currentCall = min(currentCall, len(tt.clientResponses[key])-1)
299+
300+
// Write back the modified object so callers can access the patched object.
301+
if err := scheme.Convert(tt.clientResponses[key][currentCall], obj, ctx); err != nil {
302+
return errors.Wrapf(err, "unexpected error: failed to get")
303+
}
304+
305+
callCounter[key]++
306+
307+
return nil
308+
},
309+
})
310+
311+
err := WaitForObjectsToBeAddedToTheCache(t.Context(), fakeClient, "Machine creation", tt.objs...)
199312
if tt.wantErr != "" {
200313
g.Expect(err).To(HaveOccurred())
201314
g.Expect(err.Error()).To(Equal(tt.wantErr))

0 commit comments

Comments
 (0)