diff --git a/admin/commands/storage/read_range_cluster_blocks.go b/admin/commands/storage/read_range_cluster_blocks.go index b0e41b86fe8..6e2a288f70c 100644 --- a/admin/commands/storage/read_range_cluster_blocks.go +++ b/admin/commands/storage/read_range_cluster_blocks.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/admin/commands" "github.com/onflow/flow-go/cmd/util/cmd/read-light-block" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -22,14 +23,12 @@ const Max_Range_Cluster_Block_Limit = uint64(10001) type ReadRangeClusterBlocksCommand struct { db storage.DB - headers *store.Headers payloads *store.ClusterPayloads } -func NewReadRangeClusterBlocksCommand(db storage.DB, headers *store.Headers, payloads *store.ClusterPayloads) commands.AdminCommand { +func NewReadRangeClusterBlocksCommand(db storage.DB, payloads *store.ClusterPayloads) commands.AdminCommand { return &ReadRangeClusterBlocksCommand{ db: db, - headers: headers, payloads: payloads, } } @@ -51,8 +50,9 @@ func (c *ReadRangeClusterBlocksCommand) Handler(ctx context.Context, req *admin. return nil, admin.NewInvalidAdminReqErrorf("getting for more than %v blocks at a time might have an impact to node's performance and is not allowed", Max_Range_Cluster_Block_Limit) } + clusterHeaders := store.NewClusterHeaders(&metrics.NoopCollector{}, c.db, flow.ChainID(chainID)) clusterBlocks := store.NewClusterBlocks( - c.db, flow.ChainID(chainID), c.headers, c.payloads, + c.db, flow.ChainID(chainID), clusterHeaders, c.payloads, ) lights, err := read.ReadClusterLightBlockByHeightRange(clusterBlocks, reqData.startHeight, reqData.endHeight) diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 08fd542449b..5bcaa9b44f1 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -235,8 +235,8 @@ func main() { }). AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand { clusterPayloads := store.NewClusterPayloads(&metrics.NoopCollector{}, conf.ProtocolDB) - headers := store.NewHeaders(&metrics.NoopCollector{}, conf.ProtocolDB) - return storageCommands.NewReadRangeClusterBlocksCommand(conf.ProtocolDB, headers, clusterPayloads) + // defer construction of Headers since the cluster's ChainID is provided by the command + return storageCommands.NewReadRangeClusterBlocksCommand(conf.ProtocolDB, clusterPayloads) }). Module("follower distributor", func(node *cmd.NodeConfig) error { followerDistributor = pubsub.NewFollowerDistributor() diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 142f2d55c6d..469f4e16c43 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -1186,8 +1186,37 @@ func (fnb *FlowNodeBuilder) initStorageLockManager() error { return nil } +// determineChainID attempts to determine the chain this node is running on +// directly from the database or root snapshot, before storage interfaces have been initialized. +// No errors expected during normal operation. +func (fnb *FlowNodeBuilder) determineChainID() error { + bootstrapped, err := badgerState.IsBootstrapped(fnb.ProtocolDB) + if err != nil { + return err + } + if bootstrapped { + chainID, err := badgerState.GetChainIDFromLatestFinalizedHeader(fnb.ProtocolDB) + if err != nil { + return err + } + fnb.RootChainID = chainID + } else { + // try reading root snapshot from disk (full bootstrap will happen later) + fnb.Logger.Info().Msgf("loading root protocol state snapshot from disk") + rootSnapshot, err := loadRootProtocolSnapshot(fnb.BaseConfig.BootstrapDir) + if err != nil { + return fmt.Errorf("failed to read protocol snapshot from disk: %w", err) + } + // set root snapshot fields, including fnb.RootChainID + if err := fnb.setRootSnapshot(rootSnapshot); err != nil { + return err + } + } + return nil +} + func (fnb *FlowNodeBuilder) initStorage() error { - headers := store.NewHeaders(fnb.Metrics.Cache, fnb.ProtocolDB) + headers := store.NewHeaders(fnb.Metrics.Cache, fnb.ProtocolDB, fnb.RootChainID) guarantees := store.NewGuarantees(fnb.Metrics.Cache, fnb.ProtocolDB, fnb.BaseConfig.guaranteesCacheSize, store.DefaultCacheSize) seals := store.NewSeals(fnb.Metrics.Cache, fnb.ProtocolDB) @@ -2081,6 +2110,10 @@ func (fnb *FlowNodeBuilder) onStart() error { return err } + if err := fnb.determineChainID(); err != nil { + return err + } + if err := fnb.initStorage(); err != nil { return err } diff --git a/cmd/util/cmd/common/storage.go b/cmd/util/cmd/common/storage.go index 13cd2170f98..1c6f4606ae0 100644 --- a/cmd/util/cmd/common/storage.go +++ b/cmd/util/cmd/common/storage.go @@ -5,6 +5,7 @@ import ( "github.com/rs/zerolog/log" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" storagebadger "github.com/onflow/flow-go/storage/badger" @@ -53,9 +54,9 @@ func IsPebbleFolder(dataDir string) (bool, error) { return pebblestorage.IsPebbleFolder(dataDir) } -func InitStorages(db storage.DB) *store.All { +func InitStorages(db storage.DB, chainID flow.ChainID) *store.All { metrics := &metrics.NoopCollector{} - return store.InitAll(metrics, db) + return store.InitAll(metrics, db, chainID) } // WithStorage runs the given function with the storage depending on the flags. diff --git a/cmd/util/cmd/exec-data-json-export/block_exporter.go b/cmd/util/cmd/exec-data-json-export/block_exporter.go index 32efa2e78a5..354c75420f5 100644 --- a/cmd/util/cmd/exec-data-json-export/block_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/block_exporter.go @@ -12,6 +12,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -37,9 +38,13 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) (fl // traverse backward from the given block (parent block) and fetch by blockHash err := common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) index := store.NewIndex(cacheMetrics, db) guarantees := store.NewGuarantees(cacheMetrics, db, store.DefaultCacheSize, store.DefaultCacheSize) seals := store.NewSeals(cacheMetrics, db) diff --git a/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go b/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go index 3e3d6971a51..ca14cb9bb32 100644 --- a/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/fvm/storage/snapshot" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/store" @@ -26,9 +27,13 @@ func ExportDeltaSnapshots(blockID flow.Identifier, dbPath string, outputPath str // traverse backward from the given block (parent block) and fetch by blockHash return common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) activeBlockID := blockID outputFile := filepath.Join(outputPath, "delta.jsonl") diff --git a/cmd/util/cmd/exec-data-json-export/event_exporter.go b/cmd/util/cmd/exec-data-json-export/event_exporter.go index 600f4d45af3..0020dccb2a7 100644 --- a/cmd/util/cmd/exec-data-json-export/event_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/event_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -30,9 +31,13 @@ func ExportEvents(blockID flow.Identifier, dbPath string, outputPath string) err // traverse backward from the given block (parent block) and fetch by blockHash return common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) events := store.NewEvents(cacheMetrics, db) activeBlockID := blockID diff --git a/cmd/util/cmd/exec-data-json-export/result_exporter.go b/cmd/util/cmd/exec-data-json-export/result_exporter.go index 6afefd7ee9a..df78cabd221 100644 --- a/cmd/util/cmd/exec-data-json-export/result_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/result_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -28,9 +29,13 @@ func ExportResults(blockID flow.Identifier, dbPath string, outputPath string) er // traverse backward from the given block (parent block) and fetch by blockHash return common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) results := store.NewExecutionResults(cacheMetrics, db) activeBlockID := blockID diff --git a/cmd/util/cmd/exec-data-json-export/transaction_exporter.go b/cmd/util/cmd/exec-data-json-export/transaction_exporter.go index 5c8f937437d..23437a99894 100644 --- a/cmd/util/cmd/exec-data-json-export/transaction_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/transaction_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage/store" ) @@ -48,6 +49,11 @@ func ExportExecutedTransactions(blockID flow.Identifier, dbPath string, outputPa } defer db.Close() + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + cacheMetrics := &metrics.NoopCollector{} index := store.NewIndex(cacheMetrics, db) guarantees := store.NewGuarantees(cacheMetrics, db, store.DefaultCacheSize, store.DefaultCacheSize) @@ -55,7 +61,7 @@ func ExportExecutedTransactions(blockID flow.Identifier, dbPath string, outputPa results := store.NewExecutionResults(cacheMetrics, db) receipts := store.NewExecutionReceipts(cacheMetrics, db, results, store.DefaultCacheSize) transactions := store.NewTransactions(cacheMetrics, db) - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) payloads := store.NewPayloads(db, index, guarantees, seals, receipts, results) blocks := store.NewBlocks(db, headers, payloads) collections := store.NewCollections(db, transactions) diff --git a/cmd/util/cmd/export-json-transactions/cmd.go b/cmd/util/cmd/export-json-transactions/cmd.go index c8ba51cf2b5..13e09aa93a0 100644 --- a/cmd/util/cmd/export-json-transactions/cmd.go +++ b/cmd/util/cmd/export-json-transactions/cmd.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/cmd/util/cmd/export-json-transactions/transactions" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -67,7 +68,11 @@ func ExportTransactions(lockManager lockctx.Manager, dataDir string, outputDir s // init dependencies return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { diff --git a/cmd/util/cmd/export-json-transactions/transactions/range_test.go b/cmd/util/cmd/export-json-transactions/transactions/range_test.go index 9d85f5c6232..d8f6d58644c 100644 --- a/cmd/util/cmd/export-json-transactions/transactions/range_test.go +++ b/cmd/util/cmd/export-json-transactions/transactions/range_test.go @@ -60,7 +60,7 @@ func TestFindBlockTransactions(t *testing.T) { ) // prepare dependencies - storages := common.InitStorages(db) + storages := common.InitStorages(db, b1.ChainID) payloads, collections := storages.Payloads, storages.Collections snap4 := &mock.Snapshot{} snap4.On("Head").Return(b1.ToHeader(), nil) diff --git a/cmd/util/cmd/find-inconsistent-result/cmd.go b/cmd/util/cmd/find-inconsistent-result/cmd.go index 16636dd5fc9..f20593cec91 100644 --- a/cmd/util/cmd/find-inconsistent-result/cmd.go +++ b/cmd/util/cmd/find-inconsistent-result/cmd.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/block_iterator/latest" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -95,7 +96,11 @@ func findFirstMismatch(datadir string, startHeight, endHeight uint64, lockManage func createStorages(db storage.DB, lockManager lockctx.Manager) ( storage.Headers, storage.ExecutionResults, storage.Seals, protocol.State, error) { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return nil, nil, nil, nil, err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return nil, nil, nil, nil, fmt.Errorf("could not open protocol state: %v", err) diff --git a/cmd/util/cmd/read-badger/cmd/blocks.go b/cmd/util/cmd/read-badger/cmd/blocks.go index 1e60bf85487..839a942e643 100644 --- a/cmd/util/cmd/read-badger/cmd/blocks.go +++ b/cmd/util/cmd/read-badger/cmd/blocks.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -28,8 +29,12 @@ var blocksCmd = &cobra.Command{ Short: "get a block by block ID or height", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) index := store.NewIndex(cacheMetrics, db) guarantees := store.NewGuarantees(cacheMetrics, db, store.DefaultCacheSize, store.DefaultCacheSize) seals := store.NewSeals(cacheMetrics, db) @@ -39,7 +44,6 @@ var blocksCmd = &cobra.Command{ blocks := store.NewBlocks(db, headers, payloads) var block *flow.Block - var err error if flagBlockID != "" { log.Info().Msgf("got flag block id: %s", flagBlockID) diff --git a/cmd/util/cmd/read-badger/cmd/cluster_blocks.go b/cmd/util/cmd/read-badger/cmd/cluster_blocks.go index 6d094fd10ae..af2f98b8b62 100644 --- a/cmd/util/cmd/read-badger/cmd/cluster_blocks.go +++ b/cmd/util/cmd/read-badger/cmd/cluster_blocks.go @@ -34,12 +34,12 @@ var clusterBlocksCmd = &cobra.Command{ return common.WithStorage(flagDatadir, func(db storage.DB) error { metrics := metrics.NewNoopCollector() - headers := store.NewHeaders(metrics, db) - clusterPayloads := store.NewClusterPayloads(metrics, db) - // get chain id log.Info().Msgf("got flag chain name: %s", flagChainName) chainID := flow.ChainID(flagChainName) + + headers := store.NewClusterHeaders(metrics, db, chainID) + clusterPayloads := store.NewClusterPayloads(metrics, db) clusterBlocks := store.NewClusterBlocks(db, chainID, headers, clusterPayloads) if flagClusterBlockID != "" && flagHeight != 0 { diff --git a/cmd/util/cmd/read-badger/cmd/collections.go b/cmd/util/cmd/read-badger/cmd/collections.go index b59817cad8e..b16b0ffac6e 100644 --- a/cmd/util/cmd/read-badger/cmd/collections.go +++ b/cmd/util/cmd/read-badger/cmd/collections.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -28,7 +29,11 @@ var collectionsCmd = &cobra.Command{ Short: "get collection by collection or transaction ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used if flagCollectionID != "" { log.Info().Msgf("got flag collection id: %s", flagCollectionID) diff --git a/cmd/util/cmd/read-badger/cmd/epoch_commit.go b/cmd/util/cmd/read-badger/cmd/epoch_commit.go index 23deb37105b..a6d7341dab7 100644 --- a/cmd/util/cmd/read-badger/cmd/epoch_commit.go +++ b/cmd/util/cmd/read-badger/cmd/epoch_commit.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -25,7 +26,11 @@ var epochCommitCmd = &cobra.Command{ Short: "get epoch commit by ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag commit id: %s", flagEpochCommitID) commitID, err := flow.HexStringToIdentifier(flagEpochCommitID) diff --git a/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go b/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go index 0a7922e4cf3..73d6c4e7838 100644 --- a/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go +++ b/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -23,7 +24,11 @@ var epochProtocolStateCmd = &cobra.Command{ Short: "get epoch protocol state by block ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag block id: %s", flagBlockID) blockID, err := flow.HexStringToIdentifier(flagBlockID) diff --git a/cmd/util/cmd/read-badger/cmd/guarantees.go b/cmd/util/cmd/read-badger/cmd/guarantees.go index 14adf48588a..e9196fa2a24 100644 --- a/cmd/util/cmd/read-badger/cmd/guarantees.go +++ b/cmd/util/cmd/read-badger/cmd/guarantees.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -23,7 +24,11 @@ var guaranteesCmd = &cobra.Command{ Short: "get guarantees by collection ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag collection id: %s", flagCollectionID) collectionID, err := flow.HexStringToIdentifier(flagCollectionID) diff --git a/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go b/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go index d434d6e1aaa..2908b6cead7 100644 --- a/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go +++ b/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -30,7 +31,11 @@ var protocolStateKVStore = &cobra.Command{ Short: "get protocol state kvstore by block ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag block id: %s", flagBlockID) blockID, err := flow.HexStringToIdentifier(flagBlockID) diff --git a/cmd/util/cmd/read-badger/cmd/seals.go b/cmd/util/cmd/read-badger/cmd/seals.go index f6ebaa2e180..5baff53402f 100644 --- a/cmd/util/cmd/read-badger/cmd/seals.go +++ b/cmd/util/cmd/read-badger/cmd/seals.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -25,7 +26,11 @@ var sealsCmd = &cobra.Command{ Short: "get seals by block or seal ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used if flagSealID != "" && flagBlockID != "" { return fmt.Errorf("provide one of the flags --id or --block-id") diff --git a/cmd/util/cmd/read-badger/cmd/transaction_results.go b/cmd/util/cmd/read-badger/cmd/transaction_results.go index 44285007cc7..01dfe91018e 100644 --- a/cmd/util/cmd/read-badger/cmd/transaction_results.go +++ b/cmd/util/cmd/read-badger/cmd/transaction_results.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -25,11 +26,15 @@ var transactionResultsCmd = &cobra.Command{ Short: "get transaction-result by block ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } transactionResults, err := store.NewTransactionResults(metrics.NewNoopCollector(), db, 1) if err != nil { return err } - storages := common.InitStorages(db) + storages := common.InitStorages(db, chainID) log.Info().Msgf("got flag block id: %s", flagBlockID) blockID, err := flow.HexStringToIdentifier(flagBlockID) if err != nil { diff --git a/cmd/util/cmd/read-badger/cmd/transactions.go b/cmd/util/cmd/read-badger/cmd/transactions.go index c3e123d4808..e2211486634 100644 --- a/cmd/util/cmd/read-badger/cmd/transactions.go +++ b/cmd/util/cmd/read-badger/cmd/transactions.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -23,7 +24,11 @@ var transactionsCmd = &cobra.Command{ Short: "get transaction by ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag transaction id: %s", flagTransactionID) transactionID, err := flow.HexStringToIdentifier(flagTransactionID) diff --git a/cmd/util/cmd/read-light-block/read_light_block_test.go b/cmd/util/cmd/read-light-block/read_light_block_test.go index 84e5abfc002..7a77b05f126 100644 --- a/cmd/util/cmd/read-light-block/read_light_block_test.go +++ b/cmd/util/cmd/read-light-block/read_light_block_test.go @@ -56,7 +56,7 @@ func TestReadClusterRange(t *testing.T) { clusterBlocks := store.NewClusterBlocks( db, blocks[0].ChainID, - store.NewHeaders(metrics.NewNoopCollector(), db), + store.NewClusterHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), store.NewClusterPayloads(metrics.NewNoopCollector(), db), ) diff --git a/cmd/util/cmd/read-protocol-state/cmd/blocks.go b/cmd/util/cmd/read-protocol-state/cmd/blocks.go index 4c2ef0d13bd..6bca11c0009 100644 --- a/cmd/util/cmd/read-protocol-state/cmd/blocks.go +++ b/cmd/util/cmd/read-protocol-state/cmd/blocks.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -150,7 +151,11 @@ func (r *Reader) IsExecuted(blockID flow.Identifier) (bool, error) { func runE(*cobra.Command, []string) error { lockManager := storage.MakeSingletonLockManager() return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { diff --git a/cmd/util/cmd/read-protocol-state/cmd/snapshot.go b/cmd/util/cmd/read-protocol-state/cmd/snapshot.go index 921fc26ba9b..a08434232c8 100644 --- a/cmd/util/cmd/read-protocol-state/cmd/snapshot.go +++ b/cmd/util/cmd/read-protocol-state/cmd/snapshot.go @@ -11,6 +11,7 @@ import ( commonFuncs "github.com/onflow/flow-go/cmd/util/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/inmem" "github.com/onflow/flow-go/storage" ) @@ -56,7 +57,11 @@ func init() { func runSnapshotE(*cobra.Command, []string) error { lockManager := storage.MakeSingletonLockManager() return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return fmt.Errorf("could not init protocol state") diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go index fa39fc67f80..fb58b8ef441 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger" "github.com/onflow/flow-go/storage/operation/pebbleimpl" @@ -59,7 +60,11 @@ func runE(*cobra.Command, []string) error { } return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - clean up duplicated storage types state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return fmt.Errorf("could not open protocol states: %w", err) @@ -75,7 +80,7 @@ func runE(*cobra.Command, []string) error { results := store.NewExecutionResults(metrics, db) receipts := store.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize) myReceipts := store.NewMyExecutionReceipts(metrics, db, receipts) - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, chainID) events := store.NewEvents(metrics, db) serviceEvents := store.NewServiceEvents(metrics, db) transactions := store.NewTransactions(metrics, db) diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go index 386acefd145..548f019a7ee 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go @@ -39,7 +39,7 @@ func TestReExecuteBlock(t *testing.T) { // create all modules metrics := &metrics.NoopCollector{} - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks txResults, err := store.NewTransactionResults(metrics, db, store.DefaultCacheSize) @@ -199,7 +199,7 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) { // create all modules metrics := &metrics.NoopCollector{} - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks commits := store.NewCommits(metrics, db) diff --git a/cmd/util/cmd/snapshot/cmd.go b/cmd/util/cmd/snapshot/cmd.go index b384c81220f..c5216377a36 100644 --- a/cmd/util/cmd/snapshot/cmd.go +++ b/cmd/util/cmd/snapshot/cmd.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/engine/common/rpc/convert" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -47,7 +48,11 @@ func runE(*cobra.Command, []string) error { lockManager := storage.MakeSingletonLockManager() return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return fmt.Errorf("could not open protocol state: %w", err) diff --git a/cmd/util/cmd/verify-evm-offchain-replay/verify.go b/cmd/util/cmd/verify-evm-offchain-replay/verify.go index 96eeeb3c9a3..e6bdafd416e 100644 --- a/cmd/util/cmd/verify-evm-offchain-replay/verify.go +++ b/cmd/util/cmd/verify-evm-offchain-replay/verify.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/blobs" "github.com/onflow/flow-go/module/executiondatasync/execution_data" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -144,10 +145,14 @@ func initStorages(db storage.DB, executionDataDir string) ( io.Closer, error, ) { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return nil, nil, nil, err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - check usage datastoreDir := filepath.Join(executionDataDir, "blobstore") - err := os.MkdirAll(datastoreDir, 0700) + err = os.MkdirAll(datastoreDir, 0700) if err != nil { return nil, nil, nil, err } diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 165d2450933..54e596c7306 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -387,7 +387,7 @@ func createNode( db := pebbleimpl.ToDB(pdb) lockManager := fstorage.NewTestingLockManager() - headersDB := store.NewHeaders(metricsCollector, db) + headersDB := store.NewHeaders(metricsCollector, db, rootSnapshot.Params().ChainID()) guaranteesDB := store.NewGuarantees(metricsCollector, db, store.DefaultCacheSize, store.DefaultCacheSize) sealsDB := store.NewSeals(metricsCollector, db) indexDB := store.NewIndex(metricsCollector, db) diff --git a/consensus/recovery/protocol/state_test.go b/consensus/recovery/protocol/state_test.go index 790aa8751a4..13626de8b03 100644 --- a/consensus/recovery/protocol/state_test.go +++ b/consensus/recovery/protocol/state_test.go @@ -46,7 +46,7 @@ func TestSaveBlockAsReplica(t *testing.T) { require.NoError(t, err) metrics := metrics.NewNoopCollector() - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, flow.Emulator) finalized, pending, err := recovery.FindLatest(state, headers) require.NoError(t, err) require.Equal(t, b0.ID(), finalized.ID(), "recover find latest returns inconsistent finalized block") diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 1b77dc19518..7b7c0fff947 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -161,7 +161,7 @@ func (suite *Suite) RunTest( ) { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) var err error suite.backend, err = backend.New(backend.Params{ @@ -653,7 +653,7 @@ func (suite *Suite) TestGetSealedTransaction() { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) enNodeIDs := enIdentities.NodeIDs() @@ -873,7 +873,7 @@ func (suite *Suite) TestGetTransactionResult() { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) originID := unittest.IdentifierFixture() *suite.state = protocol.State{} @@ -1210,7 +1210,7 @@ func (suite *Suite) TestExecuteScript() { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) identities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) suite.sealedSnapshot.On("Identities", mock.Anything).Return(identities, nil) suite.finalSnapshot.On("Identities", mock.Anything).Return(identities, nil) diff --git a/engine/access/ingestion/collections/indexer_test.go b/engine/access/ingestion/collections/indexer_test.go index 294ac3c034a..1892e555094 100644 --- a/engine/access/ingestion/collections/indexer_test.go +++ b/engine/access/ingestion/collections/indexer_test.go @@ -392,7 +392,7 @@ func newBlockchain(t *testing.T, pdb *pebble.DB) *blockchain { db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) transactions := store.NewTransactions(metrics, db) collections := store.NewCollections(db, transactions) diff --git a/engine/access/rpc/backend/transactions/transactions_functional_test.go b/engine/access/rpc/backend/transactions/transactions_functional_test.go index aa3c1061778..2356beb91c6 100644 --- a/engine/access/rpc/backend/transactions/transactions_functional_test.go +++ b/engine/access/rpc/backend/transactions/transactions_functional_test.go @@ -123,7 +123,7 @@ func (s *TransactionsFunctionalSuite) SetupTest() { s.db = pebbleimpl.ToDB(pdb) // Instantiate storages - all := store.InitAll(metrics, s.db) + all := store.InitAll(metrics, s.db, flow.Emulator) s.blocks = all.Blocks s.collections = all.Collections diff --git a/engine/collection/epochmgr/engine.go b/engine/collection/epochmgr/engine.go index c0bc2d87a81..5db5e9624d0 100644 --- a/engine/collection/epochmgr/engine.go +++ b/engine/collection/epochmgr/engine.go @@ -288,7 +288,7 @@ func (e *Engine) Done() <-chan struct{} { // Error returns: // - ErrNotAuthorizedForEpoch if this node is not authorized in the epoch. func (e *Engine) createEpochComponents(epoch protocol.CommittedEpoch) (*EpochComponents, error) { - state, prop, sync, hot, voteAggregator, timeoutAggregator, messageHub, err := e.factory.Create(epoch) + state, prop, sync, hot, voteAggregator, timeoutAggregator, messageHub, err := e.factory.Create(epoch, e.state.Params().ChainID()) if err != nil { return nil, fmt.Errorf("could not setup requirements for epoch (%d): %w", epoch.Counter(), err) } diff --git a/engine/collection/epochmgr/engine_test.go b/engine/collection/epochmgr/engine_test.go index e5309df3f82..c3a96c8d00e 100644 --- a/engine/collection/epochmgr/engine_test.go +++ b/engine/collection/epochmgr/engine_test.go @@ -108,35 +108,35 @@ type Suite struct { // MockFactoryCreate mocks the epoch factory to create epoch components for the given epoch. func (suite *Suite) MockFactoryCreate(arg any) { - suite.factory.On("Create", arg). + suite.factory.On("Create", arg, mock.Anything). Run(func(args mock.Arguments) { epoch, ok := args.Get(0).(realprotocol.CommittedEpoch) suite.Require().Truef(ok, "invalid type %T", args.Get(0)) suite.components[epoch.Counter()] = newMockComponents(suite.T()) }). Return( - func(epoch realprotocol.CommittedEpoch) realcluster.State { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) realcluster.State { return suite.ComponentsForEpoch(epoch).state }, - func(epoch realprotocol.CommittedEpoch) component.Component { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) component.Component { return suite.ComponentsForEpoch(epoch).prop }, - func(epoch realprotocol.CommittedEpoch) realmodule.ReadyDoneAware { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) realmodule.ReadyDoneAware { return suite.ComponentsForEpoch(epoch).sync }, - func(epoch realprotocol.CommittedEpoch) realmodule.HotStuff { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) realmodule.HotStuff { return suite.ComponentsForEpoch(epoch).hotstuff }, - func(epoch realprotocol.CommittedEpoch) hotstuff.VoteAggregator { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) hotstuff.VoteAggregator { return suite.ComponentsForEpoch(epoch).voteAggregator }, - func(epoch realprotocol.CommittedEpoch) hotstuff.TimeoutAggregator { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) hotstuff.TimeoutAggregator { return suite.ComponentsForEpoch(epoch).timeoutAggregator }, - func(epoch realprotocol.CommittedEpoch) component.Component { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) component.Component { return suite.ComponentsForEpoch(epoch).messageHub }, - func(epoch realprotocol.CommittedEpoch) error { return nil }, + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) error { return nil }, ).Maybe() } @@ -164,6 +164,9 @@ func (suite *Suite) SetupTest() { suite.state.On("Final").Return(suite.snap) suite.state.On("AtBlockID", suite.header.ID()).Return(suite.snap).Maybe() + params := protocol.NewParams(suite.T()) + params.On("ChainID").Return(flow.ChainID("chain-id")).Maybe() + suite.state.On("Params").Return(params) suite.snap.On("Epochs").Return(suite.epochQuery) suite.snap.On("Head").Return( func() *flow.Header { return suite.header }, @@ -274,7 +277,7 @@ func (suite *Suite) MockAsUnauthorizedNode(forEpoch uint64) { suite.factory = epochmgr.NewEpochComponentsFactory(suite.T()) suite.factory. - On("Create", mock.MatchedBy(unauthorizedMatcher)). + On("Create", mock.MatchedBy(unauthorizedMatcher), mock.Anything). Return(nil, nil, nil, nil, nil, nil, nil, ErrNotAuthorizedForEpoch) suite.MockFactoryCreate(mock.MatchedBy(authorizedMatcher)) diff --git a/engine/collection/epochmgr/factories/cluster_state.go b/engine/collection/epochmgr/factories/cluster_state.go index 9548f033943..688efb614a5 100644 --- a/engine/collection/epochmgr/factories/cluster_state.go +++ b/engine/collection/epochmgr/factories/cluster_state.go @@ -5,6 +5,7 @@ import ( "github.com/jordanschalm/lockctx" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" clusterkv "github.com/onflow/flow-go/state/cluster/badger" "github.com/onflow/flow-go/storage" @@ -33,38 +34,40 @@ func NewClusterStateFactory( return factory, nil } -func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot) ( +func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot, chainID flow.ChainID) ( *clusterkv.MutableState, *store.Headers, storage.ClusterPayloads, storage.ClusterBlocks, + *store.Headers, error, ) { - headers := store.NewHeaders(f.metrics, f.db) + headers := store.NewClusterHeaders(f.metrics, f.db, stateRoot.ClusterID()) payloads := store.NewClusterPayloads(f.metrics, f.db) blocks := store.NewClusterBlocks(f.db, stateRoot.ClusterID(), headers, payloads) + consensusHeaders := store.NewHeaders(f.metrics, f.db, chainID) // for reference blocks isBootStrapped, err := clusterkv.IsBootstrapped(f.db, stateRoot.ClusterID()) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could not check cluster state db: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could not check cluster state db: %w", err) } var clusterState *clusterkv.State if isBootStrapped { clusterState, err = clusterkv.OpenState(f.db, f.tracer, headers, payloads, stateRoot.ClusterID(), stateRoot.EpochCounter()) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could not open cluster state: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could not open cluster state: %w", err) } } else { clusterState, err = clusterkv.Bootstrap(f.db, f.lockManager, stateRoot) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could not bootstrap cluster state: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could not bootstrap cluster state: %w", err) } } - mutableState, err := clusterkv.NewMutableState(clusterState, f.lockManager, f.tracer, headers, payloads) + mutableState, err := clusterkv.NewMutableState(clusterState, f.lockManager, f.tracer, headers, payloads, consensusHeaders) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could create mutable cluster state: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could create mutable cluster state: %w", err) } - return mutableState, headers, payloads, blocks, err + return mutableState, headers, payloads, blocks, consensusHeaders, err } diff --git a/engine/collection/epochmgr/factories/epoch.go b/engine/collection/epochmgr/factories/epoch.go index c55224a62bc..7bb81160fa6 100644 --- a/engine/collection/epochmgr/factories/epoch.go +++ b/engine/collection/epochmgr/factories/epoch.go @@ -5,6 +5,7 @@ import ( "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/engine/collection/epochmgr" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/mempool/epochs" @@ -56,6 +57,7 @@ func NewEpochComponentsFactory( func (factory *EpochComponentsFactory) Create( epoch protocol.CommittedEpoch, + consensusChainID flow.ChainID, ) ( state cluster.State, compliance component.Component, @@ -107,7 +109,7 @@ func (factory *EpochComponentsFactory) Create( return } var mutableState *badger.MutableState - mutableState, headers, payloads, blocks, err = factory.state.Create(stateRoot) + mutableState, headers, payloads, blocks, _, err = factory.state.Create(stateRoot, consensusChainID) state = mutableState if err != nil { err = fmt.Errorf("could not create cluster state: %w", err) diff --git a/engine/collection/epochmgr/factory.go b/engine/collection/epochmgr/factory.go index c6370674e51..d27b1652bbe 100644 --- a/engine/collection/epochmgr/factory.go +++ b/engine/collection/epochmgr/factory.go @@ -2,6 +2,7 @@ package epochmgr import ( "github.com/onflow/flow-go/consensus/hotstuff" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/state/cluster" @@ -16,9 +17,10 @@ type EpochComponentsFactory interface { // be used either for an ongoing epoch (for example, after a restart) or // for an epoch that will start soon. It is safe to call multiple times for // a given epoch counter. + // ChainID refers to the consensus chain, from which reference blocks are used. // // Must return ErrNotAuthorizedForEpoch if this node is not authorized in the epoch. - Create(epoch protocol.CommittedEpoch) ( + Create(epoch protocol.CommittedEpoch, chainID flow.ChainID) ( state cluster.State, proposal component.Component, sync module.ReadyDoneAware, diff --git a/engine/collection/epochmgr/mock/epoch_components_factory.go b/engine/collection/epochmgr/mock/epoch_components_factory.go index 4e58c01c41a..f53f4930c77 100644 --- a/engine/collection/epochmgr/mock/epoch_components_factory.go +++ b/engine/collection/epochmgr/mock/epoch_components_factory.go @@ -6,6 +6,8 @@ import ( component "github.com/onflow/flow-go/module/component" cluster "github.com/onflow/flow-go/state/cluster" + flow "github.com/onflow/flow-go/model/flow" + hotstuff "github.com/onflow/flow-go/consensus/hotstuff" mock "github.com/stretchr/testify/mock" @@ -20,9 +22,9 @@ type EpochComponentsFactory struct { mock.Mock } -// Create provides a mock function with given fields: epoch -func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error) { - ret := _m.Called(epoch) +// Create provides a mock function with given fields: epoch, chainID +func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error) { + ret := _m.Called(epoch, chainID) if len(ret) == 0 { panic("no return value specified for Create") @@ -36,67 +38,67 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch) (cluster var r5 hotstuff.TimeoutAggregator var r6 component.Component var r7 error - if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error)); ok { - return rf(epoch) + if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch, flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error)); ok { + return rf(epoch, chainID) } - if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch) cluster.State); ok { - r0 = rf(epoch) + if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch, flow.ChainID) cluster.State); ok { + r0 = rf(epoch, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(cluster.State) } } - if rf, ok := ret.Get(1).(func(protocol.CommittedEpoch) component.Component); ok { - r1 = rf(epoch) + if rf, ok := ret.Get(1).(func(protocol.CommittedEpoch, flow.ChainID) component.Component); ok { + r1 = rf(epoch, chainID) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(component.Component) } } - if rf, ok := ret.Get(2).(func(protocol.CommittedEpoch) module.ReadyDoneAware); ok { - r2 = rf(epoch) + if rf, ok := ret.Get(2).(func(protocol.CommittedEpoch, flow.ChainID) module.ReadyDoneAware); ok { + r2 = rf(epoch, chainID) } else { if ret.Get(2) != nil { r2 = ret.Get(2).(module.ReadyDoneAware) } } - if rf, ok := ret.Get(3).(func(protocol.CommittedEpoch) module.HotStuff); ok { - r3 = rf(epoch) + if rf, ok := ret.Get(3).(func(protocol.CommittedEpoch, flow.ChainID) module.HotStuff); ok { + r3 = rf(epoch, chainID) } else { if ret.Get(3) != nil { r3 = ret.Get(3).(module.HotStuff) } } - if rf, ok := ret.Get(4).(func(protocol.CommittedEpoch) hotstuff.VoteAggregator); ok { - r4 = rf(epoch) + if rf, ok := ret.Get(4).(func(protocol.CommittedEpoch, flow.ChainID) hotstuff.VoteAggregator); ok { + r4 = rf(epoch, chainID) } else { if ret.Get(4) != nil { r4 = ret.Get(4).(hotstuff.VoteAggregator) } } - if rf, ok := ret.Get(5).(func(protocol.CommittedEpoch) hotstuff.TimeoutAggregator); ok { - r5 = rf(epoch) + if rf, ok := ret.Get(5).(func(protocol.CommittedEpoch, flow.ChainID) hotstuff.TimeoutAggregator); ok { + r5 = rf(epoch, chainID) } else { if ret.Get(5) != nil { r5 = ret.Get(5).(hotstuff.TimeoutAggregator) } } - if rf, ok := ret.Get(6).(func(protocol.CommittedEpoch) component.Component); ok { - r6 = rf(epoch) + if rf, ok := ret.Get(6).(func(protocol.CommittedEpoch, flow.ChainID) component.Component); ok { + r6 = rf(epoch, chainID) } else { if ret.Get(6) != nil { r6 = ret.Get(6).(component.Component) } } - if rf, ok := ret.Get(7).(func(protocol.CommittedEpoch) error); ok { - r7 = rf(epoch) + if rf, ok := ret.Get(7).(func(protocol.CommittedEpoch, flow.ChainID) error); ok { + r7 = rf(epoch, chainID) } else { r7 = ret.Error(7) } diff --git a/engine/collection/test/cluster_switchover_test.go b/engine/collection/test/cluster_switchover_test.go index 8eb42d61314..2d2325918fc 100644 --- a/engine/collection/test/cluster_switchover_test.go +++ b/engine/collection/test/cluster_switchover_test.go @@ -377,7 +377,7 @@ func (tc *ClusterSwitchoverTestCase) BlockInEpoch(epochCounter uint64) *flow.Hea func (tc *ClusterSwitchoverTestCase) SubmitTransactionToCluster( epochCounter uint64, // the epoch we are submitting the transacting w.r.t. clustering flow.ClusterList, // the clustering for the epoch - clusterIndex uint, // the index of the cluster we are targetting + clusterIndex uint, // the index of the cluster we are targeting ) { clusterMembers := clustering[int(clusterIndex)] diff --git a/engine/common/follower/integration_test.go b/engine/common/follower/integration_test.go index 486f1b236a0..bb38459e572 100644 --- a/engine/common/follower/integration_test.go +++ b/engine/common/follower/integration_test.go @@ -51,7 +51,7 @@ func TestFollowerHappyPath(t *testing.T) { tracer := trace.NewNoopTracer() log := unittest.Logger() consumer := events.NewNoop() - all := store.InitAll(metrics, pebbleimpl.ToDB(pdb)) + all := store.InitAll(metrics, pebbleimpl.ToDB(pdb), flow.Emulator) // bootstrap root snapshot state, err := pbadger.Bootstrap( diff --git a/engine/execution/pruner/core_test.go b/engine/execution/pruner/core_test.go index a50811582b1..0897269cc4c 100644 --- a/engine/execution/pruner/core_test.go +++ b/engine/execution/pruner/core_test.go @@ -33,7 +33,7 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) { db := pebbleimpl.ToDB(pdb) ctx, cancel := context.WithCancel(context.Background()) metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blockstore := all.Blocks results := all.Results diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index 6dbb9f33f3c..9d106f67ac3 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -238,7 +238,7 @@ func CompleteStateFixture( pdb := unittest.TypedPebbleDB(t, publicDBDir, pebble.Open) db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() - s := store.InitAll(metric, db) + s := store.InitAll(metric, db, rootSnapshot.Params().ChainID()) secretsDB := unittest.TypedBadgerDB(t, secretsDBDir, storagebadger.InitSecret) consumer := events.NewDistributor() @@ -560,7 +560,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ide receipts := store.NewExecutionReceipts(node.Metrics, db, results, storagebadger.DefaultCacheSize) myReceipts := store.NewMyExecutionReceipts(node.Metrics, db, receipts) versionBeacons := store.NewVersionBeacons(db) - headersStorage := store.NewHeaders(node.Metrics, db) + headersStorage := store.NewHeaders(node.Metrics, db, chainID) checkAuthorizedAtBlock := func(blockID flow.Identifier) (bool, error) { return protocol.IsNodeAuthorizedAt(node.State.AtBlockID(blockID), node.Me.NodeID()) diff --git a/engine/verification/verifier/verifiers.go b/engine/verification/verifier/verifiers.go index e52897f109c..e0659c69ffc 100644 --- a/engine/verification/verifier/verifiers.go +++ b/engine/verification/verifier/verifiers.go @@ -241,7 +241,7 @@ func initStorages( return nil, nil, nil, nil, nil, fmt.Errorf("could not init storage database: %w", err) } - storages := common.InitStorages(db) + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("could not open protocol state: %w", err) diff --git a/integration/testnet/container.go b/integration/testnet/container.go index b32af817506..af1a3559b37 100644 --- a/integration/testnet/container.go +++ b/integration/testnet/container.go @@ -385,7 +385,7 @@ func (c *Container) OpenState() (*state.State, error) { } metrics := metrics.NewNoopCollector() index := store.NewIndex(metrics, db) - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, c.net.Root().ChainID) seals := store.NewSeals(metrics, db) results := store.NewExecutionResults(metrics, db) receipts := store.NewExecutionReceipts(metrics, db, results, store.DefaultCacheSize) diff --git a/integration/tests/access/cohort4/access_test.go b/integration/tests/access/cohort4/access_test.go index 5f059783f2c..eabf87b892f 100644 --- a/integration/tests/access/cohort4/access_test.go +++ b/integration/tests/access/cohort4/access_test.go @@ -154,7 +154,7 @@ func (s *AccessSuite) runTestSignerIndicesDecoding() { err = container.WaitForContainerStopped(5 * time.Second) require.NoError(s.T(), err) - // open state to build a block singer decoder + // open state to build a block signer decoder state, err := container.OpenState() require.NoError(s.T(), err) diff --git a/integration/tests/access/cohort4/execution_data_pruning_test.go b/integration/tests/access/cohort4/execution_data_pruning_test.go index edf899c921f..814b9f0d635 100644 --- a/integration/tests/access/cohort4/execution_data_pruning_test.go +++ b/integration/tests/access/cohort4/execution_data_pruning_test.go @@ -163,7 +163,7 @@ func (s *ExecutionDataPruningSuite) TestHappyPath() { // setup storage objects needed to get the execution data id db, err := accessNode.DB() require.NoError(s.T(), err, "could not open db") - anHeaders := store.NewHeaders(metrics, db) + anHeaders := store.NewHeaders(metrics, db, flow.Localnet) anResults := store.NewExecutionResults(metrics, db) anSeals := store.NewSeals(metrics, db) diff --git a/model/flow/chain.go b/model/flow/chain.go index 7cc4df23244..b3864b64549 100644 --- a/model/flow/chain.go +++ b/model/flow/chain.go @@ -2,6 +2,7 @@ package flow import ( "fmt" + "strings" "github.com/pkg/errors" @@ -62,6 +63,11 @@ func (c ChainID) Transient() bool { return c == Emulator || c == MonotonicEmulator || c == Localnet || c == Benchnet || c == BftTestnet || c == Previewnet } +// IsClusterChain returns whether the chain ID is for a collection cluster during an epoch, rather than a full network. +func (c ChainID) IsClusterChain() bool { + return strings.HasPrefix(string(c), "cluster") +} + // getChainCodeWord derives the network type used for address generation from the globally // configured chain ID. func (c ChainID) getChainCodeWord() uint64 { diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index ee5a6115f03..a6c5f9f2fd4 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -21,9 +21,9 @@ func TestIterateHeight(t *testing.T) { lockManager := storage.NewTestingLockManager() dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { // create blocks with siblings - b1 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 1}} - b2 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 2}} - b3 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 3}} + b1 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 1, ChainID: flow.Emulator}} + b2 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 2, ChainID: flow.Emulator}} + b3 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 3, ChainID: flow.Emulator}} bs := []*flow.Header{b1, b2, b3} // index height @@ -41,7 +41,7 @@ func TestIterateHeight(t *testing.T) { // create iterator // b0 is the root block, iterate from b1 to b3 iterRange := module.IteratorRange{Start: b1.Height, End: b3.Height} - headers := store.NewHeaders(&metrics.NoopCollector{}, db) + headers := store.NewHeaders(&metrics.NoopCollector{}, db, flow.Emulator) getBlockIDByIndex := func(height uint64) (flow.Identifier, bool, error) { blockID, err := headers.BlockIDByHeight(height) if err != nil { diff --git a/module/builder/collection/builder.go b/module/builder/collection/builder.go index 1fa49d6154a..81cad19d433 100644 --- a/module/builder/collection/builder.go +++ b/module/builder/collection/builder.go @@ -45,6 +45,7 @@ type Builder struct { log zerolog.Logger clusterEpoch uint64 // the operating epoch for this cluster // cache of values about the operating epoch which never change + epochFirstHeight *uint64 // first height of this cluster's operating epoch epochFinalHeight *uint64 // last height of this cluster's operating epoch (nil if epoch not ended) epochFinalID *flow.Identifier // ID of last block in this cluster's operating epoch (nil if epoch not ended) } @@ -285,6 +286,29 @@ func (b *Builder) getBlockBuildContext(parentID flow.Identifier) (*blockBuildCon ctx.refChainFinalizedHeight = mainChainFinalizedHeader.Height ctx.refChainFinalizedID = mainChainFinalizedHeader.ID() + // If we don't have the epoch boundaries (first/final height ON MAIN CHAIN) cached, try retrieve and cache them + r := b.db.Reader() + + if b.epochFirstHeight != nil { + ctx.refEpochFirstHeight = *b.epochFirstHeight + } else { + var refEpochFirstHeight uint64 + + err = operation.RetrieveEpochFirstHeight(r, b.clusterEpoch, &refEpochFirstHeight) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + // can be missing if we joined (dynamic bootstrapped) in the middle of an epoch. + // 0 means FinalizedAncestryLookup will not be bounded by the epoch start, + // but only by which cluster blocks we have available. + refEpochFirstHeight = 0 + } else { + return nil, fmt.Errorf("unexpected failure to retrieve first height of operating epoch: %w", err) + } + } + b.epochFirstHeight = &refEpochFirstHeight + ctx.refEpochFirstHeight = *b.epochFirstHeight + } + // if the epoch has ended and the final block is cached, use the cached values if b.epochFinalHeight != nil && b.epochFinalID != nil { ctx.refEpochFinalID = b.epochFinalID @@ -292,8 +316,6 @@ func (b *Builder) getBlockBuildContext(parentID flow.Identifier) (*blockBuildCon return ctx, nil } - r := b.db.Reader() - var refEpochFinalHeight uint64 var refEpochFinalID flow.Identifier @@ -379,7 +401,7 @@ func (b *Builder) populateFinalizedAncestryLookup(lctx lockctx.Proof, ctx *block // the finalized cluster blocks which could possibly contain any conflicting transactions var clusterBlockIDs []flow.Identifier - start, end := findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight) + start, end := findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight, ctx) err := operation.LookupClusterBlocksByReferenceHeightRange(lctx, b.db.Reader(), start, end, &clusterBlockIDs) if err != nil { return fmt.Errorf("could not lookup finalized cluster blocks by reference height range [%d,%d]: %w", start, end, err) @@ -617,11 +639,10 @@ func (b *Builder) buildHeader( // Input range is the (inclusive) range of reference heights of transactions included // in the collection under construction. Output range is the (inclusive) range of // reference heights which need to be searched. -func findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight uint64) (start, end uint64) { - start = minRefHeight - flow.DefaultTransactionExpiry + 1 - if start > minRefHeight { - start = 0 // overflow check +func findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight uint64, ctx *blockBuildContext) (start, end uint64) { + delta := uint64(flow.DefaultTransactionExpiry + 1) + if minRefHeight <= ctx.refEpochFirstHeight+delta { + return ctx.refEpochFirstHeight, maxRefHeight // bound at start of epoch } - end = maxRefHeight - return start, end + return minRefHeight - delta, maxRefHeight } diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index a63e3583e81..94a685d7921 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -39,18 +39,7 @@ import ( ) var signer = func(*flow.Header) ([]byte, error) { return unittest.SignatureFixture(), nil } -var setter = func(h *flow.HeaderBodyBuilder) error { - h.WithHeight(42). - WithChainID(flow.Emulator). - WithParentID(unittest.IdentifierFixture()). - WithView(1337). - WithParentView(1336). - WithParentVoterIndices(unittest.SignerIndicesFixture(4)). - WithParentVoterSigData(unittest.QCSigDataFixture()). - WithProposerID(unittest.IdentifierFixture()) - - return nil -} +var setter func(*flow.HeaderBodyBuilder) error type BuilderSuite struct { suite.Suite @@ -62,9 +51,10 @@ type BuilderSuite struct { chainID flow.ChainID epochCounter uint64 - headers storage.Headers - payloads storage.ClusterPayloads - blocks storage.Blocks + headers storage.Headers + payloads storage.ClusterPayloads + blocks storage.Blocks + consensusHeaders storage.Headers state cluster.MutableState @@ -84,6 +74,18 @@ func (suite *BuilderSuite) SetupTest() { suite.genesis, err = unittest.ClusterBlock.Genesis() require.NoError(suite.T(), err) suite.chainID = suite.genesis.ChainID + setter = func(h *flow.HeaderBodyBuilder) error { + h.WithHeight(42). + WithChainID(suite.chainID). + WithParentID(unittest.IdentifierFixture()). + WithView(1337). + WithParentView(1336). + WithParentVoterIndices(unittest.SignerIndicesFixture(4)). + WithParentVoterSigData(unittest.QCSigDataFixture()). + WithProposerID(unittest.IdentifierFixture()) + + return nil + } suite.pool = herocache.NewTransactions(1000, unittest.Logger(), metrics.NewNoopCollector()) @@ -95,12 +97,13 @@ func (suite *BuilderSuite) SetupTest() { tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, flow.Emulator) consumer := events.NewNoop() - suite.headers = all.Headers + suite.headers = store.NewClusterHeaders(metrics, suite.db, suite.chainID) suite.blocks = all.Blocks suite.payloads = store.NewClusterPayloads(metrics, suite.db) + suite.consensusHeaders = all.Headers // just bootstrap with a genesis block, we'll use this as reference root, result, seal := unittest.BootstrapFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) @@ -132,7 +135,7 @@ func (suite *BuilderSuite) SetupTest() { clusterState, err := clusterkv.Bootstrap(suite.db, suite.lockManager, clusterStateRoot) suite.Require().NoError(err) - suite.state, err = clusterkv.NewMutableState(clusterState, suite.lockManager, tracer, suite.headers, suite.payloads) + suite.state, err = clusterkv.NewMutableState(clusterState, suite.lockManager, tracer, suite.headers, suite.payloads, suite.consensusHeaders) suite.Require().NoError(err) state, err := pbadger.Bootstrap( @@ -182,7 +185,7 @@ func (suite *BuilderSuite) SetupTest() { metrics, suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -602,7 +605,7 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -698,7 +701,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSize() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -731,7 +734,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionByteSize() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -764,7 +767,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionTotalGas() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -823,7 +826,7 @@ func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -879,7 +882,7 @@ func (suite *BuilderSuite) TestBuildOn_EmptyMempool() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -920,7 +923,7 @@ func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -945,7 +948,7 @@ func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { h.WithHeight(1). - WithChainID(flow.Emulator). + WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -987,7 +990,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1017,7 +1020,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { // since rate limiting does not apply to non-payer keys, we should fill all collections in 10 blocks parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1055,7 +1058,7 @@ func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1079,7 +1082,7 @@ func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { // rate-limiting should be applied, resulting in half-full collections (5/10) parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1119,7 +1122,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1163,7 +1166,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() { // rate-limiting should be applied, resulting in minimum collection size. parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1200,7 +1203,7 @@ func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1225,7 +1228,7 @@ func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { // having one transaction and empty collections otherwise parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1266,7 +1269,7 @@ func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1290,7 +1293,7 @@ func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { // rate-limiting should not be applied, since the payer is marked as unlimited parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1331,7 +1334,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitDryRun() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1355,7 +1358,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitDryRun() { // rate-limiting should not be applied, since dry-run setting is enabled parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1394,7 +1397,7 @@ func (suite *BuilderSuite) TestBuildOn_SystemTxAlwaysIncluded() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1488,10 +1491,11 @@ func benchmarkBuildOn(b *testing.B, size int) { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() - all := store.InitAll(metrics, suite.db) - suite.headers = all.Headers + all := store.InitAll(metrics, suite.db, flow.Emulator) + suite.headers = store.NewHeaders(metrics, suite.db, suite.chainID) suite.blocks = all.Blocks suite.payloads = store.NewClusterPayloads(metrics, suite.db) + suite.consensusHeaders = all.Headers qc := unittest.QuorumCertificateFixture(unittest.QCWithRootBlockID(suite.genesis.ID())) stateRoot, err := clusterkv.NewStateRoot(suite.genesis, qc, suite.epochCounter) @@ -1499,7 +1503,7 @@ func benchmarkBuildOn(b *testing.B, size int) { state, err := clusterkv.Bootstrap(suite.db, suite.lockManager, stateRoot) assert.NoError(b, err) - suite.state, err = clusterkv.NewMutableState(state, suite.lockManager, tracer, suite.headers, suite.payloads) + suite.state, err = clusterkv.NewMutableState(state, suite.lockManager, tracer, suite.headers, suite.payloads, suite.consensusHeaders) assert.NoError(b, err) // add some transactions to transaction pool @@ -1517,7 +1521,7 @@ func benchmarkBuildOn(b *testing.B, size int) { metrics, suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, diff --git a/module/executiondatasync/optimistic_sync/pipeline/pipeline_functional_test.go b/module/executiondatasync/optimistic_sync/pipeline/pipeline_functional_test.go index c7da63bec2f..fc44e8b260a 100644 --- a/module/executiondatasync/optimistic_sync/pipeline/pipeline_functional_test.go +++ b/module/executiondatasync/optimistic_sync/pipeline/pipeline_functional_test.go @@ -124,7 +124,7 @@ func (p *PipelineFunctionalSuite) SetupTest() { p.Require().NoError(err) // store and index the root header - p.headers = store.NewHeaders(p.metrics, p.db) + p.headers = store.NewHeaders(p.metrics, p.db, g.ChainID()) err = unittest.WithLock(t, p.lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { diff --git a/module/finalizedreader/finalizedreader_test.go b/module/finalizedreader/finalizedreader_test.go index cd6bd194878..06433c00814 100644 --- a/module/finalizedreader/finalizedreader_test.go +++ b/module/finalizedreader/finalizedreader_test.go @@ -7,6 +7,7 @@ import ( "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" @@ -20,7 +21,7 @@ func TestFinalizedReader(t *testing.T) { lockManager := storage.NewTestingLockManager() // prepare the storage.Headers instance metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks headers := all.Headers proposal := unittest.ProposalFixture() diff --git a/module/finalizer/consensus/finalizer_test.go b/module/finalizer/consensus/finalizer_test.go index 965654328dc..03906cbdcc6 100644 --- a/module/finalizer/consensus/finalizer_test.go +++ b/module/finalizer/consensus/finalizer_test.go @@ -107,7 +107,7 @@ func TestMakeFinalValidChain(t *testing.T) { metrics := metrics.NewNoopCollector() fin := Finalizer{ dbReader: pebbleimpl.ToDB(pdb).Reader(), - headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb)), + headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb), flow.Emulator), state: state, tracer: trace.NewNoopTracer(), cleanup: LogCleanup(&list), @@ -177,7 +177,7 @@ func TestMakeFinalInvalidHeight(t *testing.T) { metrics := metrics.NewNoopCollector() fin := Finalizer{ dbReader: pebbleimpl.ToDB(pdb).Reader(), - headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb)), + headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb), flow.Emulator), state: state, tracer: trace.NewNoopTracer(), cleanup: LogCleanup(&list), @@ -236,7 +236,7 @@ func TestMakeFinalDuplicate(t *testing.T) { metrics := metrics.NewNoopCollector() fin := Finalizer{ dbReader: pebbleimpl.ToDB(pdb).Reader(), - headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb)), + headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb), flow.Emulator), state: state, tracer: trace.NewNoopTracer(), cleanup: LogCleanup(&list), diff --git a/state/cluster/badger/mutator.go b/state/cluster/badger/mutator.go index c27a02ae955..fbf5bd95c98 100644 --- a/state/cluster/badger/mutator.go +++ b/state/cluster/badger/mutator.go @@ -22,21 +22,23 @@ import ( type MutableState struct { *State - lockManager lockctx.Manager - tracer module.Tracer - headers storage.Headers - payloads storage.ClusterPayloads + lockManager lockctx.Manager + tracer module.Tracer + headers storage.Headers + payloads storage.ClusterPayloads + consensusHeaders storage.Headers } var _ clusterstate.MutableState = (*MutableState)(nil) -func NewMutableState(state *State, lockManager lockctx.Manager, tracer module.Tracer, headers storage.Headers, payloads storage.ClusterPayloads) (*MutableState, error) { +func NewMutableState(state *State, lockManager lockctx.Manager, tracer module.Tracer, headers storage.Headers, payloads storage.ClusterPayloads, consensusHeaders storage.Headers) (*MutableState, error) { mutableState := &MutableState{ - State: state, - lockManager: lockManager, - tracer: tracer, - headers: headers, - payloads: payloads, + State: state, + lockManager: lockManager, + tracer: tracer, + headers: headers, + payloads: payloads, + consensusHeaders: consensusHeaders, } return mutableState, nil } @@ -224,7 +226,7 @@ func (m *MutableState) checkPayloadReferenceBlock(ctx extendContext) error { payload := ctx.candidate.Payload // 1 - the reference block must be known - refBlock, err := m.headers.ByBlockID(payload.ReferenceBlockID) + refBlock, err := m.consensusHeaders.ByBlockID(payload.ReferenceBlockID) if err != nil { if errors.Is(err, storage.ErrNotFound) { return state.NewUnverifiableExtensionError("cluster block references unknown reference block (id=%x)", payload.ReferenceBlockID) @@ -237,7 +239,7 @@ func (m *MutableState) checkPayloadReferenceBlock(ctx extendContext) error { // a reference block which is above the finalized boundary can't be verified yet return state.NewUnverifiableExtensionError("reference block is above finalized boundary (%d>%d)", refBlock.Height, ctx.finalizedConsensusHeight) } else { - storedBlockIDForHeight, err := m.headers.BlockIDByHeight(refBlock.Height) + storedBlockIDForHeight, err := m.consensusHeaders.BlockIDByHeight(refBlock.Height) if err != nil { return irrecoverable.NewExceptionf("could not look up block ID for finalized height: %w", err) } @@ -291,7 +293,7 @@ func (m *MutableState) checkPayloadTransactions(lctx lockctx.Proof, ctx extendCo minRefHeight := uint64(math.MaxUint64) maxRefHeight := uint64(0) for _, flowTx := range payload.Collection.Transactions { - refBlock, err := m.headers.ByBlockID(flowTx.ReferenceBlockID) + refBlock, err := m.consensusHeaders.ByBlockID(flowTx.ReferenceBlockID) if errors.Is(err, storage.ErrNotFound) { // Reject collection if it contains a transaction with an unknown reference block, because we cannot verify its validity. return state.NewUnverifiableExtensionError("collection contains tx (tx_id=%x) with unknown reference block (block_id=%x): %w", flowTx.ID(), flowTx.ReferenceBlockID, err) diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index cbf68967be3..b0abedbd251 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -68,7 +68,8 @@ func (suite *MutatorSuite) SetupTest() { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, flow.Emulator) + clusterHeaders := store.NewClusterHeaders(metrics, suite.db, suite.chainID) colPayloads := store.NewClusterPayloads(metrics, suite.db) // just bootstrap with a genesis block, we'll use this as reference @@ -135,7 +136,7 @@ func (suite *MutatorSuite) SetupTest() { suite.NoError(err) clusterState, err := Bootstrap(suite.db, suite.lockManager, clusterStateRoot) suite.Assert().Nil(err) - suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, all.Headers, colPayloads) + suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, clusterHeaders, colPayloads, all.Headers) suite.Assert().Nil(err) } @@ -444,8 +445,6 @@ func (suite *MutatorSuite) TestExtend_WithExpiredReferenceBlock() { } func (suite *MutatorSuite) TestExtend_WithReferenceBlockFromClusterChain() { - // TODO skipping as this isn't implemented yet - unittest.SkipUnless(suite.T(), unittest.TEST_TODO, "skipping as this isn't implemented yet") // set genesis from cluster chain as reference block proposal := suite.ProposalWithParentAndPayload(suite.genesis, *model.NewEmptyPayload(suite.genesis.ID())) err := suite.state.Extend(&proposal) diff --git a/state/cluster/badger/snapshot_test.go b/state/cluster/badger/snapshot_test.go index 76db04a71f5..a9a6de1ee76 100644 --- a/state/cluster/badger/snapshot_test.go +++ b/state/cluster/badger/snapshot_test.go @@ -56,8 +56,9 @@ func (suite *SnapshotSuite) SetupTest() { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, flow.Emulator) colPayloads := store.NewClusterPayloads(metrics, suite.db) + clusterHeaders := store.NewClusterHeaders(metrics, suite.db, suite.chainID) root := unittest.RootSnapshotFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) suite.epochCounter = root.Encodable().SealingSegment.LatestProtocolStateEntry().EpochEntry.EpochCounter() @@ -84,7 +85,7 @@ func (suite *SnapshotSuite) SetupTest() { suite.Require().NoError(err) clusterState, err := Bootstrap(suite.db, suite.lockManager, clusterStateRoot) suite.Require().NoError(err) - suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, all.Headers, colPayloads) + suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, clusterHeaders, colPayloads, all.Headers) suite.Require().NoError(err) } diff --git a/state/protocol/badger/mutator_test.go b/state/protocol/badger/mutator_test.go index 47055894268..c0d209b00c1 100644 --- a/state/protocol/badger/mutator_test.go +++ b/state/protocol/badger/mutator_test.go @@ -90,7 +90,7 @@ func TestExtendValid(t *testing.T) { tracer := trace.NewNoopTracer() db := pebbleimpl.ToDB(pdb) log := zerolog.Nop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) distributor := events.NewDistributor() consumer := mockprotocol.NewConsumer(t) @@ -920,7 +920,7 @@ func TestExtendEpochTransitionValid(t *testing.T) { tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(mmetrics.NewNoopCollector(), db) + all := store.InitAll(mmetrics.NewNoopCollector(), db, flow.Emulator) protoState, err := protocol.Bootstrap( metrics, db, @@ -2803,7 +2803,7 @@ func TestExtendInvalidSealsInBlock(t *testing.T) { tracer := trace.NewNoopTracer() log := zerolog.Nop() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) // create a event consumer to test epoch transition events distributor := events.NewDistributor() @@ -3467,7 +3467,7 @@ func TestHeaderInvalidTimestamp(t *testing.T) { tracer := trace.NewNoopTracer() log := zerolog.Nop() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) // create a event consumer to test epoch transition events distributor := events.NewDistributor() diff --git a/state/protocol/badger/state.go b/state/protocol/badger/state.go index cf0ae743c48..d8b437aae8c 100644 --- a/state/protocol/badger/state.go +++ b/state/protocol/badger/state.go @@ -989,6 +989,48 @@ func IsBootstrapped(db storage.DB) (bool, error) { return true, nil } +// GetChainIDFromLatestFinalizedHeader attempts to retrieve the consensus chainID +// from the latest finalized header in the database, before storage or protocol state have been initialized. +// Expected errors during normal operations: +// - [storage.ErrNotFound] if the node is not bootstrapped. +func GetChainIDFromLatestFinalizedHeader(db storage.DB) (flow.ChainID, error) { + h, err := GetLatestFinalizedHeader(db) + if err != nil { + return "", err + } + return h.ChainID, nil +} + +// GetLatestFinalizedHeader attempts to retrieve the latest finalized header +// without going through the storage.Headers interface. +// Expected errors during normal operations: +// - [storage.ErrNotFound] if the node is not bootstrapped. +func GetLatestFinalizedHeader(db storage.DB) (*flow.Header, error) { + var finalized uint64 + r := db.Reader() + err := operation.RetrieveFinalizedHeight(r, &finalized) + if err != nil { + return nil, err + } + var id flow.Identifier + err = operation.LookupBlockHeight(r, finalized, &id) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("could not retrieve finalized header: not present in height index") + } + return nil, err + } + var header flow.Header + err = operation.RetrieveHeader(r, id, &header) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("could not retrieve finalized header: block not known") + } + return nil, err + } + return &header, nil +} + // updateEpochMetrics update the `consensus_compliance_current_epoch_counter` and the // `consensus_compliance_current_epoch_phase` metric func updateEpochMetrics(metrics module.ComplianceMetrics, snap protocol.Snapshot) error { diff --git a/state/protocol/badger/state_test.go b/state/protocol/badger/state_test.go index cc9c024d3e6..6643e7490be 100644 --- a/state/protocol/badger/state_test.go +++ b/state/protocol/badger/state_test.go @@ -56,7 +56,7 @@ func TestBootstrapAndOpen(t *testing.T) { epoch.DKGPhase1FinalView(), epoch.DKGPhase2FinalView(), epoch.DKGPhase3FinalView()).Once() noopMetrics := new(metrics.NoopCollector) - all := store.InitAll(noopMetrics, db) + all := store.InitAll(noopMetrics, db, flow.Emulator) // protocol state has been bootstrapped, now open a protocol state with the database state, err := bprotocol.OpenState( complianceMetrics, @@ -136,7 +136,7 @@ func TestBootstrapAndOpen_EpochCommitted(t *testing.T) { complianceMetrics.On("BlockSealed", testmock.Anything).Once() noopMetrics := new(metrics.NoopCollector) - all := store.InitAll(noopMetrics, db) + all := store.InitAll(noopMetrics, db, flow.Emulator) state, err := bprotocol.OpenState( complianceMetrics, db, @@ -851,7 +851,7 @@ func bootstrap(t *testing.T, rootSnapshot protocol.Snapshot, f func(*bprotocol.S db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() defer db.Close() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := bprotocol.Bootstrap( metrics, db, diff --git a/state/protocol/util/testing.go b/state/protocol/util/testing.go index 220fb2d41bb..f3dc9643a42 100644 --- a/state/protocol/util/testing.go +++ b/state/protocol/util/testing.go @@ -71,7 +71,7 @@ func RunWithBootstrapState(t testing.TB, rootSnapshot protocol.Snapshot, f func( lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -101,7 +101,7 @@ func RunWithFullProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, f fu tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -145,7 +145,7 @@ func RunWithFullProtocolStateAndMetrics(t testing.TB, rootSnapshot protocol.Snap tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(mmetrics.NewNoopCollector(), db) + all := store.InitAll(mmetrics.NewNoopCollector(), db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -191,7 +191,7 @@ func RunWithFullProtocolStateAndValidator(t testing.TB, rootSnapshot protocol.Sn tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -235,7 +235,7 @@ func RunWithFollowerProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -275,7 +275,7 @@ func RunWithFullProtocolStateAndConsumer(t testing.TB, rootSnapshot protocol.Sna metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -318,7 +318,7 @@ func RunWithFullProtocolStateAndMetricsAndConsumer(t testing.TB, rootSnapshot pr db := pebbleimpl.ToDB(pdb) tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(mmetrics.NewNoopCollector(), db) + all := store.InitAll(mmetrics.NewNoopCollector(), db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -373,7 +373,7 @@ func RunWithFollowerProtocolStateAndHeaders(t testing.TB, rootSnapshot protocol. tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -414,7 +414,7 @@ func RunWithFullProtocolStateAndMutator(t testing.TB, rootSnapshot protocol.Snap tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, diff --git a/storage/badger/all.go b/storage/badger/all.go index 4a8e4dfb3c2..b06a25847bc 100644 --- a/storage/badger/all.go +++ b/storage/badger/all.go @@ -3,15 +3,16 @@ package badger import ( "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation/badgerimpl" "github.com/onflow/flow-go/storage/store" ) -func InitAll(metrics module.CacheMetrics, db *badger.DB) *storage.All { +func InitAll(metrics module.CacheMetrics, db *badger.DB, chainID flow.ChainID) *storage.All { sdb := badgerimpl.ToDB(db) - headers := store.NewHeaders(metrics, sdb) + headers := store.NewHeaders(metrics, sdb, chainID) guarantees := store.NewGuarantees(metrics, sdb, DefaultCacheSize, DefaultCacheSize) seals := store.NewSeals(metrics, sdb) index := store.NewIndex(metrics, sdb) diff --git a/storage/errors.go b/storage/errors.go index b3d81d9709c..5e268f5a2f2 100644 --- a/storage/errors.go +++ b/storage/errors.go @@ -31,6 +31,10 @@ var ( // ErrNotBootstrapped is returned when the database has not been bootstrapped. ErrNotBootstrapped = errors.New("pebble database not bootstrapped") + + // ErrWrongChain is returned when data from a specific chain (consensus or cluster) + // is expected to be read or inserted, but the actual chainID does not match. + ErrWrongChain = errors.New("data is not part of the expected chain") ) // InvalidDKGStateTransitionError is a sentinel error that is returned in case an invalid state transition is attempted. diff --git a/storage/store/blocks_test.go b/storage/store/blocks_test.go index f88ceeb977b..0179b3e1606 100644 --- a/storage/store/blocks_test.go +++ b/storage/store/blocks_test.go @@ -6,6 +6,7 @@ import ( "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" @@ -19,7 +20,7 @@ func TestBlockStoreAndRetrieve(t *testing.T) { lockManager := storage.NewTestingLockManager() cacheMetrics := &metrics.NoopCollector{} // verify after storing a block should be able to retrieve it back - blocks := store.InitAll(cacheMetrics, db).Blocks + blocks := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks block := unittest.FullBlockFixture() prop := unittest.ProposalFromBlock(block) @@ -46,7 +47,7 @@ func TestBlockStoreAndRetrieve(t *testing.T) { // verify after a restart, the block stored in the database is the same // as the original - blocksAfterRestart := store.InitAll(cacheMetrics, db).Blocks + blocksAfterRestart := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks receivedAfterRestart, err := blocksAfterRestart.ByID(block.ID()) require.NoError(t, err) require.Equal(t, *block, *receivedAfterRestart) @@ -57,7 +58,7 @@ func TestBlockIndexByHeightAndRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() cacheMetrics := &metrics.NoopCollector{} - blocks := store.InitAll(cacheMetrics, db).Blocks + blocks := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks block := unittest.FullBlockFixture() prop := unittest.ProposalFromBlock(block) @@ -102,7 +103,7 @@ func TestBlockIndexByHeightAndRetrieve(t *testing.T) { require.ErrorIs(t, err, storage.ErrNotFound) // Verify after a restart, the block indexed by height is still retrievable - blocksAfterRestart := store.InitAll(cacheMetrics, db).Blocks + blocksAfterRestart := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks receivedAfterRestart, err := blocksAfterRestart.ByHeight(block.Height) require.NoError(t, err) require.Equal(t, *block, *receivedAfterRestart) @@ -113,7 +114,7 @@ func TestBlockIndexByViewAndRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() cacheMetrics := &metrics.NoopCollector{} - blocks := store.InitAll(cacheMetrics, db).Blocks + blocks := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks block := unittest.FullBlockFixture() prop := unittest.ProposalFromBlock(block) @@ -153,7 +154,7 @@ func TestBlockIndexByViewAndRetrieve(t *testing.T) { require.ErrorIs(t, err, storage.ErrNotFound) // Verify after a restart, the block indexed by view is still retrievable - blocksAfterRestart := store.InitAll(cacheMetrics, db).Blocks + blocksAfterRestart := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks receivedAfterRestart, err := blocksAfterRestart.ByView(block.View) require.NoError(t, err) require.Equal(t, *block, *receivedAfterRestart) diff --git a/storage/store/cluster_blocks_test.go b/storage/store/cluster_blocks_test.go index 1f9377348d3..bae0c1e2484 100644 --- a/storage/store/cluster_blocks_test.go +++ b/storage/store/cluster_blocks_test.go @@ -55,7 +55,7 @@ func TestClusterBlocks(t *testing.T) { clusterBlocks := NewClusterBlocks( db, blocks[0].ChainID, - NewHeaders(metrics.NewNoopCollector(), db), + NewClusterHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), NewClusterPayloads(metrics.NewNoopCollector(), db), ) diff --git a/storage/store/guarantees_test.go b/storage/store/guarantees_test.go index de27ab3f5ab..addf69dfa6e 100644 --- a/storage/store/guarantees_test.go +++ b/storage/store/guarantees_test.go @@ -26,7 +26,7 @@ func TestGuaranteeStoreRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks guarantees := all.Guarantees @@ -92,7 +92,7 @@ func TestStoreDuplicateGuarantee(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks store1 := all.Guarantees expected := unittest.CollectionGuaranteeFixture() @@ -131,7 +131,7 @@ func TestStoreConflictingGuarantee(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks store1 := all.Guarantees expected := unittest.CollectionGuaranteeFixture() diff --git a/storage/store/headers.go b/storage/store/headers.go index 8fd1210d76d..e4996551fbb 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -21,34 +21,83 @@ type Headers struct { heightCache *Cache[uint64, flow.Identifier] viewCache *Cache[uint64, flow.Identifier] sigs *proposalSignatures + chainID flow.ChainID } var _ storage.Headers = (*Headers)(nil) // NewHeaders creates a Headers instance, which stores block headers. -// It supports storing, caching and retrieving by block ID and the additionally indexed by header height. -func NewHeaders(collector module.CacheMetrics, db storage.DB) *Headers { - storeWithLock := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, header *flow.Header) error { - return operation.InsertHeader(lctx, rw, blockID, header) - } - - retrieve := func(r storage.Reader, blockID flow.Identifier) (*flow.Header, error) { - var header flow.Header - err := operation.RetrieveHeader(r, blockID, &header) - return &header, err +// It supports storing, caching and retrieving by block ID, and additionally indexes by header height and view. +func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.ChainID) *Headers { + if chainID.IsClusterChain() { + panic("NewHeaders called on cluster chain ID - use NewClusterHeaders instead") } - retrieveHeight := func(r storage.Reader, height uint64) (flow.Identifier, error) { var id flow.Identifier err := operation.LookupBlockHeight(r, height, &id) return id, err } - retrieveView := func(r storage.Reader, view uint64) (flow.Identifier, error) { var id flow.Identifier err := operation.LookupCertifiedBlockByView(r, view, &id) return id, err } + return newHeaders(collector, db, chainID, retrieveHeight, retrieveView) +} + +// NewClusterHeaders creates a Headers instance for a collection cluster chain, which stores block headers for cluster blocks. +// It supports storing, caching and retrieving by block ID, and additionally an index by header height. +func NewClusterHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.ChainID) *Headers { + if !chainID.IsClusterChain() { + panic("NewClusterHeaders called on non-cluster chain ID - use NewHeaders instead") + } + retrieveHeight := func(r storage.Reader, height uint64) (flow.Identifier, error) { + var id flow.Identifier + err := operation.LookupClusterBlockHeight(r, chainID, height, &id) + return id, err + } + retrieveView := func(r storage.Reader, height uint64) (flow.Identifier, error) { + var id flow.Identifier + return id, fmt.Errorf("retrieve by view not implemented for cluster headers") + } + return newHeaders(collector, db, chainID, retrieveHeight, retrieveView) +} + +// newHeaders contains shared logic for Header storage, including storing and retrieving by block ID +func newHeaders(collector module.CacheMetrics, + db storage.DB, + chainID flow.ChainID, + retrieveHeight retrieveFunc[uint64, flow.Identifier], + retrieveView retrieveFunc[uint64, flow.Identifier], +) *Headers { + storeWithLock := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, header *flow.Header) error { + if header.ChainID != chainID { + return fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) + } + if chainID.IsClusterChain() { + if !lctx.HoldsLock(storage.LockInsertOrFinalizeClusterBlock) { + return fmt.Errorf("missing lock: %v", storage.LockInsertOrFinalizeClusterBlock) + } + } else { + if !lctx.HoldsLock(storage.LockInsertBlock) { + return fmt.Errorf("missing lock: %v", storage.LockInsertBlock) + } + } + return operation.InsertHeader(lctx, rw, blockID, header) + } + + retrieve := func(r storage.Reader, blockID flow.Identifier) (*flow.Header, error) { + var header flow.Header + err := operation.RetrieveHeader(r, blockID, &header) + if err != nil { + return nil, err + } + // raise an error when the retrieved header is for a different chain than expected + if header.ChainID != chainID { + return nil, fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) + } + return &header, err + } h := &Headers{ db: db, @@ -65,7 +114,8 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB) *Headers { withLimit[uint64, flow.Identifier](4*flow.DefaultTransactionExpiry), withRetrieve(retrieveView)), - sigs: newProposalSignatures(collector, db), + sigs: newProposalSignatures(collector, db), + chainID: chainID, } return h @@ -83,6 +133,7 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB) *Headers { // It returns [storage.ErrAlreadyExists] if the header already exists, i.e. we only insert a new header once. // This error allows the caller to detect duplicate inserts. If the header is stored along with other parts // of the block in the same batch, similar duplication checks can be skipped for storing other parts of the block. +// Returns [storage.ErrWrongChain] if the header's ChainID does not match the one used when initializing the storage. // No other errors are expected during normal operation. func (h *Headers) storeTx( lctx lockctx.Proof, @@ -131,6 +182,7 @@ func (h *Headers) retrieveIdByHeightTx(height uint64) (flow.Identifier, error) { // ByBlockID returns the header with the given ID. It is available for finalized blocks and those pending finalization. // Error returns: // - [storage.ErrNotFound] if no block header with the given ID exists +// - [storage.ErrWrongChain] if the block header exists in the database but is part of a different chain than expected func (h *Headers) ByBlockID(blockID flow.Identifier) (*flow.Header, error) { return h.retrieveTx(blockID) } diff --git a/storage/store/headers_test.go b/storage/store/headers_test.go index bc4f8ceb12f..2ca3860cdb7 100644 --- a/storage/store/headers_test.go +++ b/storage/store/headers_test.go @@ -21,7 +21,7 @@ func TestHeaderStoreRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks @@ -55,7 +55,7 @@ func TestHeaderIndexByViewAndRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks @@ -88,7 +88,7 @@ func TestHeaderIndexByViewAndRetrieve(t *testing.T) { func TestHeaderRetrieveWithoutStore(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, flow.Emulator) header := unittest.BlockHeaderFixture() @@ -106,7 +106,7 @@ func TestHeadersByParentID(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks @@ -172,7 +172,7 @@ func TestHeadersByParentIDChainStructure(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks diff --git a/storage/store/init.go b/storage/store/init.go index a4ec067d16c..b08a224becf 100644 --- a/storage/store/init.go +++ b/storage/store/init.go @@ -1,6 +1,7 @@ package store import ( + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/storage" ) @@ -27,8 +28,8 @@ type All struct { Collections *Collections } -func InitAll(metrics module.CacheMetrics, db storage.DB) *All { - headers := NewHeaders(metrics, db) +func InitAll(metrics module.CacheMetrics, db storage.DB, chainID flow.ChainID) *All { + headers := NewHeaders(metrics, db, chainID) guarantees := NewGuarantees(metrics, db, DefaultCacheSize, DefaultCacheSize) seals := NewSeals(metrics, db) index := NewIndex(metrics, db) diff --git a/storage/store/payloads_test.go b/storage/store/payloads_test.go index 604ff48b9cc..8752dd35df8 100644 --- a/storage/store/payloads_test.go +++ b/storage/store/payloads_test.go @@ -6,6 +6,7 @@ import ( "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation/dbtest" @@ -18,7 +19,7 @@ func TestPayloadStoreRetrieve(t *testing.T) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) payloads := all.Payloads blocks := all.Blocks diff --git a/utils/unittest/cluster_block.go b/utils/unittest/cluster_block.go index 55e2206a64b..ab02a48e1c2 100644 --- a/utils/unittest/cluster_block.go +++ b/utils/unittest/cluster_block.go @@ -17,6 +17,7 @@ func ClusterBlockFixture(opts ...func(*cluster.Block)) *cluster.Block { HeaderBody: HeaderBodyFixture(), Payload: *ClusterPayloadFixture(3), } + block.ChainID = "cluster" for _, opt := range opts { opt(block) }