@@ -18,19 +18,25 @@ package scope
1818
1919import (
2020 "context"
21+ "fmt"
2122 "reflect"
23+ "time"
2224
2325 "github.com/go-logr/logr"
2426 "github.com/pkg/errors"
2527 corev1 "k8s.io/api/core/v1"
2628 apierrors "k8s.io/apimachinery/pkg/api/errors"
27- "k8s.io/client-go/tools/clientcmd"
29+ "k8s.io/client-go/kubernetes"
30+ "k8s.io/klog/v2"
2831 "k8s.io/klog/v2/klogr"
2932 "k8s.io/utils/pointer"
33+ clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
3034 "sigs.k8s.io/cluster-api/controllers/noderefutil"
35+ "sigs.k8s.io/cluster-api/controllers/remote"
3136 capierrors "sigs.k8s.io/cluster-api/errors"
3237 capiv1exp "sigs.k8s.io/cluster-api/exp/api/v1alpha4"
33- utilkubeconfig "sigs.k8s.io/cluster-api/util/kubeconfig"
38+ drain "sigs.k8s.io/cluster-api/third_party/kubernetes-drain"
39+ "sigs.k8s.io/cluster-api/util/conditions"
3440 "sigs.k8s.io/cluster-api/util/patch"
3541 "sigs.k8s.io/controller-runtime/pkg/client"
3642
@@ -40,6 +46,11 @@ import (
4046 "sigs.k8s.io/cluster-api-provider-azure/util/tele"
4147)
4248
49+ const (
50+ // MachinePoolMachineScopeName is the sourceName, or more specifically the UserAgent, of client used in cordon and drain.
51+ MachinePoolMachineScopeName = "azuremachinepoolmachine-scope"
52+ )
53+
4354type (
4455 nodeGetter interface {
4556 GetNodeByProviderID (ctx context.Context , providerID string ) (* corev1.Node , error )
@@ -261,6 +272,158 @@ func (s *MachinePoolMachineScope) UpdateStatus(ctx context.Context) error {
261272 return nil
262273}
263274
275+ // CordonAndDrain will cordon and drain the Kubernetes node associated with this AzureMachinePoolMachine.
276+ func (s * MachinePoolMachineScope ) CordonAndDrain (ctx context.Context ) error {
277+ ctx , span := tele .Tracer ().Start (ctx , "scope.MachinePoolMachineScope.CordonAndDrain" )
278+ defer span .End ()
279+
280+ var (
281+ nodeRef = s .AzureMachinePoolMachine .Status .NodeRef
282+ node * corev1.Node
283+ err error
284+ )
285+ if nodeRef == nil || nodeRef .Name == "" {
286+ node , err = s .workloadNodeGetter .GetNodeByProviderID (ctx , s .ProviderID ())
287+ } else {
288+ node , err = s .workloadNodeGetter .GetNodeByObjectReference (ctx , * nodeRef )
289+ }
290+
291+ if err != nil && apierrors .IsNotFound (err ) {
292+ return nil // node was already gone, so no need to cordon and drain
293+ } else if err != nil {
294+ return errors .Wrap (err , "failed to find node" )
295+ }
296+
297+ // Drain node before deletion and issue a patch in order to make this operation visible to the users.
298+ if s .isNodeDrainAllowed () {
299+ patchHelper , err := patch .NewHelper (s .AzureMachinePoolMachine , s .client )
300+ if err != nil {
301+ return errors .Wrap (err , "failed to build a patchHelper when draining node" )
302+ }
303+
304+ s .V (4 ).Info ("Draining node" , "node" , s .AzureMachinePoolMachine .Status .NodeRef .Name )
305+ // The DrainingSucceededCondition never exists before the node is drained for the first time,
306+ // so its transition time can be used to record the first time draining.
307+ // This `if` condition prevents the transition time to be changed more than once.
308+ if conditions .Get (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition ) == nil {
309+ conditions .MarkFalse (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition , clusterv1 .DrainingReason , clusterv1 .ConditionSeverityInfo , "Draining the node before deletion" )
310+ }
311+
312+ if err := patchHelper .Patch (ctx , s .AzureMachinePoolMachine ); err != nil {
313+ return errors .Wrap (err , "failed to patch AzureMachinePoolMachine" )
314+ }
315+
316+ if err := s .drainNode (ctx , node ); err != nil {
317+ // Check for condition existence. If the condition exists, it may have a different severity or message, which
318+ // would cause the last transition time to be updated. The last transition time is used to determine how
319+ // long to wait to timeout the node drain operation. If we were to keep updating the last transition time,
320+ // a drain operation may never timeout.
321+ if conditions .Get (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition ) == nil {
322+ conditions .MarkFalse (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition , clusterv1 .DrainingFailedReason , clusterv1 .ConditionSeverityWarning , err .Error ())
323+ }
324+ return err
325+ }
326+
327+ conditions .MarkTrue (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition )
328+ }
329+
330+ return nil
331+ }
332+
333+ func (s * MachinePoolMachineScope ) drainNode (ctx context.Context , node * corev1.Node ) error {
334+ ctx , span := tele .Tracer ().Start (ctx , "scope.MachinePoolMachineScope.drainNode" )
335+ defer span .End ()
336+
337+ restConfig , err := remote .RESTConfig (ctx , MachinePoolMachineScopeName , s .client , client.ObjectKey {
338+ Name : s .ClusterName (),
339+ Namespace : s .AzureMachinePoolMachine .Namespace ,
340+ })
341+
342+ if err != nil {
343+ s .Error (err , "Error creating a remote client while deleting Machine, won't retry" )
344+ return nil
345+ }
346+
347+ kubeClient , err := kubernetes .NewForConfig (restConfig )
348+ if err != nil {
349+ s .Error (err , "Error creating a remote client while deleting Machine, won't retry" )
350+ return nil
351+ }
352+
353+ drainer := & drain.Helper {
354+ Client : kubeClient ,
355+ Force : true ,
356+ IgnoreAllDaemonSets : true ,
357+ DeleteLocalData : true ,
358+ GracePeriodSeconds : - 1 ,
359+ // If a pod is not evicted in 20 seconds, retry the eviction next time the
360+ // machine gets reconciled again (to allow other machines to be reconciled).
361+ Timeout : 20 * time .Second ,
362+ OnPodDeletedOrEvicted : func (pod * corev1.Pod , usingEviction bool ) {
363+ verbStr := "Deleted"
364+ if usingEviction {
365+ verbStr = "Evicted"
366+ }
367+ s .V (4 ).Info (fmt .Sprintf ("%s pod from Node" , verbStr ),
368+ "pod" , fmt .Sprintf ("%s/%s" , pod .Name , pod .Namespace ))
369+ },
370+ Out : writer {klog .Info },
371+ ErrOut : writer {klog .Error },
372+ DryRun : false ,
373+ }
374+
375+ if noderefutil .IsNodeUnreachable (node ) {
376+ // When the node is unreachable and some pods are not evicted for as long as this timeout, we ignore them.
377+ drainer .SkipWaitForDeleteTimeoutSeconds = 60 * 5 // 5 minutes
378+ }
379+
380+ if err := drain .RunCordonOrUncordon (ctx , drainer , node , true ); err != nil {
381+ // Machine will be re-reconciled after a cordon failure.
382+ return azure .WithTransientError (errors .Errorf ("unable to cordon node %s: %v" , node .Name , err ), 20 * time .Second )
383+ }
384+
385+ if err := drain .RunNodeDrain (ctx , drainer , node .Name ); err != nil {
386+ // Machine will be re-reconciled after a drain failure.
387+ return azure .WithTransientError (errors .Wrap (err , "Drain failed, retry in 20s" ), 20 * time .Second )
388+ }
389+
390+ s .V (4 ).Info ("Drain successful" )
391+ return nil
392+ }
393+
394+ // isNodeDrainAllowed checks to see the node is excluded from draining or if the NodeDrainTimeout has expired.
395+ func (s * MachinePoolMachineScope ) isNodeDrainAllowed () bool {
396+ if _ , exists := s .AzureMachinePoolMachine .ObjectMeta .Annotations [clusterv1 .ExcludeNodeDrainingAnnotation ]; exists {
397+ return false
398+ }
399+
400+ if s .nodeDrainTimeoutExceeded () {
401+ return false
402+ }
403+
404+ return true
405+ }
406+
407+ // nodeDrainTimeoutExceeded will check to see if the AzureMachinePool's NodeDrainTimeout is exceeded for the
408+ // AzureMachinePoolMachine.
409+ func (s * MachinePoolMachineScope ) nodeDrainTimeoutExceeded () bool {
410+ // if the NodeDrainTineout type is not set by user
411+ pool := s .AzureMachinePool
412+ if pool == nil || pool .Spec .NodeDrainTimeout == nil || pool .Spec .NodeDrainTimeout .Seconds () <= 0 {
413+ return false
414+ }
415+
416+ // if the draining succeeded condition does not exist
417+ if conditions .Get (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition ) == nil {
418+ return false
419+ }
420+
421+ now := time .Now ()
422+ firstTimeDrain := conditions .GetLastTransitionTime (s .AzureMachinePoolMachine , clusterv1 .DrainingSucceededCondition )
423+ diff := now .Sub (firstTimeDrain .Time )
424+ return diff .Seconds () >= s .AzureMachinePool .Spec .NodeDrainTimeout .Seconds ()
425+ }
426+
264427func (s * MachinePoolMachineScope ) hasLatestModelApplied () (bool , error ) {
265428 if s .instance == nil {
266429 return false , errors .New ("instance must not be nil" )
@@ -344,24 +507,16 @@ func getWorkloadClient(ctx context.Context, c client.Client, cluster client.Obje
344507 ctx , span := tele .Tracer ().Start (ctx , "scope.MachinePoolMachineScope.getWorkloadClient" )
345508 defer span .End ()
346509
347- obj := client.ObjectKey {
348- Namespace : cluster .Namespace ,
349- Name : cluster .Name ,
350- }
351- dataBytes , err := utilkubeconfig .FromSecret (ctx , c , obj )
352- if err != nil {
353- return nil , errors .Wrapf (err , "\" %s-kubeconfig\" not found in namespace %q" , obj .Name , obj .Namespace )
354- }
355-
356- config , err := clientcmd .Load (dataBytes )
357- if err != nil {
358- return nil , errors .Wrapf (err , "failed to load \" %s-kubeconfig\" in namespace %q" , obj .Name , obj .Namespace )
359- }
510+ return remote .NewClusterClient (ctx , MachinePoolMachineScopeName , c , cluster )
511+ }
360512
361- restConfig , err := clientcmd . NewDefaultClientConfig ( * config , & clientcmd. ConfigOverrides {}). ClientConfig ()
362- if err != nil {
363- return nil , errors . Wrapf ( err , "failed transform config \" %s-kubeconfig \" in namespace %q" , obj . Name , obj . Namespace )
364- }
513+ // writer implements io.Writer interface as a pass-through for klog.
514+ type writer struct {
515+ logFunc func ( args ... interface {} )
516+ }
365517
366- return client .New (restConfig , client.Options {})
518+ // Write passes string(p) into writer's logFunc and always returns len(p).
519+ func (w writer ) Write (p []byte ) (n int , err error ) {
520+ w .logFunc (string (p ))
521+ return len (p ), nil
367522}
0 commit comments