Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 203 additions & 86 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/node"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/puller"
Expand All @@ -31,7 +34,6 @@

const (
optionNameValidation = "validate"
optionNameValidationPin = "validate-pin"
optionNameCollectionPin = "pin"
optionNameOutputLocation = "output"
)
Expand Down Expand Up @@ -764,117 +766,232 @@
cmd.AddCommand(c)
}

const (
optionNameForgetOverlay = "forget-overlay"
optionNameForgetStamps = "forget-stamps"
optionNameServiceMode = "service-mode"
optionNameServicePort = "service-port"
)

func dbNukeCmd(cmd *cobra.Command) {
const (
optionNameForgetOverlay = "forget-overlay"
optionNameForgetStamps = "forget-stamps"

localstore = ioutil.DataPathLocalstore

Check failure on line 778 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

const `localstore` is unused (unused)
kademlia = ioutil.DataPathKademlia

Check failure on line 779 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

const `kademlia` is unused (unused)
statestore = "statestore"

Check failure on line 780 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

const `statestore` is unused (unused)
stamperstore = "stamperstore"

Check failure on line 781 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

const `stamperstore` is unused (unused)
)

c := &cobra.Command{
Use: "nuke",
Short: "Nuke the DB so that bee resyncs all data next time it boots up.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
serviceMode, err := cmd.Flags().GetBool(optionNameServiceMode)
if err != nil {
return fmt.Errorf("new logger: %w", err)
return fmt.Errorf("get service mode: %w", err)
}
d, err := cmd.Flags().GetDuration(optionNameSleepAfter)
if err != nil {
logger.Error(err, "getting sleep value failed")
}
defer func() {
if d > 0 {
logger.Info("command has finished, sleeping...", "duration", d.String())
time.Sleep(d)
}
}()

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
if serviceMode {
return runNukeService(cmd)
}

logger.Warning("starting to nuke the DB with data-dir", "path", dataDir)
logger.Warning("this process will erase all persisted chunks in your local storage")
logger.Warning("it will NOT discriminate any pinned content, in case you were wondering")
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
logger.Warning("proceeding with database nuke...")
return runNukeCommand(cmd)
},
}

dirsToNuke := []string{localstore, kademlia}
for _, dir := range dirsToNuke {
err = removeContent(filepath.Join(dataDir, dir))
if err != nil {
return fmt.Errorf("delete %s: %w", dir, err)
}
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "trace", "verbosity level")
c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished")
c.Flags().Bool(optionNameForgetOverlay, false, "forget the overlay and deploy a new chequebook on next boot-up")
c.Flags().Bool(optionNameForgetStamps, false, "forget the existing stamps belonging to the node. even when forgotten, they will show up again after a chain resync")
c.Flags().Bool(optionNameServiceMode, false, "run as a long-running HTTP service instead of one-time command")
c.Flags().String(optionNameServicePort, "8080", "port for the nuke service HTTP server")
cmd.AddCommand(c)
}

forgetOverlay, err := cmd.Flags().GetBool(optionNameForgetOverlay)
if err != nil {
return fmt.Errorf("get forget overlay: %w", err)
}
// runNukeCommand runs the nuke command in one-time mode
func runNukeCommand(cmd *cobra.Command) error {
const (
localstore = ioutil.DataPathLocalstore
kademlia = ioutil.DataPathKademlia
statestore = "statestore"
stamperstore = "stamperstore"
)

if forgetOverlay {
err = removeContent(filepath.Join(dataDir, statestore))
if err != nil {
return fmt.Errorf("remove statestore: %w", err)
}
err = removeContent(filepath.Join(dataDir, stamperstore))
if err != nil {
return fmt.Errorf("remove stamperstore: %w", err)
}
return nil
}
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}
d, err := cmd.Flags().GetDuration(optionNameSleepAfter)
if err != nil {
logger.Error(err, "getting sleep value failed")
}
defer func() {
if d > 0 {
logger.Info("command has finished, sleeping...", "duration", d.String())
time.Sleep(d)
}
}()

logger.Info("nuking statestore...")
dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

forgetStamps, err := cmd.Flags().GetBool(optionNameForgetStamps)
if err != nil {
return fmt.Errorf("get forget stamps: %w", err)
}
logger.Warning("starting to nuke the DB with data-dir", "path", dataDir)
logger.Warning("this process will erase all persisted chunks in your local storage")
logger.Warning("it will NOT discriminate any pinned content, in case you were wondering")
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
logger.Warning("proceeding with database nuke...")

