-
Notifications
You must be signed in to change notification settings - Fork 229
feat(store): support pruning store #2208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 28 commits
291aea5
e5e8080
6b0cc41
7ca9b67
82c76be
cd92e3e
9420944
21c0b13
45acc3a
685dbe7
d5b16c2
350b21d
7a04418
a4d2fc1
29656c9
2422d00
8ed80b4
b16db8c
8f70c51
32be97e
1b3ffcf
da2dfa1
220f802
ab0c7df
7fcf6a8
4639bc9
34d79c2
99c5b39
99dd5fe
cda2d67
8839aad
9585715
fe40d10
b1b2e72
561349e
cca398e
8eb6588
0c295a1
a6541d0
086904a
680c1ba
2a61d47
9429c9f
6cc1f55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package config | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
const ( | ||
// Pruning configuration flags | ||
|
||
// FlagPruningStrategy is a flag for specifying strategy for pruning block store | ||
FlagPruningStrategy = "rollkit.node.pruning.strategy" | ||
// FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store | ||
FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" | ||
// FlagPruningInterval is a flag for specifying how often prune blocks store | ||
FlagPruningInterval = "rollkit.node.pruning.interval" | ||
) | ||
|
||
const ( | ||
PruningConfigStrategyNone = "none" | ||
PruningConfigStrategyDefault = "default" | ||
PruningConfigStrategyEverything = "everything" | ||
PruningConfigStrategyCustom = "custom" | ||
) | ||
|
||
var ( | ||
PruningConfigNone = PruningConfig{ | ||
Strategy: PruningConfigStrategyNone, | ||
KeepRecent: 0, | ||
Interval: 0, | ||
} | ||
PruningConfigDefault = PruningConfig{ | ||
Strategy: PruningConfigStrategyDefault, | ||
KeepRecent: 362880, | ||
Interval: 10, | ||
} | ||
PruningConfigEverything = PruningConfig{ | ||
Strategy: PruningConfigStrategyEverything, | ||
KeepRecent: 2, | ||
Interval: 10, | ||
} | ||
PruningConfigCustom = PruningConfig{ | ||
Strategy: PruningConfigStrategyCustom, | ||
KeepRecent: 100, | ||
Interval: 100, | ||
} | ||
) | ||
|
||
// PruningConfig allows node operators to manage storage | ||
type PruningConfig struct { | ||
// todo: support volume-based strategy | ||
Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` | ||
KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy, must be greater or equal than 2"` | ||
Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"How often the pruning process should run, used in \"custom\" strategy"` | ||
|
||
// todo: support volume-based strategy | ||
// VolumeConfig specifies configuration for volume-based storage | ||
// VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` | ||
} | ||
|
||
func (p PruningConfig) Validate() error { | ||
// Only Custom strategy requires validation. | ||
if p.Strategy != PruningConfigStrategyCustom { | ||
return nil | ||
} | ||
|
||
if p.KeepRecent < 2 { | ||
return errors.New("keep_recent must be greater or equal than 2 for custom pruning strategy") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func GetPruningConfigFromStrategy(strategy string) PruningConfig { | ||
switch strategy { | ||
case PruningConfigStrategyDefault: | ||
return PruningConfigDefault | ||
case PruningConfigStrategyEverything: | ||
return PruningConfigEverything | ||
case PruningConfigStrategyCustom: | ||
return PruningConfigCustom | ||
} | ||
|
||
// Return strategy "none" if unknown. | ||
return PruningConfigNone | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,102 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ds "github.com/ipfs/go-datastore" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/rollkit/rollkit/pkg/config" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/rollkit/rollkit/types" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const DefaultFlushInterval = 1 * time.Second | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type defaultPruningStore struct { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Config config.PruningConfig | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var _ PruningStore = &defaultPruningStore{} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// newDefaultPruningStore returns default pruning store. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func newDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return &defaultPruningStore{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Store: &DefaultStore{db: ds}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Config: config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (s *defaultPruningStore) PruneBlockData(ctx context.Context, height uint64) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Skip if strategy is none. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if s.Config.Strategy == config.PruningConfigStrategyNone { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Skip if not the correct interval or latest height is less or equal than number of blocks need to keep. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { | |
if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent { |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This calculation could result in a uint64 underflow if height+1 < s.Config.KeepRecent
. Since uint64 underflow wraps around to a very large number, this would cause incorrect pruning behavior. Consider adding a safety check before this calculation:
var endHeight uint64
if height+1 <= s.Config.KeepRecent {
// Nothing to prune yet
return nil
}
endHeight = height + 1 - s.Config.KeepRecent
This ensures the pruning logic only executes when there are actually blocks that can be safely pruned.
endHeight := height + 1 - s.Config.KeepRecent | |
var endHeight uint64 | |
if height+1 <= s.Config.KeepRecent { | |
// Nothing to prune yet | |
return nil | |
} | |
endHeight = height + 1 - s.Config.KeepRecent | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pruning range calculation logic needs adjustment. Currently, it may not prune the intended number of blocks:
endHeight := height + 1 - s.Config.KeepRecent
startHeight := uint64(0)
if endHeight > s.Config.Interval {
startHeight = endHeight - s.Config.Interval
}
For correct behavior, consider:
startHeight := uint64(0)
if height > s.Config.KeepRecent {
endHeight := height + 1 - s.Config.KeepRecent
startHeight = endHeight - s.Config.Interval
if startHeight < 0 {
startHeight = 0
}
}
This ensures exactly s.Config.Interval
blocks are pruned in each operation while maintaining the s.Config.KeepRecent
most recent blocks. The current implementation might prune blocks that should be kept when endHeight
is close to s.Config.Interval
.
endHeight := height + 1 - s.Config.KeepRecent | |
startHeight := uint64(0) | |
if endHeight > s.Config.Interval { | |
startHeight = endHeight - s.Config.Interval | |
} | |
startHeight := uint64(0) | |
if height > s.Config.KeepRecent { | |
endHeight := height + 1 - s.Config.KeepRecent | |
startHeight = endHeight - s.Config.Interval | |
if startHeight < 0 { | |
startHeight = 0 | |
} | |
} | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to protect against concurrent executions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is a public method, it can be called by other threads as well. It would not cost much to exit early when pruning is not completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other threads shouldn't call this method. It's only added for pruning thread
Maybe currently is no need to add some protections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors from s.DeleteBlockData
are being ignored. The comment suggests that ErrNotFound
could be ignored, but other errors (like issues with the underlying datastore) should be handled.
Ignoring these errors can hide serious problems and leave the pruning process in an incomplete state without any notification. The error should be propagated up to the AsyncPruner
, which will log it.
With the recommended changes to DeleteBlockData
in pkg/store/store.go
, it will correctly handle ErrNotFound
by returning nil
, so any error returned from it will be significant.
if err := s.DeleteBlockData(ctx, i); err != nil {
return err
}
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AsyncPruningStore
implementation has a potential thread safety issue. The latestBlockDataHeight
field is updated in SaveBlockData()
and read in a separate goroutine started by Start()
, but there's no synchronization mechanism (such as a mutex) to protect this shared state. This could lead to race conditions where the pruning goroutine might operate on an inconsistent view of the latest height.
Consider adding a mutex to protect access to latestBlockDataHeight
or using atomic operations to ensure thread-safe updates and reads of this value:
import "sync/atomic"
// Use atomic.Uint64 for thread-safe access
var latestHeight atomic.Uint64
// When updating
latestHeight.Store(newValue)
// When reading
currentHeight := latestHeight.Load()
type AsyncPruningStore struct { | |
PruningStore | |
latestBlockDataHeight uint64 | |
flushInterval time.Duration | |
} | |
type AsyncPruningStore struct { | |
PruningStore | |
latestBlockDataHeight atomic.Uint64 | |
flushInterval time.Duration | |
} | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latestBlockDataHeight
field is not initialized from persistent storage on startup, which creates a potential issue: if the node restarts, it will lose track of the current height and pruning operations may not function correctly until new blocks are saved.
Consider retrieving and setting the latest height during initialization by:
// In NewAsyncPruningStore:
latestHeight, err := getLatestHeightFromStore(ds)
if err != nil {
// Handle error or set to 0
}
return &AsyncPruningStore{
PruningStore: pruningStore,
latestBlockDataHeight: latestHeight,
flushInterval: DefaultFlushInterval,
}
This would ensure pruning operations resume from the correct position after node restarts.
// todo: initialize latestBlockDataHeight from the store | |
latestHeight, err := getLatestHeightFromStore(ds) | |
if err != nil { | |
// If we can't get the latest height, default to 0 | |
latestHeight = 0 | |
} | |
latestBlockDataHeight = latestHeight |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Start
method should check if the context is already canceled before entering the loop. If ctx.Done()
is triggered immediately after calling Start()
, the ticker will be created but the deferred ticker.Stop()
won't execute until the function returns, which could lead to a goroutine leak.
Consider adding a check at the beginning:
func (s *AsyncPruningStore) Start(ctx context.Context) {
if ctx.Err() != nil {
return
}
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()
// rest of the function...
}
This ensures resources are properly cleaned up when the context is already canceled.
func (s *AsyncPruningStore) Start(ctx context.Context) { | |
// todo: use ctx for cancellation | |
ticker := time.NewTicker(s.flushInterval) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-ticker.C: | |
// Currently PruneBlockData only returns nil. | |
_ = s.PruneBlockData(ctx, s.latestBlockDataHeight) | |
} | |
} | |
} | |
func (s *AsyncPruningStore) Start(ctx context.Context) { | |
if ctx.Err() != nil { | |
return | |
} | |
ticker := time.NewTicker(s.flushInterval) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-ticker.C: | |
// Currently PruneBlockData only returns nil. | |
_ = s.PruneBlockData(ctx, s.latestBlockDataHeight) | |
} | |
} | |
} | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,7 +92,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
|
||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch: %w", err) | ||
return fmt.Errorf("failed to create a new batch for saving block data: %w", err) | ||
} | ||
|
||
if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { | ||
|
@@ -114,6 +114,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
return nil | ||
} | ||
|
||
// DeleteBlockData deletes block at given height. | ||
func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { | ||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) | ||
} | ||
|
||
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete header blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete data blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) | ||
} | ||
if err := batch.Commit(ctx); err != nil { | ||
return fmt.Errorf("failed to commit batch: %w", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we ensure that we delete everything? there might be more prefix stores in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Structures like block and header may need to refactor in future I think currently it's enough to be same with |
||
|
||
return nil | ||
} | ||
Eoous marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+108
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of To fix this, I've also added handling for func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
// Get the header to retrieve the block hash for index deletion.
header, err := s.GetHeader(ctx, height)
if err != nil {
// If block not found, it might have been already pruned.
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to get header for block %d: %w", height, err)
}
hash := header.Hash()
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
return fmt.Errorf("failed to delete header blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
return fmt.Errorf("failed to delete data blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil {
return fmt.Errorf("failed to delete index key in batch: %w", err)
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
return nil
} |
||
|
||
// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. | ||
|
||
// GetBlockData returns block header and data at given height, or error if it's not found in Store. | ||
func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { | ||
headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(height))) | ||
|
Uh oh!
There was an error while loading. Please reload this page.