@@ -20,13 +20,16 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"strings"
23
+ "time"
23
24
24
25
"github.com/pkg/errors"
25
26
corev1 "k8s.io/api/core/v1"
26
27
apierrors "k8s.io/apimachinery/pkg/api/errors"
27
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
29
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
+ "k8s.io/apimachinery/pkg/util/sets"
29
31
"k8s.io/apimachinery/pkg/util/validation/field"
32
+ "k8s.io/apimachinery/pkg/util/wait"
30
33
"k8s.io/apiserver/pkg/storage/names"
31
34
"k8s.io/klog/v2"
32
35
ctrl "sigs.k8s.io/controller-runtime"
@@ -446,11 +449,29 @@ func (r *Reconciler) reconcileMachineDeployments(ctx context.Context, s *scope.S
446
449
diff := calculateMachineDeploymentDiff (s .Current .MachineDeployments , s .Desired .MachineDeployments )
447
450
448
451
// Create MachineDeployments.
449
- for _ , mdTopologyName := range diff .toCreate {
450
- md := s .Desired .MachineDeployments [mdTopologyName ]
451
- if err := r .createMachineDeployment (ctx , s , md ); err != nil {
452
+ if len (diff .toCreate ) > 0 {
453
+ // In current state we only got the MD list via a cached call.
454
+ // As a consequence, in order to prevent the creation of duplicate MD due to stale reads,
455
+ // we are now using a live client to double-check here that the MachineDeployment
456
+ // to be created doesn't exist yet.
457
+ currentMDTopologyNames , err := r .getCurrentMachineDeployments (ctx , s )
458
+ if err != nil {
452
459
return err
453
460
}
461
+ for _ , mdTopologyName := range diff .toCreate {
462
+ md := s .Desired .MachineDeployments [mdTopologyName ]
463
+
464
+ // Skip the MD creation if the MD already exists.
465
+ if currentMDTopologyNames .Has (mdTopologyName ) {
466
+ log := tlog .LoggerFrom (ctx ).WithMachineDeployment (md .Object )
467
+ log .V (3 ).Infof (fmt .Sprintf ("Skipping creation of MachineDeployment %s because MachineDeployment for topology %s already exists (only considered creation because of stale cache)" , tlog.KObj {Obj : md .Object }, mdTopologyName ))
468
+ continue
469
+ }
470
+
471
+ if err := r .createMachineDeployment (ctx , s , md ); err != nil {
472
+ return err
473
+ }
474
+ }
454
475
}
455
476
456
477
// Update MachineDeployments.
@@ -472,6 +493,32 @@ func (r *Reconciler) reconcileMachineDeployments(ctx context.Context, s *scope.S
472
493
return nil
473
494
}
474
495
496
+ // getCurrentMachineDeployments gets the current list of MachineDeployments via the APIReader.
497
+ func (r * Reconciler ) getCurrentMachineDeployments (ctx context.Context , s * scope.Scope ) (sets.Set [string ], error ) {
498
+ // TODO: We should consider using PartialObjectMetadataList here. Currently this doesn't work as our
499
+ // implementation for topology dryrun doesn't support PartialObjectMetadataList.
500
+ mdList := & clusterv1.MachineDeploymentList {}
501
+ err := r .APIReader .List (ctx , mdList ,
502
+ client.MatchingLabels {
503
+ clusterv1 .ClusterNameLabel : s .Current .Cluster .Name ,
504
+ clusterv1 .ClusterTopologyOwnedLabel : "" ,
505
+ },
506
+ client .InNamespace (s .Current .Cluster .Namespace ),
507
+ )
508
+ if err != nil {
509
+ return nil , errors .Wrap (err , "failed to read MachineDeployments for managed topology" )
510
+ }
511
+
512
+ currentMDs := sets.Set [string ]{}
513
+ for _ , md := range mdList .Items {
514
+ mdTopologyName , ok := md .ObjectMeta .Labels [clusterv1 .ClusterTopologyMachineDeploymentNameLabel ]
515
+ if ok || mdTopologyName != "" {
516
+ currentMDs .Insert (mdTopologyName )
517
+ }
518
+ }
519
+ return currentMDs , nil
520
+ }
521
+
475
522
// createMachineDeployment creates a MachineDeployment and the corresponding Templates.
476
523
func (r * Reconciler ) createMachineDeployment (ctx context.Context , s * scope.Scope , md * scope.MachineDeploymentState ) error {
477
524
// Do not create the MachineDeployment if it is marked as pending create.
@@ -517,6 +564,23 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope
517
564
}
518
565
r .recorder .Eventf (cluster , corev1 .EventTypeNormal , createEventReason , "Created %q" , tlog.KObj {Obj : md .Object })
519
566
567
+ // Wait until MachineDeployment is visible in the cache.
568
+ // Note: We have to do this because otherwise using a cached client in current state could
569
+ // miss a newly created MachineDeployment (because the cache might be stale).
570
+ err = wait .PollUntilContextTimeout (ctx , 5 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
571
+ key := client.ObjectKey {Namespace : md .Object .Namespace , Name : md .Object .Name }
572
+ if err := r .Client .Get (ctx , key , & clusterv1.MachineDeployment {}); err != nil {
573
+ if apierrors .IsNotFound (err ) {
574
+ return false , nil
575
+ }
576
+ return false , err
577
+ }
578
+ return true , nil
579
+ })
580
+ if err != nil {
581
+ return errors .Wrapf (err , "failed to create %s: failed waiting for MachineDeployment to be visible in cache" , md .Object .Kind )
582
+ }
583
+
520
584
// If the MachineDeployment has defined a MachineHealthCheck reconcile it.
521
585
if md .MachineHealthCheck != nil {
522
586
if err := r .reconcileMachineHealthCheck (ctx , nil , md .MachineHealthCheck ); err != nil {
@@ -588,6 +652,24 @@ func (r *Reconciler) updateMachineDeployment(ctx context.Context, s *scope.Scope
588
652
}
589
653
r .recorder .Eventf (cluster , corev1 .EventTypeNormal , updateEventReason , "Updated %q%s" , tlog.KObj {Obj : currentMD .Object }, logMachineDeploymentVersionChange (currentMD .Object , desiredMD .Object ))
590
654
655
+ // Wait until MachineDeployment is updated in the cache.
656
+ // Note: We have to do this because otherwise using a cached client in current state could
657
+ // return a stale state of a MachineDeployment we just patched (because the cache might be stale).
658
+ // Note: It is good enough to check that the resource version changed. Other controllers might have updated the
659
+ // MachineDeployment as well, but the combination of the patch call above without a conflict and a changed resource
660
+ // version here guarantees that we see the changes of our own update.
661
+ err = wait .PollUntilContextTimeout (ctx , 5 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
662
+ key := client.ObjectKey {Namespace : currentMD .Object .GetNamespace (), Name : currentMD .Object .GetName ()}
663
+ cachedMD := & clusterv1.MachineDeployment {}
664
+ if err := r .Client .Get (ctx , key , cachedMD ); err != nil {
665
+ return false , err
666
+ }
667
+ return currentMD .Object .GetResourceVersion () != cachedMD .GetResourceVersion (), nil
668
+ })
669
+ if err != nil {
670
+ return errors .Wrapf (err , "failed to patch %s: failed waiting for MachineDeployment to be updated in cache" , tlog.KObj {Obj : currentMD .Object })
671
+ }
672
+
591
673
// We want to call both cleanup functions even if one of them fails to clean up as much as possible.
592
674
return nil
593
675
}
0 commit comments