diff --git a/Dockerfile b/Dockerfile index 83dfc4c9..0210c7c5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN make binary FROM debian:12.7-slim -ENV DEBIAN_FRONTEND noninteractive +ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y \ ca-certificates; \ diff --git a/README.md b/README.md index 9f8f95c2..e6e9d49b 100644 --- a/README.md +++ b/README.md @@ -23,15 +23,28 @@ - [version](#version) - [node-funder](#node-funder) - [node-operator](#node-operator) + - [restart](#restart) + - [stamper](#stamper) - [Global flags](#global-flags) +- [Public Testnet Checks](#public-testnet-checks) + - [One by one](#one-by-one) + - [All at once, sequentially](#all-at-once-sequentially) ## Introduction -**Ethereum Swarm Beekeeper** is tool used for orchestrating cluster of [Ethereum Swarm Bee](https://github.com/ethersphere/bee) and running integration tests and simulations against it in the Kubernetes. +**Ethereum Swarm Beekeeper** is an orchestration and testing tool for managing [Ethereum Swarm Bee](https://github.com/ethersphere/bee) clusters. It enables: + +- **Cluster Management**: Automate creation and deletion of Bee clusters in Kubernetes. +- **Integration Checks**: Run tests (e.g., `pingpong`, `pushsync`) to validate network behavior. +- **Static Endpoints Support**: Execute checks without Kubernetes by using static Bee node endpoints. +- **Node Funding**: Automate funding Bee nodes with ETH and BZZ tokens (Kubernetes optional). +- **Dynamic Configuration**: Use flexible YAML-based configs for customizable actions. + +Beekeeper simplifies managing and testing Bee nodes, whether deployed in Kubernetes or standalone environments. ## Requirements -- Kubernetes cluster (v1.19+) +- Kubernetes cluster (v1.31+) - [Geth Swap node](https://github.com/ethersphere/helm/tree/master/charts/geth-swap) ## Installation @@ -92,7 +105,10 @@ If *config-dir* is kept in a Git repo, field *config-git-repo* should point to i Official GitHub repository with Beekeeper's configuration is **** -NOTE: command flags can be also set through the config file +General Notes: + +- command flags can be also set through the config file +- k8s client can be disabled with *enable-k8s* flag (default is true) ## Config directory @@ -190,6 +206,8 @@ This setting means that pushsync check can be executed choosing *pushsync-chunks | version | Print version number | | node-funder | Fund (top up) Bee nodes | | node-operator | Auto-funds (top up) Bee nodes on deployment. | +| restart | Restart Bee nodes in Kubernetes | +| stamper | Manage postage batches for nodes | ### check @@ -361,22 +379,35 @@ Command **node-funder** uses tool t It has following flags: ```console ---addresses strings Comma-separated list of Bee node addresses (must start with 0x). Overrides namespace and cluster name. ---geth-url string Endpoint to chain node. Required. ---cluster-name string Cluster name. Ignored if addresses or namespace are set. ---help help for node-funder ---min-native float Minimum amount of chain native coins (xDAI) nodes should have. ---min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have. ---namespace string Kubernetes namespace. Overrides cluster name if set. ---label-selector string Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected. ---timeout duration Timeout. (default 5m0s) ---wallet-key string Hex-encoded private key for the Bee node wallet. Required. +--addresses strings Comma-separated list of Bee node addresses (must start with 0x). Overrides namespace and cluster name. +--cluster-name string Name of the Beekeeper cluster to target. Ignored if a namespace is specified. +--geth-url string Endpoint to chain node. Required. +--help help for node-funder +--label-selector string Kubernetes label selector for filtering resources within the specified namespace. Use an empty string to select all resources. (default "beekeeper.ethswarm.org/node-funder=true") +--min-native float Minimum amount of chain native coins (xDAI) nodes should have. +--min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have. +--namespace string Kubernetes namespace. Overrides cluster name if set. +--periodic-check duration Periodic execution check interval. +--timeout duration Timeout. (default 5m0s) +--wallet-key string Hex-encoded private key for the Bee node wallet. Required. ``` -example: +#### Fund specific addresses ```bash -beekeeper node-funder --geth-url="http://geth-swap.default.testnet.internal" --wallet-key="4663c222787e30c1994b59044aa5045377a6e79193a8ead88293926b535c722d" --namespace=default --min-swarm=180 --min-native=2.2 --log-verbosity=3 +beekeeper node-funder --geth-url="http://geth-swap.default.testnet.internal" --wallet-key="4663c222787e30c1994b59044aa5045377a6e79193a8ead88293926b535c722d" --addresses=0xf176839c150e52fe30e5c2b5c648465c6fdfa532,0xebe269e07161c68a942a3a7fce6b4ed66867d6f0 --min-swarm=180 --min-native=2.2 +``` + +#### Fund K8S namespace (use label selector to filter nodes) + +```bash +beekeeper node-funder --geth-url="http://geth-swap.default.testnet.internal" --wallet-key="4663c222787e30c1994b59044aa5045377a6e79193a8ead88293926b535c722d" --namespace=default --min-swarm=180 --min-native=2.2 --label-selector="app=bee" +``` + +#### Fund all nodes in the cluster (beekeeper configuration) + +```bash +beekeeper node-funder --geth-url="http://geth-swap.default.testnet.internal" --wallet-key="4663c222787e30c1994b59044aa5045377a6e79193a8ead88293926b535c722d" --cluster-name=default --min-swarm=180 --min-native=2.2 ``` ### node-operator @@ -392,7 +423,7 @@ It has following flags: --min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have. --namespace string Kubernetes namespace to scan for scheduled pods. --label-selector string Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected. ---timeout duration Timeout. Default is infinite. +--timeout duration Operation timeout (e.g., 5s, 10m, 1.5h). Default is 0, which means no timeout. --wallet-key string Hex-encoded private key for the Bee node wallet. Required. ``` @@ -429,6 +460,134 @@ or beekeeper restart -namespace=default --label-selector="app=bee" --timeout=10m ``` +### stamper + +Command **stamper** manage postage batches for nodes. + +General Notes: + +- `namespace` or `cluster-name` must be specified to locate the bee nodes. +- If both are provided, `namespace` takes precedence. +- When `namespace` is set, you can use a `label-selector` to filter specific nodes. +- Use `batch-ids` to target specific postage batches, but this is applied after finding/filtering nodes. If `batch-ids` is not provided, all batches in the filtered nodes are targeted. +- If `timeout` is set to 0 and `periodic-check` is bigger than 0, the operation will run indefinitely with periodic checks. + +It has following subcommands: + +- **create** - creates a postage batch for selected nodes + + It has following flags: + + ```console + --amount uint Amount of BZZ in PLURS added that the postage batch will have. (default 100000000) + --cluster-name string Target Beekeeper cluster name. + --depth uint16 Batch depth which specifies how many chunks can be signed with the batch. It is a logarithm. Must be higher than default bucket depth (16) + --help help for create + --label-selector string Kubernetes label selector for filtering resources (use empty string for all). (default "beekeeper.ethswarm.org/node-funder=true") + --namespace string Kubernetes namespace (overrides cluster name). + --timeout duration Operation timeout (e.g., 5s, 10m, 1.5h). (default 5m0s) + ``` + + example: + + ```bash + beekeeper stamper create --cluster-name=default --amount=1000 --depth=16 --timeout=5m + ``` + + or + + ```bash + beekeeper stamper create --namespace=default --label-selector="app=bee" --amount=1000 --depth=16 --timeout=5m + ``` + +- **topup** - tops up postage batch for selected nodes + + It has following flags: + + ```console + --batch-ids strings Comma separated list of postage batch IDs to top up. If not provided, all batches are topped up. + --cluster-name string Target Beekeeper cluster name. + --geth-url string Geth URL for chain state retrieval. + --help help for topup + --label-selector string Kubernetes label selector for filtering resources (use empty string for all). (default "beekeeper.ethswarm.org/node-funder=true") + --namespace string Kubernetes namespace (overrides cluster name). + --periodic-check duration Periodic check interval. Default is 0, which means no periodic check. + --timeout duration Operation timeout (e.g., 5s, 10m, 1.5h). (default 5m0s) + --topup-to duration Duration to top up the TTL of a stamp to. (default 720h0m0s) + --ttl-threshold duration Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value. (default 120h0m0s) + ``` + + example: + + ```bash + beekeeper stamper topup --cluster-name=default --topup-to=720h --ttl-threshold=120h --periodic-check=1h --timeout=24h + ``` + + or + + ```bash + beekeeper stamper topup --namespace=default --label-selector="app=bee" --topup-to=720h --ttl-threshold=120h --periodic-check=1h --timeout=24h + ``` + +- **dilute** - dilutes postage batch for selected nodes + + It has following flags: + + ```console + --batch-ids strings Comma separated list of postage batch IDs to dilute. If not provided, all batches are diluted. + --cluster-name string Target Beekeeper cluster name. + --dilution-depth uint8 Number of levels by which to increase the depth of a stamp during dilution. (default 1) + --help help for dilute + --label-selector string Kubernetes label selector for filtering resources (use empty string for all). (default "beekeeper.ethswarm.org/node-funder=true") + --namespace string Kubernetes namespace (overrides cluster name). + --periodic-check duration Periodic check interval. Default is 0, which means no periodic check. + --timeout duration Operation timeout (e.g., 5s, 10m, 1.5h). (default 5m0s) + --usage-threshold float Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value. (default 90) + ``` + + example: + + ```bash + beekeeper stamper dilute --cluster-name=default --dilution-depth=1 --usage-threshold=90 --periodic-check=1h --timeout=24h + ``` + + or + + ```bash + beekeeper stamper dilute --namespace=default --label-selector="app=bee" --dilution-depth=1 --usage-threshold=90 --periodic-check=1h --timeout=24h + ``` + +- **set** - sets postage batch for selected nodes + + It has following flags: + + ```console + --batch-ids strings Comma separated list of postage batch IDs to set. If not provided, all batches are set. + --cluster-name string Target Beekeeper cluster name. + --dilution-depth uint16 Number of levels by which to increase the depth of a stamp during dilution. (default 1) + --geth-url string Geth URL for chain state retrieval. + --help help for set + --label-selector string Kubernetes label selector for filtering resources (use empty string for all). (default "beekeeper.ethswarm.org/node-funder=true") + --namespace string Kubernetes namespace (overrides cluster name). + --periodic-check duration Periodic check interval. Default is 0, which means no periodic check. + --timeout duration Operation timeout (e.g., 5s, 10m, 1.5h). (default 5m0s) + --topup-to duration Duration to top up the TTL of a stamp to. (default 720h0m0s) + --ttl-threshold duration Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value. (default 120h0m0s) + --usage-threshold float Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value. (default 90) + ``` + + example: + + ```bash + beekeeper stamper set --cluster-name=default --dilution-depth=1 --usage-threshold=90 --ttl-threshold=120h --topup-to=720h --periodic-check=1h --timeout=24h + ``` + + or + + ```bash + beekeeper stamper set --namespace=default --label-selector="app=bee" --dilution-depth=1 --usage-threshold=90 --ttl-threshold=120h --topup-to=720h --periodic-check=1h --timeout=24h + ``` + ## Global flags Global flags can be used with any command. @@ -436,19 +595,23 @@ Global flags can be used with any command. example: ```console ---config string config file (default is $HOME/.beekeeper.yaml) ---config-dir string config directory (default is $HOME/.beekeeper/) ---config-git-branch string Git branch (default "main") ---config-git-password string Git password or personal access tokens (needed for private repos) ---config-git-repo string Git repository with configurations (uses config directory when Git repo is not specified) (default "") ---config-git-username string Git username (needed for private repos) ---log-verbosity string log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace (default "info") ---loki-endpoint string loki http endpoint for pushing local logs (use http://loki.testnet.internal/loki/api/v1/push) ---tracing-enable enable tracing ---tracing-endpoint string endpoint to send tracing data (default "tempo-tempo-distributed-distributor.observability:6831") ---tracing-host string host to send tracing data ---tracing-port string port to send tracing data ---tracing-service-name string service name identifier for tracing (default "beekeeper") +--config string Path to the configuration file (default is $HOME/.beekeeper.yaml) +--config-dir string Directory for configuration files (default "C:\\Users\\ljubi\\.beekeeper") +--config-git-branch string Git branch to use for configuration files (default "main") +--config-git-dir string Directory within the Git repository containing configuration files. Defaults to the root directory (default ".") +--config-git-password string Git password or personal access token for authentication (required for private repositories) +--config-git-repo string URL of the Git repository containing configuration files (uses the config-dir if not specified) +--config-git-username string Git username for authentication (required for private repositories) +--enable-k8s Enable Kubernetes client functionality (default true) +--in-cluster Use the in-cluster Kubernetes client +--kubeconfig string Path to the kubeconfig file (default "~/.kube/config") +--log-verbosity string Log verbosity level (0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace) (default "info") +--loki-endpoint string HTTP endpoint for sending logs to Loki (e.g., http://loki.testnet.internal/loki/api/v1/push) +--tracing-enable Enable tracing for performance monitoring and debugging +--tracing-endpoint string Endpoint for sending tracing data, specified as host:port (default "127.0.0.1:6831") +--tracing-host string Host address for sending tracing data +--tracing-port string Port for sending tracing data +--tracing-service-name string Service name identifier used in tracing data (default "beekeeper") ``` ## Public Testnet Checks diff --git a/cmd/beekeeper/cmd/check.go b/cmd/beekeeper/cmd/check.go index 055443bc..f1f69ae6 100644 --- a/cmd/beekeeper/cmd/check.go +++ b/cmd/beekeeper/cmd/check.go @@ -14,38 +14,43 @@ import ( "github.com/spf13/cobra" ) -func (c *command) initCheckCmd() (err error) { +var errMissingClusterName = fmt.Errorf("cluster name not provided") + +func (c *command) initCheckCmd() error { const ( - optionNameClusterName = "cluster-name" optionNameCreateCluster = "create-cluster" optionNameChecks = "checks" optionNameMetricsEnabled = "metrics-enabled" optionNameSeed = "seed" optionNameTimeout = "timeout" optionNameMetricsPusherAddress = "metrics-pusher-address" - // TODO: optionNameStages = "stages" ) cmd := &cobra.Command{ Use: "check", Short: "runs integration tests on a Bee cluster", Long: `runs integration tests on a Bee cluster.`, - RunE: func(cmd *cobra.Command, args []string) (err error) { + RunE: func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) defer cancel() + checks := c.globalConfig.GetStringSlice(optionNameChecks) + if len(checks) == 0 { + return fmt.Errorf("no checks provided") + } + + clusterName := c.globalConfig.GetString(optionNameClusterName) + if clusterName == "" { + return errMissingClusterName + } + // set cluster config - cfgCluster, ok := c.config.Clusters[c.globalConfig.GetString(optionNameClusterName)] + cfgCluster, ok := c.config.Clusters[clusterName] if !ok { - return fmt.Errorf("cluster %s not defined", c.globalConfig.GetString(optionNameClusterName)) + return fmt.Errorf("cluster %s not defined", clusterName) } - // setup cluster - cluster, err := c.setupCluster(ctx, - c.globalConfig.GetString(optionNameClusterName), - c.config, - c.globalConfig.GetBool(optionNameCreateCluster), - ) + cluster, err := c.setupCluster(ctx, clusterName, c.globalConfig.GetBool(optionNameCreateCluster)) if err != nil { return fmt.Errorf("cluster setup: %w", err) } @@ -88,7 +93,7 @@ func (c *command) initCheckCmd() (err error) { } // run checks - for _, checkName := range c.globalConfig.GetStringSlice(optionNameChecks) { + for _, checkName := range checks { checkName = strings.TrimSpace(checkName) // get configuration checkConfig, ok := c.config.Checks[checkName] @@ -145,7 +150,7 @@ func (c *command) initCheckCmd() (err error) { PreRunE: c.preRunE, } - cmd.Flags().String(optionNameClusterName, "default", "cluster name") + cmd.Flags().String(optionNameClusterName, "", "cluster name. Required") cmd.Flags().String(optionNameMetricsPusherAddress, "pushgateway.staging.internal", "prometheus metrics pusher address") cmd.Flags().Bool(optionNameCreateCluster, false, "creates cluster before executing checks") cmd.Flags().StringSlice(optionNameChecks, []string{"pingpong"}, "list of checks to execute") diff --git a/cmd/beekeeper/cmd/cluster.go b/cmd/beekeeper/cmd/cluster.go index 9888f10f..37e4cf6e 100644 --- a/cmd/beekeeper/cmd/cluster.go +++ b/cmd/beekeeper/cmd/cluster.go @@ -19,7 +19,11 @@ type nodeResult struct { err error } -func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) (err error) { +func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) error { + if clusterName == "" { + return errMissingClusterName + } + clusterConfig, ok := cfg.Clusters[clusterName] if !ok { return fmt.Errorf("cluster %s not defined", clusterName) @@ -70,12 +74,14 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co if err != nil { return err } + if len(v.Nodes) > 0 { for i := 0; i < len(v.Nodes); i++ { nName := fmt.Sprintf("%s-%d", ngName, i) if len(v.Nodes[i].Name) > 0 { nName = v.Nodes[i].Name } + if err := ng.DeleteNode(ctx, nName); err != nil { return fmt.Errorf("deleting node %s from the node group %s: %w", nName, ngName, err) } @@ -105,11 +111,15 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co } } - return + return nil } -func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *config.Config, startCluster bool) (cluster orchestration.Cluster, err error) { - clusterConfig, ok := cfg.Clusters[clusterName] +func (c *command) setupCluster(ctx context.Context, clusterName string, startCluster bool) (cluster orchestration.Cluster, err error) { + if clusterName == "" { + return nil, errMissingClusterName + } + + clusterConfig, ok := c.config.Clusters[clusterName] if !ok { return nil, fmt.Errorf("cluster %s not defined", clusterName) } @@ -139,7 +149,7 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con inCluster := c.globalConfig.GetBool(optionNameInCluster) // setup bootnode node group - fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, cfg, true, cluster, startCluster, inCluster, "", nodeResultChan) + fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, c.config, true, cluster, startCluster, inCluster, "", nodeResultChan) if err != nil { return nil, fmt.Errorf("setup node group bootnode: %w", err) } @@ -153,7 +163,7 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con } // setup other node groups - fundAddresses, _, err = setupNodes(ctx, clusterConfig, cfg, false, cluster, startCluster, inCluster, bootnodes, nodeResultChan) + fundAddresses, _, err = setupNodes(ctx, clusterConfig, c.config, false, cluster, startCluster, inCluster, bootnodes, nodeResultChan) if err != nil { return nil, fmt.Errorf("setup other node groups: %w", err) } @@ -165,7 +175,8 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con } c.log.Infof("node groups funded") } - c.log.Infof("cluster %s setup completed", clusterName) + + c.log.WithField("use-static-endpoints", clusterConfig.IsUsingStaticEndpoints()).Infof("cluster %s setup completed", clusterName) return cluster, nil } diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index f1ae7cf9..08c8221f 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "errors" "fmt" "io" @@ -12,11 +13,12 @@ import ( "github.com/ethersphere/beekeeper/pkg/config" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/scheduler" "github.com/ethersphere/beekeeper/pkg/swap" "github.com/go-git/go-billy/v5/memfs" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" - "github.com/go-git/go-git/v5/plumbing/transport/http" + httptransport "github.com/go-git/go-git/v5/plumbing/transport/http" "github.com/go-git/go-git/v5/storage/memory" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -52,14 +54,10 @@ type command struct { globalConfig *viper.Viper globalConfigFile string homeDir string - // configuration - config *config.Config - // kubernetes client - k8sClient *k8s.Client - // swap client - swapClient swap.Client - // log - log logging.Logger + config *config.Config // beekeeper clusters configuration (config dir) + k8sClient *k8s.Client // kubernetes client + swapClient swap.Client + log logging.Logger } type option func(*command) @@ -72,7 +70,7 @@ func newCommand(opts ...option) (c *command, err error) { SilenceErrors: true, SilenceUsage: true, PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - return c.initConfig() + return c.initConfig(cmd.Flags().Changed(optionNameClusterName)) }, }, } @@ -112,6 +110,10 @@ func newCommand(opts ...option) (c *command, err error) { return nil, err } + if err := c.initStamperCmd(); err != nil { + return nil, err + } + if err := c.initRestartCmd(); err != nil { return nil, err } @@ -139,6 +141,7 @@ func Execute() (err error) { if err != nil { return err } + return c.Execute() } @@ -151,7 +154,7 @@ func (c *command) initGlobalFlags() { globalFlags.String(optionNameConfigGitBranch, "main", "Git branch to use for configuration files") globalFlags.String(optionNameConfigGitUsername, "", "Git username for authentication (required for private repositories)") globalFlags.String(optionNameConfigGitPassword, "", "Git password or personal access token for authentication (required for private repositories)") - globalFlags.String(optionNameLogVerbosity, "info", "Log verbosity level (0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace;") + globalFlags.String(optionNameLogVerbosity, "info", "Log verbosity level (0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace)") globalFlags.String(optionNameLokiEndpoint, "", "HTTP endpoint for sending logs to Loki (e.g., http://loki.testnet.internal/loki/api/v1/push)") globalFlags.Bool(optionNameTracingEnabled, false, "Enable tracing for performance monitoring and debugging") globalFlags.String(optionNameTracingEndpoint, "127.0.0.1:6831", "Endpoint for sending tracing data, specified as host:port") @@ -163,64 +166,96 @@ func (c *command) initGlobalFlags() { globalFlags.String(optionNameKubeconfig, "~/.kube/config", "Path to the kubeconfig file") } -func (c *command) bindGlobalFlags() (err error) { - for _, flag := range []string{optionNameConfigDir, optionNameConfigGitRepo, optionNameConfigGitBranch, optionNameConfigGitDir, optionNameConfigGitUsername, optionNameConfigGitPassword, optionNameLogVerbosity, optionNameLokiEndpoint} { +func (c *command) bindGlobalFlags() error { + for _, flag := range []string{ + optionNameConfigDir, + optionNameConfigGitRepo, + optionNameConfigGitBranch, + optionNameConfigGitDir, + optionNameConfigGitUsername, + optionNameConfigGitPassword, + optionNameLogVerbosity, + optionNameLokiEndpoint, + } { if err := c.globalConfig.BindPFlag(flag, c.root.PersistentFlags().Lookup(flag)); err != nil { - return err + return fmt.Errorf("binding %s flag: %w", flag, err) } } - return + + return nil +} + +func (c *command) initConfig(loadConfigDir bool) error { + if err := c.initGlobalConfig(); err != nil { + return fmt.Errorf("initializing global configuration: %w", err) + } + + if err := c.initLogger(); err != nil { + return fmt.Errorf("initializing logger: %w", err) + } + + if !loadConfigDir { + c.log.Debugf("skpping loading configuration directory as the cluster name is not used") + return nil + } + + if err := c.loadConfigDirectory(); err != nil { + return fmt.Errorf("loading configuration directory: %w", err) + } + + return nil } -func (c *command) initConfig() (err error) { - // set global configuration +func (c *command) initGlobalConfig() error { cfg := viper.New() cfgName := ".beekeeper" + if c.globalConfigFile != "" { - // Use config file from the flag. cfg.SetConfigFile(c.globalConfigFile) } else { - // Search config in home directory with name ".beekeeper" (without extension). cfg.AddConfigPath(c.homeDir) cfg.SetConfigName(cfgName) } - // environment cfg.SetEnvPrefix("beekeeper") - cfg.AutomaticEnv() // read in environment variables that match + cfg.AutomaticEnv() cfg.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) if c.homeDir != "" && c.globalConfigFile == "" { c.globalConfigFile = filepath.Join(c.homeDir, cfgName+".yaml") } - // if a config file is found, read it in. if err := cfg.ReadInConfig(); err != nil { - var e viper.ConfigFileNotFoundError - if !errors.As(err, &e) { + if !errors.As(err, &viper.ConfigFileNotFoundError{}) { return err } } c.globalConfig = cfg - if err := c.bindGlobalFlags(); err != nil { - return err - } - // init logger + return c.bindGlobalFlags() +} + +func (c *command) initLogger() error { verbosity := c.globalConfig.GetString(optionNameLogVerbosity) lokiEndpoint := c.globalConfig.GetString(optionNameLokiEndpoint) - c.log, err = newLogger(c.root, verbosity, lokiEndpoint) + + log, err := newLogger(c.root, verbosity, lokiEndpoint) if err != nil { return fmt.Errorf("new logger: %w", err) } + c.log = log + return nil +} + +func (c *command) loadConfigDirectory() error { if c.globalConfig.GetString(optionNameConfigGitRepo) != "" { c.log.Debugf("using configuration from Git repository %s, branch %s, directory %s", c.globalConfig.GetString(optionNameConfigGitRepo), c.globalConfig.GetString(optionNameConfigGitBranch), c.globalConfig.GetString(optionNameConfigGitDir)) // read configuration from git repo fs := memfs.New() if _, err := git.Clone(memory.NewStorage(), fs, &git.CloneOptions{ - Auth: &http.BasicAuth{ + Auth: &httptransport.BasicAuth{ Username: c.globalConfig.GetString(optionNameConfigGitUsername), Password: c.globalConfig.GetString(optionNameConfigGitPassword), }, @@ -294,17 +329,19 @@ func (c *command) initConfig() (err error) { } } - return + return nil } -func (c *command) setHomeDir() (err error) { +func (c *command) setHomeDir() error { if c.homeDir != "" { - return + return nil } + dir, err := os.UserHomeDir() if err != nil { - return err + return fmt.Errorf("obtaining user's home dir: %w", err) } + c.homeDir = dir return nil } @@ -314,31 +351,64 @@ func (c *command) preRunE(cmd *cobra.Command, args []string) (err error) { return err } - // set Kubernetes client - if err := c.setK8S(); err != nil { + if err := c.setK8sClient(); err != nil { return err } - // set Swap client + if err := c.setSwapClient(); err != nil { return err } + return nil } -func (c *command) setK8S() (err error) { - if c.globalConfig.GetBool(optionNameEnableK8S) { - options := []k8s.ClientOption{ - k8s.WithLogger(c.log), - k8s.WithInCluster(c.globalConfig.GetBool(optionNameInCluster)), - k8s.WithKubeconfigPath(c.globalConfig.GetString(optionNameKubeconfig)), - } +func (c *command) setK8sClient() error { + if !c.globalConfig.GetBool(optionNameEnableK8S) { + c.log.Info("Kubernetes client disabled. Enable it with --enable-k8s=true flag if required") + return nil + } - if c.k8sClient, err = k8s.NewClient(options...); err != nil && !errors.Is(err, k8s.ErrKubeconfigNotSet) { - return fmt.Errorf("creating Kubernetes client: %w", err) - } + c.log.Info("Kubernetes client enabled. Disable it with --enable-k8s=false flag if not required") + + options := []k8s.ClientOption{ + k8s.WithLogger(c.log), + k8s.WithInCluster(c.globalConfig.GetBool(optionNameInCluster)), + k8s.WithKubeconfigPath(c.globalConfig.GetString(optionNameKubeconfig)), } - return + k8sClient, err := k8s.NewClient(options...) + if err != nil && !errors.Is(err, k8s.ErrKubeconfigNotSet) { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + c.k8sClient = k8sClient + return nil +} + +func (c *command) executePeriodically(ctx context.Context, action func(ctx context.Context) error) error { + periodicCheck := c.globalConfig.GetDuration(optionNamePeriodicCheck) + + if periodicCheck == 0 { + return action(ctx) + } + + periodicExecutor := scheduler.NewPeriodicExecutor(periodicCheck, c.log) + + periodicExecutor.Start(ctx, func(ctx context.Context) error { + if err := action(ctx); err != nil { + c.log.Errorf("failed to execute action periodically: %v", err) + } + return nil + }) + defer func() { + if err := periodicExecutor.Close(); err != nil { + c.log.Errorf("failed to close periodic executor: %v", err) + } + }() + + <-ctx.Done() + + return ctx.Err() } func (c *command) setSwapClient() (err error) { @@ -365,6 +435,7 @@ func newLogger(cmd *cobra.Command, verbosity, lokiEndpoint string) (logging.Logg logging.WithLokiOption(lokiEndpoint), logging.WithMetricsOption(), } + switch strings.ToLower(verbosity) { case "0", "silent": logger = logging.New(io.Discard, 0) @@ -381,5 +452,6 @@ func newLogger(cmd *cobra.Command, verbosity, lokiEndpoint string) (logging.Logg default: return nil, fmt.Errorf("unknown %s level %q, use help to check flag usage options", optionNameLogVerbosity, verbosity) } + return logger, nil } diff --git a/cmd/beekeeper/cmd/create.go b/cmd/beekeeper/cmd/create.go index 942bee5d..17f28bf2 100644 --- a/cmd/beekeeper/cmd/create.go +++ b/cmd/beekeeper/cmd/create.go @@ -14,7 +14,7 @@ func (c *command) initCreateCmd() (err error) { }, } - cmd.AddCommand(c.initCreateK8SNamespace()) + cmd.AddCommand(c.initCreateK8sNamespace()) cmd.AddCommand(c.initCreateBeeCluster()) c.root.AddCommand(cmd) diff --git a/cmd/beekeeper/cmd/create_bee_cluster.go b/cmd/beekeeper/cmd/create_bee_cluster.go index abe9566d..2661b064 100644 --- a/cmd/beekeeper/cmd/create_bee_cluster.go +++ b/cmd/beekeeper/cmd/create_bee_cluster.go @@ -8,10 +8,10 @@ import ( ) const ( - optionNameClusterName = "cluster-name" - optionNameChainNodeEndpoint = "geth-url" - optionNameWalletKey = "wallet-key" - optionNameTimeout = "timeout" + optionNameClusterName string = "cluster-name" + optionNameChainNodeEndpoint string = "geth-url" + optionNameWalletKey string = "wallet-key" + optionNameTimeout string = "timeout" ) func (c *command) initCreateBeeCluster() *cobra.Command { @@ -23,14 +23,14 @@ func (c *command) initCreateBeeCluster() *cobra.Command { ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) defer cancel() start := time.Now() - _, err = c.setupCluster(ctx, c.globalConfig.GetString(optionNameClusterName), c.config, true) + _, err = c.setupCluster(ctx, c.globalConfig.GetString(optionNameClusterName), true) c.log.Infof("cluster setup took %s", time.Since(start)) return err }, PreRunE: c.preRunE, } - cmd.Flags().String(optionNameClusterName, "default", "cluster name") + cmd.Flags().String(optionNameClusterName, "", "cluster name") cmd.Flags().String(optionNameChainNodeEndpoint, "", "Endpoint to chain node. Required.") cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.") cmd.Flags().Duration(optionNameTimeout, 30*time.Minute, "timeout") diff --git a/cmd/beekeeper/cmd/create_k8s_namespace.go b/cmd/beekeeper/cmd/create_k8s_namespace.go index 3a8dd080..92b5839b 100644 --- a/cmd/beekeeper/cmd/create_k8s_namespace.go +++ b/cmd/beekeeper/cmd/create_k8s_namespace.go @@ -6,15 +6,18 @@ import ( "github.com/spf13/cobra" ) -func (c *command) initCreateK8SNamespace() *cobra.Command { +const namespaceCmd string = "k8s-namespace" + +func (c *command) initCreateK8sNamespace() *cobra.Command { cmd := &cobra.Command{ - Use: "k8s-namespace", + Use: namespaceCmd, Short: "creates Kubernetes namespace", Long: `creates Kubernetes namespace.`, Args: func(cmd *cobra.Command, args []string) error { if len(args) < 1 { return fmt.Errorf("requires exactly one argument representing name of the Kubernetes namespace") } + return nil }, RunE: func(cmd *cobra.Command, args []string) (err error) { @@ -28,12 +31,14 @@ func (c *command) initCreateK8SNamespace() *cobra.Command { return }, PreRunE: func(cmd *cobra.Command, args []string) error { - if err := c.setK8S(); err != nil { + if err := c.setK8sClient(); err != nil { return err } + if c.k8sClient == nil { return fmt.Errorf("k8s client not set") } + return nil }, } diff --git a/cmd/beekeeper/cmd/delete_bee_cluster.go b/cmd/beekeeper/cmd/delete_bee_cluster.go index 20f8dfd3..274fb7bd 100644 --- a/cmd/beekeeper/cmd/delete_bee_cluster.go +++ b/cmd/beekeeper/cmd/delete_bee_cluster.go @@ -9,7 +9,6 @@ import ( func (c *command) initDeleteBeeCluster() *cobra.Command { const ( - optionNameClusterName = "cluster-name" optionNameWithStorage = "with-storage" optionNameTimeout = "timeout" ) @@ -27,7 +26,7 @@ func (c *command) initDeleteBeeCluster() *cobra.Command { PreRunE: c.preRunE, } - cmd.Flags().String(optionNameClusterName, "default", "cluster name") + cmd.Flags().String(optionNameClusterName, "", "cluster name") cmd.Flags().Bool(optionNameWithStorage, false, "delete storage") cmd.Flags().Duration(optionNameTimeout, 15*time.Minute, "timeout") diff --git a/cmd/beekeeper/cmd/delete_k8s_namespace.go b/cmd/beekeeper/cmd/delete_k8s_namespace.go index eb7c45c7..e09e54c5 100644 --- a/cmd/beekeeper/cmd/delete_k8s_namespace.go +++ b/cmd/beekeeper/cmd/delete_k8s_namespace.go @@ -8,13 +8,14 @@ import ( func (c *command) initDeleteK8SNamespace() *cobra.Command { cmd := &cobra.Command{ - Use: "k8s-namespace", + Use: namespaceCmd, Short: "deletes Kubernetes namespace", Long: `Deletes Kubernetes namespace.`, Args: func(cmd *cobra.Command, args []string) error { if len(args) < 1 { return fmt.Errorf("requires exactly one argument representing name of the Kubernetes namespace") } + return nil }, RunE: func(cmd *cobra.Command, args []string) (err error) { @@ -28,12 +29,14 @@ func (c *command) initDeleteK8SNamespace() *cobra.Command { return }, PreRunE: func(cmd *cobra.Command, args []string) error { - if err := c.setK8S(); err != nil { + if err := c.setK8sClient(); err != nil { return err } + if c.k8sClient == nil { return fmt.Errorf("k8s client not set") } + return nil }, } diff --git a/cmd/beekeeper/cmd/node_funder.go b/cmd/beekeeper/cmd/node_funder.go index 706a1057..e18a1392 100644 --- a/cmd/beekeeper/cmd/node_funder.go +++ b/cmd/beekeeper/cmd/node_funder.go @@ -6,19 +6,20 @@ import ( "fmt" "time" - "github.com/ethersphere/beekeeper/pkg/config" nodefunder "github.com/ethersphere/beekeeper/pkg/funder/node" "github.com/ethersphere/node-funder/pkg/funder" "github.com/spf13/cobra" ) -const nodeFunderLabelSelector string = "beekeeper.ethswarm.org/node-funder=true" +const ( + nodeFunderLabelSelector string = "beekeeper.ethswarm.org/node-funder=true" + nodeFunderCmd string = "node-funder" +) func (c *command) initNodeFunderCmd() (err error) { const ( optionNameAddresses = "addresses" optionNameNamespace = "namespace" - optionClusterName = "cluster-name" optionNameChainNodeEndpoint = "geth-url" optionNameWalletKey = "wallet-key" optionNameMinNative = "min-native" @@ -28,87 +29,95 @@ func (c *command) initNodeFunderCmd() (err error) { ) cmd := &cobra.Command{ - Use: "node-funder", + Use: nodeFunderCmd, Short: "funds bee nodes with ETH and BZZ", Long: `Fund makes BZZ tokens and ETH deposits to given Ethereum addresses. beekeeper node-funder`, RunE: func(cmd *cobra.Command, args []string) (err error) { - cfg := config.NodeFunder{} - - namespace := c.globalConfig.GetString(optionNameNamespace) - addresses := c.globalConfig.GetStringSlice(optionNameAddresses) - clusterName := c.globalConfig.GetString(optionClusterName) - - if len(addresses) > 0 { - cfg.Addresses = addresses - } else if namespace != "" { - cfg.Namespace = namespace - } else if clusterName != "" { - cluster, ok := c.config.Clusters[clusterName] - if !ok { - return fmt.Errorf("cluster %s not found", clusterName) + return c.withTimeoutHandler(cmd, func(ctx context.Context) error { + cfg := funder.Config{ + MinAmounts: funder.MinAmounts{ + NativeCoin: c.globalConfig.GetFloat64(optionNameMinNative), + SwarmToken: c.globalConfig.GetFloat64(optionNameMinSwarm), + }, + } + + if cfg.ChainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); cfg.ChainNodeEndpoint == "" { + return errors.New("chain node endpoint (geth-url) not provided") + } + + if cfg.WalletKey = c.globalConfig.GetString(optionNameWalletKey); cfg.WalletKey == "" { + return errors.New("wallet key not provided") } - if cluster.Namespace == nil || *cluster.Namespace == "" { - return fmt.Errorf("cluster %s namespace not provided", clusterName) + + defer c.log.Infof("node-funder done") + + logger := funder.WithLoggerOption(c.log) + + addresses := c.globalConfig.GetStringSlice(optionNameAddresses) + if len(addresses) > 0 { + cfg.Addresses = addresses + return c.executePeriodically(ctx, func(ctx context.Context) error { + return funder.Fund(ctx, cfg, nil, nil, logger) + }) } - cfg.Namespace = *cluster.Namespace - } else { + + namespace := c.globalConfig.GetString(optionNameNamespace) + if namespace != "" { + label := c.globalConfig.GetString(optionNameLabelSelector) + funderClient := nodefunder.NewClient(c.k8sClient, c.globalConfig.GetBool(optionNameInCluster), label, c.log) + + cfg.Namespace = namespace + return c.executePeriodically(ctx, func(ctx context.Context) error { + return funder.Fund(ctx, cfg, funderClient, nil, logger) + }) + } + + clusterName := c.globalConfig.GetString(optionNameClusterName) + if clusterName != "" { + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return fmt.Errorf("setting up cluster %s: %w", clusterName, err) + } + + clients, err := cluster.NodesClients(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve node clients: %w", err) + } + + for _, node := range clients { + addr, err := node.Addresses(ctx) + if err != nil { + return fmt.Errorf("error fetching addresses for node %s: %w", node.Name(), err) + } + cfg.Addresses = append(cfg.Addresses, addr.Ethereum) + } + + return c.executePeriodically(ctx, func(ctx context.Context) error { + return funder.Fund(ctx, cfg, nil, nil, logger) + }) + } + + // NOTE: Swarm key address is the same as the nodeEndpoint/wallet walletAddress. + // When setting up a bootnode, the swarmkey option is used to specify the existing swarm key. + // However, for other nodes, the beekeeper automatically generates a new swarm key during cluster setup. + // Once the swarm key is generated, beekeeper identifies the addresses that can be funded for each node. + return errors.New("one of addresses, namespace, or valid cluster-name must be provided") - } - - // chain node endpoint check - if cfg.ChainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); cfg.ChainNodeEndpoint == "" { - return errors.New("chain node endpoint (geth-url) not provided") - } - - // wallet key check - if cfg.WalletKey = c.globalConfig.GetString(optionNameWalletKey); cfg.WalletKey == "" { - return errors.New("wallet key not provided") - } - - cfg.MinAmounts.NativeCoin = c.globalConfig.GetFloat64(optionNameMinNative) - cfg.MinAmounts.SwarmToken = c.globalConfig.GetFloat64(optionNameMinSwarm) - - // add timeout to node-funder - ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) - defer cancel() - - defer c.log.Infof("node-funder done") - - // NOTE: Swarm key address is the same as the nodeEndpoint/wallet walletAddress. - // When setting up a bootnode, the swarmkey option is used to specify the existing swarm key. - // However, for other nodes, the beekeeper automatically generates a new swarm key during cluster setup. - // Once the swarm key is generated, beekeeper identifies the addresses that can be funded for each node. - - var nodeLister funder.NodeLister - // if addresses are provided, use them, not k8s client to list nodes - if cfg.Namespace != "" { - label := c.globalConfig.GetString(optionNameLabelSelector) - nodeLister = nodefunder.NewClient(c.k8sClient, c.globalConfig.GetBool(optionNameInCluster), label, c.log) - } - - return funder.Fund(ctx, funder.Config{ - Namespace: cfg.Namespace, - Addresses: cfg.Addresses, - ChainNodeEndpoint: cfg.ChainNodeEndpoint, - WalletKey: cfg.WalletKey, - MinAmounts: funder.MinAmounts{ - NativeCoin: cfg.MinAmounts.NativeCoin, - SwarmToken: cfg.MinAmounts.SwarmToken, - }, - }, nodeLister, nil, funder.WithLoggerOption(c.log)) + }) }, PreRunE: c.preRunE, } cmd.Flags().StringSliceP(optionNameAddresses, "a", nil, "Comma-separated list of Bee node addresses (must start with 0x). Overrides namespace and cluster name.") cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace. Overrides cluster name if set.") - cmd.Flags().String(optionClusterName, "", "Cluster name. Ignored if addresses or namespace are set.") + cmd.Flags().String(optionNameClusterName, "", "Name of the Beekeeper cluster to target. Ignored if a namespace is specified.") cmd.Flags().String(optionNameChainNodeEndpoint, "", "Endpoint to chain node. Required.") cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.") cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.") cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.") - cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.") - cmd.Flags().Duration(optionNameTimeout, 5*time.Minute, "Timeout.") + cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. Use an empty string to select all resources.") + cmd.Flags().Duration(optionNameTimeout, 5*time.Minute, "Operation timeout (e.g., 5s, 10m, 1.5h).") + cmd.Flags().Duration(optionNamePeriodicCheck, 0*time.Minute, "Periodic execution check interval.") c.root.AddCommand(cmd) diff --git a/cmd/beekeeper/cmd/operator.go b/cmd/beekeeper/cmd/operator.go index e663fe45..bf1bcd68 100644 --- a/cmd/beekeeper/cmd/operator.go +++ b/cmd/beekeeper/cmd/operator.go @@ -5,12 +5,11 @@ import ( "errors" "time" - "github.com/ethersphere/beekeeper/pkg/config" "github.com/ethersphere/beekeeper/pkg/funder/operator" "github.com/spf13/cobra" ) -func (c *command) initOperatorCmd() (err error) { +func (c *command) initOperatorCmd() error { const ( optionNameNamespace = "namespace" optionNameChainNodeEndpoint = "geth-url" @@ -23,52 +22,36 @@ func (c *command) initOperatorCmd() (err error) { cmd := &cobra.Command{ Use: "node-operator", - Short: "scans for scheduled pods and funds them", - Long: `Node operator scans for scheduled pods and funds them using node-funder. beekeeper node-operator`, + Short: "scans for scheduled Kubernetes pods and funds them", + Long: `Node operator scans for scheduled Kubernetes pods and funds them using node-funder. beekeeper node-operator`, RunE: func(cmd *cobra.Command, args []string) (err error) { - cfg := config.NodeFunder{} + return c.withTimeoutHandler(cmd, func(ctx context.Context) error { + namespace := c.globalConfig.GetString(optionNameNamespace) + if namespace == "" { + return errors.New("namespace not provided") + } - var namespace string - if namespace = c.globalConfig.GetString(optionNameNamespace); namespace == "" { - return errors.New("namespace not provided") - } + chainNodeEndpoint := c.globalConfig.GetString(optionNameChainNodeEndpoint) + if chainNodeEndpoint == "" { + return errors.New("chain node endpoint (geth-url) not provided") + } - // chain node endpoint check - if cfg.ChainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); cfg.ChainNodeEndpoint == "" { - return errors.New("chain node endpoint (geth-url) not provided") - } + walletKey := c.globalConfig.GetString(optionNameWalletKey) + if walletKey == "" { + return errors.New("wallet key not provided") + } - // wallet key check - if cfg.WalletKey = c.globalConfig.GetString(optionNameWalletKey); cfg.WalletKey == "" { - return errors.New("wallet key not provided") - } - - cfg.MinAmounts.NativeCoin = c.globalConfig.GetFloat64(optionNameMinNative) - cfg.MinAmounts.SwarmToken = c.globalConfig.GetFloat64(optionNameMinSwarm) - - // add timeout to operator - // if timeout is not set, operator will run infinitely - var ctxNew context.Context - var cancel context.CancelFunc - timeout := c.globalConfig.GetDuration(optionNameTimeout) - if timeout > 0 { - ctxNew, cancel = context.WithTimeout(cmd.Context(), timeout) - } else { - ctxNew = context.Background() - } - if cancel != nil { - defer cancel() - } - - return operator.NewClient(&operator.ClientConfig{ - Log: c.log, - Namespace: namespace, - WalletKey: cfg.WalletKey, - ChainNodeEndpoint: cfg.ChainNodeEndpoint, - MinAmounts: cfg.MinAmounts, - K8sClient: c.k8sClient, - LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), - }).Run(ctxNew) + return operator.NewClient(&operator.ClientConfig{ + Log: c.log, + Namespace: namespace, + WalletKey: walletKey, + ChainNodeEndpoint: chainNodeEndpoint, + NativeToken: c.globalConfig.GetFloat64(optionNameMinNative), + SwarmToken: c.globalConfig.GetFloat64(optionNameMinSwarm), + K8sClient: c.k8sClient, + LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), + }).Run(ctx) + }) }, PreRunE: c.preRunE, } @@ -78,8 +61,8 @@ func (c *command) initOperatorCmd() (err error) { cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.") cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.") cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.") - cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.") - cmd.Flags().Duration(optionNameTimeout, 0*time.Minute, "Timeout. Default is infinite.") + cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. Use an empty string to select all resources.") + cmd.Flags().Duration(optionNameTimeout, 0*time.Minute, "Operation timeout (e.g., 5s, 10m, 1.5h). Default is 0, which means no timeout.") c.root.AddCommand(cmd) diff --git a/cmd/beekeeper/cmd/print.go b/cmd/beekeeper/cmd/print.go index edbdfccd..5c7f5e99 100644 --- a/cmd/beekeeper/cmd/print.go +++ b/cmd/beekeeper/cmd/print.go @@ -11,10 +11,7 @@ import ( ) func (c *command) initPrintCmd() (err error) { - const ( - optionNameClusterName = "cluster-name" - optionNameTimeout = "timeout" - ) + const optionNameTimeout = "timeout" cmd := &cobra.Command{ Use: "print", @@ -44,7 +41,7 @@ Requires exactly one argument from the following list: addresses, depths, nodes, ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) defer cancel() - cluster, err := c.setupCluster(ctx, c.globalConfig.GetString(optionNameClusterName), c.config, false) + cluster, err := c.setupCluster(ctx, c.globalConfig.GetString(optionNameClusterName), false) if err != nil { return fmt.Errorf("cluster setup: %w", err) } diff --git a/cmd/beekeeper/cmd/restart.go b/cmd/beekeeper/cmd/restart.go index e8004e80..6c36d9fa 100644 --- a/cmd/beekeeper/cmd/restart.go +++ b/cmd/beekeeper/cmd/restart.go @@ -12,7 +12,6 @@ import ( func (c *command) initRestartCmd() (err error) { const ( - optionNameClusterName = "cluster-name" optionNameLabelSelector = "label-selector" optionNameNamespace = "namespace" optionNameImage = "image" @@ -43,7 +42,7 @@ func (c *command) initRestartCmd() (err error) { return fmt.Errorf("cluster config %s not defined", clusterName) } - cluster, err := c.setupCluster(ctx, clusterName, c.config, false) + cluster, err := c.setupCluster(ctx, clusterName, false) if err != nil { return fmt.Errorf("setting up cluster %s: %w", clusterName, err) } diff --git a/cmd/beekeeper/cmd/simulate.go b/cmd/beekeeper/cmd/simulate.go index 96afba68..bf238f55 100644 --- a/cmd/beekeeper/cmd/simulate.go +++ b/cmd/beekeeper/cmd/simulate.go @@ -16,14 +16,12 @@ import ( func (c *command) initSimulateCmd() (err error) { const ( - optionNameClusterName = "cluster-name" optionNameCreateCluster = "create-cluster" optionNameSimulations = "simulations" optionNameMetricsEnabled = "metrics-enabled" optionNameSeed = "seed" optionNameTimeout = "timeout" optionNameMetricsPusherAddress = "metrics-pusher-address" - // TODO: optionNameStages = "stages" ) cmd := &cobra.Command{ @@ -34,18 +32,18 @@ func (c *command) initSimulateCmd() (err error) { ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) defer cancel() + clusterName := c.globalConfig.GetString(optionNameClusterName) + if clusterName == "" { + return errMissingClusterName + } + // set cluster config - cfgCluster, ok := c.config.Clusters[c.globalConfig.GetString(optionNameClusterName)] + cfgCluster, ok := c.config.Clusters[clusterName] if !ok { - return fmt.Errorf("cluster %s not defined", c.globalConfig.GetString(optionNameClusterName)) + return fmt.Errorf("cluster %s not defined", clusterName) } - // setup cluster - cluster, err := c.setupCluster(ctx, - c.globalConfig.GetString(optionNameClusterName), - c.config, - c.globalConfig.GetBool(optionNameCreateCluster), - ) + cluster, err := c.setupCluster(ctx, clusterName, c.globalConfig.GetBool(optionNameCreateCluster)) if err != nil { return fmt.Errorf("cluster setup: %w", err) } diff --git a/cmd/beekeeper/cmd/stamper.go b/cmd/beekeeper/cmd/stamper.go new file mode 100644 index 00000000..5d068c8f --- /dev/null +++ b/cmd/beekeeper/cmd/stamper.go @@ -0,0 +1,248 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/stamper" + "github.com/spf13/cobra" +) + +const ( + optionNamePeriodicCheck string = "periodic-check" + optionNameNamespace string = "namespace" + optionNameLabelSelector string = "label-selector" +) + +func (c *command) initStamperCmd() (err error) { + cmd := &cobra.Command{ + Use: "stamper", + Short: "Manage postage batches for nodes", + Long: `Use the stamper command to manage postage batches for nodes. Topup, dilution and creation of postage batches are supported.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return cmd.Help() + }, + } + + cmd.AddCommand(initStamperDefaultFlags(c.initStamperTopup())) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperDilute())) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperCreate())) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperSet())) + + c.root.AddCommand(cmd) + + return nil +} + +func initStamperDefaultFlags(cmd *cobra.Command) *cobra.Command { + cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name).") + cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name.") + cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources (use empty string for all).") + cmd.Flags().Duration(optionNameTimeout, 5*time.Minute, "Operation timeout (e.g., 5s, 10m, 1.5h).") + return cmd +} + +func (c *command) initStamperTopup() *cobra.Command { + const ( + optionNameTTLThreshold = "ttl-threshold" + optionNameTopUpTo = "topup-to" + optionNameGethUrl = "geth-url" + optionNameBatchIDs = "batch-ids" + ) + + cmd := &cobra.Command{ + Use: "topup", + Short: "Top up the TTL of postage batches", + Long: `Top up the TTL of postage batches.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return c.withTimeoutHandler(cmd, func(ctx context.Context) error { + stamperClient, err := c.createStamperClient(ctx) + if err != nil { + return fmt.Errorf("failed to create stamper client: %w", err) + } + + return c.executePeriodically(ctx, func(ctx context.Context) error { + return stamperClient.Topup(ctx, + c.globalConfig.GetDuration(optionNameTTLThreshold), + c.globalConfig.GetDuration(optionNameTopUpTo), + stamper.WithBatchIDs(c.globalConfig.GetStringSlice(optionNameBatchIDs)), + ) + }) + }) + }, + PreRunE: c.preRunE, + } + + cmd.Flags().Duration(optionNameTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.") + cmd.Flags().Duration(optionNameTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.") + cmd.Flags().StringSlice(optionNameBatchIDs, nil, "Comma separated list of postage batch IDs to top up. If not provided, all batches are topped up.") + cmd.Flags().String(optionNameGethUrl, "", "Geth URL for chain state retrieval.") + cmd.Flags().Duration(optionNamePeriodicCheck, 0, "Periodic check interval. Default is 0, which means no periodic check.") + + return cmd +} + +func (c *command) initStamperDilute() *cobra.Command { + const ( + optionNameUsageThreshold = "usage-threshold" + optionNameDiutionDepth = "dilution-depth" + optionNameBatchIDs = "batch-ids" + ) + + cmd := &cobra.Command{ + Use: "dilute", + Short: "Dilute postage batches", + Long: `Dilute postage batches.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return c.withTimeoutHandler(cmd, func(ctx context.Context) error { + stamperClient, err := c.createStamperClient(ctx) + if err != nil { + return fmt.Errorf("failed to create stamper client: %w", err) + } + + return c.executePeriodically(ctx, func(ctx context.Context) error { + return stamperClient.Dilute(ctx, + c.globalConfig.GetFloat64(optionNameUsageThreshold), + c.globalConfig.GetUint16(optionNameDiutionDepth), + stamper.WithBatchIDs(c.globalConfig.GetStringSlice(optionNameBatchIDs)), + ) + }) + }) + }, + PreRunE: c.preRunE, + } + + cmd.Flags().Float64(optionNameUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.") + cmd.Flags().Uint8(optionNameDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.") + cmd.Flags().StringSlice(optionNameBatchIDs, nil, "Comma separated list of postage batch IDs to dilute. If not provided, all batches are diluted.") + cmd.Flags().Duration(optionNamePeriodicCheck, 0, "Periodic check interval. Default is 0, which means no periodic check.") + + return cmd +} + +func (c *command) initStamperCreate() *cobra.Command { + const ( + optionNameAmount = "amount" + optionNameDepth = "depth" + ) + + cmd := &cobra.Command{ + Use: "create", + Short: "Create a postage batch for selected nodes", + Long: `Create a postage batch for selected nodes. Nodes are selected by namespace (use label-selector for filtering) or cluster name.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return c.withTimeoutHandler(cmd, func(ctx context.Context) error { + stamperClient, err := c.createStamperClient(ctx) + if err != nil { + return fmt.Errorf("failed to create stamper client: %w", err) + } + + return stamperClient.Create(ctx, + c.globalConfig.GetUint64(optionNameAmount), + c.globalConfig.GetUint16(optionNameDepth), + ) + }) + }, + PreRunE: c.preRunE, + } + + cmd.Flags().Uint64(optionNameAmount, 100000000, "Amount of BZZ in PLURS added that the postage batch will have.") + cmd.Flags().Uint16(optionNameDepth, 17, "Batch depth which specifies how many chunks can be signed with the batch. It is a logarithm. Must be higher than default bucket depth (16)") + + return cmd +} + +func (c *command) initStamperSet() *cobra.Command { + const ( + optionNameTTLThreshold = "ttl-threshold" + optionNameTopUpTo = "topup-to" + optionNameUsageThreshold = "usage-threshold" + optionNameDiutionDepth = "dilution-depth" + optionNameGethUrl = "geth-url" + optionNameBatchIDs = "batch-ids" + ) + + cmd := &cobra.Command{ + Use: "set", + Short: "Set stamper configuration", + Long: `Set stamper configuration.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return c.withTimeoutHandler(cmd, func(ctx context.Context) error { + stamperClient, err := c.createStamperClient(ctx) + if err != nil { + return fmt.Errorf("failed to create stamper client: %w", err) + } + + return c.executePeriodically(ctx, func(ctx context.Context) error { + return stamperClient.Set(ctx, + c.globalConfig.GetDuration(optionNameTTLThreshold), + c.globalConfig.GetDuration(optionNameTopUpTo), + c.globalConfig.GetFloat64(optionNameUsageThreshold), + c.globalConfig.GetUint16(optionNameDiutionDepth), + stamper.WithBatchIDs(c.globalConfig.GetStringSlice(optionNameBatchIDs)), + ) + }) + }) + }, + PreRunE: c.preRunE, + } + + cmd.Flags().Duration(optionNameTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.") + cmd.Flags().Duration(optionNameTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.") + cmd.Flags().Float64(optionNameUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.") + cmd.Flags().Uint16(optionNameDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.") + cmd.Flags().StringSlice(optionNameBatchIDs, nil, "Comma separated list of postage batch IDs to set. If not provided, all batches are set.") + cmd.Flags().String(optionNameGethUrl, "", "Geth URL for chain state retrieval.") + cmd.Flags().Duration(optionNamePeriodicCheck, 0, "Periodic check interval. Default is 0, which means no periodic check.") + + return cmd +} + +func (c *command) createStamperClient(ctx context.Context) (*stamper.Client, error) { + namespace := c.globalConfig.GetString(optionNameNamespace) + clusterName := c.globalConfig.GetString(optionNameClusterName) + + if clusterName == "" && namespace == "" { + return nil, errors.New("either cluster name or namespace must be provided") + } + + var beeClients map[string]*bee.Client + + if clusterName != "" { + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return nil, fmt.Errorf("setting up cluster %s: %w", clusterName, err) + } + + beeClients, err = cluster.NodesClients(ctx) + if err != nil { + return nil, fmt.Errorf("failed to retrieve node clients: %w", err) + } + } + + return stamper.New(&stamper.ClientConfig{ + Log: c.log, + Namespace: namespace, + K8sClient: c.k8sClient, + BeeClients: beeClients, + SwapClient: c.swapClient, + LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), + InCluster: c.globalConfig.GetBool(optionNameInCluster), + }), nil +} + +func (c *command) withTimeoutHandler(cmd *cobra.Command, f func(ctx context.Context) error) error { + timeout := c.globalConfig.GetDuration(optionNameTimeout) + ctx := cmd.Context() + var cancel context.CancelFunc + + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + return f(ctx) +} diff --git a/go.mod b/go.mod index c87b7ebd..57b8b925 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/prometheus/client_golang v1.19.0 github.com/prometheus/common v0.50.0 github.com/sirupsen/logrus v1.9.3 - github.com/spf13/cobra v1.8.0 - github.com/spf13/viper v1.18.2 + github.com/spf13/cobra v1.8.1 + github.com/spf13/viper v1.19.0 github.com/uber/jaeger-client-go v2.30.0+incompatible golang.org/x/crypto v0.24.0 golang.org/x/sync v0.7.0 @@ -27,6 +27,7 @@ require ( k8s.io/api v0.30.3 k8s.io/apimachinery v0.30.3 k8s.io/client-go v0.30.3 + resenje.org/x v0.6.0 ) require ( @@ -65,7 +66,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect @@ -99,7 +100,7 @@ require ( github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.6.0 // indirect @@ -133,7 +134,7 @@ require ( golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect - google.golang.org/appengine v1.6.7 // indirect + google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 32b0208b..b7790119 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/Yj github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= -github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= -github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233 h1:d28BXYi+wUpz1KBmiF9bWrjEMacUEREV6MBi2ODnrfQ= github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233/go.mod h1:geZJZH3SzKCqnz5VT0q/DyIG/tvu/dZk+VIfXicupJs= github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= @@ -137,13 +137,15 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -157,8 +159,9 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= @@ -276,8 +279,8 @@ github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE= github.com/onsi/gomega v1.31.0/go.mod h1:DW9aCi7U6Yi40wNVAvT6kzFnEVEI5n3DloYBiKiT6zk= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= -github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= -github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -322,18 +325,19 @@ github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= -github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= -github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= -github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= +github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= +github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobtDnDzA= github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -342,8 +346,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= @@ -401,7 +406,6 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -453,10 +457,10 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -476,8 +480,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -515,6 +521,8 @@ k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= +resenje.org/x v0.6.0 h1:afn9E4XhglF4y9Kq0VH5tdSyjnsVKxiYgB6HFj7ebss= +resenje.org/x v0.6.0/go.mod h1:qgwe4MCzh57EkkMDurg24ug7HHfZtAjtBkmCihNmOpM= rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU= rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/pkg/bee/api/postage.go b/pkg/bee/api/postage.go index 7b6dd65a..eb3f100e 100644 --- a/pkg/bee/api/postage.go +++ b/pkg/bee/api/postage.go @@ -105,3 +105,25 @@ func (p *PostageService) ReserveState(ctx context.Context) (ReserveState, error) err := p.client.request(ctx, http.MethodGet, "/reservestate", nil, &resp) return resp, err } + +type ChainStateResponse struct { + ChainTip uint64 `json:"chainTip"` // ChainTip (block height). + Block uint64 `json:"block"` // The block number of the last postage event. + TotalAmount *bigint.BigInt `json:"totalAmount"` // Cumulative amount paid per stamp. //*big.Int + CurrentPrice *bigint.BigInt `json:"currentPrice"` // Bzz/chunk/block normalised price. //*big.Int +} + +// GetChainState returns the chain state of the node +func (p *PostageService) GetChainState(ctx context.Context) (ChainStateResponse, error) { + var resp ChainStateResponse + err := p.client.request(ctx, http.MethodGet, "/chainstate", nil, &resp) + if err != nil { + return ChainStateResponse{}, err + } + return resp, nil +} + +func (batch *PostageStampResponse) BatchUsage() float64 { + maxUtilization := 1 << (batch.Depth - batch.BucketDepth) // 2^(depth - bucketDepth) + return (float64(batch.Utilization) / float64(maxUtilization)) * 100 // batch utilization between 0 and 100 percent +} diff --git a/pkg/bee/client.go b/pkg/bee/client.go index f8eff26f..ea1d81fd 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -83,6 +83,10 @@ func (c *Client) Config() ClientOptions { return c.opts } +func (c *Client) API() *api.Client { + return c.api +} + // Addresses returns node's addresses func (c *Client) Addresses(ctx context.Context) (resp Addresses, err error) { a, err := c.api.Node.Addresses(ctx) diff --git a/pkg/check/settlements/settlements.go b/pkg/check/settlements/settlements.go index 98fd55c4..cb87d661 100644 --- a/pkg/check/settlements/settlements.go +++ b/pkg/check/settlements/settlements.go @@ -27,7 +27,7 @@ type Options struct { PostageDepth uint64 PostageLabel string Seed int64 - Threshold int64 // balances treshold + Threshold int64 // balances threshold UploadNodeCount int WaitAfterUpload time.Duration // seconds to wait before downloading a file WaitBeforeDownload time.Duration // seconds to wait before downloading a file diff --git a/pkg/config/node_funder.go b/pkg/config/node_funder.go deleted file mode 100644 index 5fd1ba75..00000000 --- a/pkg/config/node_funder.go +++ /dev/null @@ -1,14 +0,0 @@ -package config - -type NodeFunder struct { - Namespace string - Addresses []string - ChainNodeEndpoint string - WalletKey string // Hex encoded key - MinAmounts MinAmounts -} - -type MinAmounts struct { - NativeCoin float64 // on mainnet this is xDAI - SwarmToken float64 // on mainnet this is xBZZ -} diff --git a/pkg/funder/operator/operator.go b/pkg/funder/operator/operator.go index cb17cd31..d7058e1e 100644 --- a/pkg/funder/operator/operator.go +++ b/pkg/funder/operator/operator.go @@ -6,13 +6,13 @@ import ( "fmt" "io" "net/http" - "net/url" + "time" "github.com/ethersphere/beekeeper/pkg/bee" - "github.com/ethersphere/beekeeper/pkg/config" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/node-funder/pkg/funder" + v1 "k8s.io/api/core/v1" ) type ClientConfig struct { @@ -20,15 +20,16 @@ type ClientConfig struct { Namespace string WalletKey string ChainNodeEndpoint string - MinAmounts config.MinAmounts + NativeToken float64 + SwarmToken float64 K8sClient *k8s.Client - HTTPClient *http.Client // injected HTTP client + HTTPClient *http.Client LabelSelector string } type Client struct { *ClientConfig - httpClient http.Client + httpClient *http.Client } func NewClient(cfg *ClientConfig) *Client { @@ -47,44 +48,66 @@ func NewClient(cfg *ClientConfig) *Client { } return &Client{ - httpClient: *httpClient, + httpClient: httpClient, ClientConfig: cfg, } } func (c *Client) Run(ctx context.Context) error { - c.Log.Infof("operator started") - defer c.Log.Infof("operator done") + c.Log.Infof("operator started for namespace %s", c.Namespace) + defer c.Log.Info("operator done") - newPodIps := make(chan string) + newPods := make(chan *v1.Pod) go func() { for { select { case <-ctx.Done(): c.Log.Error("operator context canceled") return - case podIp, ok := <-newPodIps: + case pod, ok := <-newPods: if !ok { c.Log.Error("operator channel closed") return } - c.Log.Debugf("operator received pod ip: %s", podIp) - addresses, err := c.getAddresses(ctx, podIp) + c.Log.Debugf("operator received pod with ip: %s", pod.Status.PodIP) + + nodeInfo, _, err := c.K8sClient.Service.FindNode(ctx, c.Namespace, pod) if err != nil { - c.Log.Errorf("process pod ip: %v", err) + c.Log.Errorf("find service for pod: %v", err) continue } - c.Log.Infof("ethereum address: %s", addresses.Ethereum) + var addresses bee.Addresses + + maxRetries := 5 + for i := 0; i < maxRetries; i++ { + addresses, err = c.getAddresses(ctx, nodeInfo.Endpoint) + if err != nil { + c.Log.Errorf("get addresses (attempt %d/%d): %v", i+1, maxRetries, err) + if i < maxRetries-1 { // Wait before retrying, except on the last attempt + time.Sleep(1 * time.Second) + } + continue + } + + c.Log.Tracef("Successfully fetched addresses on attempt %d/%d", i+1, maxRetries) + break + } + + if err != nil { + c.Log.Errorf("Failed to fetch addresses after %d attempts: %v", maxRetries, err) + } + + c.Log.Infof("node '%s' ethereum address: %s", nodeInfo.Name, addresses.Ethereum) err = funder.Fund(ctx, funder.Config{ Addresses: []string{addresses.Ethereum}, ChainNodeEndpoint: c.ChainNodeEndpoint, WalletKey: c.WalletKey, MinAmounts: funder.MinAmounts{ - NativeCoin: c.MinAmounts.NativeCoin, - SwarmToken: c.MinAmounts.SwarmToken, + NativeCoin: c.NativeToken, + SwarmToken: c.SwarmToken, }, }, nil, nil, funder.WithLoggerOption(c.Log)) if err != nil { @@ -94,23 +117,17 @@ func (c *Client) Run(ctx context.Context) error { } }() - if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPodIps); err != nil { + if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPods); err != nil { return fmt.Errorf("events watch: %w", err) } return nil } -// getAddresses sends a request to the pod IP and retrieves the Addresses struct, +// getAddresses sends a request to the node to get the addresses of the node, // which includes overlay, underlay addresses, Ethereum address, and public keys. -func (c *Client) getAddresses(ctx context.Context, podIp string) (bee.Addresses, error) { - url := &url.URL{ - Scheme: "http", - Host: podIp + ":1633", // it is possible to extract port from service - Path: "/addresses", - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) +func (c *Client) getAddresses(ctx context.Context, endpoint string) (bee.Addresses, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/addresses", endpoint), nil) if err != nil { return bee.Addresses{}, fmt.Errorf("new request: %s", err.Error()) } diff --git a/pkg/k8s/pod/client.go b/pkg/k8s/pod/client.go index 6d86153e..cf5b9c8d 100644 --- a/pkg/k8s/pod/client.go +++ b/pkg/k8s/pod/client.go @@ -57,7 +57,7 @@ func (c *Client) Set(ctx context.Context, name, namespace string, o Options) (po } } - return + return pod, nil } // Delete deletes Pod @@ -100,14 +100,15 @@ func (c *Client) DeletePods(ctx context.Context, namespace, labelSelector string if len(deletionErrors) > 0 { return deletedCount, fmt.Errorf("some pods failed to delete: %v", deletionErrors) } + return deletedCount, nil } -// WatchNewRunning detects new running Pods in the namespace and sends their IPs to the channel. -func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPodIps chan string) (err error) { +// WatchNewRunning detects new running Pods in the namespace and sends them to the channel. +func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPods chan *v1.Pod) error { c.log.Debugf("starting events watch in namespace %s, label selector %s", namespace, labelSelector) - defer c.log.Infof("events watch done") - defer close(newPodIps) + defer c.log.Debug("events watch done") + defer close(newPods) watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ LabelSelector: labelSelector, @@ -125,13 +126,19 @@ func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector s if !ok { return fmt.Errorf("watch channel closed") } + switch event.Type { // case watch.Added: // already running pods case watch.Modified: pod, ok := event.Object.(*v1.Pod) if ok { if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil && pod.Status.Phase == v1.PodRunning { - newPodIps <- pod.Status.PodIP + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue { + newPods <- pod + break + } + } } } } diff --git a/pkg/k8s/service/client.go b/pkg/k8s/service/client.go index 2e5f96d6..dc842432 100644 --- a/pkg/k8s/service/client.go +++ b/pkg/k8s/service/client.go @@ -103,3 +103,34 @@ func (c *Client) GetNodes(ctx context.Context, namespace, labelSelector string) return } + +func (c *Client) FindNode(ctx context.Context, namespace string, pod *v1.Pod) (*NodeInfo, *v1.Service, error) { + services, err := c.clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, nil, fmt.Errorf("listing services in namespace %s: %w", namespace, err) + } + + for _, svc := range services.Items { + if selector := svc.Spec.Selector; selector != nil { + matches := true + for key, value := range selector { + if pod.Labels[key] != value { + matches = false + break + } + } + if matches { + for _, port := range svc.Spec.Ports { + if port.Name == "api" { + return &NodeInfo{ + Name: svc.Name, + Endpoint: fmt.Sprintf("http://%s:%v", svc.Spec.ClusterIP, port.Port), + }, &svc, nil + } + } + } + } + } + + return nil, nil, fmt.Errorf("no matching service found for pod %s", pod.Name) +} diff --git a/pkg/orchestration/k8s/nodegroup.go b/pkg/orchestration/k8s/nodegroup.go index 0d4d84f9..f68f960f 100644 --- a/pkg/orchestration/k8s/nodegroup.go +++ b/pkg/orchestration/k8s/nodegroup.go @@ -74,8 +74,7 @@ func (g *NodeGroup) AddNode(ctx context.Context, name string, inCluster bool, o } for _, opt := range opts { - err := opt(&beeClientOpts) - if err != nil { + if err := opt(&beeClientOpts); err != nil { return fmt.Errorf("bee client option: %w", err) } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 00000000..0dd1ded1 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,58 @@ +package scheduler + +import ( + "context" + "time" + + "github.com/ethersphere/beekeeper/pkg/logging" + "resenje.org/x/shutdown" +) + +type PeriodicExecutor struct { + ticker *time.Ticker + interval time.Duration + log logging.Logger + shutdown *shutdown.Graceful +} + +func NewPeriodicExecutor(interval time.Duration, log logging.Logger) *PeriodicExecutor { + return &PeriodicExecutor{ + ticker: time.NewTicker(interval), + interval: interval, + log: log, + shutdown: shutdown.NewGraceful(), + } +} + +func (pe *PeriodicExecutor) Start(ctx context.Context, task func(ctx context.Context) error) { + pe.shutdown.Add(1) + go func() { + defer pe.shutdown.Done() + ctx = pe.shutdown.Context(ctx) + + if err := task(ctx); err != nil { + pe.log.Errorf("Task execution failed: %v", err) + } + + for { + select { + case <-pe.ticker.C: + pe.log.Debugf("Executing task after %s interval", pe.interval) + if err := task(ctx); err != nil { + pe.log.Errorf("Task execution failed: %v", err) + } + case <-pe.shutdown.Quit(): + return + case <-ctx.Done(): + return + } + } + }() +} + +func (pe *PeriodicExecutor) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + pe.ticker.Stop() + return pe.shutdown.Shutdown(ctx) +} diff --git a/pkg/stamper/node.go b/pkg/stamper/node.go new file mode 100644 index 00000000..9ce2e4cd --- /dev/null +++ b/pkg/stamper/node.go @@ -0,0 +1,187 @@ +package stamper + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/logging" +) + +type node struct { + client *api.Client + name string + log logging.Logger +} + +func newNodeInfo(client *api.Client, name string, log logging.Logger) *node { + return &node{ + client: client, + name: name, + log: log, + } +} + +func (n *node) Create(ctx context.Context, amount uint64, depth uint16) error { + batchID, err := n.client.Postage.CreatePostageBatch(ctx, int64(amount), uint64(depth), "beekeeper") + if err != nil { + return fmt.Errorf("node %s: create postage batch: %w", n.name, err) + } + + n.log.Infof("node %s: created postage batch %s", n.name, batchID) + + return nil +} + +func (n *node) Dilute(ctx context.Context, threshold float64, depthIncrement uint16, batchIds []string) error { + batches, _, err := n.getPostageBatches(ctx, false) + if err != nil { + return err + } + + for _, batch := range batches { + if !isValidBatch(&batch, batchIds) { + continue + } + + if batch.BatchUsage() >= threshold { + return n.handleDilution(ctx, batch, depthIncrement) + } + } + + return nil +} + +// Set performs Topup and Dilute operations on postage batches. +// Topup is handled first based on the original depth, followed by Dilute +// which considers the new depth and utilization threshold. +func (n *node) Set( + ctx context.Context, + ttlThreshold time.Duration, + topUpFinalTTL time.Duration, + utilizationThreshold float64, + extraDepth uint16, + secondsPerBlock int64, + batchIds []string, +) error { + batches, price, err := n.getPostageBatches(ctx, true) + if err != nil { + return err + } + + for _, batch := range batches { + if !isValidBatch(&batch, batchIds) { + continue + } + + batchTTL := time.Duration(batch.BatchTTL) * time.Second + + needsDilution := batch.BatchUsage() >= utilizationThreshold + + if needsDilution { + batchTTL = batchTTL / (1 << extraDepth) // reduce batch TTL by 2^extraDepth + } + + if batchTTL > ttlThreshold && !needsDilution { + continue + } + + if err := n.handleTopup(ctx, batch, ttlThreshold, topUpFinalTTL, batchTTL, secondsPerBlock, price); err != nil { + return err + } + + if needsDilution { + return n.handleDilution(ctx, batch, extraDepth) + } + } + + return nil +} + +func (n *node) Topup(ctx context.Context, ttlThreshold time.Duration, topUpFinalTTL time.Duration, secondsPerBlock int64, batchIds []string) error { + batches, price, err := n.getPostageBatches(ctx, true) + if err != nil { + return err + } + + for _, batch := range batches { + if !isValidBatch(&batch, batchIds) { + continue + } + + batchTTL := time.Duration(batch.BatchTTL) * time.Second + + return n.handleTopup(ctx, batch, ttlThreshold, topUpFinalTTL, batchTTL, secondsPerBlock, price) + } + + return nil +} + +func (n *node) handleDilution(ctx context.Context, batch api.PostageStampResponse, extraDepth uint16) error { + newDepth := uint16(batch.Depth) + extraDepth + + n.log.Tracef("node %s: batch %s: usage %.2f%%, diluting to depth %d", n.name, batch.BatchID, batch.BatchUsage(), newDepth) + + if err := n.client.Postage.DilutePostageBatch(ctx, batch.BatchID, uint64(newDepth), ""); err != nil { + return fmt.Errorf("node %s: dilute batch %s: %w", n.name, batch.BatchID, err) + } + + n.log.Infof("node %s: diluted batch %s to depth %d", n.name, batch.BatchID, newDepth) + + return nil +} + +func (n *node) handleTopup(ctx context.Context, batch api.PostageStampResponse, ttlThreshold, topUpFinalTTL, batchTTL time.Duration, secondsPerBlock, price int64) error { + if batchTTL <= ttlThreshold { + topUpTTL := topUpFinalTTL - batchTTL + if topUpTTL > 0 { + amount := (int64(topUpTTL.Seconds()) / secondsPerBlock) * price + + n.log.Tracef("node %s: batch %s: required duration %d, amount %d", n.name, batch.BatchID, topUpTTL, amount) + + if err := n.client.Postage.TopUpPostageBatch(ctx, batch.BatchID, amount, ""); err != nil { + return fmt.Errorf("node %s: top-up batch %s: %w", n.name, batch.BatchID, err) + } + + n.log.Infof("node %s: topped up batch %s with amount %d", n.name, batch.BatchID, amount) + } + } + + return nil +} + +func (n *node) getPostageBatches(ctx context.Context, needPrice bool) (batches []api.PostageStampResponse, price int64, err error) { + if needPrice { + chainState, err := n.client.Postage.GetChainState(ctx) + if err != nil { + return nil, 0, fmt.Errorf("node %s: get chain state: %w", n.name, err) + } + + price = chainState.CurrentPrice.Int64() + if price <= 0 { + return nil, 0, fmt.Errorf("node %s: invalid chain price: %d", n.name, price) + } + } + + batches, err = n.client.Postage.PostageBatches(ctx) + if err != nil { + return nil, 0, fmt.Errorf("node %s: get postage batches: %w", n.name, err) + } + + return batches, price, nil +} + +// isValidBatch checks if a batch should be processed +func isValidBatch(batch *api.PostageStampResponse, batchIDs []string) bool { + if !batch.Usable || batch.Utilization == 0 || batch.BatchTTL <= 0 { + return false + } + + if len(batchIDs) > 0 && !slices.Contains(batchIDs, batch.BatchID) { + return false + } + + return true +} diff --git a/pkg/stamper/stamper.go b/pkg/stamper/stamper.go new file mode 100644 index 00000000..f3184ee1 --- /dev/null +++ b/pkg/stamper/stamper.go @@ -0,0 +1,257 @@ +package stamper + +import ( + "context" + "fmt" + "io" + "net/url" + "time" + + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/k8s" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/swap" +) + +type Option func(*options) + +type options struct { + batchIds []string +} + +func WithBatchIDs(batchIds []string) Option { + return func(o *options) { + o.batchIds = batchIds + } +} + +type ClientConfig struct { + Log logging.Logger + Namespace string + K8sClient *k8s.Client + SwapClient swap.Client + BeeClients map[string]*bee.Client + LabelSelector string + InCluster bool +} + +type Client struct { + log logging.Logger + namespace string + k8sClient *k8s.Client + swapClient swap.Client + beeClients map[string]*bee.Client + labelSelector string + inCluster bool +} + +func New(cfg *ClientConfig) *Client { + if cfg == nil { + return nil + } + + if cfg.Log == nil { + cfg.Log = logging.New(io.Discard, 0) + } + + return &Client{ + log: cfg.Log, + namespace: cfg.Namespace, + k8sClient: cfg.K8sClient, + swapClient: cfg.SwapClient, + beeClients: cfg.BeeClients, + labelSelector: cfg.LabelSelector, + inCluster: cfg.InCluster, + } +} + +// Create creates a postage batch. +func (s *Client) Create(ctx context.Context, amount uint64, depth uint16) error { + s.log.WithFields(map[string]interface{}{ + "amount": amount, + "depth": depth, + }).Infof("creating postage batch on nodes") + + nodes, err := s.getNodes(ctx) + if err != nil { + return fmt.Errorf("get nodes: %w", err) + } + + for _, node := range nodes { + if err := node.Create(ctx, amount, depth); err != nil { + s.log.Errorf("node %s create postage batch: %v", node.name, err) + } + } + + return nil +} + +// Dilute dilutes a postage batch. +func (s *Client) Dilute(ctx context.Context, usageThreshold float64, dilutionDepth uint16, opts ...Option) error { + s.log.WithFields(map[string]interface{}{ + "usageThreshold": usageThreshold, + "dilutionDepth": dilutionDepth, + }).Infof("diluting postage batch on nodes") + + nodes, err := s.getNodes(ctx) + if err != nil { + return fmt.Errorf("get nodes: %w", err) + } + + for _, node := range nodes { + if err := node.Dilute(ctx, usageThreshold, dilutionDepth, processOptions(opts...).batchIds); err != nil { + s.log.Errorf("node %s dilute postage batch: %v", node.name, err) + } + } + + return nil +} + +// Set sets the topup and dilution parameters. +func (s *Client) Set(ctx context.Context, ttlThreshold time.Duration, topupTo time.Duration, usageThreshold float64, dilutionDepth uint16, opts ...Option) error { + s.log.WithFields(map[string]interface{}{ + "ttlThreshold": ttlThreshold, + "topupTo": topupTo, + "usageThreshold": usageThreshold, + "dilutionDepth": dilutionDepth, + }).Infof("setting topup and dilution on postage batch on nodes") + + nodes, err := s.getNodes(ctx) + if err != nil { + return fmt.Errorf("get nodes: %w", err) + } + + blockTime, err := s.swapClient.FetchBlockTime(ctx) + if err != nil { + return fmt.Errorf("fetching block time: %w", err) + } + + for _, node := range nodes { + if err := node.Set(ctx, ttlThreshold, topupTo, usageThreshold, dilutionDepth, blockTime, processOptions(opts...).batchIds); err != nil { + s.log.Errorf("node %s set postage batch: %v", node.name, err) + } + } + + return nil +} + +// Topup tops up a postage batch. +func (s *Client) Topup(ctx context.Context, ttlThreshold time.Duration, topupTo time.Duration, opts ...Option) (err error) { + s.log.WithFields(map[string]interface{}{ + "ttlThreshold": ttlThreshold, + "topupTo": topupTo, + }).Infof("topup postage batch on nodes") + + nodes, err := s.getNodes(ctx) + if err != nil { + return fmt.Errorf("get nodes: %w", err) + } + + blockTime, err := s.swapClient.FetchBlockTime(ctx) + if err != nil { + return fmt.Errorf("fetching block time: %w", err) + } + + for _, node := range nodes { + if err := node.Topup(ctx, ttlThreshold, topupTo, blockTime, processOptions(opts...).batchIds); err != nil { + s.log.Errorf("node %s topup postage batch: %v", node.name, err) + } + } + + return nil +} + +func (sc *Client) getNodes(ctx context.Context) (nodes []node, err error) { + if sc.namespace != "" { + return sc.getNamespaceNodes(ctx) + } + + if sc.beeClients == nil { + return nil, fmt.Errorf("bee clients not provided") + } + + nodes = make([]node, 0, len(sc.beeClients)) + for nodeName, beeClient := range sc.beeClients { + nodes = append(nodes, *newNodeInfo(beeClient.API(), nodeName, sc.log)) + } + + return nodes, nil +} + +func (sc *Client) getNamespaceNodes(ctx context.Context) (nodes []node, err error) { + if sc.namespace == "" { + return nil, fmt.Errorf("namespace not provided") + } + + if sc.k8sClient == nil { + return nil, fmt.Errorf("k8s client not provided") + } + + if sc.inCluster { + nodes, err = sc.getServiceNodes(ctx) + } else { + nodes, err = sc.getIngressNodes(ctx) + } + if err != nil { + return nil, fmt.Errorf("get nodes: %w", err) + } + + return nodes, nil +} + +func (sc *Client) getServiceNodes(ctx context.Context) ([]node, error) { + svcNodes, err := sc.k8sClient.Service.GetNodes(ctx, sc.namespace, sc.labelSelector) + if err != nil { + return nil, fmt.Errorf("list api services: %w", err) + } + + nodes := make([]node, len(svcNodes)) + for i, node := range svcNodes { + parsedURL, err := url.Parse(node.Endpoint) + if err != nil { + return nil, fmt.Errorf("extract base URL: %w", err) + } + + apiClient := api.NewClient(parsedURL, nil) + + nodes[i] = *newNodeInfo(apiClient, node.Name, sc.log) + } + + return nodes, nil +} + +func (sc *Client) getIngressNodes(ctx context.Context) ([]node, error) { + ingressNodes, err := sc.k8sClient.Ingress.GetNodes(ctx, sc.namespace, sc.labelSelector) + if err != nil { + return nil, fmt.Errorf("list ingress api nodes hosts: %w", err) + } + + ingressRouteNodes, err := sc.k8sClient.IngressRoute.GetNodes(ctx, sc.namespace, sc.labelSelector) + if err != nil { + return nil, fmt.Errorf("list ingress route api nodes hosts: %w", err) + } + + allNodes := append(ingressNodes, ingressRouteNodes...) + nodes := make([]node, len(allNodes)) + for i, node := range allNodes { + parsedURL, err := url.Parse(fmt.Sprintf("http://%s", node.Host)) + if err != nil { + return nil, fmt.Errorf("extract base URL: %w", err) + } + + apiClient := api.NewClient(parsedURL, nil) + + nodes[i] = *newNodeInfo(apiClient, node.Name, sc.log) + } + + return nodes, nil +} + +func processOptions(opts ...Option) *options { + o := &options{} + for _, opt := range opts { + opt(o) + } + return o +} diff --git a/pkg/swap/block.go b/pkg/swap/block.go new file mode 100644 index 00000000..d90d3f2d --- /dev/null +++ b/pkg/swap/block.go @@ -0,0 +1,102 @@ +package swap + +import ( + "context" + "fmt" + "net/http" + "strconv" +) + +func (g *GethClient) FetchBlockTime(ctx context.Context) (int64, error) { + latestBlockNumber, err := g.fetchLatestBlockNumber(ctx) + if err != nil { + return 0, fmt.Errorf("fetch latest block number: %w", err) + } + + timestampLatest, err := g.fetchBlockTimestamp(ctx, latestBlockNumber) + if err != nil { + return 0, fmt.Errorf("fetch latest block timestamp: %w", err) + } + + timestampPrevious, err := g.fetchBlockTimestamp(ctx, latestBlockNumber-1) + if err != nil { + return 0, fmt.Errorf("fetch previous block timestamp: %w", err) + } + + blockTime := timestampLatest - timestampPrevious + + g.logger.Tracef("block time: %d seconds", blockTime) + + return blockTime, nil +} + +type rpcRequest struct { + ID string `json:"id"` + JsonRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params []interface{} `json:"params"` +} + +func (g *GethClient) fetchLatestBlockNumber(ctx context.Context) (int64, error) { + req := rpcRequest{ + JsonRPC: "2.0", + Method: "eth_blockNumber", + ID: "1", + } + + resp := new(struct { + JsonRPC string `json:"jsonrpc"` + Result string `json:"result"` + ID string `json:"id"` + }) + + if err := requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + return 0, fmt.Errorf("request json: %w", err) + } + + if len(resp.Result) == 0 { + return 0, fmt.Errorf("empty result") + } + + if resp.Result[:2] != "0x" { + return 0, fmt.Errorf("invalid result") + } + + blockNumber, err := strconv.ParseInt(resp.Result[2:], 16, 64) + if err != nil { + return 0, fmt.Errorf("parse int: %w", err) + } + + return blockNumber, nil +} + +func (g *GethClient) fetchBlockTimestamp(ctx context.Context, blockNumber int64) (int64, error) { + req := rpcRequest{ + ID: "1", + JsonRPC: "2.0", + Method: "eth_getBlockByNumber", + Params: []interface{}{fmt.Sprintf("0x%x", blockNumber), false}, + } + + resp := new(struct { + ID string `json:"id"` + JsonRPC string `json:"jsonrpc"` + Result struct { + Timestamp string `json:"timestamp"` + } `json:"result"` + }) + + if err := requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + return 0, fmt.Errorf("request json: %w", err) + } + + if len(resp.Result.Timestamp) == 0 { + return 0, fmt.Errorf("empty timestamp") + } + + if resp.Result.Timestamp[:2] != "0x" { + return 0, fmt.Errorf("invalid timestamp") + } + + return strconv.ParseInt(resp.Result.Timestamp[2:], 16, 64) +} diff --git a/pkg/swap/geth.go b/pkg/swap/geth.go index 910b2c01..f262975f 100644 --- a/pkg/swap/geth.go +++ b/pkg/swap/geth.go @@ -6,6 +6,7 @@ import ( "math/big" "net/http" "net/url" + "slices" "strings" "github.com/ethersphere/beekeeper/pkg/logging" @@ -244,14 +245,11 @@ func (g *GethClient) ethAccounts(ctx context.Context) (a []string, err error) { return resp.Result, nil } -// contains checks if list contains string +// contains checks if list contains string and ignores case func contains(list []string, find string) bool { - for _, s := range list { - if strings.EqualFold(s, find) { - return true - } - } - return false + return slices.ContainsFunc(list, func(s string) bool { + return strings.EqualFold(s, find) + }) } // addPrefix adds prefix to string if it doesn't exist diff --git a/pkg/swap/http.go b/pkg/swap/http.go index b830e864..50a76091 100644 --- a/pkg/swap/http.go +++ b/pkg/swap/http.go @@ -78,11 +78,7 @@ func httpClientWithTransport(baseURL *url.URL, c *http.Client) *http.Client { c.Transport = roundTripperFunc(func(r *http.Request) (resp *http.Response, err error) { r.Header.Set("User-Agent", userAgent) - u, err := baseURL.Parse(r.URL.String()) - if err != nil { - return nil, err - } - r.URL = u + r.URL = baseURL return transport.RoundTrip(r) }) return c diff --git a/pkg/swap/notset.go b/pkg/swap/notset.go index 8ce6ae22..91b6fb63 100644 --- a/pkg/swap/notset.go +++ b/pkg/swap/notset.go @@ -31,3 +31,8 @@ func (n *NotSet) SendGBZZ(ctx context.Context, to string, amount float64) (tx st func (n *NotSet) AttestOverlayEthAddress(ctx context.Context, ethAddr string) (tx string, err error) { return "", ErrNotSet } + +// FetchBlockTime(ctx context.Context) (blockTime int64, err error) +func (n *NotSet) FetchBlockTime(ctx context.Context) (blockTime int64, err error) { + return 0, ErrNotSet +} diff --git a/pkg/swap/swap.go b/pkg/swap/swap.go index c09d48c3..2a589ebf 100644 --- a/pkg/swap/swap.go +++ b/pkg/swap/swap.go @@ -20,4 +20,5 @@ type Client interface { SendBZZ(ctx context.Context, to string, amount float64) (tx string, err error) SendGBZZ(ctx context.Context, to string, amount float64) (tx string, err error) AttestOverlayEthAddress(ctx context.Context, ethAddr string) (tx string, err error) + FetchBlockTime(ctx context.Context) (blockTime int64, err error) } diff --git a/version.go b/version.go index 46f14ca8..aaf61852 100644 --- a/version.go +++ b/version.go @@ -1,8 +1,9 @@ package beekeeper var ( - version = "0.23.2" // manually set semantic version number - commit string // automatically set git commit hash + version = "0.25.0" // manually set semantic version number + + commit string // automatically set git commit hash // Version TODO Version = func() string {