Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions internal/controllers/topology/cluster/reconcile_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (r *Reconciler) reconcileMachineHealthCheck(ctx context.Context, current, d
if err != nil {
return errors.Wrapf(err, "failed to create patch helper for MachineHealthCheck %s", klog.KObj(desired))
}
if err := helper.Patch(ctx); err != nil {
if _, err := helper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to create MachineHealthCheck %s", klog.KObj(desired))
}
r.recorder.Eventf(desired, corev1.EventTypeNormal, createEventReason, "Created MachineHealthCheck %q", klog.KObj(desired))
Expand Down Expand Up @@ -500,7 +500,7 @@ func (r *Reconciler) reconcileMachineHealthCheck(ctx context.Context, current, d
}

log.Info("Patching MachineHealthCheck")
if err := patchHelper.Patch(ctx); err != nil {
if _, err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch MachineHealthCheck %s", klog.KObj(current))
}
r.recorder.Eventf(current, corev1.EventTypeNormal, updateEventReason, "Updated MachineHealthCheck %q", klog.KObj(current))
Expand Down Expand Up @@ -530,19 +530,19 @@ func (r *Reconciler) reconcileCluster(ctx context.Context, s *scope.Scope) error
} else {
log.Info("Patching Cluster", "diff", string(changes))
}
if err := patchHelper.Patch(ctx); err != nil {
modifiedResourceVersion, err := patchHelper.Patch(ctx)
if err != nil {
return errors.Wrapf(err, "failed to patch Cluster %s", klog.KObj(s.Current.Cluster))
}
r.recorder.Eventf(s.Current.Cluster, corev1.EventTypeNormal, updateEventReason, "Updated Cluster %q", klog.KObj(s.Current.Cluster))

// Wait until Cluster is updated in the cache.
// Note: We have to do this because otherwise using a cached client in the Reconcile func could
// return a stale state of the Cluster we just patched (because the cache might be stale).
// Note: It is good enough to check that the resource version changed. Other controllers might have updated the
// Cluster as well, but the combination of the patch call above without a conflict and a changed resource
// version here guarantees that we see the changes of our own update.
// Note: Using DeepCopy to not modify s.Current.Cluster as it's not trivial to figure out what impact that would have.
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Cluster update", s.Current.Cluster.DeepCopy())
cluster := s.Current.Cluster.DeepCopy()
cluster.ResourceVersion = modifiedResourceVersion
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Cluster update", cluster)
Copy link
Member Author

@sbueringer sbueringer Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wait was a no-op previously because patchHelper.Patch did not update s.Current.Cluster during the Patch call so it always had the resourceVersion that we got at the beginning of the reconcile from the apiserver

}

