@@ -20,13 +20,16 @@ import (
2020 "context"
2121 "fmt"
2222 "strings"
23+ "time"
2324
2425 "github.com/pkg/errors"
2526 corev1 "k8s.io/api/core/v1"
2627 apierrors "k8s.io/apimachinery/pkg/api/errors"
2728 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30+ "k8s.io/apimachinery/pkg/util/sets"
2931 "k8s.io/apimachinery/pkg/util/validation/field"
32+ "k8s.io/apimachinery/pkg/util/wait"
3033 "k8s.io/apiserver/pkg/storage/names"
3134 "k8s.io/klog/v2"
3235 ctrl "sigs.k8s.io/controller-runtime"
@@ -446,11 +449,29 @@ func (r *Reconciler) reconcileMachineDeployments(ctx context.Context, s *scope.S
446449 diff := calculateMachineDeploymentDiff (s .Current .MachineDeployments , s .Desired .MachineDeployments )
447450
448451 // Create MachineDeployments.
449- for _ , mdTopologyName := range diff .toCreate {
450- md := s .Desired .MachineDeployments [mdTopologyName ]
451- if err := r .createMachineDeployment (ctx , s , md ); err != nil {
452+ if len (diff .toCreate ) > 0 {
453+ // In current state we only got the MD list via a cached call.
454+ // As a consequence, in order to prevent the creation of duplicate MD due to stale reads,
455+ // we are now using a live client to double-check here that the MachineDeployment
456+ // to be created doesn't exist yet.
457+ currentMDTopologyNames , err := r .getCurrentMachineDeployments (ctx , s )
458+ if err != nil {
452459 return err
453460 }
461+ for _ , mdTopologyName := range diff .toCreate {
462+ md := s .Desired .MachineDeployments [mdTopologyName ]
463+
464+ // Skip the MD creation if the MD already exists.
465+ if currentMDTopologyNames .Has (mdTopologyName ) {
466+ log := tlog .LoggerFrom (ctx ).WithMachineDeployment (md .Object )
467+ log .V (3 ).Infof (fmt .Sprintf ("Skipping creation of MachineDeployment %s because MachineDeployment for topology %s already exists (only considered creation because of stale cache)" , tlog.KObj {Obj : md .Object }, mdTopologyName ))
468+ continue
469+ }
470+
471+ if err := r .createMachineDeployment (ctx , s , md ); err != nil {
472+ return err
473+ }
474+ }
454475 }
455476
456477 // Update MachineDeployments.
@@ -472,6 +493,32 @@ func (r *Reconciler) reconcileMachineDeployments(ctx context.Context, s *scope.S
472493 return nil
473494}
474495
496+ // getCurrentMachineDeployments gets the current list of MachineDeployments via the APIReader.
497+ func (r * Reconciler ) getCurrentMachineDeployments (ctx context.Context , s * scope.Scope ) (sets.Set [string ], error ) {
498+ // TODO: We should consider using PartialObjectMetadataList here. Currently this doesn't work as our
499+ // implementation for topology dryrun doesn't support PartialObjectMetadataList.
500+ mdList := & clusterv1.MachineDeploymentList {}
501+ err := r .APIReader .List (ctx , mdList ,
502+ client.MatchingLabels {
503+ clusterv1 .ClusterNameLabel : s .Current .Cluster .Name ,
504+ clusterv1 .ClusterTopologyOwnedLabel : "" ,
505+ },
506+ client .InNamespace (s .Current .Cluster .Namespace ),
507+ )
508+ if err != nil {
509+ return nil , errors .Wrap (err , "failed to read MachineDeployments for managed topology" )
510+ }
511+
512+ currentMDs := sets.Set [string ]{}
513+ for _ , md := range mdList .Items {
514+ mdTopologyName , ok := md .ObjectMeta .Labels [clusterv1 .ClusterTopologyMachineDeploymentNameLabel ]
515+ if ok || mdTopologyName != "" {
516+ currentMDs .Insert (mdTopologyName )
517+ }
518+ }
519+ return currentMDs , nil
520+ }
521+
475522// createMachineDeployment creates a MachineDeployment and the corresponding Templates.
476523func (r * Reconciler ) createMachineDeployment (ctx context.Context , s * scope.Scope , md * scope.MachineDeploymentState ) error {
477524 // Do not create the MachineDeployment if it is marked as pending create.
@@ -517,6 +564,23 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope
517564 }
518565 r .recorder .Eventf (cluster , corev1 .EventTypeNormal , createEventReason , "Created %q" , tlog.KObj {Obj : md .Object })
519566
567+ // Wait until MachineDeployment is visible in the cache.
568+ // Note: We have to do this because otherwise using a cached client in current state could
569+ // miss a newly created MachineDeployment (because the cache might be stale).
570+ err = wait .PollUntilContextTimeout (ctx , 5 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
571+ key := client.ObjectKey {Namespace : md .Object .Namespace , Name : md .Object .Name }
572+ if err := r .Client .Get (ctx , key , & clusterv1.MachineDeployment {}); err != nil {
573+ if apierrors .IsNotFound (err ) {
574+ return false , nil
575+ }
576+ return false , err
577+ }
578+ return true , nil
579+ })
580+ if err != nil {
581+ return errors .Wrapf (err , "failed to create %s: failed waiting for MachineDeployment to be visible in cache" , md .Object .Kind )
582+ }
583+
520584 // If the MachineDeployment has defined a MachineHealthCheck reconcile it.
521585 if md .MachineHealthCheck != nil {
522586 if err := r .reconcileMachineHealthCheck (ctx , nil , md .MachineHealthCheck ); err != nil {
@@ -588,6 +652,24 @@ func (r *Reconciler) updateMachineDeployment(ctx context.Context, s *scope.Scope
588652 }
589653 r .recorder .Eventf (cluster , corev1 .EventTypeNormal , updateEventReason , "Updated %q%s" , tlog.KObj {Obj : currentMD .Object }, logMachineDeploymentVersionChange (currentMD .Object , desiredMD .Object ))
590654
655+ // Wait until MachineDeployment is updated in the cache.
656+ // Note: We have to do this because otherwise using a cached client in current state could
657+ // return a stale state of a MachineDeployment we just patched (because the cache might be stale).
658+ // Note: It is good enough to check that the resource version changed. Other controllers might have updated the
659+ // MachineDeployment as well, but the combination of the patch call above without a conflict and a changed resource
660+ // version here guarantees that we see the changes of our own update.
661+ err = wait .PollUntilContextTimeout (ctx , 5 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
662+ key := client.ObjectKey {Namespace : currentMD .Object .GetNamespace (), Name : currentMD .Object .GetName ()}
663+ cachedMD := & clusterv1.MachineDeployment {}
664+ if err := r .Client .Get (ctx , key , cachedMD ); err != nil {
665+ return false , err
666+ }
667+ return currentMD .Object .GetResourceVersion () != cachedMD .GetResourceVersion (), nil
668+ })
669+ if err != nil {
670+ return errors .Wrapf (err , "failed to patch %s: failed waiting for MachineDeployment to be updated in cache" , tlog.KObj {Obj : currentMD .Object })
671+ }
672+
591673 // We want to call both cleanup functions even if one of them fails to clean up as much as possible.
592674 return nil
593675}
0 commit comments