Skip to content

Commit 5bff51f

Browse files
authored
feat: ability to upgrade ec manager (#1717)
1 parent 3250267 commit 5bff51f

File tree

11 files changed

+696
-298
lines changed

11 files changed

+696
-298
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ endif
145145
.PHONY: pkg/goods/bins/manager
146146
pkg/goods/bins/manager:
147147
mkdir -p pkg/goods/bins
148-
CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -o output/bins/manager ./cmd/manager
148+
CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -ldflags="-s -w $(LD_FLAGS) -extldflags=-static" -o output/bins/manager ./cmd/manager
149149
cp output/bins/manager $@
150150
touch $@
151151

cmd/manager/start.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77

8+
"github.com/replicatedhq/embedded-cluster/pkg/kubeutils"
89
"github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig"
910
"github.com/replicatedhq/embedded-cluster/pkg/socket"
1011
"github.com/replicatedhq/embedded-cluster/pkg/websocket"
@@ -37,7 +38,13 @@ func StartCmd(ctx context.Context, name string) *cobra.Command {
3738

3839
// connect to the KOTS WebSocket server
3940
if !disableWebsocket {
40-
go websocket.ConnectToKOTSWebSocket(ctx)
41+
go func() {
42+
kcli, err := kubeutils.KubeClient()
43+
if err != nil {
44+
panic(err)
45+
}
46+
websocket.ConnectToKOTSWebSocket(ctx, kcli)
47+
}()
4148
}
4249

4350
<-ctx.Done()

pkg/extensions/util.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func uninstall(ctx context.Context, hcli *helm.Helm, ext ecv1beta1.Chart) error
8282
err := hcli.Uninstall(ctx, helm.UninstallOptions{
8383
ReleaseName: ext.Name,
8484
Namespace: ext.TargetNS,
85+
Wait: true,
8586
})
8687
if err != nil {
8788
return errors.Wrap(err, "helm uninstall")

pkg/manager/upgrade.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package manager
2+
3+
import (
4+
"context"
5+
6+
"github.com/pkg/errors"
7+
"github.com/replicatedhq/embedded-cluster/pkg/helpers/systemd"
8+
"github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig"
9+
)
10+
11+
type UpgradeOptions struct {
12+
AppSlug string `json:"appSlug"`
13+
VersionLabel string `json:"versionLabel"`
14+
LicenseID string `json:"licenseID"`
15+
LicenseEndpoint string `json:"licenseEndpoint"`
16+
}
17+
18+
func Upgrade(ctx context.Context, opts UpgradeOptions) error {
19+
// path to the manager binary on the host
20+
binPath := runtimeconfig.PathToEmbeddedClusterBinary("manager")
21+
22+
// TODO (@salah): airgap
23+
err := DownloadBinaryOnline(ctx, binPath, opts.LicenseID, opts.LicenseEndpoint, opts.VersionLabel)
24+
if err != nil {
25+
return errors.Wrap(err, "download manager binary")
26+
}
27+
28+
// this is hacky but app slug is what determines the service name
29+
SetServiceName(opts.AppSlug)
30+
31+
if err := systemd.Restart(ctx, UnitName()); err != nil {
32+
return errors.Wrap(err, "restart manager service")
33+
}
34+
35+
return nil
36+
}

pkg/websocket/client.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package websocket
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"math/rand"
8+
"net/url"
9+
"os"
10+
"time"
11+
12+
gwebsocket "github.com/gorilla/websocket"
13+
"github.com/pkg/errors"
14+
"github.com/replicatedhq/embedded-cluster/pkg/extensions"
15+
"github.com/replicatedhq/embedded-cluster/pkg/manager"
16+
"github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig"
17+
"github.com/replicatedhq/embedded-cluster/pkg/upgrade"
18+
"github.com/replicatedhq/embedded-cluster/pkg/versions"
19+
"github.com/replicatedhq/embedded-cluster/pkg/websocket/types"
20+
"github.com/sirupsen/logrus"
21+
corev1 "k8s.io/api/core/v1"
22+
k8stypes "k8s.io/apimachinery/pkg/types"
23+
"sigs.k8s.io/controller-runtime/pkg/client"
24+
)
25+
26+
var wsDialer = &gwebsocket.Dialer{
27+
HandshakeTimeout: 10 * time.Second,
28+
}
29+
30+
func ConnectToKOTSWebSocket(ctx context.Context, kcli client.Client) {
31+
for {
32+
if err := attemptConnection(ctx, kcli); err != nil {
33+
logrus.Errorf("Connection attempt to KOTS failed: %v, retrying in 10 seconds...", err)
34+
time.Sleep(10 * time.Second)
35+
continue
36+
}
37+
}
38+
}
39+
40+
func attemptConnection(ctx context.Context, kcli client.Client) error {
41+
endpoint, err := getKOTSEndpoint(ctx, kcli)
42+
if err != nil {
43+
return errors.Wrap(err, "get kots endpoint")
44+
}
45+
46+
hostname, err := os.Hostname()
47+
if err != nil {
48+
return errors.Wrap(err, "get hostname")
49+
}
50+
var node corev1.Node
51+
if err := kcli.Get(ctx, k8stypes.NamespacedName{Name: hostname}, &node); err != nil {
52+
return errors.Wrap(err, "get node")
53+
}
54+
55+
wsURL := fmt.Sprintf("ws://%s/ec-ws?nodeName=%s&version=%s", endpoint, url.QueryEscape(node.Name), url.QueryEscape(versions.Version))
56+
logrus.Infof("connecting to KOTS WebSocket server on %s", wsURL)
57+
u, err := url.Parse(wsURL)
58+
if err != nil {
59+
return fmt.Errorf("parse websocket url: %w", err)
60+
}
61+
62+
conn, _, err := wsDialer.Dial(u.String(), nil)
63+
if err != nil {
64+
return fmt.Errorf("connect to websocket server: %w", err)
65+
}
66+
defer conn.Close()
67+
68+
logrus.Info("Successfully connected to KOTS WebSocket server")
69+
70+
// ping server on a regular interval to make sure it's still connected
71+
go pingWSServer(ctx, conn)
72+
73+
// listen to server messages
74+
return listenToWSServer(ctx, conn, endpoint)
75+
}
76+
77+
func pingWSServer(ctx context.Context, conn *gwebsocket.Conn) error {
78+
for {
79+
select {
80+
case <-ctx.Done():
81+
return nil
82+
case <-time.After(time.Second * time.Duration(5+rand.Intn(16))): // 5-20 seconds
83+
pingMsg := fmt.Sprintf("%x", rand.Int())
84+
if err := conn.WriteControl(gwebsocket.PingMessage, []byte(pingMsg), time.Now().Add(1*time.Second)); err != nil {
85+
return errors.Wrap(err, "send ping message")
86+
}
87+
}
88+
}
89+
}
90+
91+
func listenToWSServer(ctx context.Context, conn *gwebsocket.Conn, endpoint string) error {
92+
for {
93+
_, message, err := conn.ReadMessage() // receive messages, including ping/pong
94+
if err != nil {
95+
return errors.Wrap(err, "read message")
96+
}
97+
98+
var msg types.Message
99+
if err := json.Unmarshal(message, &msg); err != nil {
100+
logrus.Errorf("failed to unmarshal message: %s: %s", err, string(message))
101+
continue
102+
}
103+
if err := msg.Validate(); err != nil {
104+
logrus.Errorf("invalid message: %s", err.Error())
105+
continue
106+
}
107+
108+
logrus.Infof("Processing message with command=%s app=%s version=%s step=%s", msg.Command, msg.AppSlug, msg.VersionLabel, msg.StepID)
109+
110+
// ensure the environment is set up correctly
111+
os.Setenv("KUBECONFIG", runtimeconfig.PathToKubeConfig())
112+
os.Setenv("TMPDIR", runtimeconfig.EmbeddedClusterTmpSubDir())
113+
114+
// create a reporter for this step
115+
stepReporter := NewStepReporter(ctx, endpoint, msg.AppSlug, msg.VersionLabel, msg.StepID)
116+
stepReporter.Started()
117+
118+
if err := processWSMessage(ctx, msg); err != nil {
119+
logrus.Errorf("failed to process message: %s", err.Error())
120+
stepReporter.Failed(fmt.Sprintf("failed to process message: %s", err.Error()))
121+
continue
122+
}
123+
124+
// this doesn't get called for the upgrade manager step because the manager restarts.
125+
// kots marks the step as complete when the new manager connects to it.
126+
logrus.Infof("Successfully proccessed message with command=%s app=%s version=%s step=%s", msg.Command, msg.AppSlug, msg.VersionLabel, msg.StepID)
127+
stepReporter.Complete()
128+
}
129+
}
130+
131+
func processWSMessage(ctx context.Context, msg types.Message) error {
132+
switch msg.Command {
133+
case types.CommandUpgradeManager:
134+
d := types.UpgradeManagerData{}
135+
if err := json.Unmarshal([]byte(msg.Data), &d); err != nil {
136+
return errors.Wrapf(err, "unmarshal data: %s", string(msg.Data))
137+
}
138+
if err := d.Validate(); err != nil {
139+
return errors.Wrap(err, "invalid data")
140+
}
141+
142+
if err := manager.Upgrade(ctx, manager.UpgradeOptions{
143+
AppSlug: msg.AppSlug,
144+
VersionLabel: msg.VersionLabel,
145+
LicenseID: d.LicenseID,
146+
LicenseEndpoint: d.LicenseEndpoint,
147+
}); err != nil {
148+
return errors.Wrap(err, "upgrade manager")
149+
}
150+
151+
case types.CommandUpgradeCluster:
152+
d := types.UpgradeClusterData{}
153+
if err := json.Unmarshal([]byte(msg.Data), &d); err != nil {
154+
return errors.Wrapf(err, "unmarshal data: %s", string(msg.Data))
155+
}
156+
if err := d.Validate(); err != nil {
157+
return errors.Wrap(err, "invalid data")
158+
}
159+
if err := upgrade.Upgrade(ctx, &d.Installation); err != nil {
160+
return errors.Wrap(err, "upgrade cluster")
161+
}
162+
163+
case types.CommandAddExtension:
164+
d := types.ExtensionData{}
165+
if err := json.Unmarshal([]byte(msg.Data), &d); err != nil {
166+
return errors.Wrapf(err, "unmarshal data: %s", string(msg.Data))
167+
}
168+
if err := extensions.Add(ctx, d.Repos, d.Chart); err != nil {
169+
return errors.Wrap(err, "add extension")
170+
}
171+
172+
case types.CommandUpgradeExtension:
173+
d := types.ExtensionData{}
174+
if err := json.Unmarshal([]byte(msg.Data), &d); err != nil {
175+
return errors.Wrapf(err, "unmarshal data: %s", string(msg.Data))
176+
}
177+
if err := extensions.Upgrade(ctx, d.Repos, d.Chart); err != nil {
178+
return errors.Wrap(err, "upgrade extension")
179+
}
180+
181+
case types.CommandRemoveExtension:
182+
d := types.ExtensionData{}
183+
if err := json.Unmarshal([]byte(msg.Data), &d); err != nil {
184+
return errors.Wrapf(err, "unmarshal data: %s", string(msg.Data))
185+
}
186+
if err := extensions.Remove(ctx, d.Repos, d.Chart); err != nil {
187+
return errors.Wrap(err, "remove extension")
188+
}
189+
default:
190+
return errors.Errorf("unknown command: %s", msg.Command)
191+
}
192+
193+
return nil
194+
}

0 commit comments

Comments
 (0)