stateStore, _, err := node.InitStateStore(logger, dataDir, 1000)
if err != nil {
return fmt.Errorf("new statestore: %w", err)
}
defer stateStore.Close()
dirsToNuke := []string{localstore, kademlia}
for _, dir := range dirsToNuke {
err = removeContent(filepath.Join(dataDir, dir))
if err != nil {
return fmt.Errorf("delete %s: %w", dir, err)
}
}

stateStoreCleaner, ok := stateStore.(storage.StateStorerCleaner)
if ok {
err = stateStoreCleaner.Nuke()
if err != nil {
return fmt.Errorf("statestore nuke: %w", err)
}
}
forgetOverlay, err := cmd.Flags().GetBool(optionNameForgetOverlay)
if err != nil {
return fmt.Errorf("get forget overlay: %w", err)
}

if forgetStamps {
err = removeContent(filepath.Join(dataDir, stamperstore))
if err != nil {
return fmt.Errorf("remove stamperstore: %w", err)
}
}
if forgetOverlay {
err = removeContent(filepath.Join(dataDir, statestore))
if err != nil {
return fmt.Errorf("remove statestore: %w", err)
}
err = removeContent(filepath.Join(dataDir, stamperstore))
if err != nil {
return fmt.Errorf("remove stamperstore: %w", err)
}
return nil
}

return nil
}}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "trace", "verbosity level")
c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished")
c.Flags().Bool(optionNameForgetOverlay, false, "forget the overlay and deploy a new chequebook on next boot-up")
c.Flags().Bool(optionNameForgetStamps, false, "forget the existing stamps belonging to the node. even when forgotten, they will show up again after a chain resync")
cmd.AddCommand(c)
logger.Info("nuking statestore...")

forgetStamps, err := cmd.Flags().GetBool(optionNameForgetStamps)
if err != nil {
return fmt.Errorf("get forget stamps: %w", err)
}

stateStore, _, err := node.InitStateStore(logger, dataDir, 1000)
if err != nil {
return fmt.Errorf("new statestore: %w", err)
}
defer stateStore.Close()

stateStoreCleaner, ok := stateStore.(storage.StateStorerCleaner)
if ok {
err = stateStoreCleaner.Nuke()
if err != nil {
return fmt.Errorf("statestore nuke: %w", err)
}
}

if forgetStamps {
err = removeContent(filepath.Join(dataDir, stamperstore))
if err != nil {
return fmt.Errorf("remove stamperstore: %w", err)
}
}

return nil
}

// runNukeService runs the nuke command as a long-running HTTP service
func runNukeService(cmd *cobra.Command) error {
port, err := cmd.Flags().GetString(optionNameServicePort)
if err != nil {
return fmt.Errorf("get service port: %w", err)
}

v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

logger.Info("starting nuke service", "port", port)
logger.Info("nuke service is running as an HTTP server")
logger.Info("use POST /nuke to start nuke operation")
logger.Info("use GET /nuke/status to check operation status")
logger.Info("use GET /nuke/readiness for readiness probe")

// Create a simple HTTP server for the nuke service
mux := http.NewServeMux()

// Create nuke service instance
nukeService := api.NewNukeService(logger)

// Add handlers
mux.HandleFunc("/nuke", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

var req api.NukeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}

if err := nukeService.StartNuke(req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("nuke operation started"))

Check failure on line 959 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `w.Write` is not checked (errcheck)
})

mux.HandleFunc("/nuke/status", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

status := nukeService.GetStatus()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)

Check failure on line 970 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `(*encoding/json.Encoder).Encode` is not checked (errcheck)
})

mux.HandleFunc("/nuke/readiness", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

if nukeService.IsReady() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ready"))

Check failure on line 981 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `w.Write` is not checked (errcheck)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("nuke operation in progress"))

Check failure on line 984 in cmd/bee/cmd/db.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `w.Write` is not checked (errcheck)
}
})

server := &http.Server{
Addr: ":" + port,
Handler: mux,
}

logger.Info("nuke service listening", "address", ":"+port)
return server.ListenAndServe()
}

func removeContent(path string) error {
Expand Down
10 changes: 9 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ type Service struct {
redistributionAgent *storageincentives.Agent

statusService *status.Service
isWarmingUp bool
nukeService *NukeService

isWarmingUp bool
}

func (s *Service) SetP2P(p2p p2p.DebugService) {
Expand All @@ -241,6 +243,12 @@ func (s *Service) SetRedistributionAgent(redistributionAgent *storageincentives.
}
}

func (s *Service) SetNukeService(nukeService *NukeService) {
if s != nil {
s.nukeService = nukeService
}
}

type Options struct {
CORSAllowedOrigins []string
WsPingPeriod time.Duration
Expand Down
Loading
Loading