Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ RUN go mod download
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY pkg/ pkg/

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=$arch go build -a -ldflags '-s -w' -o manager main.go
Expand Down
88 changes: 87 additions & 1 deletion controllers/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import (
"context"
"fmt"
"math/rand"
"net"
"sort"
"strings"
"time"

clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token"
"golang.org/x/mod/semver"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
Expand All @@ -28,6 +32,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
defaultClusterAgentPort string = "25000"
defaultDqlitePort string = "19001"
defaultClusterAgentClientTimeout time.Duration = 10 * time.Second
)

type errServiceUnhealthy struct {
service string
reason string
Expand Down Expand Up @@ -466,7 +476,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cl
}

// clean up MicroK8s cluster secrets
for _, secretName := range []string{"kubeconfig", "ca", "jointoken"} {
for _, secretName := range []string{"kubeconfig", "ca", "jointoken", token.AuthTokenNameSuffix} {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Expand Down Expand Up @@ -578,6 +588,28 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
node := deleteMachine.Status.NodeRef

logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name)

logger.Info("deleting node from dqlite", "machineName", deleteMachine.Name, "nodeName", node.Name)

// NOTE(Hue): We do this step as a best effort since this whole logic is implemented to prevent a not-yet-reported bug.
// The issue is that we were not removing the endpoint from dqlite when we were deleting a machine.
// This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster.
// How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite.
// If we have 2 machines, deleting one is not safe because it can be the leader and we're not taking care of
// leadership transfers in the cluster-agent for now. Maybe something for later (TODO)
// If we have 3 or more machines left, get cluster agent client and delete node from dqlite.
if len(machines) > 2 {
portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap

if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil {
if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, cluster, deleteMachine, portRemap); err != nil {
logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name)
}
} else {
logger.Error(err, "failed to get cluster agent client")
}
}

logger.Info("deleting machine")

err = r.Client.Delete(ctx, &deleteMachine)
Expand All @@ -595,6 +627,60 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
return ctrl.Result{Requeue: true}, nil
}

func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine, portRemap bool) (*clusteragent.Client, error) {
opts := clusteragent.Options{
// NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint.
// This machine should preferably not be the <delMachine> itself, although this is not forced by Microk8s.
IgnoreMachineNames: sets.NewString(delMachine.Name),
}

port := defaultClusterAgentPort
if portRemap {
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
port = "30000"
}

clusterAgentClient, err := clusteragent.NewClient(machines, port, defaultClusterAgentClientTimeout, opts)
if err != nil {
return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err)
}

return clusterAgentClient, nil
}

// removeMicrok8sNode removes the node from
func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client,
clusterKey client.ObjectKey, delMachine clusterv1.Machine, portRemap bool) error {
dqlitePort := defaultDqlitePort
if portRemap {
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
dqlitePort = "2379"
}

var removeEp string
for _, addr := range delMachine.Status.Addresses {
if net.ParseIP(addr.Address) != nil {
removeEp = fmt.Sprintf("%s:%s", addr.Address, dqlitePort)
break
}
}

if removeEp == "" {
return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name)
}

token, err := token.Lookup(ctx, r.Client, clusterKey)
if err != nil {
return fmt.Errorf("failed to lookup token: %w", err)
}

if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, token, removeEp); err != nil {
return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err)
}

return nil
}

func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) {
nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
cloud.google.com/go/compute v1.10.0 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand Down
115 changes: 115 additions & 0 deletions pkg/clusteragent/clusteragent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package clusteragent

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"time"

"k8s.io/apimachinery/pkg/util/sets"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

// Options should be used when initializing a new client.
type Options struct {
// IgnoreMachineNames is a set of ignored machine names that we don't want to pick their IP for the cluster agent endpoint.
IgnoreMachineNames sets.String
}

type Client struct {
ip, port string
client *http.Client
}

// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent
// with that IP.
func NewClient(machines []clusterv1.Machine, port string, timeout time.Duration, opts Options) (*Client, error) {
var ip string
for _, m := range machines {
if opts.IgnoreMachineNames.Has(m.Name) {
continue
}

for _, addr := range m.Status.Addresses {
if net.ParseIP(addr.Address) != nil {
ip = addr.Address
break
}
}
break
}

if ip == "" {
return nil, errors.New("failed to find an IP for cluster agent")
}

return &Client{
ip: ip,
port: port,
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
// TODO(Hue): Workaround for now, address later on
// get the certificate fingerprint from the matching node through a resource in the cluster (TBD),
// and validate it in the TLSClientConfig
InsecureSkipVerify: true,
},
},
},
}, nil
}

func (c *Client) Endpoint() string {
return fmt.Sprintf("https://%s:%s", c.ip, c.port)
}

// do makes a request to the given endpoint with the given method. It marshals the request and unmarshals
// server response body if the provided response is not nil.
// The endpoint should _not_ have a leading slash.
func (c *Client) do(ctx context.Context, method, endpoint string, request any, header map[string][]string, response any) error {
url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint)

requestBody, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("failed to prepare worker info request: %w", err)
}

req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header = http.Header(header)

res, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to call cluster agent: %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// NOTE(hue): Marshal and print any response that we got since it might contain valuable information
// on why the request failed.
// Ignore JSON errors to prevent unnecessarily complicated error handling.
anyResp := make(map[string]any)
_ = json.NewDecoder(res.Body).Decode(&anyResp)
b, _ := json.Marshal(anyResp)
resStr := string(b)

return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr)
}

if response != nil {
if err := json.NewDecoder(res.Body).Decode(response); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}
}

return nil
}
Loading
Loading