// reconcileMachineDeployments reconciles the desired state of the MachineDeployment objects.
Expand Down Expand Up @@ -693,7 +693,7 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope
bootstrapCleanupFunc()
return createErrorWithoutObjectName(ctx, err, md.Object)
}
if err := helper.Patch(ctx); err != nil {
if _, err := helper.Patch(ctx); err != nil {
// Best effort cleanup of the InfrastructureMachineTemplate & BootstrapTemplate (only on creation).
infrastructureMachineCleanupFunc()
bootstrapCleanupFunc()
Expand Down Expand Up @@ -814,7 +814,8 @@ func (r *Reconciler) updateMachineDeployment(ctx context.Context, s *scope.Scope
} else {
log.Info("Patching MachineDeployment", "diff", string(changes))
}
if err := patchHelper.Patch(ctx); err != nil {
modifiedResourceVersion, err := patchHelper.Patch(ctx)
if err != nil {
// Best effort cleanup of the InfrastructureMachineTemplate & BootstrapTemplate (only on template rotation).
infrastructureMachineCleanupFunc()
bootstrapCleanupFunc()
Expand All @@ -825,10 +826,10 @@ func (r *Reconciler) updateMachineDeployment(ctx context.Context, s *scope.Scope
// Wait until MachineDeployment is updated in the cache.
// Note: We have to do this because otherwise using a cached client in current state could
// return a stale state of a MachineDeployment we just patched (because the cache might be stale).
// Note: It is good enough to check that the resource version changed. Other controllers might have updated the
// MachineDeployment as well, but the combination of the patch call above without a conflict and a changed resource
// version here guarantees that we see the changes of our own update.
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineDeployment update", currentMD.Object); err != nil {
// Note: Using DeepCopy to not modify currentMD.Object as it's not trivial to figure out what impact that would have.
md := currentMD.Object.DeepCopy()
md.ResourceVersion = modifiedResourceVersion
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineDeployment update", md); err != nil {
return err
}

Expand Down Expand Up @@ -1016,7 +1017,7 @@ func (r *Reconciler) createMachinePool(ctx context.Context, s *scope.Scope, mp *
bootstrapCleanupFunc()
return createErrorWithoutObjectName(ctx, err, mp.Object)
}
if err := helper.Patch(ctx); err != nil {
if _, err := helper.Patch(ctx); err != nil {
// Best effort cleanup of the InfrastructureMachinePool & BootstrapConfig (only on creation).
infrastructureMachineMachinePoolCleanupFunc()
bootstrapCleanupFunc()
Expand Down Expand Up @@ -1077,7 +1078,8 @@ func (r *Reconciler) updateMachinePool(ctx context.Context, s *scope.Scope, mpTo
} else {
log.Info("Patching MachinePool", "diff", string(changes))
}
if err := patchHelper.Patch(ctx); err != nil {
modifiedResourceVersion, err := patchHelper.Patch(ctx)
if err != nil {
return errors.Wrapf(err, "failed to patch MachinePool %s", klog.KObj(currentMP.Object))
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, updateEventReason, "Updated MachinePool %q%s", klog.KObj(currentMP.Object), logMachinePoolVersionChange(currentMP.Object, desiredMP.Object))
Expand All @@ -1088,7 +1090,9 @@ func (r *Reconciler) updateMachinePool(ctx context.Context, s *scope.Scope, mpTo
// Note: It is good enough to check that the resource version changed. Other controllers might have updated the
// MachinePool as well, but the combination of the patch call above without a conflict and a changed resource
// version here guarantees that we see the changes of our own update.
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachinePool update", currentMP.Object); err != nil {
mp := currentMP.Object.DeepCopy()
mp.ResourceVersion = modifiedResourceVersion
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachinePool update", mp); err != nil {
return err
}

Expand Down Expand Up @@ -1193,7 +1197,7 @@ func (r *Reconciler) reconcileReferencedObject(ctx context.Context, in reconcile
if err != nil {
return false, errors.Wrap(createErrorWithoutObjectName(ctx, err, in.desired), "failed to create patch helper")
}
if err := helper.Patch(ctx); err != nil {
if _, err := helper.Patch(ctx); err != nil {
return false, createErrorWithoutObjectName(ctx, err, in.desired)
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %s %q", in.desired.GetKind(), klog.KObj(in.desired))
Expand Down Expand Up @@ -1224,7 +1228,7 @@ func (r *Reconciler) reconcileReferencedObject(ctx context.Context, in reconcile
} else {
log.Info(fmt.Sprintf("Patching %s", in.desired.GetKind()), "diff", string(changes))
}
if err := patchHelper.Patch(ctx); err != nil {
if _, err := patchHelper.Patch(ctx); err != nil {
return false, errors.Wrapf(err, "failed to patch %s %s", in.current.GetKind(), klog.KObj(in.current))
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, updateEventReason, "Updated %s %q%s", in.desired.GetKind(), klog.KObj(in.desired), logUnstructuredVersionChange(in.current, in.desired, in.versionGetter))
Expand Down Expand Up @@ -1279,7 +1283,7 @@ func (r *Reconciler) reconcileReferencedTemplate(ctx context.Context, in reconci
if err != nil {
return false, errors.Wrap(createErrorWithoutObjectName(ctx, err, in.desired), "failed to create patch helper")
}
if err := helper.Patch(ctx); err != nil {
if _, err := helper.Patch(ctx); err != nil {
return false, createErrorWithoutObjectName(ctx, err, in.desired)
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %s %q", in.desired.GetKind(), klog.KObj(in.desired))
Expand Down Expand Up @@ -1319,7 +1323,7 @@ func (r *Reconciler) reconcileReferencedTemplate(ctx context.Context, in reconci
} else {
log.Info(fmt.Sprintf("Patching %s", in.desired.GetKind()), "diff", string(changes))
}
if err := patchHelper.Patch(ctx); err != nil {
if _, err := patchHelper.Patch(ctx); err != nil {
return false, errors.Wrapf(err, "failed to patch %s %s", in.desired.GetKind(), klog.KObj(in.desired))
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, updateEventReason, "Updated %s %q (metadata changes)", in.desired.GetKind(), klog.KObj(in.desired))
Expand All @@ -1344,7 +1348,7 @@ func (r *Reconciler) reconcileReferencedTemplate(ctx context.Context, in reconci
if err != nil {
return false, errors.Wrap(createErrorWithoutObjectName(ctx, err, in.desired), "failed to create patch helper")
}
if err := helper.Patch(ctx); err != nil {
if _, err := helper.Patch(ctx); err != nil {
return false, createErrorWithoutObjectName(ctx, err, in.desired)
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %s %q as a replacement for %q (template rotation)", in.desired.GetKind(), klog.KObj(in.desired), in.ref.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ type PatchHelper interface {
Changes() []byte

// Patch patches the given obj in the Kubernetes cluster.
Patch(ctx context.Context) error
Patch(ctx context.Context) (modifiedResourceVersion string, err error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ func (h *serverSidePatchHelper) HasChanges() bool {
}

// Patch will server side apply the current intent (the modified object.
func (h *serverSidePatchHelper) Patch(ctx context.Context) error {
func (h *serverSidePatchHelper) Patch(ctx context.Context) (string, error) {
if !h.HasChanges() {
return nil
return "", nil
}

log := ctrl.LoggerFrom(ctx)
Expand All @@ -147,5 +147,8 @@ func (h *serverSidePatchHelper) Patch(ctx context.Context) error {
// overwrite values and become sole manager.
client.ForceOwnership,
}
return h.client.Apply(ctx, client.ApplyConfigurationFromUnstructured(h.modified), options...)
if err := h.client.Apply(ctx, client.ApplyConfigurationFromUnstructured(h.modified), options...); err != nil {
return "", err
}
return h.modified.GetResourceVersion(), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func TestServerSideApply(t *testing.T) {
g.Expect(p0.Changes()).To(BeNil()) // changes are expected to be nil on create.

// Create the object using server side apply
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Check the object and verify managed field are properly set.
got := obj.DeepCopy()
Expand Down Expand Up @@ -306,7 +307,8 @@ func TestServerSideApply(t *testing.T) {
g.Expect(p0.Changes()).To(BeNil())

// Change the object using server side apply
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Check the object and verify fields set by the other controller are preserved.
got := obj.DeepCopy()
Expand Down Expand Up @@ -354,7 +356,8 @@ func TestServerSideApply(t *testing.T) {
g.Expect(p0.Changes()).To(Equal([]byte(`{"spec":{"controlPlaneEndpoint":{"host":"changed"}}}`)))

// Create the object using server side apply
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Check the object and verify the change is applied as well as the fields set by the other controller are still preserved.
got := obj.DeepCopy()
Expand Down Expand Up @@ -393,7 +396,8 @@ func TestServerSideApply(t *testing.T) {
g.Expect(p0.Changes()).To(BeEmpty()) // Note: metadata.managedFields have been removed from the diff to reduce log verbosity.

// Create the object using server side apply
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Check the object and verify the change is applied as well as managed field updated accordingly.
got := obj.DeepCopy()
Expand Down Expand Up @@ -434,7 +438,8 @@ func TestServerSideApply(t *testing.T) {
g.Expect(p0.Changes()).To(Equal([]byte(`{"spec":{"bar":"changed-by-topology-controller"}}`)))

// Create the object using server side apply
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Check the object and verify the change is applied as well as managed field updated accordingly.
got := obj.DeepCopy()
Expand Down Expand Up @@ -667,7 +672,8 @@ func TestServerSideApplyWithDefaulting(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(p0.HasChanges()).To(BeTrue())
g.Expect(p0.HasSpecChanges()).To(BeTrue())
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())
defer func() {
g.Expect(env.CleanupAndWait(ctx, kct.DeepCopy())).To(Succeed())
}()
Expand Down Expand Up @@ -731,7 +737,8 @@ func TestServerSideApplyWithDefaulting(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(p0.HasChanges()).To(Equal(tt.expectChanges), fmt.Sprintf("changes: %s", string(p0.Changes())))
g.Expect(p0.HasSpecChanges()).To(Equal(tt.expectSpecChanges))
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Verify field ownership
// Note: It might take a bit for the cache to be up-to-date.
Expand Down Expand Up @@ -779,7 +786,8 @@ func TestServerSideApplyWithDefaulting(t *testing.T) {
// Expect no changes.
g.Expect(p0.HasChanges()).To(BeFalse())
g.Expect(p0.HasSpecChanges()).To(BeFalse())
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err := p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Expect webhook to be called.
g.Expect(defaulter.Counter).To(Equal(countBefore+2),
Expand All @@ -806,7 +814,8 @@ func TestServerSideApplyWithDefaulting(t *testing.T) {
// Expect no changes.
g.Expect(p0.HasChanges()).To(BeFalse())
g.Expect(p0.HasSpecChanges()).To(BeFalse())
g.Expect(p0.Patch(ctx)).To(Succeed())
_, err = p0.Patch(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Expect webhook to not be called.
g.Expect(defaulter.Counter).To(Equal(countBefore),
Expand Down
Loading