Skip to content

Commit 63ffbd8

Browse files
authored
Merge branch 'master' into peter/7830-scheduled-tx-index
2 parents 1f6925d + 78c4a13 commit 63ffbd8

File tree

26 files changed

+187
-96
lines changed

26 files changed

+187
-96
lines changed

cmd/collection/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func main() {
636636
node.Logger,
637637
node.Me,
638638
node.ProtocolDB,
639+
node.StorageLockMgr,
639640
node.State,
640641
node.Metrics.Engine,
641642
node.Metrics.Mempool,

cmd/consensus/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ func main() {
396396
multipleReceiptsFilterMempool,
397397
consensusMempools.LogForkAndCrash(node.Logger),
398398
node.ProtocolDB,
399+
node.StorageLockMgr,
399400
node.Logger,
400401
)
401402
if err != nil {
@@ -621,7 +622,7 @@ func main() {
621622
notifier.AddFollowerConsumer(followerDistributor)
622623

623624
// initialize the persister
624-
persist, err := persister.New(node.ProtocolDB, node.RootChainID)
625+
persist, err := persister.New(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
625626
if err != nil {
626627
return nil, err
627628
}

cmd/scaffold.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func (fnb *FlowNodeBuilder) EnqueuePingService() {
303303
var hotstuffViewFunc func() (uint64, error)
304304
// Setup consensus nodes to report their HotStuff view
305305
if fnb.BaseConfig.NodeRole == flow.RoleConsensus.String() {
306-
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID)
306+
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
307307
if err != nil {
308308
return nil, err
309309
}

cmd/util/cmd/read-hotstuff/cmd/get_liveness.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ func init() {
2222

2323
func runGetLivenessData(*cobra.Command, []string) {
2424
err := common.WithStorage(flagDatadir, func(db storage.DB) error {
25+
lockManager := storage.NewTestingLockManager()
26+
2527
chainID := flow.ChainID(flagChain)
26-
reader, err := persister.NewReader(db, chainID)
28+
reader, err := persister.NewReader(db, chainID, lockManager)
2729
if err != nil {
2830
log.Fatal().Err(err).Msg("could not create reader from db")
2931
}

cmd/util/cmd/read-hotstuff/cmd/get_safety.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ func init() {
2222

2323
func runGetSafetyData(*cobra.Command, []string) {
2424
err := common.WithStorage(flagDatadir, func(db storage.DB) error {
25+
lockManager := storage.NewTestingLockManager()
2526

2627
chainID := flow.ChainID(flagChain)
27-
reader, err := persister.NewReader(db, chainID)
28+
reader, err := persister.NewReader(db, chainID, lockManager)
2829
if err != nil {
2930
log.Fatal().Err(err).Msg("could not create reader from db")
3031
}

consensus/hotstuff/persister/persister.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package persister
33
import (
44
"fmt"
55

6+
"github.com/jordanschalm/lockctx"
7+
68
"github.com/onflow/flow-go/consensus/hotstuff"
79
"github.com/onflow/flow-go/model/flow"
810
"github.com/onflow/flow-go/storage"
@@ -15,8 +17,9 @@ import (
1517
// SafetyData and LivenessData, for each distinct chain ID. This bootstrapping must be complete
1618
// before constructing a Persister instance with New (otherwise it will return an error).
1719
type Persister struct {
18-
db storage.DB
19-
chainID flow.ChainID
20+
db storage.DB
21+
chainID flow.ChainID
22+
lockManager lockctx.Manager
2023
}
2124

2225
var _ hotstuff.Persister = (*Persister)(nil)
@@ -26,15 +29,16 @@ var _ hotstuff.PersisterReader = (*Persister)(nil)
2629
// Persister depends on protocol.State and cluster.State bootstrapping to set initial values for
2730
// SafetyData and LivenessData, for each distinct chain ID. This bootstrapping must be completed
2831
// before first using a Persister instance.
29-
func New(db storage.DB, chainID flow.ChainID) (*Persister, error) {
32+
func New(db storage.DB, chainID flow.ChainID, lockManager lockctx.Manager) (*Persister, error) {
3033
err := ensureSafetyDataAndLivenessDataAreBootstrapped(db, chainID)
3134
if err != nil {
3235
return nil, fmt.Errorf("fail to check persister was properly bootstrapped: %w", err)
3336
}
3437

3538
p := &Persister{
36-
db: db,
37-
chainID: chainID,
39+
db: db,
40+
chainID: chainID,
41+
lockManager: lockManager,
3842
}
3943
return p, nil
4044
}
@@ -62,8 +66,8 @@ func ensureSafetyDataAndLivenessDataAreBootstrapped(db storage.DB, chainID flow.
6266
}
6367

6468
// NewReader returns a new Persister as a PersisterReader type (only read methods accessible).
65-
func NewReader(db storage.DB, chainID flow.ChainID) (hotstuff.PersisterReader, error) {
66-
return New(db, chainID)
69+
func NewReader(db storage.DB, chainID flow.ChainID, lockManager lockctx.Manager) (hotstuff.PersisterReader, error) {
70+
return New(db, chainID, lockManager)
6771
}
6872

6973
// GetSafetyData will retrieve last persisted safety data.
@@ -85,15 +89,19 @@ func (p *Persister) GetLivenessData() (*hotstuff.LivenessData, error) {
8589
// PutSafetyData persists the last safety data.
8690
// During normal operations, no errors are expected.
8791
func (p *Persister) PutSafetyData(safetyData *hotstuff.SafetyData) error {
88-
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
89-
return operation.UpsertSafetyData(rw.Writer(), p.chainID, safetyData)
92+
return storage.WithLock(p.lockManager, storage.LockInsertSafetyData, func(lctx lockctx.Context) error {
93+
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
94+
return operation.UpsertSafetyData(lctx, rw, p.chainID, safetyData)
95+
})
9096
})
9197
}
9298

9399
// PutLivenessData persists the last liveness data.
94100
// During normal operations, no errors are expected.
95101
func (p *Persister) PutLivenessData(livenessData *hotstuff.LivenessData) error {
96-
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
97-
return operation.UpsertLivenessData(rw.Writer(), p.chainID, livenessData)
102+
return storage.WithLock(p.lockManager, storage.LockInsertLivenessData, func(lctx lockctx.Context) error {
103+
return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
104+
return operation.UpsertLivenessData(lctx, rw, p.chainID, livenessData)
105+
})
98106
})
99107
}

consensus/integration/nodes_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ func createNode(
563563

564564
signer := verification.NewCombinedSigner(me, beaconKeyStore)
565565

566-
persist, err := persister.New(db, rootHeader.ChainID)
566+
persist, err := persister.New(db, rootHeader.ChainID, lockManager)
567567
require.NoError(t, err)
568568

569569
livenessData, err := persist.GetLivenessData()

engine/collection/epochmgr/factories/hotstuff.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type HotStuffFactory struct {
3232
baseLogger zerolog.Logger
3333
me module.Local
3434
db storage.DB
35+
lockManager storage.LockManager
3536
protoState protocol.State
3637
engineMetrics module.EngineMetrics
3738
mempoolMetrics module.MempoolMetrics
@@ -43,6 +44,7 @@ func NewHotStuffFactory(
4344
log zerolog.Logger,
4445
me module.Local,
4546
db storage.DB,
47+
lockManager storage.LockManager,
4648
protoState protocol.State,
4749
engineMetrics module.EngineMetrics,
4850
mempoolMetrics module.MempoolMetrics,
@@ -54,6 +56,7 @@ func NewHotStuffFactory(
5456
baseLogger: log,
5557
me: me,
5658
db: db,
59+
lockManager: lockManager,
5760
protoState: protoState,
5861
engineMetrics: engineMetrics,
5962
mempoolMetrics: mempoolMetrics,
@@ -155,7 +158,7 @@ func (f *HotStuffFactory) CreateModules(
155158
return nil, nil, err
156159
}
157160

158-
persist, err := persister.New(f.db, cluster.ChainID())
161+
persist, err := persister.New(f.db, cluster.ChainID(), f.lockManager)
159162
if err != nil {
160163
return nil, nil, err
161164
}

engine/testutil/nodes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro
377377
node.Log,
378378
node.Me,
379379
db,
380+
node.LockManager,
380381
node.State,
381382
node.Metrics,
382383
node.Metrics,

module/mempool/consensus/exec_fork_suppressor.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"sync"
77

8+
"github.com/jordanschalm/lockctx"
89
"github.com/rs/zerolog"
910
"github.com/rs/zerolog/log"
1011
"go.uber.org/atomic"
@@ -46,6 +47,7 @@ type ExecForkSuppressor struct {
4647
execForkDetected atomic.Bool
4748
onExecFork ExecForkActor
4849
execForkEvidenceStore storage.ExecutionForkEvidence
50+
lockManager storage.LockManager
4951
log zerolog.Logger
5052
}
5153

@@ -61,6 +63,7 @@ func NewExecStateForkSuppressor(
6163
seals mempool.IncorporatedResultSeals,
6264
onExecFork ExecForkActor,
6365
db storage.DB,
66+
lockManager storage.LockManager,
6467
log zerolog.Logger,
6568
) (*ExecForkSuppressor, error) {
6669
executionForkEvidenceStore := store.NewExecutionForkEvidence(db)
@@ -83,6 +86,7 @@ func NewExecStateForkSuppressor(
8386
execForkDetected: *atomic.NewBool(execForkDetectedFlag),
8487
onExecFork: onExecFork,
8588
execForkEvidenceStore: executionForkEvidenceStore,
89+
lockManager: lockManager,
8690
log: log.With().Str("mempool", "ExecForkSuppressor").Logger(),
8791
}
8892

@@ -365,9 +369,13 @@ func (s *ExecForkSuppressor) filterConflictingSeals(sealsByBlockID map[flow.Iden
365369
s.execForkDetected.Store(true)
366370
s.Clear()
367371
conflictingSeals = append(sealsList{candidateSeal}, conflictingSeals...)
368-
err := s.execForkEvidenceStore.StoreIfNotExists(conflictingSeals)
372+
373+
// Acquire lock and store execution fork evidence
374+
err := storage.WithLock(s.lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
375+
return s.execForkEvidenceStore.StoreIfNotExists(lctx, conflictingSeals)
376+
})
369377
if err != nil {
370-
panic("failed to store execution fork evidence")
378+
s.log.Fatal().Msg("failed to store execution fork evidence")
371379
}
372380
s.onExecFork(conflictingSeals)
373381
return nil

0 commit comments

Comments
 (0)