Skip to content

Commit dc4f4d3

Browse files
committed
update safety data operation
1 parent 7a3c1ee commit dc4f4d3

File tree

9 files changed

+68
-24
lines changed

9 files changed

+68
-24
lines changed

cmd/collection/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func main() {
636636
node.Logger,
637637
node.Me,
638638
node.ProtocolDB,
639+
node.StorageLockMgr,
639640
node.State,
640641
node.Metrics.Engine,
641642
node.Metrics.Mempool,

cmd/scaffold.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func (fnb *FlowNodeBuilder) EnqueuePingService() {
303303
var hotstuffViewFunc func() (uint64, error)
304304
// Setup consensus nodes to report their HotStuff view
305305
if fnb.BaseConfig.NodeRole == flow.RoleConsensus.String() {
306-
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID)
306+
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
307307
if err != nil {
308308
return nil, err
309309
}

consensus/hotstuff/persister/persister.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package persister
33
import (
44
"fmt"
55

6+
"github.com/jordanschalm/lockctx"
67
"github.com/onflow/flow-go/consensus/hotstuff"
78
"github.com/onflow/flow-go/model/flow"
89
"github.com/onflow/flow-go/storage"
@@ -15,8 +16,9 @@ import (
1516
// SafetyData and LivenessData, for each distinct chain ID. This bootstrapping must be complete
1617
// before constructing a Persister instance with New (otherwise it will return an error).
1718
type Persister struct {
18-
db storage.DB
19-
chainID flow.ChainID
19+
db storage.DB
20+
chainID flow.ChainID
21+
lockManager lockctx.Manager
2022
}
2123

2224
var _ hotstuff.Persister = (*Persister)(nil)
@@ -26,15 +28,16 @@ var _ hotstuff.PersisterReader = (*Persister)(nil)
2628
// Persister depends on protocol.State and cluster.State bootstrapping to set initial values for
2729
// SafetyData and LivenessData, for each distinct chain ID. This bootstrapping must be completed
2830
// before first using a Persister instance.
29-
func New(db storage.DB, chainID flow.ChainID) (*Persister, error) {
31+
func New(db storage.DB, chainID flow.ChainID, lockManager lockctx.Manager) (*Persister, error) {
3032
err := ensureSafetyDataAndLivenessDataAreBootstrapped(db, chainID)
3133
if err != nil {
3234
return nil, fmt.Errorf("fail to check persister was properly bootstrapped: %w", err)
3335
}
3436

3537
p := &Persister{
36-
db: db,
37-
chainID: chainID,
38+
db: db,
39+
chainID: chainID,
40+
lockManager: lockManager,
3841
}
3942
return p, nil
4043
}
@@ -62,8 +65,8 @@ func ensureSafetyDataAndLivenessDataAreBootstrapped(db storage.DB, chainID flow.
6265
}
6366

6467
// NewReader returns a new Persister as a PersisterReader type (only read methods accessible).
65-
func NewReader(db storage.DB, chainID flow.ChainID) (hotstuff.PersisterReader, error) {
66-
return New(db, chainID)
68+
func NewReader(db storage.DB, chainID flow.ChainID, lockManager lockctx.Manager) (hotstuff.PersisterReader, error) {
69+
return New(db, chainID, lockManager)
6770
}
6871

6972
// GetSafetyData will retrieve last persisted safety data.
@@ -85,15 +88,19 @@ func (p *Persister) GetLivenessData() (*hotstuff.LivenessData, error) {
8588
// PutSafetyData persists the last safety data.
8689
// During normal operations, no errors are expected.
8790
func (p *Persister) PutSafetyData(safetyData *hotstuff.SafetyData) error {
88-
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
89-
return operation.UpsertSafetyData(rw.Writer(), p.chainID, safetyData)
91+
return storage.WithLock(p.lockManager, storage.LockInsertSafetyData, func(lctx lockctx.Context) error {
92+
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
93+
return operation.UpsertSafetyData(lctx, rw, p.chainID, safetyData)
94+
})
9095
})
9196
}
9297

9398
// PutLivenessData persists the last liveness data.
9499
// During normal operations, no errors are expected.
95100
func (p *Persister) PutLivenessData(livenessData *hotstuff.LivenessData) error {
96-
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
97-
return operation.UpsertLivenessData(rw.Writer(), p.chainID, livenessData)
101+
return storage.WithLock(p.lockManager, storage.LockInsertLivenessData, func(lctx lockctx.Context) error {
102+
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
103+
return operation.UpsertLivenessData(lctx, rw, p.chainID, livenessData)
104+
})
98105
})
99106
}

engine/collection/epochmgr/factories/hotstuff.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type HotStuffFactory struct {
3232
baseLogger zerolog.Logger
3333
me module.Local
3434
db storage.DB
35+
lockManager storage.LockManager
3536
protoState protocol.State
3637
engineMetrics module.EngineMetrics
3738
mempoolMetrics module.MempoolMetrics
@@ -43,6 +44,7 @@ func NewHotStuffFactory(
4344
log zerolog.Logger,
4445
me module.Local,
4546
db storage.DB,
47+
lockManager storage.LockManager,
4648
protoState protocol.State,
4749
engineMetrics module.EngineMetrics,
4850
mempoolMetrics module.MempoolMetrics,
@@ -54,6 +56,7 @@ func NewHotStuffFactory(
5456
baseLogger: log,
5557
me: me,
5658
db: db,
59+
lockManager: lockManager,
5760
protoState: protoState,
5861
engineMetrics: engineMetrics,
5962
mempoolMetrics: mempoolMetrics,
@@ -155,7 +158,7 @@ func (f *HotStuffFactory) CreateModules(
155158
return nil, nil, err
156159
}
157160

158-
persist, err := persister.New(f.db, cluster.ChainID())
161+
persist, err := persister.New(f.db, cluster.ChainID(), f.lockManager)
159162
if err != nil {
160163
return nil, nil, err
161164
}

engine/testutil/nodes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro
377377
node.Log,
378378
node.Me,
379379
db,
380+
node.LockManager,
380381
node.State,
381382
node.Metrics,
382383
node.Metrics,

state/cluster/badger/state.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
3636
if err != nil {
3737
return nil, fmt.Errorf("failed to acquire lock `storage.LockInsertOrFinalizeClusterBlock` for inserting cluster block: %w", err)
3838
}
39+
err = lctx.AcquireLock(storage.LockInsertSafetyData)
40+
if err != nil {
41+
return nil, fmt.Errorf("failed to acquire lock `storage.LockInsertSafetyData` for inserting safety data: %w", err)
42+
}
43+
err = lctx.AcquireLock(storage.LockInsertLivenessData)
44+
if err != nil {
45+
return nil, fmt.Errorf("failed to acquire lock `storage.LockInsertLivenessData` for inserting liveness data: %w", err)
46+
}
3947
isBootstrapped, err := IsBootstrapped(db, stateRoot.ClusterID())
4048
if err != nil {
4149
return nil, fmt.Errorf("failed to determine whether database contains bootstrapped state: %w", err)
@@ -86,12 +94,12 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
8694
NewestQC: rootQC,
8795
}
8896
// insert safety data
89-
err = operation.UpsertSafetyData(rw.Writer(), chainID, safetyData)
97+
err = operation.UpsertSafetyData(lctx, rw, chainID, safetyData)
9098
if err != nil {
9199
return fmt.Errorf("could not insert safety data: %w", err)
92100
}
93101
// insert liveness data
94-
err = operation.UpsertLivenessData(rw.Writer(), chainID, livenessData)
102+
err = operation.UpsertLivenessData(lctx, rw, chainID, livenessData)
95103
if err != nil {
96104
return fmt.Errorf("could not insert liveness data: %w", err)
97105
}

state/protocol/badger/state.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ func Bootstrap(
124124
if err != nil {
125125
return nil, err
126126
}
127+
err = lctx.AcquireLock(storage.LockInsertSafetyData)
128+
if err != nil {
129+
return nil, err
130+
}
131+
err = lctx.AcquireLock(storage.LockInsertLivenessData)
132+
if err != nil {
133+
return nil, err
134+
}
127135

128136
config := defaultBootstrapConfig()
129137
for _, opt := range options {
@@ -585,13 +593,12 @@ func bootstrapStatePointers(lctx lockctx.Proof, rw storage.ReaderBatchWriter, ro
585593
NewestQC: qcForLatestFinalizedBlock,
586594
}
587595

588-
w := rw.Writer()
589596
// persist safety and liveness data plus the QuorumCertificate for the latest finalized block for HotStuff/Jolteon consensus
590-
err = operation.UpsertSafetyData(w, lastFinalized.ChainID, safetyData)
597+
err = operation.UpsertSafetyData(lctx, rw, lastFinalized.ChainID, safetyData)
591598
if err != nil {
592599
return fmt.Errorf("could not insert safety data: %w", err)
593600
}
594-
err = operation.UpsertLivenessData(w, lastFinalized.ChainID, livenessData)
601+
err = operation.UpsertLivenessData(lctx, rw, lastFinalized.ChainID, livenessData)
595602
if err != nil {
596603
return fmt.Errorf("could not insert liveness data: %w", err)
597604
}
@@ -600,6 +607,7 @@ func bootstrapStatePointers(lctx lockctx.Proof, rw storage.ReaderBatchWriter, ro
600607
return fmt.Errorf("could not insert quorum certificate for the latest finalized block: %w", err)
601608
}
602609

610+
w := rw.Writer()
603611
// insert height pointers
604612
err = operation.UpsertFinalizedHeight(lctx, w, lastFinalized.Height)
605613
if err != nil {

storage/locks.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const (
2929
LockBootstrapping = "lock_bootstrapping"
3030
// LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere
3131
LockInsertChunkDataPack = "lock_insert_chunk_data_pack"
32+
LockInsertSafetyData = "lock_insert_safety_data"
33+
LockInsertLivenessData = "lock_insert_liveness_data"
3234
)
3335

3436
// Locks returns a list of all named locks used by the storage layer.
@@ -42,6 +44,8 @@ func Locks() []string {
4244
LockInsertCollection,
4345
LockBootstrapping,
4446
LockInsertChunkDataPack,
47+
LockInsertSafetyData,
48+
LockInsertLivenessData,
4549
}
4650
}
4751

@@ -65,6 +69,9 @@ func makeLockPolicy() lockctx.Policy {
6569
return lockctx.NewDAGPolicyBuilder().
6670
Add(LockInsertBlock, LockFinalizeBlock).
6771
Add(LockFinalizeBlock, LockBootstrapping).
72+
Add(LockBootstrapping, LockInsertSafetyData).
73+
Add(LockInsertSafetyData, LockInsertLivenessData).
74+
Add(LockInsertOrFinalizeClusterBlock, LockInsertSafetyData).
6875
Add(LockInsertOwnReceipt, LockInsertChunkDataPack).
6976
Build()
7077
}

storage/operation/views.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
47
"github.com/onflow/flow-go/consensus/hotstuff"
58
"github.com/onflow/flow-go/model/flow"
69
"github.com/onflow/flow-go/storage"
@@ -9,11 +12,14 @@ import (
912
// UpsertSafetyData inserts or updates the given safety data for this node.
1013
// Intended for consensus participants only (consensus and collector nodes).
1114
// Here, `chainID` specifies which consensus instance specifically the node participates in.
12-
// CAUTION: OVERWRITES existing data (potential for data corruption).
1315
//
1416
// No errors are expected during normal operation.
15-
func UpsertSafetyData(w storage.Writer, chainID flow.ChainID, safetyData *hotstuff.SafetyData) error {
16-
return UpsertByKey(w, MakePrefix(codeSafetyData, chainID), safetyData)
17+
func UpsertSafetyData(lctx lockctx.Proof, rw storage.ReaderBatchWriter, chainID flow.ChainID, safetyData *hotstuff.SafetyData) error {
18+
if !lctx.HoldsLock(storage.LockInsertSafetyData) {
19+
return fmt.Errorf("missing required lock: storage.LockInsertSafetyData")
20+
}
21+
22+
return UpsertByKey(rw.Writer(), MakePrefix(codeSafetyData, chainID), safetyData)
1723
}
1824

1925
// RetrieveSafetyData retrieves the safety data for this node.
@@ -28,11 +34,14 @@ func RetrieveSafetyData(r storage.Reader, chainID flow.ChainID, safetyData *hots
2834
// UpsertLivenessData inserts or updates the given liveness data for this node.
2935
// Intended for consensus participants only (consensus and collector nodes).
3036
// Here, `chainID` specifies which consensus instance specifically the node participates in.
31-
// CAUTION: OVERWRITES existing data (potential for data corruption).
3237
//
3338
// No errors are expected during normal operation.
34-
func UpsertLivenessData(w storage.Writer, chainID flow.ChainID, livenessData *hotstuff.LivenessData) error {
35-
return UpsertByKey(w, MakePrefix(codeLivenessData, chainID), livenessData)
39+
func UpsertLivenessData(lctx lockctx.Proof, rw storage.ReaderBatchWriter, chainID flow.ChainID, livenessData *hotstuff.LivenessData) error {
40+
if !lctx.HoldsLock(storage.LockInsertLivenessData) {
41+
return fmt.Errorf("missing required lock: storage.LockInsertLivenessData")
42+
}
43+
44+
return UpsertByKey(rw.Writer(), MakePrefix(codeLivenessData, chainID), livenessData)
3645
}
3746

3847
// RetrieveSafetyData retrieves the safety data for this node.

0 commit comments

Comments
 (0)