Skip to content

Commit da3c9e1

Browse files
authored
Merge pull request #8018 from onflow/leo/refactor-safety-data
[Storage] Refactor safety data operations
2 parents 96a0627 + 2e2b9a1 commit da3c9e1

File tree

13 files changed

+77
-28
lines changed

13 files changed

+77
-28
lines changed

cmd/collection/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func main() {
636636
node.Logger,
637637
node.Me,
638638
node.ProtocolDB,
639+
node.StorageLockMgr,
639640
node.State,
640641
node.Metrics.Engine,
641642
node.Metrics.Mempool,

cmd/consensus/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ func main() {
621621
notifier.AddFollowerConsumer(followerDistributor)
622622

623623
// initialize the persister
624-
persist, err := persister.New(node.ProtocolDB, node.RootChainID)
624+
persist, err := persister.New(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
625625
if err != nil {
626626
return nil, err
627627
}

cmd/scaffold.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func (fnb *FlowNodeBuilder) EnqueuePingService() {
303303
var hotstuffViewFunc func() (uint64, error)
304304
// Setup consensus nodes to report their HotStuff view
305305
if fnb.BaseConfig.NodeRole == flow.RoleConsensus.String() {
306-
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID)
306+
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
307307
if err != nil {
308308
return nil, err
309309
}

cmd/util/cmd/read-hotstuff/cmd/get_liveness.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ func init() {
2222

2323
func runGetLivenessData(*cobra.Command, []string) {
2424
err := common.WithStorage(flagDatadir, func(db storage.DB) error {
25+
lockManager := storage.NewTestingLockManager()
26+
2527
chainID := flow.ChainID(flagChain)
26-
reader, err := persister.NewReader(db, chainID)
28+
reader, err := persister.NewReader(db, chainID, lockManager)
2729
if err != nil {
2830
log.Fatal().Err(err).Msg("could not create reader from db")
2931
}

cmd/util/cmd/read-hotstuff/cmd/get_safety.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ func init() {
2222

2323
func runGetSafetyData(*cobra.Command, []string) {
2424
err := common.WithStorage(flagDatadir, func(db storage.DB) error {
25+
lockManager := storage.NewTestingLockManager()
2526

2627
chainID := flow.ChainID(flagChain)
27-
reader, err := persister.NewReader(db, chainID)
28+
reader, err := persister.NewReader(db, chainID, lockManager)
2829
if err != nil {
2930
log.Fatal().Err(err).Msg("could not create reader from db")
3031
}

consensus/hotstuff/persister/persister.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package persister
33
import (
44
"fmt"
55

6+
"github.com/jordanschalm/lockctx"
7+
68
"github.com/onflow/flow-go/consensus/hotstuff"
79
"github.com/onflow/flow-go/model/flow"
810
"github.com/onflow/flow-go/storage"
@@ -15,8 +17,9 @@ import (
1517
// SafetyData and LivenessData, for each distinct chain ID. This bootstrapping must be complete
1618
// before constructing a Persister instance with New (otherwise it will return an error).
1719
type Persister struct {
18-
db storage.DB
19-
chainID flow.ChainID
20+
db storage.DB
21+
chainID flow.ChainID
22+
lockManager lockctx.Manager
2023
}
2124

2225
var _ hotstuff.Persister = (*Persister)(nil)
@@ -26,15 +29,16 @@ var _ hotstuff.PersisterReader = (*Persister)(nil)
2629
// Persister depends on protocol.State and cluster.State bootstrapping to set initial values for
2730
// SafetyData and LivenessData, for each distinct chain ID. This bootstrapping must be completed
2831
// before first using a Persister instance.
29-
func New(db storage.DB, chainID flow.ChainID) (*Persister, error) {
32+
func New(db storage.DB, chainID flow.ChainID, lockManager lockctx.Manager) (*Persister, error) {
3033
err := ensureSafetyDataAndLivenessDataAreBootstrapped(db, chainID)
3134
if err != nil {
3235
return nil, fmt.Errorf("fail to check persister was properly bootstrapped: %w", err)
3336
}
3437

3538
p := &Persister{
36-
db: db,
37-
chainID: chainID,
39+
db: db,
40+
chainID: chainID,
41+
lockManager: lockManager,
3842
}
3943
return p, nil
4044
}
@@ -62,8 +66,8 @@ func ensureSafetyDataAndLivenessDataAreBootstrapped(db storage.DB, chainID flow.
6266
}
6367

6468
// NewReader returns a new Persister as a PersisterReader type (only read methods accessible).
65-
func NewReader(db storage.DB, chainID flow.ChainID) (hotstuff.PersisterReader, error) {
66-
return New(db, chainID)
69+
func NewReader(db storage.DB, chainID flow.ChainID, lockManager lockctx.Manager) (hotstuff.PersisterReader, error) {
70+
return New(db, chainID, lockManager)
6771
}
6872

6973
// GetSafetyData will retrieve last persisted safety data.
@@ -85,15 +89,19 @@ func (p *Persister) GetLivenessData() (*hotstuff.LivenessData, error) {
8589
// PutSafetyData persists the last safety data.
8690
// During normal operations, no errors are expected.
8791
func (p *Persister) PutSafetyData(safetyData *hotstuff.SafetyData) error {
88-
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
89-
return operation.UpsertSafetyData(rw.Writer(), p.chainID, safetyData)
92+
return storage.WithLock(p.lockManager, storage.LockInsertSafetyData, func(lctx lockctx.Context) error {
93+
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
94+
return operation.UpsertSafetyData(lctx, rw, p.chainID, safetyData)
95+
})
9096
})
9197
}
9298

9399
// PutLivenessData persists the last liveness data.
94100
// During normal operations, no errors are expected.
95101
func (p *Persister) PutLivenessData(livenessData *hotstuff.LivenessData) error {
96-
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
97-
return operation.UpsertLivenessData(rw.Writer(), p.chainID, livenessData)
102+
return storage.WithLock(p.lockManager, storage.LockInsertLivenessData, func(lctx lockctx.Context) error {
103+
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
104+
return operation.UpsertLivenessData(lctx, rw, p.chainID, livenessData)
105+
})
98106
})
99107
}

consensus/integration/nodes_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ func createNode(
563563

564564
signer := verification.NewCombinedSigner(me, beaconKeyStore)
565565

566-
persist, err := persister.New(db, rootHeader.ChainID)
566+
persist, err := persister.New(db, rootHeader.ChainID, lockManager)
567567
require.NoError(t, err)
568568

569569
livenessData, err := persist.GetLivenessData()

engine/collection/epochmgr/factories/hotstuff.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type HotStuffFactory struct {
3232
baseLogger zerolog.Logger
3333
me module.Local
3434
db storage.DB
35+
lockManager storage.LockManager
3536
protoState protocol.State
3637
engineMetrics module.EngineMetrics
3738
mempoolMetrics module.MempoolMetrics
@@ -43,6 +44,7 @@ func NewHotStuffFactory(
4344
log zerolog.Logger,
4445
me module.Local,
4546
db storage.DB,
47+
lockManager storage.LockManager,
4648
protoState protocol.State,
4749
engineMetrics module.EngineMetrics,
4850
mempoolMetrics module.MempoolMetrics,
@@ -54,6 +56,7 @@ func NewHotStuffFactory(
5456
baseLogger: log,
5557
me: me,
5658
db: db,
59+
lockManager: lockManager,
5760
protoState: protoState,
5861
engineMetrics: engineMetrics,
5962
mempoolMetrics: mempoolMetrics,
@@ -155,7 +158,7 @@ func (f *HotStuffFactory) CreateModules(
155158
return nil, nil, err
156159
}
157160

158-
persist, err := persister.New(f.db, cluster.ChainID())
161+
persist, err := persister.New(f.db, cluster.ChainID(), f.lockManager)
159162
if err != nil {
160163
return nil, nil, err
161164
}

engine/testutil/nodes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro
377377
node.Log,
378378
node.Me,
379379
db,
380+
node.LockManager,
380381
node.State,
381382
node.Metrics,
382383
node.Metrics,

state/cluster/badger/state.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
3535
if err != nil {
3636
return nil, fmt.Errorf("failed to acquire lock `storage.LockInsertOrFinalizeClusterBlock` for inserting cluster block: %w", err)
3737
}
38+
err = lctx.AcquireLock(storage.LockInsertSafetyData)
39+
if err != nil {
40+
return nil, fmt.Errorf("failed to acquire lock `storage.LockInsertSafetyData` for inserting safety data: %w", err)
41+
}
42+
err = lctx.AcquireLock(storage.LockInsertLivenessData)
43+
if err != nil {
44+
return nil, fmt.Errorf("failed to acquire lock `storage.LockInsertLivenessData` for inserting liveness data: %w", err)
45+
}
3846
isBootstrapped, err := IsBootstrapped(db, stateRoot.ClusterID())
3947
if err != nil {
4048
return nil, fmt.Errorf("failed to determine whether database contains bootstrapped state: %w", err)
@@ -85,12 +93,12 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
8593
NewestQC: rootQC,
8694
}
8795
// insert safety data
88-
err = operation.UpsertSafetyData(rw.Writer(), chainID, safetyData)
96+
err = operation.UpsertSafetyData(lctx, rw, chainID, safetyData)
8997
if err != nil {
9098
return fmt.Errorf("could not insert safety data: %w", err)
9199
}
92100
// insert liveness data
93-
err = operation.UpsertLivenessData(rw.Writer(), chainID, livenessData)
101+
err = operation.UpsertLivenessData(lctx, rw, chainID, livenessData)
94102
if err != nil {
95103
return fmt.Errorf("could not insert liveness data: %w", err)
96104
}

0 commit comments

Comments
 (0)