Skip to content

Commit 3f08166

Browse files
authored
Return custom datastore endpoints if supported by client (#45)
1 parent 5712bda commit 3f08166

File tree

13 files changed

+305
-100
lines changed

13 files changed

+305
-100
lines changed

pkg/api/v1/configure.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66

7-
snaputil "github.com/canonical/microk8s-cluster-agent/pkg/snap/util"
7+
"github.com/canonical/microk8s-cluster-agent/pkg/snap"
88
)
99

1010
// ConfigureServiceRequest is a configuration request for MicroK8s.
@@ -47,7 +47,7 @@ func (a *API) Configure(ctx context.Context, req ConfigureRequest) error {
4747
return fmt.Errorf("invalid token")
4848
}
4949
for _, service := range req.ConfigureServices {
50-
if _, err := snaputil.UpdateServiceArguments(a.Snap, service.Name, service.UpdateArguments, service.RemoveArguments); err != nil {
50+
if _, err := snap.UpdateServiceArguments(a.Snap, service.Name, service.UpdateArguments, service.RemoveArguments); err != nil {
5151
return fmt.Errorf("failed to update arguments of service %q: %w", service.Name, err)
5252
}
5353
if service.Restart {

pkg/api/v1/configure_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"testing"
77

88
v1 "github.com/canonical/microk8s-cluster-agent/pkg/api/v1"
9+
"github.com/canonical/microk8s-cluster-agent/pkg/snap"
910
"github.com/canonical/microk8s-cluster-agent/pkg/snap/mock"
10-
snaputil "github.com/canonical/microk8s-cluster-agent/pkg/snap/util"
1111
)
1212

1313
func TestConfigure(t *testing.T) {
@@ -73,7 +73,7 @@ func TestConfigure(t *testing.T) {
7373
}
7474
for serviceName, expectedArguments := range tc.expectedArguments {
7575
for key, expectedValue := range expectedArguments {
76-
if value := snaputil.GetServiceArgument(s, serviceName, key); value != expectedValue {
76+
if value := snap.GetServiceArgument(s, serviceName, key); value != expectedValue {
7777
t.Fatalf("Expected argument %q of service %q to be %q, but it is %q", key, serviceName, expectedValue, value)
7878
}
7979
}

pkg/api/v1/join.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66
"net"
77

8-
snaputil "github.com/canonical/microk8s-cluster-agent/pkg/snap/util"
8+
"github.com/canonical/microk8s-cluster-agent/pkg/snap"
99
"github.com/canonical/microk8s-cluster-agent/pkg/util"
1010
)
1111

@@ -61,9 +61,9 @@ type JoinResponse struct {
6161
// Join implements "POST /CLUSTER_API_V1/join".
6262
func (a *API) Join(ctx context.Context, request JoinRequest) (*JoinResponse, error) {
6363
response := &JoinResponse{
64-
EtcdEndpoint: snaputil.GetServiceArgument(a.Snap, "etcd", "--listen-client-urls"),
65-
APIServerPort: snaputil.GetServiceArgument(a.Snap, "kube-apiserver", "--secure-port"),
66-
ClusterCIDR: snaputil.GetServiceArgument(a.Snap, "kube-proxy", "--cluster-cidr"),
64+
EtcdEndpoint: snap.GetServiceArgument(a.Snap, "etcd", "--listen-client-urls"),
65+
APIServerPort: snap.GetServiceArgument(a.Snap, "kube-apiserver", "--secure-port"),
66+
ClusterCIDR: snap.GetServiceArgument(a.Snap, "kube-proxy", "--cluster-cidr"),
6767
}
6868

6969
if !a.Snap.ConsumeClusterToken(request.ClusterToken) {
@@ -102,7 +102,7 @@ func (a *API) Join(ctx context.Context, request JoinRequest) (*JoinResponse, err
102102
if err := a.Snap.AddCertificateRequestToken(fmt.Sprintf("%s-kubelet", request.ClusterToken)); err != nil {
103103
return nil, fmt.Errorf("failed adding certificate request token for kubelet: %w", err)
104104
}
105-
case snaputil.GetServiceArgument(a.Snap, "kube-apiserver", "--token-auth-file") != "":
105+
case snap.GetServiceArgument(a.Snap, "kube-apiserver", "--token-auth-file") != "":
106106
// client does not know how to handle certificate auth, but we have a tokens file
107107
response.APIServerAuthMode = APIServerAuthModeToken
108108
response.KubeProxyToken, err = a.Snap.GetKnownToken("system:kube-proxy")

pkg/api/v2/api_util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"net"
77
"strings"
88

9-
snaputil "github.com/canonical/microk8s-cluster-agent/pkg/snap/util"
9+
"github.com/canonical/microk8s-cluster-agent/pkg/snap"
1010
)
1111

1212
// findMatchingBindAddress attempts to find the bind address for dqlite from the 'host:port' of the join request.
@@ -70,7 +70,7 @@ nextAddr:
7070

7171
// kubeAPIServerPrefersInternalIPForKubelet checks whether the --kubelet-preferred-address-types of kube-apiserver includes 'InternalIP' with higher preference over 'Hostname'
7272
func (a *API) kubeAPIServerPrefersInternalIPForKubelet() bool {
73-
order := snaputil.GetServiceArgument(a.Snap, "kube-apiserver", "--kubelet-preferred-address-types")
73+
order := snap.GetServiceArgument(a.Snap, "kube-apiserver", "--kubelet-preferred-address-types")
7474

7575
// 'Hostname' has precedence by default, argument must contain 'InternalIP'
7676
if ipIndex := strings.Index(order, "InternalIP"); ipIndex != -1 {

pkg/api/v2/join.go

Lines changed: 97 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"strings"
1111

12+
"github.com/canonical/microk8s-cluster-agent/pkg/snap"
1213
snaputil "github.com/canonical/microk8s-cluster-agent/pkg/snap/util"
1314
"github.com/canonical/microk8s-cluster-agent/pkg/util"
1415
)
@@ -33,6 +34,8 @@ type JoinRequest struct {
3334
ClusterAgentPort string `json:"port"`
3435
// WorkerOnly is true when joining a worker-only node.
3536
WorkerOnly WorkerOnlyField `json:"worker"`
37+
// CanHandleCustomEtcd is set by joining nodes that know how to deal with custom etcd endpoints being used by the kube-apiserver.
38+
CanHandleCustomEtcd bool `json:"can_handle_custom_etcd"`
3639
// HostPort is the hostname and port that accepted the request. This is retrieved directly from the *http.Request object.
3740
HostPort string `json:"-"`
3841
// RemoteAddress is the remote address from which the join request originates. This is retrieved directly from the *http.Request object.
@@ -79,6 +82,18 @@ type JoinResponse struct {
7982
ControlPlaneNodes []string `json:"control_plane_nodes"`
8083
// ClusterCIDR is the cidr that is used by the cluster, defined in kube-proxy args.
8184
ClusterCIDR string `json:"cluster_cidr,omitempty"`
85+
// EtcdServers is the value of the kube-apiserver '--etcd-servers' argument, containing the list of etcd endpoints to use.
86+
// This is only included in the response when a custom data store is configured.
87+
EtcdServers string `json:"etcd_servers,omitempty"`
88+
// EtcdCertificateAuthority is the contents of the file from the kube-apiserver '--etcd-cafile' argument, containing a CA for connecting to the etcd servers. Will be empty if not using TLS.
89+
// This is only included in the response when a custom data store is configured.
90+
EtcdCertificateAuthority string `json:"etcd_ca,omitempty"`
91+
// EtcdClientCertificate is the contents of the file from the kube-apiserver '--etcd-certfile' argument, containing a certificate for connecting to the etcd servers. Will be empty if not using TLS.
92+
// This is only included in the response when a custom data store is configured.
93+
EtcdClientCertificate string `json:"etcd_cert,omitempty"`
94+
// EtcdClientKey is the contents of the file from the kube-apiserver '--etcd-keyfile' argument, containing a private key for connecting to the etcd servers. Will be empty if not using TLS.
95+
// This is only included in the response when a custom data store is configured.
96+
EtcdClientKey string `json:"etcd_key,omitempty"`
8297
}
8398

8499
// Join implements "POST v2/join".
@@ -92,7 +107,7 @@ func (a *API) Join(ctx context.Context, req JoinRequest) (*JoinResponse, int, er
92107
}
93108

94109
// Check cluster agent ports.
95-
clusterAgentBind := snaputil.GetServiceArgument(a.Snap, "cluster-agent", "--bind")
110+
clusterAgentBind := snap.GetServiceArgument(a.Snap, "cluster-agent", "--bind")
96111
_, port, _ := net.SplitHostPort(clusterAgentBind)
97112
if port != req.ClusterAgentPort {
98113
return nil, http.StatusBadGateway, fmt.Errorf("the port of the cluster agent port has to be set to %s", port)
@@ -110,45 +125,57 @@ func (a *API) Join(ctx context.Context, req JoinRequest) (*JoinResponse, int, er
110125
return nil, http.StatusBadRequest, fmt.Errorf("the hostname (%s) of the joining node does not resolve to the IP %q. Refusing join", req.RemoteHostName, remoteIP)
111126
}
112127

113-
// Check node is not in cluster already.
114-
a.dqliteMu.Lock()
115-
dqliteCluster, err := snaputil.WaitForDqliteCluster(ctx, a.Snap, func(c snaputil.DqliteCluster) (bool, error) {
116-
return len(c) >= 1, nil
117-
})
118-
if err != nil {
119-
a.dqliteMu.Unlock()
120-
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster nodes: %w", err)
121-
}
122-
for _, node := range dqliteCluster {
123-
if strings.HasPrefix(node.Address, remoteIP+":") {
124-
a.dqliteMu.Unlock()
125-
return nil, http.StatusGatewayTimeout, fmt.Errorf("the joining node (%s) is already known to dqlite", remoteIP)
126-
}
127-
}
128-
129-
// Update dqlite cluster if needed
130-
if len(dqliteCluster) == 1 && strings.HasPrefix(dqliteCluster[0].Address, "127.0.0.1:") {
131-
newDqliteBindAddress, err := a.findMatchingBindAddress(req.HostPort)
132-
if err != nil {
133-
a.dqliteMu.Unlock()
134-
return nil, http.StatusInternalServerError, fmt.Errorf("failed to find matching dqlite bind address for %v: %w", req.HostPort, err)
135-
}
128+
kubeAPIServerUsesDqlite := strings.Contains(snap.GetServiceArgument(a.Snap, "kube-apiserver", "--etcd-servers"), "/var/kubernetes/backend/kine.sock:12379")
136129

137-
if err := snaputil.UpdateDqliteIP(ctx, a.Snap, newDqliteBindAddress); err != nil {
138-
a.dqliteMu.Unlock()
139-
return nil, http.StatusInternalServerError, fmt.Errorf("failed to update dqlite address to %q: %w", newDqliteBindAddress, err)
140-
}
130+
// Handle datastore updates
131+
switch {
132+
case kubeAPIServerUsesDqlite:
133+
// FIXME(neoaggelos): move this logic into a snaputil.MaybeUpdateDqliteBindAddress() to cleanup the code a little bit
141134

142-
// Wait for dqlite cluster to come up with new address
143-
dqliteCluster, err = snaputil.WaitForDqliteCluster(ctx, a.Snap, func(c snaputil.DqliteCluster) (bool, error) {
144-
return len(c) >= 1 && !strings.HasPrefix(c[0].Address, "127.0.0.1:"), nil
135+
// Check node is not in cluster already.
136+
a.dqliteMu.Lock()
137+
dqliteCluster, err := snaputil.WaitForDqliteCluster(ctx, a.Snap, func(c snaputil.DqliteCluster) (bool, error) {
138+
return len(c) >= 1, nil
145139
})
146140
if err != nil {
147141
a.dqliteMu.Unlock()
148-
return nil, http.StatusInternalServerError, fmt.Errorf("failed waiting for dqlite cluster to come up: %w", err)
142+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster nodes: %w", err)
143+
}
144+
for _, node := range dqliteCluster {
145+
if strings.HasPrefix(node.Address, remoteIP+":") {
146+
a.dqliteMu.Unlock()
147+
return nil, http.StatusGatewayTimeout, fmt.Errorf("the joining node (%s) is already known to dqlite", remoteIP)
148+
}
149149
}
150+
// Update dqlite cluster if needed
151+
if len(dqliteCluster) == 1 && strings.HasPrefix(dqliteCluster[0].Address, "127.0.0.1:") {
152+
newDqliteBindAddress, err := a.findMatchingBindAddress(req.HostPort)
153+
if err != nil {
154+
a.dqliteMu.Unlock()
155+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to find matching dqlite bind address for %v: %w", req.HostPort, err)
156+
}
157+
if err := snaputil.UpdateDqliteIP(ctx, a.Snap, newDqliteBindAddress); err != nil {
158+
a.dqliteMu.Unlock()
159+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to update dqlite address to %q: %w", newDqliteBindAddress, err)
160+
}
161+
// Wait for dqlite cluster to come up with new address
162+
_, err = snaputil.WaitForDqliteCluster(ctx, a.Snap, func(c snaputil.DqliteCluster) (bool, error) {
163+
return len(c) >= 1 && !strings.HasPrefix(c[0].Address, "127.0.0.1:"), nil
164+
})
165+
if err != nil {
166+
a.dqliteMu.Unlock()
167+
return nil, http.StatusInternalServerError, fmt.Errorf("failed waiting for dqlite cluster to come up: %w", err)
168+
}
169+
}
170+
a.dqliteMu.Unlock()
171+
172+
case req.CanHandleCustomEtcd:
173+
// no-op
174+
175+
default:
176+
// fail since this node is using a custom datastore and the client cannot handle it properly.
177+
return nil, http.StatusInternalServerError, fmt.Errorf("this MicroK8s cluster uses a custom etcd endpoint. update MicroK8s to version 1.28 or newer and retry the join operation")
150178
}
151-
a.dqliteMu.Unlock()
152179

153180
callbackToken, err := a.Snap.GetOrCreateSelfCallbackToken()
154181
if err != nil {
@@ -176,11 +203,11 @@ func (a *API) Join(ctx context.Context, req JoinRequest) (*JoinResponse, int, er
176203
response := &JoinResponse{
177204
CertificateAuthority: ca,
178205
CallbackToken: callbackToken,
179-
APIServerPort: snaputil.GetServiceArgument(a.Snap, "kube-apiserver", "--secure-port"),
180-
APIServerAuthorizationMode: snaputil.GetServiceArgument(a.Snap, "kube-apiserver", "--authorization-mode"),
206+
APIServerPort: snap.GetServiceArgument(a.Snap, "kube-apiserver", "--secure-port"),
207+
APIServerAuthorizationMode: snap.GetServiceArgument(a.Snap, "kube-apiserver", "--authorization-mode"),
181208
HostNameOverride: remoteIP,
182209
KubeletArgs: kubeletArgs,
183-
ClusterCIDR: snaputil.GetServiceArgument(a.Snap, "kube-proxy", "--cluster-cidr"),
210+
ClusterCIDR: snap.GetServiceArgument(a.Snap, "kube-proxy", "--cluster-cidr"),
184211
}
185212

186213
if req.WorkerOnly {
@@ -208,29 +235,49 @@ func (a *API) Join(ctx context.Context, req JoinRequest) (*JoinResponse, int, er
208235
}
209236

210237
switch {
211-
case snaputil.GetServiceArgument(a.Snap, "kube-apiserver", "--token-auth-file") != "":
238+
case snap.GetServiceArgument(a.Snap, "kube-apiserver", "--token-auth-file") != "":
212239
response.AdminToken, err = a.Snap.GetKnownToken("admin")
213240
if err != nil {
214241
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve token for admin user: %w", err)
215242
}
216243
case !req.CanHandleCertificateAuth:
217244
return nil, http.StatusInternalServerError, fmt.Errorf("joining this MicroK8s cluster requires x509 authentication. update MicroK8s to version 1.28 or newer and retry the join operation")
218245
}
219-
response.DqliteClusterCertificate, err = a.Snap.ReadDqliteCert()
220-
if err != nil {
221-
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster certificate: %w", err)
222-
}
223-
response.DqliteClusterKey, err = a.Snap.ReadDqliteKey()
224-
if err != nil {
225-
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster key: %w", err)
226-
}
227-
voters := make([]string, 0, len(dqliteCluster))
228-
for _, node := range dqliteCluster {
229-
if node.NodeRole == 0 {
230-
voters = append(voters, node.Address)
246+
247+
// add datastore arguments
248+
switch {
249+
case kubeAPIServerUsesDqlite:
250+
dqliteCluster, err := snaputil.WaitForDqliteCluster(ctx, a.Snap, func(c snaputil.DqliteCluster) (bool, error) {
251+
return len(c) >= 1, nil
252+
})
253+
if err != nil {
254+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster nodes: %w", err)
255+
}
256+
response.DqliteClusterCertificate, err = a.Snap.ReadDqliteCert()
257+
if err != nil {
258+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster certificate: %w", err)
259+
}
260+
response.DqliteClusterKey, err = a.Snap.ReadDqliteKey()
261+
if err != nil {
262+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to retrieve dqlite cluster key: %w", err)
263+
}
264+
voters := make([]string, 0, len(dqliteCluster))
265+
for _, node := range dqliteCluster {
266+
if node.NodeRole == 0 {
267+
voters = append(voters, node.Address)
268+
}
269+
}
270+
response.DqliteVoterNodes = voters
271+
case req.CanHandleCustomEtcd:
272+
response.EtcdServers = snap.GetServiceArgument(a.Snap, "kube-apiserver", "--etcd-servers")
273+
response.EtcdCertificateAuthority, response.EtcdClientCertificate, response.EtcdClientKey, err = a.Snap.ReadEtcdCertificates()
274+
if err != nil {
275+
return nil, http.StatusInternalServerError, fmt.Errorf("failed to read etcd certificates: %w", err)
231276
}
277+
default:
278+
// fail since this node is using a custom datastore and the client cannot handle it properly.
279+
return nil, http.StatusInternalServerError, fmt.Errorf("this MicroK8s cluster uses a custom etcd endpoint. update MicroK8s to version 1.28 or newer and retry the join operation")
232280
}
233-
response.DqliteVoterNodes = voters
234281
}
235282

236283
return response, http.StatusOK, nil

0 commit comments

Comments
 (0)