diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 62eee2591c2..2b59f561e61 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -9,15 +9,18 @@ import ( "bytes" "context" "encoding/binary" + "encoding/json" "errors" "fmt" "io" + "net/http" "os" "path" "path/filepath" "strings" "time" + "github.com/ethersphere/bee/v2/pkg/api" "github.com/ethersphere/bee/v2/pkg/node" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/puller" @@ -31,7 +34,6 @@ import ( const ( optionNameValidation = "validate" - optionNameValidationPin = "validate-pin" optionNameCollectionPin = "pin" optionNameOutputLocation = "output" ) @@ -764,11 +766,15 @@ func dbImportPinningCmd(cmd *cobra.Command) { cmd.AddCommand(c) } +const ( + optionNameForgetOverlay = "forget-overlay" + optionNameForgetStamps = "forget-stamps" + optionNameServiceMode = "service-mode" + optionNameServicePort = "service-port" +) + func dbNukeCmd(cmd *cobra.Command) { const ( - optionNameForgetOverlay = "forget-overlay" - optionNameForgetStamps = "forget-stamps" - localstore = ioutil.DataPathLocalstore kademlia = ioutil.DataPathKademlia statestore = "statestore" @@ -779,102 +785,213 @@ func dbNukeCmd(cmd *cobra.Command) { Use: "nuke", Short: "Nuke the DB so that bee resyncs all data next time it boots up.", RunE: func(cmd *cobra.Command, args []string) (err error) { - v, err := cmd.Flags().GetString(optionNameVerbosity) - if err != nil { - return fmt.Errorf("get verbosity: %w", err) - } - v = strings.ToLower(v) - logger, err := newLogger(cmd, v) + serviceMode, err := cmd.Flags().GetBool(optionNameServiceMode) if err != nil { - return fmt.Errorf("new logger: %w", err) + return fmt.Errorf("get service mode: %w", err) } - d, err := cmd.Flags().GetDuration(optionNameSleepAfter) - if err != nil { - logger.Error(err, "getting sleep value failed") - } - defer func() { - if d > 0 { - logger.Info("command has finished, sleeping...", "duration", d.String()) - time.Sleep(d) - } - }() - dataDir, err := cmd.Flags().GetString(optionNameDataDir) - if err != nil { - return fmt.Errorf("get data-dir: %w", err) - } - if dataDir == "" { - return errors.New("no data-dir provided") + if serviceMode { + return runNukeService(cmd) } - logger.Warning("starting to nuke the DB with data-dir", "path", dataDir) - logger.Warning("this process will erase all persisted chunks in your local storage") - logger.Warning("it will NOT discriminate any pinned content, in case you were wondering") - logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...") - time.Sleep(10 * time.Second) - logger.Warning("proceeding with database nuke...") + return runNukeCommand(cmd) + }, + } - dirsToNuke := []string{localstore, kademlia} - for _, dir := range dirsToNuke { - err = removeContent(filepath.Join(dataDir, dir)) - if err != nil { - return fmt.Errorf("delete %s: %w", dir, err) - } - } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "trace", "verbosity level") + c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished") + c.Flags().Bool(optionNameForgetOverlay, false, "forget the overlay and deploy a new chequebook on next boot-up") + c.Flags().Bool(optionNameForgetStamps, false, "forget the existing stamps belonging to the node. even when forgotten, they will show up again after a chain resync") + c.Flags().Bool(optionNameServiceMode, false, "run as a long-running HTTP service instead of one-time command") + c.Flags().String(optionNameServicePort, "8080", "port for the nuke service HTTP server") + cmd.AddCommand(c) +} - forgetOverlay, err := cmd.Flags().GetBool(optionNameForgetOverlay) - if err != nil { - return fmt.Errorf("get forget overlay: %w", err) - } +// runNukeCommand runs the nuke command in one-time mode +func runNukeCommand(cmd *cobra.Command) error { + const ( + localstore = ioutil.DataPathLocalstore + kademlia = ioutil.DataPathKademlia + statestore = "statestore" + stamperstore = "stamperstore" + ) - if forgetOverlay { - err = removeContent(filepath.Join(dataDir, statestore)) - if err != nil { - return fmt.Errorf("remove statestore: %w", err) - } - err = removeContent(filepath.Join(dataDir, stamperstore)) - if err != nil { - return fmt.Errorf("remove stamperstore: %w", err) - } - return nil - } + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + d, err := cmd.Flags().GetDuration(optionNameSleepAfter) + if err != nil { + logger.Error(err, "getting sleep value failed") + } + defer func() { + if d > 0 { + logger.Info("command has finished, sleeping...", "duration", d.String()) + time.Sleep(d) + } + }() - logger.Info("nuking statestore...") + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } - forgetStamps, err := cmd.Flags().GetBool(optionNameForgetStamps) - if err != nil { - return fmt.Errorf("get forget stamps: %w", err) - } + logger.Warning("starting to nuke the DB with data-dir", "path", dataDir) + logger.Warning("this process will erase all persisted chunks in your local storage") + logger.Warning("it will NOT discriminate any pinned content, in case you were wondering") + logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...") + time.Sleep(10 * time.Second) + logger.Warning("proceeding with database nuke...") - stateStore, _, err := node.InitStateStore(logger, dataDir, 1000) - if err != nil { - return fmt.Errorf("new statestore: %w", err) - } - defer stateStore.Close() + dirsToNuke := []string{localstore, kademlia} + for _, dir := range dirsToNuke { + err = removeContent(filepath.Join(dataDir, dir)) + if err != nil { + return fmt.Errorf("delete %s: %w", dir, err) + } + } - stateStoreCleaner, ok := stateStore.(storage.StateStorerCleaner) - if ok { - err = stateStoreCleaner.Nuke() - if err != nil { - return fmt.Errorf("statestore nuke: %w", err) - } - } + forgetOverlay, err := cmd.Flags().GetBool(optionNameForgetOverlay) + if err != nil { + return fmt.Errorf("get forget overlay: %w", err) + } - if forgetStamps { - err = removeContent(filepath.Join(dataDir, stamperstore)) - if err != nil { - return fmt.Errorf("remove stamperstore: %w", err) - } - } + if forgetOverlay { + err = removeContent(filepath.Join(dataDir, statestore)) + if err != nil { + return fmt.Errorf("remove statestore: %w", err) + } + err = removeContent(filepath.Join(dataDir, stamperstore)) + if err != nil { + return fmt.Errorf("remove stamperstore: %w", err) + } + return nil + } - return nil - }} - c.Flags().String(optionNameDataDir, "", "data directory") - c.Flags().String(optionNameVerbosity, "trace", "verbosity level") - c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished") - c.Flags().Bool(optionNameForgetOverlay, false, "forget the overlay and deploy a new chequebook on next boot-up") - c.Flags().Bool(optionNameForgetStamps, false, "forget the existing stamps belonging to the node. even when forgotten, they will show up again after a chain resync") - cmd.AddCommand(c) + logger.Info("nuking statestore...") + + forgetStamps, err := cmd.Flags().GetBool(optionNameForgetStamps) + if err != nil { + return fmt.Errorf("get forget stamps: %w", err) + } + + stateStore, _, err := node.InitStateStore(logger, dataDir, 1000) + if err != nil { + return fmt.Errorf("new statestore: %w", err) + } + defer stateStore.Close() + + stateStoreCleaner, ok := stateStore.(storage.StateStorerCleaner) + if ok { + err = stateStoreCleaner.Nuke() + if err != nil { + return fmt.Errorf("statestore nuke: %w", err) + } + } + + if forgetStamps { + err = removeContent(filepath.Join(dataDir, stamperstore)) + if err != nil { + return fmt.Errorf("remove stamperstore: %w", err) + } + } + + return nil +} + +// runNukeService runs the nuke command as a long-running HTTP service +func runNukeService(cmd *cobra.Command) error { + port, err := cmd.Flags().GetString(optionNameServicePort) + if err != nil { + return fmt.Errorf("get service port: %w", err) + } + + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + logger.Info("starting nuke service", "port", port) + logger.Info("nuke service is running as an HTTP server") + logger.Info("use POST /nuke to start nuke operation") + logger.Info("use GET /nuke/status to check operation status") + logger.Info("use GET /nuke/readiness for readiness probe") + + // Create a simple HTTP server for the nuke service + mux := http.NewServeMux() + + // Create nuke service instance + nukeService := api.NewNukeService(logger) + + // Add handlers + mux.HandleFunc("/nuke", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + var req api.NukeRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + if err := nukeService.StartNuke(req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("nuke operation started")) + }) + + mux.HandleFunc("/nuke/status", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + status := nukeService.GetStatus() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) + }) + + mux.HandleFunc("/nuke/readiness", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + if nukeService.IsReady() { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ready")) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("nuke operation in progress")) + } + }) + + server := &http.Server{ + Addr: ":" + port, + Handler: mux, + } + + logger.Info("nuke service listening", "address", ":"+port) + return server.ListenAndServe() } func removeContent(path string) error { diff --git a/pkg/api/api.go b/pkg/api/api.go index e3419ccdb38..e5f4c4afd0f 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -220,7 +220,9 @@ type Service struct { redistributionAgent *storageincentives.Agent statusService *status.Service - isWarmingUp bool + nukeService *NukeService + + isWarmingUp bool } func (s *Service) SetP2P(p2p p2p.DebugService) { @@ -241,6 +243,12 @@ func (s *Service) SetRedistributionAgent(redistributionAgent *storageincentives. } } +func (s *Service) SetNukeService(nukeService *NukeService) { + if s != nil { + s.nukeService = nukeService + } +} + type Options struct { CORSAllowedOrigins []string WsPingPeriod time.Duration diff --git a/pkg/api/nuke.go b/pkg/api/nuke.go new file mode 100644 index 00000000000..fef7f105d75 --- /dev/null +++ b/pkg/api/nuke.go @@ -0,0 +1,247 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/log" +) + +// NukeService handles the long-running nuke operation +type NukeService struct { + logger log.Logger + mu sync.RWMutex + status NukeStatus + startTime time.Time + endTime *time.Time + error error +} + +// NukeStatus represents the current status of the nuke operation +type NukeStatus string + +const ( + NukeStatusNotStarted NukeStatus = "not_started" + NukeStatusInProgress NukeStatus = "in_progress" + NukeStatusCompleted NukeStatus = "completed" + NukeStatusFailed NukeStatus = "failed" +) + +// NukeStatusResponse represents the response for the status endpoint +type NukeStatusResponse struct { + Status NukeStatus `json:"status"` + StartTime time.Time `json:"start_time,omitempty"` + EndTime *time.Time `json:"end_time,omitempty"` + Error string `json:"error,omitempty"` +} + +// NukeRequest represents the request to start a nuke operation +type NukeRequest struct { + DataDir string `json:"data_dir" validate:"required"` + ForgetOverlay bool `json:"forget_overlay"` + ForgetStamps bool `json:"forget_stamps"` +} + +// NewNukeService creates a new nuke service +func NewNukeService(logger log.Logger) *NukeService { + return &NukeService{ + logger: logger, + status: NukeStatusNotStarted, + } +} + +// StartNuke starts the nuke operation in a goroutine +func (s *NukeService) StartNuke(req NukeRequest) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.status == NukeStatusInProgress { + return fmt.Errorf("nuke operation already in progress") + } + + s.status = NukeStatusInProgress + s.startTime = time.Now() + s.endTime = nil + s.error = nil + + go s.executeNuke(req) + + return nil +} + +// executeNuke performs the actual nuke operation +func (s *NukeService) executeNuke(req NukeRequest) { + defer func() { + s.mu.Lock() + if s.status == NukeStatusInProgress { + if s.error != nil { + s.status = NukeStatusFailed + } else { + s.status = NukeStatusCompleted + } + } + now := time.Now() + s.endTime = &now + s.mu.Unlock() + }() + + s.logger.Warning("starting to nuke the DB with data-dir", "path", req.DataDir) + s.logger.Warning("this process will erase all persisted chunks in your local storage") + s.logger.Warning("it will NOT discriminate any pinned content, in case you were wondering") + + // Give user time to cancel + s.logger.Warning("you have another 10 seconds to change your mind and kill this process...") + time.Sleep(10 * time.Second) + s.logger.Warning("proceeding with database nuke...") + + const ( + localstore = "localstore" + kademlia = "kademlia" + statestore = "statestore" + stamperstore = "stamperstore" + ) + + dirsToNuke := []string{localstore, kademlia} + for _, dir := range dirsToNuke { + err := s.removeContent(filepath.Join(req.DataDir, dir)) + if err != nil { + s.error = fmt.Errorf("delete %s: %w", dir, err) + return + } + } + + if req.ForgetOverlay { + err := s.removeContent(filepath.Join(req.DataDir, statestore)) + if err != nil { + s.error = fmt.Errorf("remove statestore: %w", err) + return + } + err = s.removeContent(filepath.Join(req.DataDir, stamperstore)) + if err != nil { + s.error = fmt.Errorf("remove stamperstore: %w", err) + return + } + return + } + + s.logger.Info("nuking statestore...") + + // For now, we'll just remove the statestore directory content + // The actual statestore nuking will be done when the node restarts + if req.ForgetStamps { + err := s.removeContent(filepath.Join(req.DataDir, stamperstore)) + if err != nil { + s.error = fmt.Errorf("remove stamperstore: %w", err) + return + } + } + + s.logger.Info("nuke operation completed successfully") +} + +// removeContent removes all content from a directory +func (s *NukeService) removeContent(path string) error { + dir, err := os.Open(path) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + defer dir.Close() + + subpaths, err := dir.Readdirnames(0) + if err != nil { + return err + } + + for _, sub := range subpaths { + err = os.RemoveAll(filepath.Join(path, sub)) + if err != nil { + return err + } + } + return nil +} + +// GetStatus returns the current status of the nuke operation +func (s *NukeService) GetStatus() NukeStatusResponse { + s.mu.RLock() + defer s.mu.RUnlock() + + response := NukeStatusResponse{ + Status: s.status, + StartTime: s.startTime, + EndTime: s.endTime, + } + + if s.error != nil { + response.Error = s.error.Error() + } + + return response +} + +// IsReady returns true if the service is ready to accept new nuke requests +func (s *NukeService) IsReady() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.status != NukeStatusInProgress +} + +// nukeHandler handles the POST request to start a nuke operation +func (s *Service) nukeHandler(w http.ResponseWriter, r *http.Request) { + if s.nukeService == nil { + jsonhttp.BadRequest(w, "nuke service not available") + return + } + + var req NukeRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + jsonhttp.BadRequest(w, "invalid request body") + return + } + + if err := s.nukeService.StartNuke(req); err != nil { + jsonhttp.BadRequest(w, err.Error()) + return + } + + jsonhttp.OK(w, "nuke operation started") +} + +// nukeStatusHandler handles the GET request to get nuke operation status +func (s *Service) nukeStatusHandler(w http.ResponseWriter, r *http.Request) { + if s.nukeService == nil { + jsonhttp.BadRequest(w, "nuke service not available") + return + } + + status := s.nukeService.GetStatus() + jsonhttp.OK(w, status) +} + +// nukeReadinessHandler handles the readiness probe for the nuke service +func (s *Service) nukeReadinessHandler(w http.ResponseWriter, r *http.Request) { + if s.nukeService == nil { + jsonhttp.BadRequest(w, "nuke service not available") + return + } + + if s.nukeService.IsReady() { + jsonhttp.OK(w, "ready") + } else { + jsonhttp.ServiceUnavailable(w, "nuke operation in progress") + } +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 13a782bcd77..9f73e408d13 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -176,6 +176,26 @@ func (s *Service) mountTechnicalDebug() { httpaccess.NewHTTPAccessSuppressLogHandler(), web.FinalHandlerFunc(s.healthHandler), )) + + // Nuke service endpoints + s.router.Handle("/nuke", jsonhttp.MethodHandler{ + "POST": web.ChainHandlers( + httpaccess.NewHTTPAccessSuppressLogHandler(), + web.FinalHandlerFunc(s.nukeHandler), + ), + }) + + s.router.Handle("/nuke/status", jsonhttp.MethodHandler{ + "GET": web.ChainHandlers( + httpaccess.NewHTTPAccessSuppressLogHandler(), + web.FinalHandlerFunc(s.nukeStatusHandler), + ), + }) + + s.router.Handle("/nuke/readiness", web.ChainHandlers( + httpaccess.NewHTTPAccessSuppressLogHandler(), + web.FinalHandlerFunc(s.nukeReadinessHandler), + )) } func (s *Service) checkRouteAvailability(handler http.Handler) http.Handler {