Skip to content

Commit 26f6919

Browse files
authored
Merge pull request #7794 from onflow/leo/pebble-merge-master
[Feature] Pebble merge into master
2 parents f67f354 + 10979ae commit 26f6919

File tree

375 files changed

+11802
-11874
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

375 files changed

+11802
-11874
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ generate-mocks: install-mock-generators
174174
mockery --name '.*' --dir=module/component --case=underscore --output="./module/component/mock" --outpkg="component"
175175
mockery --name '.*' --dir=network --case=underscore --output="./network/mocknetwork" --outpkg="mocknetwork"
176176
mockery --name '.*' --dir=storage --case=underscore --output="./storage/mock" --outpkg="mock"
177-
mockery --name 'DeferredDBUpdate' --dir=storage/badger/transaction --case=underscore --output="storage/mock" --outpkg="mock"
178177
mockery --name '.*' --dir="state/protocol" --case=underscore --output="state/protocol/mock" --outpkg="mock"
179178
mockery --name '.*' --dir="state/protocol/events" --case=underscore --output="./state/protocol/events/mock" --outpkg="mock"
180179
mockery --name '.*' --dir="state/protocol/protocol_state" --case=underscore --output="state/protocol/protocol_state/mock" --outpkg="mock"

admin/commands/storage/read_range_cluster_blocks.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/dgraph-io/badger/v2"
87
"github.com/rs/zerolog/log"
98

109
"github.com/onflow/flow-go/admin"
1110
"github.com/onflow/flow-go/admin/commands"
1211
"github.com/onflow/flow-go/cmd/util/cmd/read-light-block"
1312
"github.com/onflow/flow-go/model/flow"
14-
storage "github.com/onflow/flow-go/storage/badger"
13+
"github.com/onflow/flow-go/storage"
14+
"github.com/onflow/flow-go/storage/store"
1515
)
1616

1717
var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
@@ -21,12 +21,12 @@ var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
2121
const Max_Range_Cluster_Block_Limit = uint64(10001)
2222

2323
type ReadRangeClusterBlocksCommand struct {
24-
db *badger.DB
25-
headers *storage.Headers
26-
payloads *storage.ClusterPayloads
24+
db storage.DB
25+
headers *store.Headers
26+
payloads *store.ClusterPayloads
2727
}
2828

