Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 3 additions & 43 deletions pkg/cli/create_helm.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package cli

import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
Expand Down Expand Up @@ -924,50 +919,15 @@ func (cmd *createHelm) getVClusterConfigFromSnapshot(ctx context.Context) (strin
return "", fmt.Errorf("parse snapshot: %w", err)
}

objectStore, err := snapshot.CreateStore(ctx, snapshotOptions)
if err != nil {
return "", fmt.Errorf("create snapshot store: %w", err)
}

reader, err := objectStore.GetObject(ctx)
if err != nil {
return "", fmt.Errorf("get snapshot object: %w", err)
}

// read the first tar entry
gzipReader, err := gzip.NewReader(reader)
if err != nil {
return "", fmt.Errorf("create gzip reader: %w", err)
}
defer gzipReader.Close()

// create a new tar reader
tarReader := tar.NewReader(gzipReader)

// read the vCluster config
header, err := tarReader.Next()
if err != nil {
release, err := snapshot.GetVClusterReleaseFromSnapshot(ctx, snapshotOptions)
if err != nil && !errors.Is(err, snapshot.ErrVClusterReleaseNotFound) {
return "", err
}

buf := &bytes.Buffer{}
_, err = io.Copy(buf, tarReader)
if err != nil {
return "", err
}

// no vCluster config in the snapshot
if header.Name != snapshot.SnapshotReleaseKey {
if release == nil {
return "", nil
}

// unmarshal the release
release := &snapshot.HelmRelease{}
err = json.Unmarshal(buf.Bytes(), release)
if err != nil {
return "", fmt.Errorf("unmarshal vCluster release: %w", err)
}

// set chart version
if release.ChartVersion != "" && (cmd.ChartVersion == "" || cmd.ChartVersion == upgrade.GetVersion()) {
cmd.ChartVersion = release.ChartVersion
Expand Down
187 changes: 164 additions & 23 deletions pkg/snapshot/restoreclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

"github.com/ghodss/yaml"
vclusterconfig "github.com/loft-sh/vcluster/config"
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/loft-sh/vcluster/pkg/scheme"
setupconfig "github.com/loft-sh/vcluster/pkg/setup/config"
"github.com/loft-sh/vcluster/pkg/snapshot/volumes"
"github.com/loft-sh/vcluster/pkg/strvals"
"github.com/loft-sh/vcluster/pkg/util/translate"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
Expand Down Expand Up @@ -57,6 +59,8 @@ var (

// bump revision to make sure we invalidate caches. See https://github.com/kubernetes/kubernetes/issues/118501 for more details
BumpRevision = int64(1000)

ErrVClusterReleaseNotFound = errors.New("vCluster release not found in snapshot")
)

func (o *RestoreClient) GetSnapshotRequest(ctx context.Context) (*Request, error) {
Expand Down Expand Up @@ -113,13 +117,79 @@ func (o *RestoreClient) GetSnapshotRequest(ctx context.Context) (*Request, error
return nil, ErrSnapshotRequestNotFound
}

// GetVClusterReleaseFromSnapshot extracts the vCluster helm release from a snapshot
func GetVClusterReleaseFromSnapshot(ctx context.Context, snapshotOptions *Options) (*HelmRelease, error) {
objectStore, err := CreateStore(ctx, snapshotOptions)
if err != nil {
return nil, fmt.Errorf("create snapshot store: %w", err)
}

reader, err := objectStore.GetObject(ctx)
if err != nil {
return nil, fmt.Errorf("get snapshot object: %w", err)
}
defer reader.Close()

// read the first tar entry
gzipReader, err := gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("create gzip reader: %w", err)
}
defer gzipReader.Close()

// create a new tar reader
tarReader := tar.NewReader(gzipReader)

// read the vCluster config
header, err := tarReader.Next()
if err != nil {
return nil, err
}

buf := &bytes.Buffer{}
_, err = io.Copy(buf, tarReader)
if err != nil {
return nil, err
}

// no vCluster config in the snapshot
if header.Name != SnapshotReleaseKey {
return nil, ErrVClusterReleaseNotFound
}

// unmarshal the release
release := &HelmRelease{}
err = json.Unmarshal(buf.Bytes(), release)
if err != nil {
return nil, fmt.Errorf("unmarshal vCluster release: %w", err)
}

return release, nil
}

func (o *RestoreClient) Run(ctx context.Context) (retErr error) {
// create decoder and encoder
decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDeserializer()
encoder := protobuf.NewSerializer(scheme.Scheme, scheme.Scheme)

// get vCluster release from snapshot before restore
release, err := GetVClusterReleaseFromSnapshot(ctx, &o.Snapshot)
if err != nil {
return fmt.Errorf("failed to get vCluster release from snapshot: %w", err)
}

if release == nil || len(release.Values) == 0 {
return fmt.Errorf("no release values found in the snapshot")
}

// merge release values with default config values
mergedValues, err := mergeWithDefaults(release.Values)
if err != nil {
return fmt.Errorf("failed to merge release values with defaults: %w", err)
}

// parse vCluster config
vConfig, err := config.ParseConfig(constants.DefaultVClusterConfigLocation, os.Getenv("VCLUSTER_NAME"), nil)
vConfig, err := config.ParseConfigBytes(mergedValues, os.Getenv("VCLUSTER_NAME"), nil)
if err != nil {
return err
}
Expand All @@ -129,6 +199,12 @@ func (o *RestoreClient) Run(ctx context.Context) (retErr error) {
if err != nil {
return err
}

err = InitClients(vConfig)
if err != nil {
return err
}

o.vConfig = vConfig

// set global vCluster name
Expand Down Expand Up @@ -203,7 +279,7 @@ func (o *RestoreClient) Run(ctx context.Context) (retErr error) {
// check snapshot request
if strings.HasPrefix(string(key), RequestStoreKey) {
if o.RestoreVolumes {
err = o.createRestoreRequest(ctx, vConfig, value)
err = o.createRestoreRequest(ctx, value)
if err != nil {
return fmt.Errorf("failed to create restore request: %w", err)
}
Expand Down Expand Up @@ -258,41 +334,34 @@ func (o *RestoreClient) Run(ctx context.Context) (retErr error) {

klog.Infof("Successfully restored %d etcd keys from snapshot", restoredKeys)
klog.Infof("Successfully restored snapshot from %s", objectStore.Target())

// write release.values to vcluster config secret
err = o.writeReleaseValuesToConfigSecret(ctx, mergedValues)
if err != nil {
return fmt.Errorf("failed to write release values to vcluster config secret: %w", err)
}

return nil
}

func (o *RestoreClient) createRestoreRequest(ctx context.Context, vConfig *config.VirtualClusterConfig, value []byte) error {
func (o *RestoreClient) createRestoreRequest(ctx context.Context, value []byte) error {
klog.V(1).Infof("Found snapshot request object %s", string(value))
var err error
if vConfig.HostConfig == nil || vConfig.HostNamespace == "" {
// init the clients
vConfig.HostConfig, vConfig.HostNamespace, err = setupconfig.InitClientConfig()
if err != nil {
return fmt.Errorf("failed to init client config: %w", err)
}
}
if vConfig.HostClient == nil {
err = setupconfig.InitClients(vConfig)
if err != nil {
return fmt.Errorf("failed to init clients: %w", err)
}
}

var snapshotRequest Request
err = json.Unmarshal(value, &snapshotRequest)
err := json.Unmarshal(value, &snapshotRequest)
if err != nil {
return fmt.Errorf("failed to unmarshal snapshot request: %w", err)
}
klog.Infof("Found snapshot request: %s", snapshotRequest.Name)
o.snapshotRequest = snapshotRequest

// first create the snapshot options Secret
secret, err := CreateSnapshotOptionsSecret(constants.RestoreRequestLabel, vConfig.HostNamespace, vConfig.Name, &o.Snapshot)
secret, err := CreateSnapshotOptionsSecret(constants.RestoreRequestLabel, o.vConfig.HostNamespace, o.vConfig.Name, &o.Snapshot)
if err != nil {
return fmt.Errorf("failed to create snapshot options Secret: %w", err)
}
secret.GenerateName = fmt.Sprintf("%s-restore-request-", vConfig.Name)
secret, err = vConfig.HostClient.CoreV1().Secrets(vConfig.HostNamespace).Create(ctx, secret, metav1.CreateOptions{})
secret.GenerateName = fmt.Sprintf("%s-restore-request-", o.vConfig.Name)
secret, err = o.vConfig.HostClient.CoreV1().Secrets(o.vConfig.HostNamespace).Create(ctx, secret, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create snapshot options Secret: %w", err)
}
Expand All @@ -303,12 +372,12 @@ func (o *RestoreClient) createRestoreRequest(ctx context.Context, vConfig *confi
return fmt.Errorf("failed to create restore request: %w", err)
}
restoreRequest.Name = secret.Name
configMap, err := CreateRestoreRequestConfigMap(vConfig.HostNamespace, vConfig.Name, restoreRequest)
configMap, err := CreateRestoreRequestConfigMap(o.vConfig.HostNamespace, o.vConfig.Name, restoreRequest)
if err != nil {
return fmt.Errorf("failed to create snapshot request ConfigMap: %w", err)
}
configMap.Name = secret.Name
_, err = vConfig.HostClient.CoreV1().ConfigMaps(vConfig.HostNamespace).Create(ctx, configMap, metav1.CreateOptions{})
_, err = o.vConfig.HostClient.CoreV1().ConfigMaps(o.vConfig.HostNamespace).Create(ctx, configMap, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create snapshot request ConfigMap: %w", err)
}
Expand Down Expand Up @@ -392,6 +461,50 @@ func (o *RestoreClient) isPVCThatShouldBeRestoredInHost(key string) bool {
return status.Phase == volumes.RequestPhaseCompleted
}

func (o *RestoreClient) writeReleaseValuesToConfigSecret(ctx context.Context, releaseValues []byte) error {
// get the secret
configSecretName := "vc-config-" + o.vConfig.Name
configSecret, err := o.vConfig.HostClient.CoreV1().Secrets(o.vConfig.HostNamespace).Get(ctx, configSecretName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get vCluster config secret %s: %w", configSecretName, err)
}

// update the secret with release values
if configSecret.Data == nil {
configSecret.Data = make(map[string][]byte)
}
configSecret.Data["config.yaml"] = releaseValues

// update the secret
_, err = o.vConfig.HostClient.CoreV1().Secrets(o.vConfig.HostNamespace).Update(ctx, configSecret, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update vCluster config secret %s: %w", configSecretName, err)
}

klog.Infof("Successfully wrote config values to vCluster config secret %s/%s", o.vConfig.HostNamespace, configSecretName)
return nil
}

func InitClients(vConfig *config.VirtualClusterConfig) error {
var err error

if vConfig.HostConfig == nil || vConfig.HostNamespace == "" {
// init the clients
vConfig.HostConfig, vConfig.HostNamespace, err = setupconfig.InitClientConfig()
if err != nil {
return fmt.Errorf("failed to init client config: %w", err)
}
}
if vConfig.HostClient == nil {
err = setupconfig.InitClients(vConfig)
if err != nil {
return fmt.Errorf("failed to init clients: %w", err)
}
}

return nil
}

func transformPod(value []byte, decoder runtime.Decoder, encoder runtime.Encoder) ([]byte, error) {
// decode value
obj := &corev1.Pod{}
Expand Down Expand Up @@ -778,3 +891,31 @@ func getTranslatedPVCName(pvcName string) string {
hostName := translate.Default.HostName(nil, vName, vNamespace)
return hostName.Namespace + "/" + hostName.Name
}

// mergeWithDefaults merges the provided values with default vCluster config values
func mergeWithDefaults(values []byte) ([]byte, error) {
// get default config values
defaultMap := map[string]interface{}{}
err := yaml.Unmarshal([]byte(vclusterconfig.Values), &defaultMap)
if err != nil {
return nil, fmt.Errorf("unmarshal default config: %w", err)
}

// unmarshal release values
releaseMap := map[string]interface{}{}
err = yaml.Unmarshal(values, &releaseMap)
if err != nil {
return nil, fmt.Errorf("unmarshal release values: %w", err)
}

// merge: defaults first, then release values override
merged := strvals.MergeMaps(defaultMap, releaseMap)

// marshal back to bytes
mergedBytes, err := yaml.Marshal(merged)
if err != nil {
return nil, fmt.Errorf("marshal merged config: %w", err)
}

return mergedBytes, nil
}