-
Notifications
You must be signed in to change notification settings - Fork 229
feat(store): support pruning store #2208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
291aea5
e5e8080
6b0cc41
7ca9b67
82c76be
cd92e3e
9420944
21c0b13
45acc3a
685dbe7
d5b16c2
350b21d
7a04418
a4d2fc1
29656c9
2422d00
8ed80b4
b16db8c
8f70c51
32be97e
1b3ffcf
da2dfa1
220f802
ab0c7df
7fcf6a8
4639bc9
34d79c2
99c5b39
99dd5fe
cda2d67
8839aad
9585715
fe40d10
b1b2e72
561349e
cca398e
8eb6588
0c295a1
a6541d0
086904a
680c1ba
2a61d47
9429c9f
6cc1f55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package config | ||
|
||
const ( | ||
// Pruning configuration flags | ||
|
||
// FlagPruningStrategy is a flag for specifying strategy for pruning block store | ||
FlagPruningStrategy = "rollkit.node.pruning.strategy" | ||
// FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store | ||
FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" | ||
// FlagPruningInterval is a flag for specifying how offen prune blocks store | ||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
FlagPruningInterval = "rollkit.node.pruning.interval" | ||
) | ||
|
||
const ( | ||
PruningConfigStrategyNone = "none" | ||
PruningConfigStrategyDefault = "default" | ||
PruningConfigStrategyEverything = "everything" | ||
PruningConfigStrategyCustom = "custom" | ||
) | ||
|
||
var ( | ||
PruningConfigNone = PruningConfig{ | ||
Strategy: PruningConfigStrategyNone, | ||
KeepRecent: 0, | ||
Interval: 0, | ||
} | ||
PruningConfigDefault = PruningConfig{ | ||
Strategy: PruningConfigStrategyDefault, | ||
KeepRecent: 362880, | ||
Interval: 10, | ||
} | ||
PruningConfigEverything = PruningConfig{ | ||
Strategy: PruningConfigStrategyEverything, | ||
KeepRecent: 2, | ||
Interval: 10, | ||
} | ||
PruningConfigCustom = PruningConfig{ | ||
Strategy: PruningConfigStrategyCustom, | ||
KeepRecent: 100, | ||
Interval: 100, | ||
} | ||
) | ||
|
||
// PruningConfig allows node operators to manage storage | ||
type PruningConfig struct { | ||
// todo: support volume-based strategy | ||
Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` | ||
KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy"` | ||
Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run, used in \"custom\" strategy"` | ||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
// todo: support volume-based strategy | ||
// VolumeConfig specifies configuration for volume-based storage | ||
// VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` | ||
} | ||
|
||
func GetPruningConfigFromStrategy(strategy string) PruningConfig { | ||
switch strategy { | ||
case PruningConfigStrategyDefault: | ||
return PruningConfigDefault | ||
case PruningConfigStrategyEverything: | ||
return PruningConfigEverything | ||
case PruningConfigStrategyCustom: | ||
return PruningConfigCustom | ||
} | ||
|
||
// Return strategy "none" if unknown. | ||
return PruningConfigNone | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package store | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/rollkit/rollkit/pkg/config" | ||
"github.com/rollkit/rollkit/types" | ||
) | ||
|
||
type DefaultPruningStore struct { | ||
Store | ||
|
||
Config config.PruningConfig | ||
} | ||
|
||
var _ PruningStore = &DefaultPruningStore{} | ||
|
||
// NewDefaultPruningStore returns default pruning store. | ||
func NewDefaultPruningStore(store Store, config config.PruningConfig) PruningStore { | ||
return &DefaultPruningStore{ | ||
Store: store, | ||
Config: config, | ||
} | ||
} | ||
|
||
// Close safely closes underlying data storage, to ensure that data is actually saved. | ||
func (s *DefaultPruningStore) Close() error { | ||
return s.Store.Close() | ||
} | ||
|
||
// SetHeight sets the height saved in the Store if it is higher than the existing height | ||
func (s *DefaultPruningStore) SetHeight(ctx context.Context, height uint64) error { | ||
return s.Store.SetHeight(ctx, height) | ||
} | ||
|
||
// Height returns height of the highest block saved in the Store. | ||
func (s *DefaultPruningStore) Height(ctx context.Context) (uint64, error) { | ||
return s.Store.Height(ctx) | ||
} | ||
|
||
// SaveBlockData adds block header and data to the store along with corresponding signature. | ||
// Stored height is updated if block height is greater than stored value. | ||
func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { | ||
if err := s.PruneBlockData(ctx); err != nil { | ||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return fmt.Errorf("failed to prune block data: %w", err) | ||
} | ||
|
||
return s.Store.SaveBlockData(ctx, header, data, signature) | ||
} | ||
|
||
// DeleteBlockData deletes block at given height. | ||
func (s *DefaultPruningStore) DeleteBlockData(ctx context.Context, height uint64) error { | ||
return s.Store.DeleteBlockData(ctx, height) | ||
} | ||
|
||
// GetBlockData returns block header and data at given height, or error if it's not found in Store. | ||
func (s *DefaultPruningStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { | ||
return s.Store.GetBlockData(ctx, height) | ||
} | ||
|
||
// GetBlockByHash returns block with given block header hash, or error if it's not found in Store. | ||
func (s *DefaultPruningStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { | ||
return s.Store.GetBlockByHash(ctx, hash) | ||
} | ||
|
||
// GetSignatureByHash returns signature for a block at given height, or error if it's not found in Store. | ||
func (s *DefaultPruningStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { | ||
return s.Store.GetSignatureByHash(ctx, hash) | ||
} | ||
|
||
// GetSignature returns signature for a block with given block header hash, or error if it's not found in Store. | ||
func (s *DefaultPruningStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { | ||
return s.Store.GetSignature(ctx, height) | ||
} | ||
|
||
// UpdateState updates state saved in Store. Only one State is stored. | ||
// If there is no State in Store, state will be saved. | ||
func (s *DefaultPruningStore) UpdateState(ctx context.Context, state types.State) error { | ||
return s.Store.UpdateState(ctx, state) | ||
} | ||
|
||
// GetState returns last state saved with UpdateState. | ||
func (s *DefaultPruningStore) GetState(ctx context.Context) (types.State, error) { | ||
return s.Store.GetState(ctx) | ||
} | ||
|
||
// SetMetadata saves arbitrary value in the store. | ||
// | ||
// Metadata is separated from other data by using prefix in KV. | ||
func (s *DefaultPruningStore) SetMetadata(ctx context.Context, key string, value []byte) error { | ||
return s.Store.SetMetadata(ctx, key, value) | ||
} | ||
|
||
// GetMetadata returns values stored for given key with SetMetadata. | ||
func (s *DefaultPruningStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { | ||
return s.Store.GetMetadata(ctx, key) | ||
} | ||
|
||
func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { | ||
var ( | ||
err error | ||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
) | ||
|
||
// Skip if strategy is none. | ||
if s.Config.Strategy == config.PruningConfigStrategyNone { | ||
return nil | ||
} | ||
|
||
height, err := s.Height(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep. | ||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent { | ||
return nil | ||
} | ||
|
||
// Must keep at least 2 blocks(while strategy is everything). | ||
endHeight := height - s.Config.KeepRecent | ||
startHeight := min(0, endHeight-s.Config.KeepRecent) | ||
|
||
for i := startHeight; i <= endHeight; i++ { | ||
err = s.DeleteBlockData(ctx, i) | ||
|
||
if err != nil { | ||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,7 +92,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
|
||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch: %w", err) | ||
return fmt.Errorf("failed to create a new batch for saving block data: %w", err) | ||
} | ||
|
||
if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { | ||
|
@@ -114,6 +114,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
return nil | ||
} | ||
|
||
// DeleteBlockData deletes block at given height. | ||
func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { | ||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) | ||
} | ||
|
||
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete header blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete data blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) | ||
} | ||
if err := batch.Commit(ctx); err != nil { | ||
return fmt.Errorf("failed to commit batch: %w", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we ensure that we delete everything? there might be more prefix stores in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Structures like block and header may need to refactor in future I think currently it's enough to be same with |
||
|
||
return nil | ||
} | ||
Eoous marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+108
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of To fix this, I've also added handling for func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
// Get the header to retrieve the block hash for index deletion.
header, err := s.GetHeader(ctx, height)
if err != nil {
// If block not found, it might have been already pruned.
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to get header for block %d: %w", height, err)
}
hash := header.Hash()
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
return fmt.Errorf("failed to delete header blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
return fmt.Errorf("failed to delete data blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil {
return fmt.Errorf("failed to delete index key in batch: %w", err)
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
return nil
} |
||
|
||
// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. | ||
|
||
// GetBlockData returns block header and data at given height, or error if it's not found in Store. | ||
func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { | ||
headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(height))) | ||
|
Uh oh!
There was an error while loading. Please reload this page.