Skip to content

Commit bd3145d

Browse files
andrewrynhardtalos-bot
authored andcommitted
fix: leave etcd on scale down
Signed-off-by: Andrew Rynhard <[email protected]>
1 parent 9c3b076 commit bd3145d

File tree

3 files changed

+458
-45
lines changed

3 files changed

+458
-45
lines changed

controllers/taloscontrolplane_controller.go

Lines changed: 199 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@ import (
1111
"time"
1212

1313
"github.com/go-logr/logr"
14+
"github.com/golang/protobuf/ptypes/empty"
1415
"github.com/pkg/errors"
1516
cabptv1 "github.com/talos-systems/cluster-api-bootstrap-provider-talos/api/v1alpha3"
1617
controlplanev1 "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3"
18+
"github.com/talos-systems/talos/pkg/machinery/api/machine"
19+
talosclient "github.com/talos-systems/talos/pkg/machinery/client"
20+
talosconfig "github.com/talos-systems/talos/pkg/machinery/client/config"
1721
corev1 "k8s.io/api/core/v1"
1822
v1 "k8s.io/api/core/v1"
1923
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -25,6 +29,7 @@ import (
2529
"k8s.io/client-go/kubernetes"
2630
"k8s.io/client-go/tools/clientcmd"
2731
"k8s.io/utils/pointer"
32+
"sigs.k8s.io/cluster-api/api/v1alpha3"
2833
capiv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
2934
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
3035
"sigs.k8s.io/cluster-api/controllers/external"
@@ -102,9 +107,15 @@ func (r *TalosControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Resu
102107
// Fetch the Cluster.
103108
cluster, err := util.GetOwnerCluster(ctx, r.Client, tcp.ObjectMeta)
104109
if err != nil {
105-
logger.Error(err, "Failed to retrieve owner Cluster from the API Server")
106-
return ctrl.Result{}, err
110+
if !apierrors.IsNotFound(err) {
111+
logger.Error(err, "Failed to retrieve owner Cluster from the API Server")
112+
113+
return ctrl.Result{}, err
114+
}
115+
116+
return ctrl.Result{RequeueAfter: 20 * time.Second}, nil
107117
}
118+
108119
if cluster == nil {
109120
logger.Info("Cluster Controller has not yet set OwnerRef")
110121
return ctrl.Result{Requeue: true}, nil
@@ -123,10 +134,36 @@ func (r *TalosControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Resu
123134
return ctrl.Result{Requeue: true}, nil
124135
}
125136

126-
// If object doesn't have a finalizer, add one.
127-
controllerutil.AddFinalizer(tcp, controlplanev1.TalosControlPlaneFinalizer)
137+
// Initialize the patch helper.
138+
patchHelper, err := patch.NewHelper(tcp, r.Client)
139+
if err != nil {
140+
logger.Error(err, "Failed to configure the patch helper")
141+
return ctrl.Result{Requeue: true}, nil
142+
}
143+
144+
// Add finalizer first if not exist to avoid the race condition between init and delete
145+
if !controllerutil.ContainsFinalizer(tcp, controlplanev1.TalosControlPlaneFinalizer) {
146+
controllerutil.AddFinalizer(tcp, controlplanev1.TalosControlPlaneFinalizer)
147+
148+
// patch and return right away instead of reusing the main defer,
149+
// because the main defer may take too much time to get cluster status
150+
// Patch ObservedGeneration only if the reconciliation completed successfully
151+
patchOpts := []patch.Option{}
152+
if reterr == nil {
153+
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
154+
}
155+
156+
if err := patchHelper.Patch(ctx, tcp, patchOpts...); err != nil {
157+
logger.Error(err, "Failed to add finalizer to TalosControlPlane")
158+
return ctrl.Result{}, err
159+
}
160+
161+
return ctrl.Result{}, nil
162+
}
128163

129164
defer func() {
165+
r.Log.Info("Attempting to set control plane status")
166+
130167
if requeueErr, ok := errors.Cause(reterr).(capierrors.HasRequeueAfterError); ok {
131168
if res.RequeueAfter == 0 {
132169
res.RequeueAfter = requeueErr.GetRequeueAfter()
@@ -137,6 +174,7 @@ func (r *TalosControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Resu
137174
// Always attempt to update status.
138175
if err := r.updateStatus(ctx, tcp, cluster); err != nil {
139176
logger.Error(err, "Failed to update TalosControlPlane Status")
177+
140178
reterr = kerrors.NewAggregate([]error{reterr, err})
141179
}
142180

@@ -150,10 +188,12 @@ func (r *TalosControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Resu
150188
// Make TCP to requeue in case status is not ready, so we can check for node status without waiting for a full resync (by default 10 minutes).
151189
// Only requeue if we are not going in exponential backoff due to error, or if we are not already re-queueing, or if the object has a deletion timestamp.
152190
if reterr == nil && !res.Requeue && !(res.RequeueAfter > 0) && tcp.ObjectMeta.DeletionTimestamp.IsZero() {
153-
if !tcp.Status.Ready {
191+
if !tcp.Status.Ready || tcp.Status.UnavailableReplicas > 0 {
154192
res = ctrl.Result{RequeueAfter: 20 * time.Second}
155193
}
156194
}
195+
196+
r.Log.Info("Successfully updated control plane status")
157197
}()
158198

159199
if !tcp.ObjectMeta.DeletionTimestamp.IsZero() {
@@ -232,7 +272,8 @@ func (r *TalosControlPlaneReconciler) reconcileDelete(ctx context.Context, clust
232272
// Get list of all control plane machines
233273
ownedMachines, err := r.getControlPlaneMachinesForCluster(ctx, util.ObjectKey(cluster), tcp.Name)
234274
if err != nil {
235-
r.Log.Error(err, "failed to retrieve control plane machines for cluster")
275+
r.Log.Error(err, "Failed to retrieve control plane machines for cluster")
276+
236277
return ctrl.Result{}, err
237278
}
238279

@@ -281,18 +322,121 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,
281322

282323
oldest := machines[0]
283324
for _, machine := range machines {
325+
if !machine.ObjectMeta.DeletionTimestamp.IsZero() {
326+
r.Log.Info("Machine is in process of deletion", "machine", machine.Name)
327+
328+
return ctrl.Result{RequeueAfter: 20 * time.Second}, nil
329+
}
330+
284331
if machine.CreationTimestamp.Before(&oldest.CreationTimestamp) {
285332
oldest = machine
286333
}
287334
}
288335

289-
r.Log.Info("Deleting control plane machine", "machine", oldest.Name)
336+
if oldest.Status.NodeRef == nil {
337+
return ctrl.Result{}, fmt.Errorf("%q machine does not have a nodeRef", oldest.Name)
338+
}
339+
340+
kubeconfigSecret := &v1.Secret{}
341+
342+
err = r.Client.Get(ctx,
343+
types.NamespacedName{
344+
Namespace: cluster.Namespace,
345+
Name: cluster.Name + "-kubeconfig",
346+
},
347+
kubeconfigSecret,
348+
)
349+
if err != nil {
350+
return ctrl.Result{Requeue: true}, err
351+
}
352+
353+
config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigSecret.Data["value"])
354+
if err != nil {
355+
return ctrl.Result{Requeue: true}, err
356+
}
357+
358+
clientset, err := kubernetes.NewForConfig(config)
359+
if err != nil {
360+
return ctrl.Result{Requeue: true}, err
361+
}
362+
363+
var address string
364+
365+
node, err := clientset.CoreV1().Nodes().Get(ctx, oldest.Status.NodeRef.Name, metav1.GetOptions{})
366+
if err != nil {
367+
return ctrl.Result{Requeue: true}, err
368+
}
369+
370+
for _, addr := range node.Status.Addresses {
371+
if addr.Type == v1.NodeInternalIP {
372+
address = addr.Address
373+
break
374+
}
375+
}
376+
377+
if address == "" {
378+
return ctrl.Result{Requeue: true}, fmt.Errorf("no address was found for node %q", node.Name)
379+
}
380+
381+
var (
382+
cfgs cabptv1.TalosConfigList
383+
found *cabptv1.TalosConfig
384+
)
385+
386+
err = r.Client.List(ctx, &cfgs)
387+
if err != nil {
388+
return ctrl.Result{Requeue: true}, err
389+
}
390+
391+
for _, cfg := range cfgs.Items {
392+
for _, ref := range cfg.OwnerReferences {
393+
if ref.Kind == "Machine" && ref.Name == oldest.Name {
394+
found = &cfg
395+
break
396+
}
397+
}
398+
}
399+
400+
if found == nil {
401+
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to find TalosConfig for %q", oldest.Name)
402+
}
403+
404+
t, err := talosconfig.FromString(found.Status.TalosConfig)
405+
if err != nil {
406+
return ctrl.Result{Requeue: true}, err
407+
}
408+
409+
c, err := talosclient.New(ctx, talosclient.WithEndpoints(address), talosclient.WithConfig(t))
410+
if err != nil {
411+
return ctrl.Result{Requeue: true}, err
412+
}
413+
414+
r.Log.Info("Forfeiting leadership", "machine", oldest.Status.NodeRef.Name)
415+
416+
_, err = c.MachineClient.EtcdForfeitLeadership(ctx, &machine.EtcdForfeitLeadershipRequest{})
417+
if err != nil {
418+
return ctrl.Result{Requeue: true}, nil
419+
}
420+
421+
r.Log.Info("Leaving etcd", "machine", oldest.Name, "node", node.Name, "address", address)
422+
423+
_, err = c.MachineClient.EtcdLeaveCluster(ctx, &machine.EtcdLeaveClusterRequest{})
424+
if err != nil {
425+
return ctrl.Result{Requeue: true}, nil
426+
}
427+
428+
// NB: We shutdown the node here so that a loadbalancer will drop the backend.
429+
// The Kubernetes API server is configured to talk to etcd on localhost, but
430+
// at this point etcd has been stopped.
431+
r.Log.Info("Shutting down node", "machine", oldest.Name, "node", node.Name, "address", address)
432+
433+
_, err = c.MachineClient.Shutdown(ctx, &empty.Empty{})
434+
if err != nil {
435+
return ctrl.Result{Requeue: true}, nil
436+
}
437+
438+
r.Log.Info("Deleting control plane node", "machine", oldest.Name, "node", node.Name)
290439

291-
// TODO: We need to remove the etcd member. This can be done by calling the
292-
// Talos reset API.
293-
// TODO: We should use the control plane ready count to know if we can safely
294-
// remove a node.
295-
// TODO: Delete the node from the workload cluster.
296440
err = r.Client.Delete(ctx, &oldest)
297441
if err != nil {
298442
return ctrl.Result{}, err
@@ -493,29 +637,59 @@ func (r *TalosControlPlaneReconciler) updateStatus(ctx context.Context, tcp *con
493637
return err
494638
}
495639

640+
errChan := make(chan error)
641+
496642
for _, ownedMachine := range ownedMachines {
497-
if ownedMachine.Status.NodeRef == nil {
498-
r.Log.Info("owned machine does not yet have noderef", "machine", ownedMachine.Name)
499-
continue
500-
}
643+
ownedMachine := ownedMachine
501644

502-
// If this fails for whatever reason, we can't accurately set the status
503-
// of the control plane.
504-
node, err := clientset.CoreV1().Nodes().Get(ownedMachine.Status.NodeRef.Name, metav1.GetOptions{})
505-
if err != nil {
506-
return err
507-
}
645+
go func() {
646+
e := func() error {
647+
if v1alpha3.MachinePhase(ownedMachine.Status.Phase) == v1alpha3.MachinePhaseDeleting {
648+
return fmt.Errorf("machine is deleting")
649+
}
650+
651+
if ownedMachine.Status.NodeRef == nil {
652+
return fmt.Errorf("machine %q does not have a noderef", ownedMachine.Name)
653+
}
654+
655+
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
656+
defer cancel()
508657

509-
for _, condition := range node.Status.Conditions {
510-
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
511-
tcp.Status.ReadyReplicas++
658+
node, err := clientset.CoreV1().Nodes().Get(ctx, ownedMachine.Status.NodeRef.Name, metav1.GetOptions{})
659+
if err != nil {
660+
return fmt.Errorf("failed to get node %q: %w", node.Name, err)
661+
}
662+
663+
for _, condition := range node.Status.Conditions {
664+
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
665+
return nil
666+
}
667+
}
668+
669+
return fmt.Errorf("node ready condition not found")
670+
}()
671+
672+
if e != nil {
673+
e = fmt.Errorf("failed to get status for %q: %w", ownedMachine.Name, e)
512674
}
675+
676+
errChan <- e
677+
}()
678+
}
679+
680+
for range ownedMachines {
681+
err = <-errChan
682+
if err == nil {
683+
tcp.Status.ReadyReplicas++
684+
} else {
685+
r.Log.Info("Failed to get readiness of machine", "err", err)
513686
}
514687
}
515688

516689
tcp.Status.UnavailableReplicas = replicas - tcp.Status.ReadyReplicas
517690

518691
if tcp.Status.ReadyReplicas > 0 {
692+
r.Log.Info("Ready replicas", "count", tcp.Status.ReadyReplicas)
519693
tcp.Status.Ready = true
520694
}
521695

go.mod

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ go 1.13
44

55
require (
66
github.com/go-logr/logr v0.1.0
7-
github.com/onsi/ginkgo v1.12.0
8-
github.com/onsi/gomega v1.9.0
7+
github.com/golang/protobuf v1.4.2
8+
github.com/onsi/ginkgo v1.12.1
9+
github.com/onsi/gomega v1.10.1
910
github.com/pkg/errors v0.9.1
1011
github.com/talos-systems/cluster-api-bootstrap-provider-talos v0.2.0-alpha.0
11-
k8s.io/api v0.17.2
12-
k8s.io/apimachinery v0.17.2
13-
k8s.io/apiserver v0.17.2
14-
k8s.io/client-go v0.17.2
15-
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab
16-
sigs.k8s.io/cluster-api v0.3.5
17-
sigs.k8s.io/controller-runtime v0.5.3
12+
github.com/talos-systems/talos/pkg/machinery v0.0.0-20201006184949-3961f835f502
13+
k8s.io/api v0.18.6
14+
k8s.io/apimachinery v0.18.6
15+
k8s.io/apiserver v0.18.6
16+
k8s.io/client-go v0.18.6
17+
k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19
18+
sigs.k8s.io/cluster-api v0.3.9
19+
sigs.k8s.io/controller-runtime v0.6.3
1820
)

0 commit comments

Comments
 (0)