@@ -6,11 +6,15 @@ import (
66 "errors"
77 "fmt"
88 "io/fs"
9+ "math"
910 "os"
1011 "path/filepath"
1112 "strings"
1213 "time"
1314
15+ cmtState "github.com/cometbft/cometbft/state"
16+ cmtBlockstore "github.com/cometbft/cometbft/store"
17+
1418 badgerDB "github.com/dgraph-io/badger/v4"
1519 "github.com/spf13/cobra"
1620
@@ -20,6 +24,7 @@ import (
2024 "github.com/oasisprotocol/oasis-core/go/config"
2125 "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/abci"
2226 cmtCommon "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/common"
27+ cmtConfig "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/config"
2328 cmtDBProvider "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/db/badger"
2429 cmdCommon "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common"
2530 roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
@@ -70,6 +75,13 @@ WARNING: Ensure you have at least as much of a free disk as your largest databas
7075 RunE : doDBCompactions ,
7176 }
7277
78+ pruneCmd = & cobra.Command {
79+ Use : "prune-experimental" ,
80+ Args : cobra .NoArgs ,
81+ Short : "EXPERIMENTAL: trigger pruning for all consensus databases" ,
82+ RunE : doPrune ,
83+ }
84+
7385 logger = logging .GetLogger ("cmd/storage" )
7486
7587 pretty = cmdCommon .Isatty (1 )
@@ -385,7 +397,17 @@ func flattenBadgerDB(db *badgerDB.DB, logger *logging.Logger) error {
385397}
386398
387399func compactConsensusNodeDB (dataDir string ) error {
388- ldb , ndb , _ , err := abci .InitStateStorage (
400+ ndb , err := openConsensusStateNodeDB (dataDir )
401+ if err != nil {
402+ return fmt .Errorf ("failed to initialize ABCI storage backend: %w" , err )
403+ }
404+ defer ndb .Close ()
405+
406+ return ndb .Compact ()
407+ }
408+
409+ func openConsensusStateNodeDB (dataDir string ) (db.NodeDB , error ) {
410+ _ , ndb , _ , err := abci .InitStateStorage (
389411 & abci.ApplicationConfig {
390412 DataDir : filepath .Join (dataDir , cmtCommon .StateDir ),
391413 StorageBackend : config .GlobalConfig .Storage .Backend ,
@@ -394,16 +416,189 @@ func compactConsensusNodeDB(dataDir string) error {
394416 DisableCheckpointer : true ,
395417 },
396418 )
397- if err != nil {
398- return fmt .Errorf ("failed to initialize ABCI storage backend: %w" , err )
419+
420+ return ndb , err
421+ }
422+
423+ func doPrune (_ * cobra.Command , args []string ) error {
424+ if err := cmdCommon .Init (); err != nil {
425+ cmdCommon .EarlyLogAndExit (err )
426+ }
427+
428+ if config .GlobalConfig .Consensus .Prune .Strategy == cmtConfig .PruneStrategyNone {
429+ logger .Info ("skipping consensus pruning since disabled in the config" )
430+ return nil
431+ }
432+
433+ dataDir := cmdCommon .DataDir ()
434+
435+ if err := pruneConsensusDBs (dataDir , config .GlobalConfig .Consensus .Prune .NumKept , configuredRuntimes ()); err != nil {
436+ return fmt .Errorf ("failed to prune consensus databases: %w" , err )
399437 }
400438
401- // Close the resources. Both Close and Cleanup only close NodeDB.
402- // Closing both here, to prevent resource leaks if things change in the future.
439+ return nil
440+ }
441+
442+ func configuredRuntimes () []common.Namespace {
443+ // TODO handle path based configuration
444+ var runtimes []common.Namespace
445+ for _ , rt := range config .GlobalConfig .Runtime .Runtimes {
446+ runtimes = append (runtimes , rt .ID )
447+ }
448+ return runtimes
449+ }
450+
451+ func pruneConsensusDBs (dataDir string , numKept uint64 , runtimes []common.Namespace ) error {
452+ ndb , err := openConsensusStateNodeDB (dataDir )
453+ if err != nil {
454+ return fmt .Errorf ("failed to open NodeDB: %w" , err )
455+ }
403456 defer ndb .Close ()
404- defer ldb .Cleanup ()
405457
406- return ndb .Compact ()
458+ latest , ok := ndb .GetLatestVersion ()
459+ if ! ok {
460+ logger .Info ("skipping pruning as state db is empty" )
461+ return nil
462+ }
463+
464+ if latest < numKept {
465+ logger .Info ("skipping pruning as the latest version is smaller than the number of versions to keep" )
466+ return nil
467+ }
468+
469+ // In case of configured runtimes, do not prune past the earliest reindexed
470+ // consensus height, so that light history can be populated correctly.
471+ minReindexed , err := minReindexedHeight (dataDir , runtimes )
472+ if err != nil {
473+ return fmt .Errorf ("failed to fetch earliest reindexed consensus height: %w" , err )
474+ }
475+
476+ retainHeight := min (
477+ latest - numKept , // underflow not possible due to if above.
478+ uint64 (minReindexed ),
479+ )
480+
481+ if err := pruneConsensusNodeDB (ndb , retainHeight ); err != nil {
482+ return fmt .Errorf ("failed to prune application state: %w" , err )
483+ }
484+
485+ if err := pruneCometDBs (dataDir , int64 (retainHeight )); err != nil {
486+ return fmt .Errorf ("failed to prune CometBFT managed databases: %w" , err )
487+ }
488+
489+ return nil
490+ }
491+
492+ func pruneConsensusNodeDB (ndb db.NodeDB , retainHeight uint64 ) error {
493+ startHeight := ndb .GetEarliestVersion ()
494+
495+ if retainHeight <= startHeight {
496+ logger .Info ("consensus state already pruned" , "retain_height" , retainHeight , "start_height" , startHeight )
497+ return nil
498+ }
499+
500+ logger .Info ("pruning consensus state" , "start_height" , startHeight , "retain_height" , retainHeight )
501+ for h := startHeight ; h < retainHeight ; h ++ {
502+ if err := ndb .Prune (h ); err != nil {
503+ return fmt .Errorf ("failed to prune version %d: %w" , h , err )
504+ }
505+
506+ if h % 10_000 == 0 { // periodically sync to disk
507+ if err := ndb .Sync (); err != nil {
508+ return fmt .Errorf ("failed to sync NodeDB: %w" , err )
509+ }
510+ logger .Debug ("forcing NodeDB disk sync during pruning" , "version" , h )
511+ }
512+ }
513+
514+ if err := ndb .Sync (); err != nil {
515+ return fmt .Errorf ("failed to sync NodeDB: %w" , err )
516+ }
517+
518+ return nil
519+ }
520+
521+ // minReindexedHeight returns the smallest consensus height reindexed by any
522+ // of the configured runtimes.
523+ //
524+ // In case of no configured runtimes it returns max int64.
525+ func minReindexedHeight (dataDir string , runtimes []common.Namespace ) (int64 , error ) {
526+ fetchLastReindexedHeight := func (runtimeID common.Namespace ) (int64 , error ) {
527+ rtDir := runtimeConfig .GetRuntimeStateDir (dataDir , runtimeID )
528+
529+ history , err := history .New (runtimeID , rtDir , history .NewNonePrunerFactory (), true )
530+ if err != nil {
531+ return 0 , fmt .Errorf ("failed to open new light history: %w" , err )
532+ }
533+ defer history .Close ()
534+
535+ h , err := history .LastConsensusHeight ()
536+ if err != nil {
537+ return 0 , fmt .Errorf ("failed to get last consensus height: %w" , err )
538+ }
539+
540+ return h , nil
541+ }
542+
543+ var minH int64 = math .MaxInt64
544+ for _ , rt := range runtimes {
545+ h , err := fetchLastReindexedHeight (rt )
546+ if err != nil {
547+ return 0 , fmt .Errorf ("failed to fetch last reindexed height for %s: %w" , rt , err )
548+ }
549+
550+ if h < minH {
551+ minH = h
552+ }
553+ }
554+
555+ return minH , nil
556+ }
557+
558+ func pruneCometDBs (dataDir string , retainHeight int64 ) error {
559+ // Hardcoding the path is not ideal.
560+ blockstorePath := fmt .Sprintf ("%s/consensus/data/blockstore.badger.db" , dataDir )
561+ statePath := fmt .Sprintf ("%s/consensus/data/state.badger.db" , dataDir )
562+
563+ blockDB , err := cmtDBProvider .New (blockstorePath , false )
564+ if err != nil {
565+ return fmt .Errorf ("failed to open blockstore: %w" , err )
566+ }
567+ blockstore := cmtBlockstore .NewBlockStore (blockDB )
568+ defer blockstore .Close ()
569+
570+ // First store the base, then prune blockstore and finally state db.
571+ // This is not ideal since it could happen that we only prune blockstore, internally
572+ // updating the base. Repeating the pruning would left part of the state db not pruned.
573+ // Upstream CometBFT implementation suffer from the same issue:
574+ // - https://github.com/oasisprotocol/cometbft/blob/653c9a0c95ac0f91a0c8c11efb9aa21c98407af6/state/execution.go#L655
575+ base := blockstore .Base ()
576+ if retainHeight <= base {
577+ logger .Info ("blockstore and state db already pruned" )
578+ return nil
579+ }
580+
581+ logger .Info ("pruning consensus blockstore" , "base" , base , "retain_height" , retainHeight )
582+ n , err := blockstore .PruneBlocks (retainHeight )
583+ if err != nil {
584+ return fmt .Errorf ("failed to prune blocks (retain height: %d): %w" , retainHeight , err )
585+ }
586+ logger .Info ("blockstore pruning finished" , "pruned" , n )
587+
588+ stateDB , err := cmtDBProvider .New (statePath , false )
589+ if err != nil {
590+ return fmt .Errorf ("failed to open state db: %w" , err )
591+ }
592+ state := cmtState .NewStore (stateDB , cmtState.StoreOptions {})
593+ defer state .Close ()
594+
595+ logger .Info ("pruning consensus states" , "base" , base , "retain_height" , retainHeight )
596+ if err := state .PruneStates (base , retainHeight ); err != nil {
597+ return fmt .Errorf ("failed to prune state db (start: %d, end: %d)" , base , retainHeight )
598+ }
599+ logger .Info ("state db pruning finished" )
600+
601+ return nil
407602}
408603
409604// Register registers the client sub-command and all of its children.
@@ -414,5 +609,6 @@ func Register(parentCmd *cobra.Command) {
414609 storageCmd .AddCommand (storageCheckCmd )
415610 storageCmd .AddCommand (storageRenameNsCmd )
416611 storageCmd .AddCommand (storageCompactCmd )
612+ storageCmd .AddCommand (pruneCmd )
417613 parentCmd .AddCommand (storageCmd )
418614}
0 commit comments