29-
func NewReadRangeClusterBlocksCommand(db *badger.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
29+
func NewReadRangeClusterBlocksCommand(db storage.DB, headers *store.Headers, payloads *store.ClusterPayloads) commands.AdminCommand {
3030
return &ReadRangeClusterBlocksCommand{
3131
db: db,
3232
headers: headers,
@@ -51,7 +51,7 @@ func (c *ReadRangeClusterBlocksCommand) Handler(ctx context.Context, req *admin.
5151
return nil, admin.NewInvalidAdminReqErrorf("getting for more than %v blocks at a time might have an impact to node's performance and is not allowed", Max_Range_Cluster_Block_Limit)
5252
}
5353

54-
clusterBlocks := storage.NewClusterBlocks(
54+
clusterBlocks := store.NewClusterBlocks(
5555
c.db, flow.ChainID(chainID), c.headers, c.payloads,
5656
)
5757

cmd/access/node_builder/access_node_builder.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ import (
116116
"github.com/onflow/flow-go/state/protocol"
117117
badgerState "github.com/onflow/flow-go/state/protocol/badger"
118118
"github.com/onflow/flow-go/state/protocol/blocktimer"
119+
statedatastore "github.com/onflow/flow-go/state/protocol/datastore"
119120
"github.com/onflow/flow-go/storage"
120121
bstorage "github.com/onflow/flow-go/storage/badger"
121122
pstorage "github.com/onflow/flow-go/storage/pebble"
@@ -445,7 +446,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
445446
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
446447
// create a finalizer that will handle updating the protocol
447448
// state when the follower detects newly finalized blocks
448-
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
449+
final := finalizer.NewFinalizer(node.ProtocolDB.Reader(), node.Storage.Headers, builder.FollowerState, node.Tracer)
449450

450451
packer := signature.NewConsensusSigDataPacker(builder.Committee)
451452
// initialize the verifier for the protocol consensus
@@ -955,6 +956,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
955956
builder.RootChainID.Chain(),
956957
indexerDerivedChainData,
957958
notNil(builder.collectionExecutedMetric),
959+
node.StorageLockMgr,
958960
)
959961
if err != nil {
960962
return nil, err
@@ -1633,7 +1635,7 @@ func (builder *FlowAccessNodeBuilder) Initialize() error {
16331635

16341636
builder.EnqueueTracer()
16351637
builder.PreInit(cmd.DynamicStartPreInit)
1636-
builder.ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange)
1638+
builder.ValidateRootSnapshot(statedatastore.ValidRootSnapshotContainsEntityExpiryRange)
16371639

16381640
return nil
16391641
}
@@ -2135,6 +2137,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
21352137
notNil(builder.collections),
21362138
notNil(builder.transactions),
21372139
lastFullBlockHeight,
2140+
node.StorageLockMgr,
21382141
)
21392142
builder.RequestEng.WithHandle(collectionSyncer.OnCollectionDownloaded)
21402143

cmd/bootstrap/cmd/finalize.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/onflow/flow-go/model/dkg"
2424
"github.com/onflow/flow-go/model/flow"
2525
"github.com/onflow/flow-go/module/epochs"
26-
"github.com/onflow/flow-go/state/protocol/badger"
26+
"github.com/onflow/flow-go/state/protocol/datastore"
2727
"github.com/onflow/flow-go/state/protocol/inmem"
2828
"github.com/onflow/flow-go/state/protocol/protocol_state"
2929
"github.com/onflow/flow-go/state/protocol/protocol_state/kvstore"
@@ -203,13 +203,13 @@ func finalize(cmd *cobra.Command, args []string) {
203203

204204
// validate the generated root snapshot is valid
205205
verifyResultID := true
206-
err = badger.IsValidRootSnapshot(snapshot, verifyResultID)
206+
err = datastore.IsValidRootSnapshot(snapshot, verifyResultID)
207207
if err != nil {
208208
log.Fatal().Err(err).Msg("the generated root snapshot is invalid")
209209
}
210210

211211
// validate the generated root snapshot QCs
212-
err = badger.IsValidRootSnapshotQCs(snapshot)
212+
err = datastore.IsValidRootSnapshotQCs(snapshot)
213213
if err != nil {
214214
log.Fatal().Err(err).Msg("root snapshot contains invalid QCs")
215215
}
@@ -247,13 +247,13 @@ func finalize(cmd *cobra.Command, args []string) {
247247

248248
log.Info().Msg("saved result and seal are matching")
249249

250-
err = badger.IsValidRootSnapshot(rootSnapshot, verifyResultID)
250+
err = datastore.IsValidRootSnapshot(rootSnapshot, verifyResultID)
251251
if err != nil {
252252
log.Fatal().Err(err).Msg("saved snapshot is invalid")
253253
}
254254

255255
// validate the generated root snapshot QCs
256-
err = badger.IsValidRootSnapshotQCs(snapshot)
256+
err = datastore.IsValidRootSnapshotQCs(snapshot)
257257
if err != nil {
258258
log.Fatal().Err(err).Msg("root snapshot contains invalid QCs")
259259
}

cmd/collection/main.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import (
5757
badgerState "github.com/onflow/flow-go/state/protocol/badger"
5858
"github.com/onflow/flow-go/state/protocol/blocktimer"
5959
"github.com/onflow/flow-go/state/protocol/events/gadgets"
60-
"github.com/onflow/flow-go/storage/badger"
60+
"github.com/onflow/flow-go/storage/store"
6161
"github.com/onflow/flow-go/utils/grpcutils"
6262
)
6363

@@ -223,12 +223,9 @@ func main() {
223223
return collectionCommands.NewTxRateLimitCommand(addressRateLimiter)
224224
}).
225225
AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand {
226-
clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
227-
headers, ok := conf.Storage.Headers.(*badger.Headers)
228-
if !ok {
229-
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as badger headers")
230-
}
231-
return storageCommands.NewReadRangeClusterBlocksCommand(conf.DB, headers, clusterPayloads)
226+
clusterPayloads := store.NewClusterPayloads(&metrics.NoopCollector{}, conf.ProtocolDB)
227+
headers := store.NewHeaders(&metrics.NoopCollector{}, conf.ProtocolDB)
228+
return storageCommands.NewReadRangeClusterBlocksCommand(conf.ProtocolDB, headers, clusterPayloads)
232229
}).
233230
Module("follower distributor", func(node *cmd.NodeConfig) error {
234231
followerDistributor = pubsub.NewFollowerDistributor()
@@ -370,7 +367,7 @@ func main() {
370367
Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
371368
// create a finalizer for updating the protocol
372369
// state when the follower detects newly finalized blocks
373-
finalizer := confinalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)
370+
finalizer := confinalizer.NewFinalizer(node.ProtocolDB.Reader(), node.Storage.Headers, followerState, node.Tracer)
374371
finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
375372
if err != nil {
376373
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
@@ -531,7 +528,7 @@ func main() {
531528
// Epoch manager encapsulates and manages epoch-dependent engines as we
532529
// transition between epochs
533530
Component("epoch manager", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
534-
clusterStateFactory, err := factories.NewClusterStateFactory(node.DB, node.Metrics.Cache, node.Tracer)
531+
clusterStateFactory, err := factories.NewClusterStateFactory(node.ProtocolDB, node.StorageLockMgr, node.Metrics.Cache, node.Tracer)
535532
if err != nil {
536533
return nil, err
537534
}
@@ -551,8 +548,9 @@ func main() {
551548
}
552549

553550
builderFactory, err := factories.NewBuilderFactory(
554-
node.DB,
551+
node.ProtocolDB,
555552
node.State,
553+
node.StorageLockMgr,
556554
node.Storage.Headers,
557555
node.Tracer,
558556
colMetrics,

cmd/consensus/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import (
6565
"github.com/onflow/flow-go/state/protocol"
6666
badgerState "github.com/onflow/flow-go/state/protocol/badger"
6767
"github.com/onflow/flow-go/state/protocol/blocktimer"
68+
"github.com/onflow/flow-go/state/protocol/datastore"
6869
"github.com/onflow/flow-go/state/protocol/events/gadgets"
6970
protocol_state "github.com/onflow/flow-go/state/protocol/protocol_state/state"
7071
bstorage "github.com/onflow/flow-go/storage/badger"
@@ -200,7 +201,7 @@ func main() {
200201

201202
nodeBuilder.
202203
PreInit(cmd.DynamicStartPreInit).
203-
ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange).
204+
ValidateRootSnapshot(datastore.ValidRootSnapshotContainsEntityExpiryRange).
204205
Module("machine account config", func(node *cmd.NodeConfig) error {
205206
machineAccountInfo, err = cmd.LoadNodeMachineAccountInfoFile(node.BootstrapDir, node.NodeID)
206207
return err
@@ -577,7 +578,7 @@ func main() {
577578
Component("hotstuff modules", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
578579
// initialize the block finalizer
579580
finalize := finalizer.NewFinalizer(
580-
node.DB,
581+
node.ProtocolDB.Reader(),
581582
node.Storage.Headers,
582583
mutableState,
583584
node.Tracer,
@@ -740,6 +741,7 @@ func main() {
740741
return ctl, nil
741742
}).
742743
Component("consensus participant", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
744+
// create different epochs setups
743745
mutableProtocolState := protocol_state.NewMutableProtocolState(
744746
node.Logger,
745747
node.Storage.EpochProtocolStateEntries,

cmd/dynamic_startup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func DynamicStartPreInit(nodeConfig *NodeConfig) error {
5959
log := nodeConfig.Logger.With().Str("component", "dynamic-startup").Logger()
6060

6161
// CASE 1: The state is already bootstrapped - nothing to do
62-
isBootstrapped, err := badgerstate.IsBootstrapped(nodeConfig.DB)
62+
isBootstrapped, err := badgerstate.IsBootstrapped(nodeConfig.ProtocolDB)
6363
if err != nil {
6464
return fmt.Errorf("could not check if state is boostrapped: %w", err)
6565
}

cmd/execution_builder.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ func (exeNode *ExecutionNode) LoadExecutionState(
745745
error,
746746
) {
747747

748-
chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(
748+
chunkDataPackDB, err := storagepebble.SafeOpen(
749749
node.Logger.With().Str("pebbledb", "cdp").Logger(),
750750
exeNode.exeConf.chunkDataPackDir,
751751
)
@@ -791,6 +791,7 @@ func (exeNode *ExecutionNode) LoadExecutionState(
791791
node.Tracer,
792792
exeNode.registerStore,
793793
exeNode.exeConf.enableStorehouse,
794+
node.StorageLockMgr,
794795
)
795796

796797
height, _, err := exeNode.executionState.GetLastExecutedBlockID(context.Background())
@@ -1193,7 +1194,7 @@ func (exeNode *ExecutionNode) LoadFollowerCore(
11931194
) {
11941195
// create a finalizer that handles updating the protocol
11951196
// state when the follower detects newly finalized blocks
1196-
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, exeNode.followerState, node.Tracer)
1197+
final := finalizer.NewFinalizer(node.ProtocolDB.Reader(), node.Storage.Headers, exeNode.followerState, node.Tracer)
11971198

11981199
finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
11991200
if err != nil {
@@ -1383,7 +1384,7 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
13831384
// in order to support switching from badger to pebble in the middle of the spork,
13841385
// we will check if the execution database has been bootstrapped by reading the state from badger db.
13851386
// and if not, bootstrap both badger and pebble db.
1386-
commit, bootstrapped, err := bootstrapper.IsBootstrapped(badgerimpl.ToDB(node.DB))
1387+
commit, bootstrapped, err := bootstrapper.IsBootstrapped(node.ProtocolDB)
13871388
if err != nil {
13881389
return fmt.Errorf("could not query database to know whether database has been bootstrapped: %w", err)
13891390
}
@@ -1410,12 +1411,12 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
14101411
return fmt.Errorf("could not load bootstrap state from checkpoint file: %w", err)
14111412
}
14121413

1413-
err = bootstrapper.BootstrapExecutionDatabase(badgerimpl.ToDB(node.DB), node.RootSeal)
1414+
err = bootstrapper.BootstrapExecutionDatabase(node.StorageLockMgr, badgerimpl.ToDB(node.DB), node.RootSeal)
14141415
if err != nil {
14151416
return fmt.Errorf("could not bootstrap execution database: %w", err)
14161417
}
14171418

1418-
err = bootstrapper.BootstrapExecutionDatabase(pebbleimpl.ToDB(node.PebbleDB), node.RootSeal)
1419+
err = bootstrapper.BootstrapExecutionDatabase(node.StorageLockMgr, pebbleimpl.ToDB(node.PebbleDB), node.RootSeal)
14191420
if err != nil {
14201421
return fmt.Errorf("could not bootstrap execution database: %w", err)
14211422
}

cmd/execution_config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ import (
1616
"github.com/onflow/flow-go/model/flow"
1717
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
1818
"github.com/onflow/flow-go/module/mempool"
19+
"github.com/onflow/flow-go/storage/store"
1920
"github.com/onflow/flow-go/utils/grpcutils"
2021

2122
"github.com/onflow/flow-go/engine/execution/computation"
2223
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
2324
"github.com/onflow/flow-go/engine/execution/rpc"
2425
"github.com/onflow/flow-go/fvm/storage/derived"
25-
storage "github.com/onflow/flow-go/storage/badger"
2626
)
2727

2828
// ExecutionConfig contains the configs for starting up execution nodes
@@ -99,7 +99,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
9999
flags.IntVar(&exeConf.computationConfig.MaxConcurrency, "computer-max-concurrency", 1, "set to greater than 1 to enable concurrent transaction execution")
100100
flags.StringVar(&exeConf.chunkDataPackDir, "chunk-data-pack-dir", filepath.Join(datadir, "chunk_data_packs"), "directory to use for storing chunk data packs")
101101
flags.StringVar(&exeConf.chunkDataPackCheckpointsDir, "chunk-data-pack-checkpoints-dir", filepath.Join(datadir, "chunk_data_packs_checkpoints_dir"), "directory to use storing chunk data packs pebble database checkpoints for querying while the node is running")
102-
flags.UintVar(&exeConf.chunkDataPackCacheSize, "chdp-cache", storage.DefaultCacheSize, "cache size for chunk data packs")
102+
flags.UintVar(&exeConf.chunkDataPackCacheSize, "chdp-cache", store.DefaultCacheSize, "cache size for chunk data packs")
103103
flags.Uint32Var(&exeConf.chunkDataPackRequestsCacheSize, "chdp-request-queue", mempool.DefaultChunkDataPackRequestQueueSize, "queue size for chunk data pack requests")
104104
flags.DurationVar(&exeConf.requestInterval, "request-interval", 60*time.Second, "the interval between requests for the requester engine")
105105
flags.Uint32Var(&exeConf.receiptRequestsCacheSize, "receipt-request-cache", provider.DefaultEntityRequestCacheSize, "queue size for entity requests at common provider engine")

cmd/observer/node_builder/observer_builder.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ func (builder *ObserverServiceBuilder) buildFollowerCore() *ObserverServiceBuild
465465
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
466466
// create a finalizer that will handle updating the protocol
467467
// state when the follower detects newly finalized blocks
468-
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
468+
final := finalizer.NewFinalizer(node.ProtocolDB.Reader(), node.Storage.Headers, builder.FollowerState, node.Tracer)
469469

470470
followerCore, err := consensus.NewFollower(
471471
node.Logger,
@@ -1452,6 +1452,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
14521452
builder.RootChainID.Chain(),
14531453
indexerDerivedChainData,
14541454
collectionExecutedMetric,
1455+
node.StorageLockMgr,
14551456
)
14561457
if err != nil {
14571458
return nil, err

0 commit comments

Comments
 (0)