Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions backend/internal/core/workflows/deployer_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func ensureClient(state ewf.State) {

func DeployNetworkStep() ewf.StepFn {
return func(ctx context.Context, state ewf.State) error {
log := logger.ForOperation("deployer_activities", "deploy_network")
ensureClient(state)

config, err := getConfig(state)
Expand All @@ -91,7 +92,9 @@ func DeployNetworkStep() ewf.StepFn {
statemanager.StoreCluster(state, cluster)
err = kubeClient.DeployNetwork(ctx, &cluster)
// Save GridClient state after network deployment
statemanager.SaveGridClientState(state, kubeClient)
if err := statemanager.SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after network deployment")
}
statemanager.StoreCluster(state, cluster)
if err != nil {
nodeIDs := make([]uint32, 0, len(cluster.Nodes))
Expand All @@ -114,6 +117,7 @@ func DeployNetworkStep() ewf.StepFn {

func UpdateNetworkStep() ewf.StepFn {
return func(ctx context.Context, state ewf.State) error {
log := logger.ForOperation("deployer_activities", "update_network")
ensureClient(state)

config, err := getConfig(state)
Expand Down Expand Up @@ -144,7 +148,9 @@ func UpdateNetworkStep() ewf.StepFn {
}

// Save GridClient state after network update
statemanager.SaveGridClientState(state, kubeClient)
if err := statemanager.SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after network update")
}
statemanager.StoreCluster(state, cluster)
state["node"] = node
return nil
Expand All @@ -153,6 +159,7 @@ func UpdateNetworkStep() ewf.StepFn {

func AddNodeStep() ewf.StepFn {
return func(ctx context.Context, state ewf.State) error {
log := logger.ForOperation("deployer_activities", "add_node")
ensureClient(state)

config, err := getConfig(state)
Expand Down Expand Up @@ -184,7 +191,9 @@ func AddNodeStep() ewf.StepFn {
}

// Save GridClient state after node deployment
statemanager.SaveGridClientState(state, kubeClient)
if err := statemanager.SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after node deployment")
}
statemanager.StoreCluster(state, cluster)

// Store the deployed node in state for verification step
Expand Down Expand Up @@ -239,7 +248,9 @@ func DeployLeaderNodeStep() ewf.StepFn {

log.Debug().Str("node_name", leaderNode.Name).Msg("Leader node deployed successfully")

statemanager.SaveGridClientState(state, kubeClient)
if err := statemanager.SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after leader node deployment")
}
statemanager.StoreCluster(state, cluster)
return nil
}
Expand Down Expand Up @@ -291,7 +302,9 @@ func BatchDeployAllNodesStep(metrics *metricsLib.Metrics) ewf.StepFn {

batchErr := kubeClient.BatchDeployNodes(ctx, &cluster, nodesToDeploy, config.SSHPublicKey)

statemanager.SaveGridClientState(state, kubeClient)
if err := statemanager.SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after batch node deployment")
}
statemanager.StoreCluster(state, cluster)

if batchErr != nil {
Expand Down Expand Up @@ -524,6 +537,7 @@ func DeleteAllUserClustersStep(clusterRepo models.ClusterRepository, metrics *me

func RemoveDeploymentNodeStep() ewf.StepFn {
return func(ctx context.Context, state ewf.State) error {
log := logger.ForOperation("deployer_activities", "remove_deployment_node")
ensureClient(state)

config, err := getConfig(state)
Expand Down Expand Up @@ -553,7 +567,10 @@ func RemoveDeploymentNodeStep() ewf.StepFn {
}

// Save GridClient state after node removal
statemanager.SaveGridClientState(state, kubeClient)
if err := statemanager.SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after node removal")
}

statemanager.StoreCluster(state, existingCluster)
return nil
}
Expand All @@ -563,7 +580,9 @@ func closeClient(ctx context.Context, wf *ewf.Workflow, err error) {
log := logger.ForOperation("deployer_activities", "close_client").With().Str("workflow_name", wf.Name).Logger()
if kubeClient, ok := wf.State["kubeclient"].(*kubedeployer.Client); ok {
// Save final GridClient state before closing
statemanager.SaveGridClientState(wf.State, kubeClient)
if err := statemanager.SaveGridClientState(wf.State, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state before closing client")
}

kubeClient.Close()
delete(wf.State, "kubeclient")
Expand Down
34 changes: 16 additions & 18 deletions backend/internal/deployment/kubedeployer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,41 @@ package kubedeployer

import (
"fmt"
"kubecloud/internal/infrastructure/gridclient"

"github.com/threefoldtech/tfgrid-sdk-go/grid-client/deployer"
"go.opentelemetry.io/otel/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type Client struct {
GridClient deployer.TFPluginClient
mnemonic string
GridClient gridclient.GridClient
}

func NewClient(mnemonic, gridNet string, debug bool, tp trace.TracerProvider) (*Client, error) {
pluginOpts := []deployer.PluginOpt{
deployer.WithNetwork(gridNet),
deployer.WithDisableSentry(),
// NewClient creates a new Client instance
func NewClient(mnemonic, gridNet string, debug bool, tp *sdktrace.TracerProvider) (*Client, error) {
if gridNet == "" {
return nil, fmt.Errorf("gridNet is required and cannot be empty")
}
var opts []gridclient.ClientOpts
opts = append(opts, gridclient.WithNetwork(gridNet))
if debug {
pluginOpts = append(pluginOpts, deployer.WithLogs())
opts = append(opts, gridclient.WithDebug())
}

if tp != nil {
pluginOpts = append(pluginOpts, deployer.WithTraceProvider(tp))
opts = append(opts, gridclient.WithTracerProvider(tp))
}
opts = append(opts, gridclient.WithDisableSentry())

tfplugin, err := deployer.NewTFPluginClient(
mnemonic,
pluginOpts...,
)
gridCl, err := gridclient.NewGridClient(mnemonic, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create TFPluginClient: %v", err)
return nil, err
}

return &Client{
GridClient: tfplugin,
mnemonic: mnemonic,
GridClient: gridCl,
}, nil
}

// Close closes the underlying GridClient
func (c *Client) Close() {
c.GridClient.Close()
}
22 changes: 11 additions & 11 deletions backend/internal/deployment/kubedeployer/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"slices"

"kubecloud/internal/infrastructure/gridclient"
"kubecloud/internal/infrastructure/logger"
"kubecloud/internal/infrastructure/telemetry"

"github.com/threefoldtech/tfgrid-sdk-go/grid-client/deployer"
"github.com/threefoldtech/tfgrid-sdk-go/grid-client/workloads"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -24,7 +24,7 @@ func (c *Cluster) GetLeaderNode() (Node, error) {
return c.Nodes[0], nil
}

func (n *Node) AssignNodeIP(ctx context.Context, gridClient deployer.TFPluginClient, networkName string) error {
func (n *Node) AssignNodeIP(ctx context.Context, gridClient gridclient.GridClient, networkName string) error {
ctx, span := getTracer().Start(ctx, "Node.AssignNodeIP",
trace.WithAttributes(
attribute.String("node.name", n.Name),
Expand Down Expand Up @@ -109,8 +109,8 @@ func (c *Client) DeployNode(ctx context.Context, cluster *Cluster, node Node, ma
leaderIP,
cluster.Token,
masterPubKey,
c.mnemonic,
c.GridClient.Network,
c.GridClient.GetMnemonic(),
c.GridClient.GetGridNetwork(),
)
if err != nil {
telemetry.RecordError(span, err)
Expand All @@ -128,7 +128,7 @@ func (c *Client) DeployNode(ctx context.Context, cluster *Cluster, node Node, ma
Str("deployment_name", depl.Name).
Msg("Deploying to grid")

if err := c.GridClient.DeploymentDeployer.Deploy(ctx, &depl); err != nil {
if err := c.GridClient.Deploy(ctx, &depl); err != nil {
log.Error().
Err(err).
Str("node_name", node.Name).
Expand All @@ -144,7 +144,7 @@ func (c *Client) DeployNode(ctx context.Context, cluster *Cluster, node Node, ma
Uint32("node_id", node.NodeID).
Msg("Loading deployment result from grid")

result, err := c.GridClient.State.LoadDeploymentFromGrid(ctx, node.NodeID, node.Name)
result, err := c.GridClient.LoadDeploymentFromGrid(ctx, node.NodeID, node.Name)
if err != nil {
telemetry.RecordError(span, err)
return fmt.Errorf("failed to load deployment for node %s: %v", node.Name, err)
Expand Down Expand Up @@ -241,8 +241,8 @@ func (c *Client) BatchDeployNodes(ctx context.Context, cluster *Cluster, nodes [
leaderIP,
cluster.Token,
masterPubKey,
c.mnemonic,
c.GridClient.Network,
c.GridClient.GetMnemonic(),
c.GridClient.GetGridNetwork(),
)
if err != nil {
telemetry.RecordError(span, err)
Expand All @@ -259,7 +259,7 @@ func (c *Client) BatchDeployNodes(ctx context.Context, cluster *Cluster, nodes [
log.Debug().
Int("deployment_count", len(deployments)).
Msg("Starting batch deployment to grid")
batchErr := c.GridClient.DeploymentDeployer.BatchDeploy(ctx, deployments)
batchErr := c.GridClient.BatchDeploy(ctx, deployments)

var successCount int
var failedNodes []string
Expand All @@ -278,7 +278,7 @@ func (c *Client) BatchDeployNodes(ctx context.Context, cluster *Cluster, nodes [
),
)

result, err := c.GridClient.State.LoadDeploymentFromGrid(ctx, node.NodeID, node.Name)
result, err := c.GridClient.LoadDeploymentFromGrid(ctx, node.NodeID, node.Name)
if err != nil {
log.Warn().Err(err).Str("node_name", node.Name).Msg("Failed to load deployment for node")
failedNodes = append(failedNodes, node.Name)
Expand Down Expand Up @@ -418,7 +418,7 @@ func (c *Client) DeployNetwork(ctx context.Context, cluster *Cluster) error {
Msg("Deploying network")

span.AddEvent("Deploying network")
err = c.GridClient.NetworkDeployer.Deploy(ctx, &net)
err = c.GridClient.DeployNetwork(ctx, &net)
cluster.Network = net
if err != nil {
telemetry.RecordError(span, err)
Expand Down
12 changes: 6 additions & 6 deletions backend/internal/deployment/kubedeployer/netutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"fmt"
"net"

"kubecloud/internal/infrastructure/gridclient"

"github.com/pkg/errors"
"github.com/threefoldtech/tfgrid-sdk-go/grid-client/deployer"
"github.com/threefoldtech/tfgrid-sdk-go/grid-client/workloads"
"github.com/threefoldtech/tfgrid-sdk-go/grid-client/zos"
)

func getIpForVm(ctx context.Context, tfPluginClient deployer.TFPluginClient, networkName string, nodeID uint32) (string, error) {
network := tfPluginClient.State.Networks.GetNetwork(networkName)
ipRange := network.GetNodeSubnet(nodeID)
func getIpForVm(ctx context.Context, tfPluginClient gridclient.GridClient, networkName string, nodeID uint32) (string, error) {
ipRange := tfPluginClient.GetNodeSubnet(networkName, nodeID)

ip, ipRangeCIDR, err := net.ParseCIDR(ipRange)
if err != nil {
Expand Down Expand Up @@ -46,8 +46,8 @@ func getIpForVm(ctx context.Context, tfPluginClient deployer.TFPluginClient, net
return "", fmt.Errorf("all IPs are exhausted for network %s on node %d", networkName, nodeID)
}

func getUsedHostIDsFromGrid(ctx context.Context, tfPluginClient deployer.TFPluginClient, nodeID uint32, networkName string, ipRangeCIDR *net.IPNet) ([]byte, error) {
nodeClient, err := tfPluginClient.NcPool.GetNodeClient(tfPluginClient.SubstrateConn, nodeID)
func getUsedHostIDsFromGrid(ctx context.Context, tfPluginClient gridclient.GridClient, nodeID uint32, networkName string, ipRangeCIDR *net.IPNet) ([]byte, error) {
nodeClient, err := tfPluginClient.GetNodeClient(nodeID)
if err != nil {
return nil, errors.Wrapf(err, "could not get node client for node %d", nodeID)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/deployment/kubedeployer/noderemove.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Client) isContractActive(ctx context.Context, contractID uint64) bool {

log := logger.ForOperation("kubedeployer", "check_contract_active")
log.Debug().Uint64("contract_id", contractID).Msg("Checking if contract is active")
_, err := c.GridClient.SubstrateConn.GetContract(contractID)
_, err := c.GridClient.GetContract(contractID)
isActive := err == nil

span.SetAttributes(attribute.Bool("contract.is_active", isActive))
Expand Down
17 changes: 14 additions & 3 deletions backend/internal/deployment/statemanager/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/xmonader/ewf"
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace/noop"
)

Expand Down Expand Up @@ -65,8 +66,14 @@ func GetKubeClient(state ewf.State, config ClientConfig) (*kubedeployer.Client,
log.Warn().Msg("Tracer provider is no-op, tracing will not work")
}

// Extract SDK trace provider if available
var tp *sdktrace.TracerProvider
if globalTp != nil {
tp, _ = globalTp.(*sdktrace.TracerProvider)
}

// Create new client
kubeClient, err := kubedeployer.NewClient(config.Mnemonic, config.Network, config.Debug, globalTp)
kubeClient, err := kubedeployer.NewClient(config.Mnemonic, config.Network, config.Debug, tp)
if err != nil {
return nil, fmt.Errorf("failed to create kubeclient: %w", err)
}
Expand All @@ -78,7 +85,9 @@ func GetKubeClient(state ewf.State, config ClientConfig) (*kubedeployer.Client,

// Store the new client in state for reuse
state["kubeclient"] = kubeClient
SaveGridClientState(state, kubeClient)
if err := SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state after creating kubeclient")
}

log.Debug().Msg("Created and stored fresh kubeclient")
return kubeClient, nil
Expand Down Expand Up @@ -107,7 +116,9 @@ func CloseClient(state ewf.State, kubeClient *kubedeployer.Client) error {
return nil
}

SaveGridClientState(state, kubeClient)
if err := SaveGridClientState(state, kubeClient); err != nil {
log.Warn().Err(err).Msg("failed to save GridClient state before closing client")
}
kubeClient.Close()
delete(state, "kubeclient")

Expand Down
Loading
Loading