Skip to content

Commit 5ed7ddf

Browse files
Call node removal endpoint on machine deletion (#66)
1 parent 4f3a678 commit 5ed7ddf

File tree

5 files changed

+460
-0
lines changed

5 files changed

+460
-0
lines changed

controllers/reconcile.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@ import (
44
"context"
55
"fmt"
66
"math/rand"
7+
"net"
78
"sort"
89
"strings"
910
"time"
1011

1112
clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1"
13+
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
1214
"golang.org/x/mod/semver"
1315

1416
"github.com/pkg/errors"
1517
corev1 "k8s.io/api/core/v1"
1618
apierrors "k8s.io/apimachinery/pkg/api/errors"
1719
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1820
kerrors "k8s.io/apimachinery/pkg/util/errors"
21+
"k8s.io/apimachinery/pkg/util/sets"
1922
"k8s.io/apiserver/pkg/storage/names"
2023
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
2124
"sigs.k8s.io/cluster-api/controllers/external"
@@ -28,6 +31,12 @@ import (
2831
"sigs.k8s.io/controller-runtime/pkg/log"
2932
)
3033

34+
const (
35+
defaultClusterAgentPort string = "25000"
36+
defaultDqlitePort string = "19001"
37+
defaultClusterAgentClientTimeout time.Duration = 10 * time.Second
38+
)
39+
3140
type errServiceUnhealthy struct {
3241
service string
3342
reason string
@@ -578,6 +587,26 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
578587
node := deleteMachine.Status.NodeRef
579588

580589
logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name)
590+
591+
logger.Info("deleting node from dqlite", "machineName", deleteMachine.Name, "nodeName", node.Name)
592+
593+
// NOTE(Hue): We do this step as a best effort since this whole logic is implemented to prevent a not-yet-reported bug.
594+
// The issue is that we were not removing the endpoint from dqlite when we were deleting a machine.
595+
// This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster.
596+
// How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite.
597+
// If we have 2 or more machines left, get cluster agent client and delete node from dqlite
598+
if len(machines) > 1 {
599+
portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap
600+
601+
if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil {
602+
if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine, portRemap); err != nil {
603+
logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name)
604+
}
605+
} else {
606+
logger.Error(err, "failed to get cluster agent client")
607+
}
608+
}
609+
581610
logger.Info("deleting machine")
582611

583612
err = r.Client.Delete(ctx, &deleteMachine)
@@ -595,6 +624,54 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
595624
return ctrl.Result{Requeue: true}, nil
596625
}
597626

627+
func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine, portRemap bool) (*clusteragent.Client, error) {
628+
opts := clusteragent.Options{
629+
// NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint.
630+
// This machine should preferably not be the <delMachine> itself, although this is not forced by Microk8s.
631+
IgnoreMachineNames: sets.NewString(delMachine.Name),
632+
}
633+
634+
port := defaultClusterAgentPort
635+
if portRemap {
636+
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
637+
port = "30000"
638+
}
639+
640+
clusterAgentClient, err := clusteragent.NewClient(machines, port, defaultClusterAgentClientTimeout, opts)
641+
if err != nil {
642+
return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err)
643+
}
644+
645+
return clusterAgentClient, nil
646+
}
647+
648+
// removeMicrok8sNode removes the node from
649+
func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine, portRemap bool) error {
650+
dqlitePort := defaultDqlitePort
651+
if portRemap {
652+
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
653+
dqlitePort = "2379"
654+
}
655+
656+
var removeEp string
657+
for _, addr := range delMachine.Status.Addresses {
658+
if net.ParseIP(addr.Address) != nil {
659+
removeEp = fmt.Sprintf("%s:%s", addr.Address, dqlitePort)
660+
break
661+
}
662+
}
663+
664+
if removeEp == "" {
665+
return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name)
666+
}
667+
668+
if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil {
669+
return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err)
670+
}
671+
672+
return nil
673+
}
674+
598675
func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) {
599676
nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")
600677

pkg/clusteragent/clusteragent.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package clusteragent
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/tls"
7+
"encoding/json"
8+
"errors"
9+
"fmt"
10+
"net"
11+
"net/http"
12+
"time"
13+
14+
"k8s.io/apimachinery/pkg/util/sets"
15+
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
16+
)
17+
18+
// Options should be used when initializing a new client.
19+
type Options struct {
20+
// IgnoreMachineNames is a set of ignored machine names that we don't want to pick their IP for the cluster agent endpoint.
21+
IgnoreMachineNames sets.String
22+
// InsecureSkipVerify skips the verification of the server's certificate chain and host name.
23+
// This is mostly used for testing purposes.
24+
InsecureSkipVerify bool
25+
}
26+
27+
type Client struct {
28+
ip, port string
29+
client *http.Client
30+
}
31+
32+
// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent
33+
// with that IP.
34+
func NewClient(machines []clusterv1.Machine, port string, timeout time.Duration, opts Options) (*Client, error) {
35+
var ip string
36+
for _, m := range machines {
37+
if opts.IgnoreMachineNames.Has(m.Name) {
38+
continue
39+
}
40+
41+
for _, addr := range m.Status.Addresses {
42+
if net.ParseIP(addr.Address) != nil {
43+
ip = addr.Address
44+
break
45+
}
46+
}
47+
break
48+
}
49+
50+
if ip == "" {
51+
return nil, errors.New("failed to find an IP for cluster agent")
52+
}
53+
54+
transport := &http.Transport{
55+
TLSClientConfig: &tls.Config{
56+
InsecureSkipVerify: opts.InsecureSkipVerify,
57+
},
58+
}
59+
60+
return &Client{
61+
ip: ip,
62+
port: port,
63+
client: &http.Client{
64+
Timeout: timeout,
65+
Transport: transport,
66+
},
67+
}, nil
68+
}
69+
70+
func (c *Client) Endpoint() string {
71+
return fmt.Sprintf("https://%s:%s", c.ip, c.port)
72+
}
73+
74+
// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals
75+
// server response body if the provided response is not nil.
76+
// The endpoint should _not_ have a leading slash.
77+
func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error {
78+
url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint)
79+
80+
requestBody, err := json.Marshal(request)
81+
if err != nil {
82+
return fmt.Errorf("failed to prepare worker info request: %w", err)
83+
}
84+
85+
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody))
86+
if err != nil {
87+
return fmt.Errorf("failed to create request: %w", err)
88+
}
89+
90+
res, err := c.client.Do(req)
91+
if err != nil {
92+
return fmt.Errorf("failed to call cluster agent: %w", err)
93+
}
94+
defer res.Body.Close()
95+
96+
if res.StatusCode != http.StatusOK {
97+
// NOTE(hue): Marshal and print any response that we got since it might contain valuable information
98+
// on why the request failed.
99+
// Ignore JSON errors to prevent unnecessarily complicated error handling.
100+
anyResp := make(map[string]any)
101+
_ = json.NewDecoder(res.Body).Decode(&anyResp)
102+
b, _ := json.Marshal(anyResp)
103+
resStr := string(b)
104+
105+
return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr)
106+
}
107+
108+
if response != nil {
109+
if err := json.NewDecoder(res.Body).Decode(response); err != nil {
110+
return fmt.Errorf("failed to decode response: %w", err)
111+
}
112+
}
113+
114+
return nil
115+
}

0 commit comments

Comments
 (0)