From f3c7603be8b104a04911058362c3a083f15c2302 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 1 Dec 2025 14:53:15 -0800 Subject: [PATCH 01/16] add ChainID parameter to Header storage --- .../storage/read_range_cluster_blocks.go | 8 ++--- cmd/collection/main.go | 4 +-- cmd/scaffold.go | 27 ++++++++++++++++- cmd/util/cmd/common/storage.go | 5 ++-- .../exec-data-json-export/block_exporter.go | 7 ++++- .../delta_snapshot_exporter.go | 7 ++++- .../exec-data-json-export/event_exporter.go | 7 ++++- .../exec-data-json-export/result_exporter.go | 7 ++++- .../transaction_exporter.go | 8 ++++- cmd/util/cmd/export-json-transactions/cmd.go | 7 ++++- .../transactions/range_test.go | 2 +- cmd/util/cmd/find-inconsistent-result/cmd.go | 7 ++++- cmd/util/cmd/read-badger/cmd/blocks.go | 8 +++-- .../cmd/read-badger/cmd/cluster_blocks.go | 6 ++-- cmd/util/cmd/read-badger/cmd/collections.go | 7 ++++- cmd/util/cmd/read-badger/cmd/epoch_commit.go | 7 ++++- .../read-badger/cmd/epoch_protocol_state.go | 7 ++++- cmd/util/cmd/read-badger/cmd/guarantees.go | 7 ++++- .../cmd/read-badger/cmd/protocol_kvstore.go | 7 ++++- cmd/util/cmd/read-badger/cmd/seals.go | 7 ++++- .../read-badger/cmd/transaction_results.go | 7 ++++- cmd/util/cmd/read-badger/cmd/transactions.go | 7 ++++- .../read-light-block/read_light_block_test.go | 2 +- .../cmd/read-protocol-state/cmd/blocks.go | 7 ++++- .../cmd/read-protocol-state/cmd/snapshot.go | 7 ++++- .../cmd/rollback_executed_height.go | 9 ++++-- .../cmd/rollback_executed_height_test.go | 4 +-- cmd/util/cmd/snapshot/cmd.go | 7 ++++- .../cmd/verify-evm-offchain-replay/verify.go | 9 ++++-- consensus/integration/nodes_test.go | 2 +- consensus/recovery/protocol/state_test.go | 2 +- engine/access/access_test.go | 8 ++--- .../ingestion/collections/indexer_test.go | 2 +- .../transactions_functional_test.go | 2 +- .../epochmgr/factories/cluster_state.go | 2 +- engine/common/follower/integration_test.go | 2 +- engine/execution/pruner/core_test.go | 2 +- engine/testutil/nodes.go | 4 +-- engine/verification/verifier/verifiers.go | 2 +- integration/testnet/container.go | 2 +- .../tests/access/cohort4/access_test.go | 2 +- .../cohort4/execution_data_pruning_test.go | 2 +- module/block_iterator/iterator_test.go | 2 +- module/builder/collection/builder_test.go | 4 +-- .../pipeline_functional_test.go | 2 +- .../finalizedreader/finalizedreader_test.go | 3 +- module/finalizer/consensus/finalizer_test.go | 6 ++-- state/cluster/badger/mutator_test.go | 2 +- state/cluster/badger/snapshot_test.go | 2 +- state/protocol/badger/mutator_test.go | 8 ++--- state/protocol/badger/state.go | 30 +++++++++++++++++++ state/protocol/badger/state_test.go | 6 ++-- state/protocol/util/testing.go | 18 +++++------ storage/badger/all.go | 5 ++-- storage/store/blocks_test.go | 13 ++++---- storage/store/cluster_blocks_test.go | 4 +-- storage/store/guarantees_test.go | 6 ++-- storage/store/headers.go | 15 ++++++++-- storage/store/headers_test.go | 10 +++---- storage/store/init.go | 5 ++-- storage/store/payloads_test.go | 3 +- 61 files changed, 283 insertions(+), 106 deletions(-) diff --git a/admin/commands/storage/read_range_cluster_blocks.go b/admin/commands/storage/read_range_cluster_blocks.go index b0e41b86fe8..7fe1c068ff2 100644 --- a/admin/commands/storage/read_range_cluster_blocks.go +++ b/admin/commands/storage/read_range_cluster_blocks.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/admin/commands" "github.com/onflow/flow-go/cmd/util/cmd/read-light-block" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -22,14 +23,12 @@ const Max_Range_Cluster_Block_Limit = uint64(10001) type ReadRangeClusterBlocksCommand struct { db storage.DB - headers *store.Headers payloads *store.ClusterPayloads } -func NewReadRangeClusterBlocksCommand(db storage.DB, headers *store.Headers, payloads *store.ClusterPayloads) commands.AdminCommand { +func NewReadRangeClusterBlocksCommand(db storage.DB, payloads *store.ClusterPayloads) commands.AdminCommand { return &ReadRangeClusterBlocksCommand{ db: db, - headers: headers, payloads: payloads, } } @@ -51,8 +50,9 @@ func (c *ReadRangeClusterBlocksCommand) Handler(ctx context.Context, req *admin. return nil, admin.NewInvalidAdminReqErrorf("getting for more than %v blocks at a time might have an impact to node's performance and is not allowed", Max_Range_Cluster_Block_Limit) } + clusterHeaders := store.NewHeaders(&metrics.NoopCollector{}, c.db, flow.ChainID(chainID)) clusterBlocks := store.NewClusterBlocks( - c.db, flow.ChainID(chainID), c.headers, c.payloads, + c.db, flow.ChainID(chainID), clusterHeaders, c.payloads, ) lights, err := read.ReadClusterLightBlockByHeightRange(clusterBlocks, reqData.startHeight, reqData.endHeight) diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 08fd542449b..5bcaa9b44f1 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -235,8 +235,8 @@ func main() { }). AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand { clusterPayloads := store.NewClusterPayloads(&metrics.NoopCollector{}, conf.ProtocolDB) - headers := store.NewHeaders(&metrics.NoopCollector{}, conf.ProtocolDB) - return storageCommands.NewReadRangeClusterBlocksCommand(conf.ProtocolDB, headers, clusterPayloads) + // defer construction of Headers since the cluster's ChainID is provided by the command + return storageCommands.NewReadRangeClusterBlocksCommand(conf.ProtocolDB, clusterPayloads) }). Module("follower distributor", func(node *cmd.NodeConfig) error { followerDistributor = pubsub.NewFollowerDistributor() diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 142f2d55c6d..7b084e44f29 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -1186,8 +1186,29 @@ func (fnb *FlowNodeBuilder) initStorageLockManager() error { return nil } +func (fnb *FlowNodeBuilder) determineChainID() error { + if ok, _ := badgerState.IsBootstrapped(fnb.ProtocolDB); ok { + chainID, err := badgerState.GetChainIDFromLatestFinalizedHeader(fnb.ProtocolDB) + if err == nil { + fnb.RootChainID = chainID + return nil + } + } + // could not read from DB; try reading root snapshot from disk + fnb.Logger.Info().Msgf("loading root protocol state snapshot from disk") + rootSnapshot, err := loadRootProtocolSnapshot(fnb.BaseConfig.BootstrapDir) + if err != nil { + return fmt.Errorf("failed to read protocol snapshot from disk: %w", err) + } + // set root snapshot fields (including RootChainID) + if err := fnb.setRootSnapshot(rootSnapshot); err != nil { + return err + } + return nil +} + func (fnb *FlowNodeBuilder) initStorage() error { - headers := store.NewHeaders(fnb.Metrics.Cache, fnb.ProtocolDB) + headers := store.NewHeaders(fnb.Metrics.Cache, fnb.ProtocolDB, fnb.RootChainID) guarantees := store.NewGuarantees(fnb.Metrics.Cache, fnb.ProtocolDB, fnb.BaseConfig.guaranteesCacheSize, store.DefaultCacheSize) seals := store.NewSeals(fnb.Metrics.Cache, fnb.ProtocolDB) @@ -2081,6 +2102,10 @@ func (fnb *FlowNodeBuilder) onStart() error { return err } + if err := fnb.determineChainID(); err != nil { + return err + } + if err := fnb.initStorage(); err != nil { return err } diff --git a/cmd/util/cmd/common/storage.go b/cmd/util/cmd/common/storage.go index 13cd2170f98..1c6f4606ae0 100644 --- a/cmd/util/cmd/common/storage.go +++ b/cmd/util/cmd/common/storage.go @@ -5,6 +5,7 @@ import ( "github.com/rs/zerolog/log" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" storagebadger "github.com/onflow/flow-go/storage/badger" @@ -53,9 +54,9 @@ func IsPebbleFolder(dataDir string) (bool, error) { return pebblestorage.IsPebbleFolder(dataDir) } -func InitStorages(db storage.DB) *store.All { +func InitStorages(db storage.DB, chainID flow.ChainID) *store.All { metrics := &metrics.NoopCollector{} - return store.InitAll(metrics, db) + return store.InitAll(metrics, db, chainID) } // WithStorage runs the given function with the storage depending on the flags. diff --git a/cmd/util/cmd/exec-data-json-export/block_exporter.go b/cmd/util/cmd/exec-data-json-export/block_exporter.go index 32efa2e78a5..354c75420f5 100644 --- a/cmd/util/cmd/exec-data-json-export/block_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/block_exporter.go @@ -12,6 +12,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -37,9 +38,13 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) (fl // traverse backward from the given block (parent block) and fetch by blockHash err := common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) index := store.NewIndex(cacheMetrics, db) guarantees := store.NewGuarantees(cacheMetrics, db, store.DefaultCacheSize, store.DefaultCacheSize) seals := store.NewSeals(cacheMetrics, db) diff --git a/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go b/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go index 3e3d6971a51..ca14cb9bb32 100644 --- a/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/fvm/storage/snapshot" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/store" @@ -26,9 +27,13 @@ func ExportDeltaSnapshots(blockID flow.Identifier, dbPath string, outputPath str // traverse backward from the given block (parent block) and fetch by blockHash return common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) activeBlockID := blockID outputFile := filepath.Join(outputPath, "delta.jsonl") diff --git a/cmd/util/cmd/exec-data-json-export/event_exporter.go b/cmd/util/cmd/exec-data-json-export/event_exporter.go index 600f4d45af3..0020dccb2a7 100644 --- a/cmd/util/cmd/exec-data-json-export/event_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/event_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -30,9 +31,13 @@ func ExportEvents(blockID flow.Identifier, dbPath string, outputPath string) err // traverse backward from the given block (parent block) and fetch by blockHash return common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) events := store.NewEvents(cacheMetrics, db) activeBlockID := blockID diff --git a/cmd/util/cmd/exec-data-json-export/result_exporter.go b/cmd/util/cmd/exec-data-json-export/result_exporter.go index 6afefd7ee9a..df78cabd221 100644 --- a/cmd/util/cmd/exec-data-json-export/result_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/result_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -28,9 +29,13 @@ func ExportResults(blockID flow.Identifier, dbPath string, outputPath string) er // traverse backward from the given block (parent block) and fetch by blockHash return common.WithStorage(dbPath, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) results := store.NewExecutionResults(cacheMetrics, db) activeBlockID := blockID diff --git a/cmd/util/cmd/exec-data-json-export/transaction_exporter.go b/cmd/util/cmd/exec-data-json-export/transaction_exporter.go index 5c8f937437d..23437a99894 100644 --- a/cmd/util/cmd/exec-data-json-export/transaction_exporter.go +++ b/cmd/util/cmd/exec-data-json-export/transaction_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage/store" ) @@ -48,6 +49,11 @@ func ExportExecutedTransactions(blockID flow.Identifier, dbPath string, outputPa } defer db.Close() + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + cacheMetrics := &metrics.NoopCollector{} index := store.NewIndex(cacheMetrics, db) guarantees := store.NewGuarantees(cacheMetrics, db, store.DefaultCacheSize, store.DefaultCacheSize) @@ -55,7 +61,7 @@ func ExportExecutedTransactions(blockID flow.Identifier, dbPath string, outputPa results := store.NewExecutionResults(cacheMetrics, db) receipts := store.NewExecutionReceipts(cacheMetrics, db, results, store.DefaultCacheSize) transactions := store.NewTransactions(cacheMetrics, db) - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) payloads := store.NewPayloads(db, index, guarantees, seals, receipts, results) blocks := store.NewBlocks(db, headers, payloads) collections := store.NewCollections(db, transactions) diff --git a/cmd/util/cmd/export-json-transactions/cmd.go b/cmd/util/cmd/export-json-transactions/cmd.go index c8ba51cf2b5..13e09aa93a0 100644 --- a/cmd/util/cmd/export-json-transactions/cmd.go +++ b/cmd/util/cmd/export-json-transactions/cmd.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/cmd/util/cmd/export-json-transactions/transactions" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -67,7 +68,11 @@ func ExportTransactions(lockManager lockctx.Manager, dataDir string, outputDir s // init dependencies return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { diff --git a/cmd/util/cmd/export-json-transactions/transactions/range_test.go b/cmd/util/cmd/export-json-transactions/transactions/range_test.go index 9d85f5c6232..d8f6d58644c 100644 --- a/cmd/util/cmd/export-json-transactions/transactions/range_test.go +++ b/cmd/util/cmd/export-json-transactions/transactions/range_test.go @@ -60,7 +60,7 @@ func TestFindBlockTransactions(t *testing.T) { ) // prepare dependencies - storages := common.InitStorages(db) + storages := common.InitStorages(db, b1.ChainID) payloads, collections := storages.Payloads, storages.Collections snap4 := &mock.Snapshot{} snap4.On("Head").Return(b1.ToHeader(), nil) diff --git a/cmd/util/cmd/find-inconsistent-result/cmd.go b/cmd/util/cmd/find-inconsistent-result/cmd.go index 16636dd5fc9..f20593cec91 100644 --- a/cmd/util/cmd/find-inconsistent-result/cmd.go +++ b/cmd/util/cmd/find-inconsistent-result/cmd.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/block_iterator/latest" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -95,7 +96,11 @@ func findFirstMismatch(datadir string, startHeight, endHeight uint64, lockManage func createStorages(db storage.DB, lockManager lockctx.Manager) ( storage.Headers, storage.ExecutionResults, storage.Seals, protocol.State, error) { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return nil, nil, nil, nil, err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return nil, nil, nil, nil, fmt.Errorf("could not open protocol state: %v", err) diff --git a/cmd/util/cmd/read-badger/cmd/blocks.go b/cmd/util/cmd/read-badger/cmd/blocks.go index 1e60bf85487..839a942e643 100644 --- a/cmd/util/cmd/read-badger/cmd/blocks.go +++ b/cmd/util/cmd/read-badger/cmd/blocks.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -28,8 +29,12 @@ var blocksCmd = &cobra.Command{ Short: "get a block by block ID or height", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } cacheMetrics := &metrics.NoopCollector{} - headers := store.NewHeaders(cacheMetrics, db) + headers := store.NewHeaders(cacheMetrics, db, chainID) index := store.NewIndex(cacheMetrics, db) guarantees := store.NewGuarantees(cacheMetrics, db, store.DefaultCacheSize, store.DefaultCacheSize) seals := store.NewSeals(cacheMetrics, db) @@ -39,7 +44,6 @@ var blocksCmd = &cobra.Command{ blocks := store.NewBlocks(db, headers, payloads) var block *flow.Block - var err error if flagBlockID != "" { log.Info().Msgf("got flag block id: %s", flagBlockID) diff --git a/cmd/util/cmd/read-badger/cmd/cluster_blocks.go b/cmd/util/cmd/read-badger/cmd/cluster_blocks.go index 6d094fd10ae..c2c9362d087 100644 --- a/cmd/util/cmd/read-badger/cmd/cluster_blocks.go +++ b/cmd/util/cmd/read-badger/cmd/cluster_blocks.go @@ -34,12 +34,12 @@ var clusterBlocksCmd = &cobra.Command{ return common.WithStorage(flagDatadir, func(db storage.DB) error { metrics := metrics.NewNoopCollector() - headers := store.NewHeaders(metrics, db) - clusterPayloads := store.NewClusterPayloads(metrics, db) - // get chain id log.Info().Msgf("got flag chain name: %s", flagChainName) chainID := flow.ChainID(flagChainName) + + headers := store.NewHeaders(metrics, db, chainID) + clusterPayloads := store.NewClusterPayloads(metrics, db) clusterBlocks := store.NewClusterBlocks(db, chainID, headers, clusterPayloads) if flagClusterBlockID != "" && flagHeight != 0 { diff --git a/cmd/util/cmd/read-badger/cmd/collections.go b/cmd/util/cmd/read-badger/cmd/collections.go index b59817cad8e..b16b0ffac6e 100644 --- a/cmd/util/cmd/read-badger/cmd/collections.go +++ b/cmd/util/cmd/read-badger/cmd/collections.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -28,7 +29,11 @@ var collectionsCmd = &cobra.Command{ Short: "get collection by collection or transaction ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used if flagCollectionID != "" { log.Info().Msgf("got flag collection id: %s", flagCollectionID) diff --git a/cmd/util/cmd/read-badger/cmd/epoch_commit.go b/cmd/util/cmd/read-badger/cmd/epoch_commit.go index 23deb37105b..a6d7341dab7 100644 --- a/cmd/util/cmd/read-badger/cmd/epoch_commit.go +++ b/cmd/util/cmd/read-badger/cmd/epoch_commit.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -25,7 +26,11 @@ var epochCommitCmd = &cobra.Command{ Short: "get epoch commit by ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag commit id: %s", flagEpochCommitID) commitID, err := flow.HexStringToIdentifier(flagEpochCommitID) diff --git a/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go b/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go index 0a7922e4cf3..73d6c4e7838 100644 --- a/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go +++ b/cmd/util/cmd/read-badger/cmd/epoch_protocol_state.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -23,7 +24,11 @@ var epochProtocolStateCmd = &cobra.Command{ Short: "get epoch protocol state by block ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag block id: %s", flagBlockID) blockID, err := flow.HexStringToIdentifier(flagBlockID) diff --git a/cmd/util/cmd/read-badger/cmd/guarantees.go b/cmd/util/cmd/read-badger/cmd/guarantees.go index 14adf48588a..e9196fa2a24 100644 --- a/cmd/util/cmd/read-badger/cmd/guarantees.go +++ b/cmd/util/cmd/read-badger/cmd/guarantees.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -23,7 +24,11 @@ var guaranteesCmd = &cobra.Command{ Short: "get guarantees by collection ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag collection id: %s", flagCollectionID) collectionID, err := flow.HexStringToIdentifier(flagCollectionID) diff --git a/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go b/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go index d434d6e1aaa..2908b6cead7 100644 --- a/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go +++ b/cmd/util/cmd/read-badger/cmd/protocol_kvstore.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -30,7 +31,11 @@ var protocolStateKVStore = &cobra.Command{ Short: "get protocol state kvstore by block ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag block id: %s", flagBlockID) blockID, err := flow.HexStringToIdentifier(flagBlockID) diff --git a/cmd/util/cmd/read-badger/cmd/seals.go b/cmd/util/cmd/read-badger/cmd/seals.go index f6ebaa2e180..5baff53402f 100644 --- a/cmd/util/cmd/read-badger/cmd/seals.go +++ b/cmd/util/cmd/read-badger/cmd/seals.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -25,7 +26,11 @@ var sealsCmd = &cobra.Command{ Short: "get seals by block or seal ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used if flagSealID != "" && flagBlockID != "" { return fmt.Errorf("provide one of the flags --id or --block-id") diff --git a/cmd/util/cmd/read-badger/cmd/transaction_results.go b/cmd/util/cmd/read-badger/cmd/transaction_results.go index 44285007cc7..01dfe91018e 100644 --- a/cmd/util/cmd/read-badger/cmd/transaction_results.go +++ b/cmd/util/cmd/read-badger/cmd/transaction_results.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -25,11 +26,15 @@ var transactionResultsCmd = &cobra.Command{ Short: "get transaction-result by block ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } transactionResults, err := store.NewTransactionResults(metrics.NewNoopCollector(), db, 1) if err != nil { return err } - storages := common.InitStorages(db) + storages := common.InitStorages(db, chainID) log.Info().Msgf("got flag block id: %s", flagBlockID) blockID, err := flow.HexStringToIdentifier(flagBlockID) if err != nil { diff --git a/cmd/util/cmd/read-badger/cmd/transactions.go b/cmd/util/cmd/read-badger/cmd/transactions.go index c3e123d4808..e2211486634 100644 --- a/cmd/util/cmd/read-badger/cmd/transactions.go +++ b/cmd/util/cmd/read-badger/cmd/transactions.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -23,7 +24,11 @@ var transactionsCmd = &cobra.Command{ Short: "get transaction by ID", RunE: func(cmd *cobra.Command, args []string) error { return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - header storage not used log.Info().Msgf("got flag transaction id: %s", flagTransactionID) transactionID, err := flow.HexStringToIdentifier(flagTransactionID) diff --git a/cmd/util/cmd/read-light-block/read_light_block_test.go b/cmd/util/cmd/read-light-block/read_light_block_test.go index 84e5abfc002..94329b8837b 100644 --- a/cmd/util/cmd/read-light-block/read_light_block_test.go +++ b/cmd/util/cmd/read-light-block/read_light_block_test.go @@ -56,7 +56,7 @@ func TestReadClusterRange(t *testing.T) { clusterBlocks := store.NewClusterBlocks( db, blocks[0].ChainID, - store.NewHeaders(metrics.NewNoopCollector(), db), + store.NewHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), store.NewClusterPayloads(metrics.NewNoopCollector(), db), ) diff --git a/cmd/util/cmd/read-protocol-state/cmd/blocks.go b/cmd/util/cmd/read-protocol-state/cmd/blocks.go index 4c2ef0d13bd..6bca11c0009 100644 --- a/cmd/util/cmd/read-protocol-state/cmd/blocks.go +++ b/cmd/util/cmd/read-protocol-state/cmd/blocks.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -150,7 +151,11 @@ func (r *Reader) IsExecuted(blockID flow.Identifier) (bool, error) { func runE(*cobra.Command, []string) error { lockManager := storage.MakeSingletonLockManager() return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { diff --git a/cmd/util/cmd/read-protocol-state/cmd/snapshot.go b/cmd/util/cmd/read-protocol-state/cmd/snapshot.go index 921fc26ba9b..a08434232c8 100644 --- a/cmd/util/cmd/read-protocol-state/cmd/snapshot.go +++ b/cmd/util/cmd/read-protocol-state/cmd/snapshot.go @@ -11,6 +11,7 @@ import ( commonFuncs "github.com/onflow/flow-go/cmd/util/common" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/inmem" "github.com/onflow/flow-go/storage" ) @@ -56,7 +57,11 @@ func init() { func runSnapshotE(*cobra.Command, []string) error { lockManager := storage.MakeSingletonLockManager() return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return fmt.Errorf("could not init protocol state") diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go index fa39fc67f80..fb58b8ef441 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/state/protocol" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger" "github.com/onflow/flow-go/storage/operation/pebbleimpl" @@ -59,7 +60,11 @@ func runE(*cobra.Command, []string) error { } return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - clean up duplicated storage types state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return fmt.Errorf("could not open protocol states: %w", err) @@ -75,7 +80,7 @@ func runE(*cobra.Command, []string) error { results := store.NewExecutionResults(metrics, db) receipts := store.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize) myReceipts := store.NewMyExecutionReceipts(metrics, db, receipts) - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, chainID) events := store.NewEvents(metrics, db) serviceEvents := store.NewServiceEvents(metrics, db) transactions := store.NewTransactions(metrics, db) diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go index 386acefd145..548f019a7ee 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go @@ -39,7 +39,7 @@ func TestReExecuteBlock(t *testing.T) { // create all modules metrics := &metrics.NoopCollector{} - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks txResults, err := store.NewTransactionResults(metrics, db, store.DefaultCacheSize) @@ -199,7 +199,7 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) { // create all modules metrics := &metrics.NoopCollector{} - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks commits := store.NewCommits(metrics, db) diff --git a/cmd/util/cmd/snapshot/cmd.go b/cmd/util/cmd/snapshot/cmd.go index b384c81220f..c5216377a36 100644 --- a/cmd/util/cmd/snapshot/cmd.go +++ b/cmd/util/cmd/snapshot/cmd.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/engine/common/rpc/convert" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" ) @@ -47,7 +48,11 @@ func runE(*cobra.Command, []string) error { lockManager := storage.MakeSingletonLockManager() return common.WithStorage(flagDatadir, func(db storage.DB) error { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return err + } + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return fmt.Errorf("could not open protocol state: %w", err) diff --git a/cmd/util/cmd/verify-evm-offchain-replay/verify.go b/cmd/util/cmd/verify-evm-offchain-replay/verify.go index 96eeeb3c9a3..e6bdafd416e 100644 --- a/cmd/util/cmd/verify-evm-offchain-replay/verify.go +++ b/cmd/util/cmd/verify-evm-offchain-replay/verify.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/blobs" "github.com/onflow/flow-go/module/executiondatasync/execution_data" + badgerstate "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/store" ) @@ -144,10 +145,14 @@ func initStorages(db storage.DB, executionDataDir string) ( io.Closer, error, ) { - storages := common.InitStorages(db) + chainID, err := badgerstate.GetChainIDFromLatestFinalizedHeader(db) + if err != nil { + return nil, nil, nil, err + } + storages := common.InitStorages(db, chainID) // TODO(4204) - check usage datastoreDir := filepath.Join(executionDataDir, "blobstore") - err := os.MkdirAll(datastoreDir, 0700) + err = os.MkdirAll(datastoreDir, 0700) if err != nil { return nil, nil, nil, err } diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 165d2450933..54e596c7306 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -387,7 +387,7 @@ func createNode( db := pebbleimpl.ToDB(pdb) lockManager := fstorage.NewTestingLockManager() - headersDB := store.NewHeaders(metricsCollector, db) + headersDB := store.NewHeaders(metricsCollector, db, rootSnapshot.Params().ChainID()) guaranteesDB := store.NewGuarantees(metricsCollector, db, store.DefaultCacheSize, store.DefaultCacheSize) sealsDB := store.NewSeals(metricsCollector, db) indexDB := store.NewIndex(metricsCollector, db) diff --git a/consensus/recovery/protocol/state_test.go b/consensus/recovery/protocol/state_test.go index 790aa8751a4..13626de8b03 100644 --- a/consensus/recovery/protocol/state_test.go +++ b/consensus/recovery/protocol/state_test.go @@ -46,7 +46,7 @@ func TestSaveBlockAsReplica(t *testing.T) { require.NoError(t, err) metrics := metrics.NewNoopCollector() - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, flow.Emulator) finalized, pending, err := recovery.FindLatest(state, headers) require.NoError(t, err) require.Equal(t, b0.ID(), finalized.ID(), "recover find latest returns inconsistent finalized block") diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 1b77dc19518..7b7c0fff947 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -161,7 +161,7 @@ func (suite *Suite) RunTest( ) { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) var err error suite.backend, err = backend.New(backend.Params{ @@ -653,7 +653,7 @@ func (suite *Suite) TestGetSealedTransaction() { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) enNodeIDs := enIdentities.NodeIDs() @@ -873,7 +873,7 @@ func (suite *Suite) TestGetTransactionResult() { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) originID := unittest.IdentifierFixture() *suite.state = protocol.State{} @@ -1210,7 +1210,7 @@ func (suite *Suite) TestExecuteScript() { unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) { lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics.NewNoopCollector(), db) + all := store.InitAll(metrics.NewNoopCollector(), db, flow.Emulator) identities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) suite.sealedSnapshot.On("Identities", mock.Anything).Return(identities, nil) suite.finalSnapshot.On("Identities", mock.Anything).Return(identities, nil) diff --git a/engine/access/ingestion/collections/indexer_test.go b/engine/access/ingestion/collections/indexer_test.go index 294ac3c034a..1892e555094 100644 --- a/engine/access/ingestion/collections/indexer_test.go +++ b/engine/access/ingestion/collections/indexer_test.go @@ -392,7 +392,7 @@ func newBlockchain(t *testing.T, pdb *pebble.DB) *blockchain { db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) transactions := store.NewTransactions(metrics, db) collections := store.NewCollections(db, transactions) diff --git a/engine/access/rpc/backend/transactions/transactions_functional_test.go b/engine/access/rpc/backend/transactions/transactions_functional_test.go index 263f8022029..c960e80348f 100644 --- a/engine/access/rpc/backend/transactions/transactions_functional_test.go +++ b/engine/access/rpc/backend/transactions/transactions_functional_test.go @@ -121,7 +121,7 @@ func (s *TransactionsFunctionalSuite) SetupTest() { s.db = pebbleimpl.ToDB(pdb) // Instantiate storages - all := store.InitAll(metrics, s.db) + all := store.InitAll(metrics, s.db, flow.Emulator) s.blocks = all.Blocks s.collections = all.Collections diff --git a/engine/collection/epochmgr/factories/cluster_state.go b/engine/collection/epochmgr/factories/cluster_state.go index 9548f033943..714104c8218 100644 --- a/engine/collection/epochmgr/factories/cluster_state.go +++ b/engine/collection/epochmgr/factories/cluster_state.go @@ -41,7 +41,7 @@ func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot) ( error, ) { - headers := store.NewHeaders(f.metrics, f.db) + headers := store.NewHeaders(f.metrics, f.db, stateRoot.ClusterID()) payloads := store.NewClusterPayloads(f.metrics, f.db) blocks := store.NewClusterBlocks(f.db, stateRoot.ClusterID(), headers, payloads) diff --git a/engine/common/follower/integration_test.go b/engine/common/follower/integration_test.go index 486f1b236a0..bb38459e572 100644 --- a/engine/common/follower/integration_test.go +++ b/engine/common/follower/integration_test.go @@ -51,7 +51,7 @@ func TestFollowerHappyPath(t *testing.T) { tracer := trace.NewNoopTracer() log := unittest.Logger() consumer := events.NewNoop() - all := store.InitAll(metrics, pebbleimpl.ToDB(pdb)) + all := store.InitAll(metrics, pebbleimpl.ToDB(pdb), flow.Emulator) // bootstrap root snapshot state, err := pbadger.Bootstrap( diff --git a/engine/execution/pruner/core_test.go b/engine/execution/pruner/core_test.go index a50811582b1..0897269cc4c 100644 --- a/engine/execution/pruner/core_test.go +++ b/engine/execution/pruner/core_test.go @@ -33,7 +33,7 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) { db := pebbleimpl.ToDB(pdb) ctx, cancel := context.WithCancel(context.Background()) metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blockstore := all.Blocks results := all.Results diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index 6dbb9f33f3c..cc924726948 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -238,7 +238,7 @@ func CompleteStateFixture( pdb := unittest.TypedPebbleDB(t, publicDBDir, pebble.Open) db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() - s := store.InitAll(metric, db) + s := store.InitAll(metric, db, flow.Emulator) secretsDB := unittest.TypedBadgerDB(t, secretsDBDir, storagebadger.InitSecret) consumer := events.NewDistributor() @@ -560,7 +560,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ide receipts := store.NewExecutionReceipts(node.Metrics, db, results, storagebadger.DefaultCacheSize) myReceipts := store.NewMyExecutionReceipts(node.Metrics, db, receipts) versionBeacons := store.NewVersionBeacons(db) - headersStorage := store.NewHeaders(node.Metrics, db) + headersStorage := store.NewHeaders(node.Metrics, db, chainID) checkAuthorizedAtBlock := func(blockID flow.Identifier) (bool, error) { return protocol.IsNodeAuthorizedAt(node.State.AtBlockID(blockID), node.Me.NodeID()) diff --git a/engine/verification/verifier/verifiers.go b/engine/verification/verifier/verifiers.go index e52897f109c..e0659c69ffc 100644 --- a/engine/verification/verifier/verifiers.go +++ b/engine/verification/verifier/verifiers.go @@ -241,7 +241,7 @@ func initStorages( return nil, nil, nil, nil, nil, fmt.Errorf("could not init storage database: %w", err) } - storages := common.InitStorages(db) + storages := common.InitStorages(db, chainID) state, err := common.OpenProtocolState(lockManager, db, storages) if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("could not open protocol state: %w", err) diff --git a/integration/testnet/container.go b/integration/testnet/container.go index b32af817506..af1a3559b37 100644 --- a/integration/testnet/container.go +++ b/integration/testnet/container.go @@ -385,7 +385,7 @@ func (c *Container) OpenState() (*state.State, error) { } metrics := metrics.NewNoopCollector() index := store.NewIndex(metrics, db) - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, c.net.Root().ChainID) seals := store.NewSeals(metrics, db) results := store.NewExecutionResults(metrics, db) receipts := store.NewExecutionReceipts(metrics, db, results, store.DefaultCacheSize) diff --git a/integration/tests/access/cohort4/access_test.go b/integration/tests/access/cohort4/access_test.go index 5f059783f2c..eabf87b892f 100644 --- a/integration/tests/access/cohort4/access_test.go +++ b/integration/tests/access/cohort4/access_test.go @@ -154,7 +154,7 @@ func (s *AccessSuite) runTestSignerIndicesDecoding() { err = container.WaitForContainerStopped(5 * time.Second) require.NoError(s.T(), err) - // open state to build a block singer decoder + // open state to build a block signer decoder state, err := container.OpenState() require.NoError(s.T(), err) diff --git a/integration/tests/access/cohort4/execution_data_pruning_test.go b/integration/tests/access/cohort4/execution_data_pruning_test.go index edf899c921f..814b9f0d635 100644 --- a/integration/tests/access/cohort4/execution_data_pruning_test.go +++ b/integration/tests/access/cohort4/execution_data_pruning_test.go @@ -163,7 +163,7 @@ func (s *ExecutionDataPruningSuite) TestHappyPath() { // setup storage objects needed to get the execution data id db, err := accessNode.DB() require.NoError(s.T(), err, "could not open db") - anHeaders := store.NewHeaders(metrics, db) + anHeaders := store.NewHeaders(metrics, db, flow.Localnet) anResults := store.NewExecutionResults(metrics, db) anSeals := store.NewSeals(metrics, db) diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index ee5a6115f03..6f57414cf62 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -41,7 +41,7 @@ func TestIterateHeight(t *testing.T) { // create iterator // b0 is the root block, iterate from b1 to b3 iterRange := module.IteratorRange{Start: b1.Height, End: b3.Height} - headers := store.NewHeaders(&metrics.NoopCollector{}, db) + headers := store.NewHeaders(&metrics.NoopCollector{}, db, "") // TODO(4204) set chainID in this test? getBlockIDByIndex := func(height uint64) (flow.Identifier, bool, error) { blockID, err := headers.BlockIDByHeight(height) if err != nil { diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index a63e3583e81..dc408b09039 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -95,7 +95,7 @@ func (suite *BuilderSuite) SetupTest() { tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, flow.Emulator) // TODO(4204) - handle cluster and non-cluster blocks? consumer := events.NewNoop() suite.headers = all.Headers @@ -1488,7 +1488,7 @@ func benchmarkBuildOn(b *testing.B, size int) { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, flow.Emulator) suite.headers = all.Headers suite.blocks = all.Blocks suite.payloads = store.NewClusterPayloads(metrics, suite.db) diff --git a/module/executiondatasync/optimistic_sync/pipeline_functional_test.go b/module/executiondatasync/optimistic_sync/pipeline_functional_test.go index be23d5b99c3..3a97f5dc12e 100644 --- a/module/executiondatasync/optimistic_sync/pipeline_functional_test.go +++ b/module/executiondatasync/optimistic_sync/pipeline_functional_test.go @@ -123,7 +123,7 @@ func (p *PipelineFunctionalSuite) SetupTest() { p.Require().NoError(err) // store and index the root header - p.headers = store.NewHeaders(p.metrics, p.db) + p.headers = store.NewHeaders(p.metrics, p.db, g.ChainID()) err = unittest.WithLock(t, p.lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { return p.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { diff --git a/module/finalizedreader/finalizedreader_test.go b/module/finalizedreader/finalizedreader_test.go index cd6bd194878..06433c00814 100644 --- a/module/finalizedreader/finalizedreader_test.go +++ b/module/finalizedreader/finalizedreader_test.go @@ -7,6 +7,7 @@ import ( "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" @@ -20,7 +21,7 @@ func TestFinalizedReader(t *testing.T) { lockManager := storage.NewTestingLockManager() // prepare the storage.Headers instance metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks headers := all.Headers proposal := unittest.ProposalFixture() diff --git a/module/finalizer/consensus/finalizer_test.go b/module/finalizer/consensus/finalizer_test.go index 965654328dc..03906cbdcc6 100644 --- a/module/finalizer/consensus/finalizer_test.go +++ b/module/finalizer/consensus/finalizer_test.go @@ -107,7 +107,7 @@ func TestMakeFinalValidChain(t *testing.T) { metrics := metrics.NewNoopCollector() fin := Finalizer{ dbReader: pebbleimpl.ToDB(pdb).Reader(), - headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb)), + headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb), flow.Emulator), state: state, tracer: trace.NewNoopTracer(), cleanup: LogCleanup(&list), @@ -177,7 +177,7 @@ func TestMakeFinalInvalidHeight(t *testing.T) { metrics := metrics.NewNoopCollector() fin := Finalizer{ dbReader: pebbleimpl.ToDB(pdb).Reader(), - headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb)), + headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb), flow.Emulator), state: state, tracer: trace.NewNoopTracer(), cleanup: LogCleanup(&list), @@ -236,7 +236,7 @@ func TestMakeFinalDuplicate(t *testing.T) { metrics := metrics.NewNoopCollector() fin := Finalizer{ dbReader: pebbleimpl.ToDB(pdb).Reader(), - headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb)), + headers: store.NewHeaders(metrics, pebbleimpl.ToDB(pdb), flow.Emulator), state: state, tracer: trace.NewNoopTracer(), cleanup: LogCleanup(&list), diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index cbf68967be3..130794b3f8f 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -68,7 +68,7 @@ func (suite *MutatorSuite) SetupTest() { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, suite.chainID) // TODO(4204) - handle cluster and non-cluster blocks? colPayloads := store.NewClusterPayloads(metrics, suite.db) // just bootstrap with a genesis block, we'll use this as reference diff --git a/state/cluster/badger/snapshot_test.go b/state/cluster/badger/snapshot_test.go index 76db04a71f5..fb6a5bb2269 100644 --- a/state/cluster/badger/snapshot_test.go +++ b/state/cluster/badger/snapshot_test.go @@ -56,7 +56,7 @@ func (suite *SnapshotSuite) SetupTest() { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() - all := store.InitAll(metrics, suite.db) + all := store.InitAll(metrics, suite.db, flow.Emulator) // TODO(4204) - handle cluster and non-cluster blocks? colPayloads := store.NewClusterPayloads(metrics, suite.db) root := unittest.RootSnapshotFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) diff --git a/state/protocol/badger/mutator_test.go b/state/protocol/badger/mutator_test.go index 47055894268..c0d209b00c1 100644 --- a/state/protocol/badger/mutator_test.go +++ b/state/protocol/badger/mutator_test.go @@ -90,7 +90,7 @@ func TestExtendValid(t *testing.T) { tracer := trace.NewNoopTracer() db := pebbleimpl.ToDB(pdb) log := zerolog.Nop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) distributor := events.NewDistributor() consumer := mockprotocol.NewConsumer(t) @@ -920,7 +920,7 @@ func TestExtendEpochTransitionValid(t *testing.T) { tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(mmetrics.NewNoopCollector(), db) + all := store.InitAll(mmetrics.NewNoopCollector(), db, flow.Emulator) protoState, err := protocol.Bootstrap( metrics, db, @@ -2803,7 +2803,7 @@ func TestExtendInvalidSealsInBlock(t *testing.T) { tracer := trace.NewNoopTracer() log := zerolog.Nop() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) // create a event consumer to test epoch transition events distributor := events.NewDistributor() @@ -3467,7 +3467,7 @@ func TestHeaderInvalidTimestamp(t *testing.T) { tracer := trace.NewNoopTracer() log := zerolog.Nop() db := pebbleimpl.ToDB(pdb) - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) // create a event consumer to test epoch transition events distributor := events.NewDistributor() diff --git a/state/protocol/badger/state.go b/state/protocol/badger/state.go index cf0ae743c48..f692e83dea8 100644 --- a/state/protocol/badger/state.go +++ b/state/protocol/badger/state.go @@ -989,6 +989,36 @@ func IsBootstrapped(db storage.DB) (bool, error) { return true, nil } +func GetChainIDFromLatestFinalizedHeader(db storage.DB) (flow.ChainID, error) { + h, err := GetLatestFinalizedHeader(db) + if err != nil { + return "", err + } + return h.ChainID, nil +} + +// GetLatestFinalizedHeader attempts to retrieve the latest finalized header +// without going through the storage.Headers interface. +func GetLatestFinalizedHeader(db storage.DB) (*flow.Header, error) { + var finalized uint64 + r := db.Reader() + err := operation.RetrieveFinalizedHeight(r, &finalized) + if err != nil { + return nil, err + } + var id flow.Identifier + err = operation.LookupBlockHeight(r, finalized, &id) + if err != nil { + return nil, err + } + var header flow.Header + err = operation.RetrieveHeader(r, id, &header) + if err != nil { + return nil, err + } + return &header, nil +} + // updateEpochMetrics update the `consensus_compliance_current_epoch_counter` and the // `consensus_compliance_current_epoch_phase` metric func updateEpochMetrics(metrics module.ComplianceMetrics, snap protocol.Snapshot) error { diff --git a/state/protocol/badger/state_test.go b/state/protocol/badger/state_test.go index cc9c024d3e6..6643e7490be 100644 --- a/state/protocol/badger/state_test.go +++ b/state/protocol/badger/state_test.go @@ -56,7 +56,7 @@ func TestBootstrapAndOpen(t *testing.T) { epoch.DKGPhase1FinalView(), epoch.DKGPhase2FinalView(), epoch.DKGPhase3FinalView()).Once() noopMetrics := new(metrics.NoopCollector) - all := store.InitAll(noopMetrics, db) + all := store.InitAll(noopMetrics, db, flow.Emulator) // protocol state has been bootstrapped, now open a protocol state with the database state, err := bprotocol.OpenState( complianceMetrics, @@ -136,7 +136,7 @@ func TestBootstrapAndOpen_EpochCommitted(t *testing.T) { complianceMetrics.On("BlockSealed", testmock.Anything).Once() noopMetrics := new(metrics.NoopCollector) - all := store.InitAll(noopMetrics, db) + all := store.InitAll(noopMetrics, db, flow.Emulator) state, err := bprotocol.OpenState( complianceMetrics, db, @@ -851,7 +851,7 @@ func bootstrap(t *testing.T, rootSnapshot protocol.Snapshot, f func(*bprotocol.S db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() defer db.Close() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := bprotocol.Bootstrap( metrics, db, diff --git a/state/protocol/util/testing.go b/state/protocol/util/testing.go index 220fb2d41bb..f3dc9643a42 100644 --- a/state/protocol/util/testing.go +++ b/state/protocol/util/testing.go @@ -71,7 +71,7 @@ func RunWithBootstrapState(t testing.TB, rootSnapshot protocol.Snapshot, f func( lockManager := storage.NewTestingLockManager() db := pebbleimpl.ToDB(pdb) metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -101,7 +101,7 @@ func RunWithFullProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, f fu tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -145,7 +145,7 @@ func RunWithFullProtocolStateAndMetrics(t testing.TB, rootSnapshot protocol.Snap tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(mmetrics.NewNoopCollector(), db) + all := store.InitAll(mmetrics.NewNoopCollector(), db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -191,7 +191,7 @@ func RunWithFullProtocolStateAndValidator(t testing.TB, rootSnapshot protocol.Sn tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -235,7 +235,7 @@ func RunWithFollowerProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -275,7 +275,7 @@ func RunWithFullProtocolStateAndConsumer(t testing.TB, rootSnapshot protocol.Sna metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -318,7 +318,7 @@ func RunWithFullProtocolStateAndMetricsAndConsumer(t testing.TB, rootSnapshot pr db := pebbleimpl.ToDB(pdb) tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(mmetrics.NewNoopCollector(), db) + all := store.InitAll(mmetrics.NewNoopCollector(), db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -373,7 +373,7 @@ func RunWithFollowerProtocolStateAndHeaders(t testing.TB, rootSnapshot protocol. tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, @@ -414,7 +414,7 @@ func RunWithFullProtocolStateAndMutator(t testing.TB, rootSnapshot protocol.Snap tracer := trace.NewNoopTracer() log := zerolog.Nop() consumer := events.NewNoop() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) state, err := pbadger.Bootstrap( metrics, db, diff --git a/storage/badger/all.go b/storage/badger/all.go index 4a8e4dfb3c2..b06a25847bc 100644 --- a/storage/badger/all.go +++ b/storage/badger/all.go @@ -3,15 +3,16 @@ package badger import ( "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation/badgerimpl" "github.com/onflow/flow-go/storage/store" ) -func InitAll(metrics module.CacheMetrics, db *badger.DB) *storage.All { +func InitAll(metrics module.CacheMetrics, db *badger.DB, chainID flow.ChainID) *storage.All { sdb := badgerimpl.ToDB(db) - headers := store.NewHeaders(metrics, sdb) + headers := store.NewHeaders(metrics, sdb, chainID) guarantees := store.NewGuarantees(metrics, sdb, DefaultCacheSize, DefaultCacheSize) seals := store.NewSeals(metrics, sdb) index := store.NewIndex(metrics, sdb) diff --git a/storage/store/blocks_test.go b/storage/store/blocks_test.go index f88ceeb977b..0179b3e1606 100644 --- a/storage/store/blocks_test.go +++ b/storage/store/blocks_test.go @@ -6,6 +6,7 @@ import ( "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" @@ -19,7 +20,7 @@ func TestBlockStoreAndRetrieve(t *testing.T) { lockManager := storage.NewTestingLockManager() cacheMetrics := &metrics.NoopCollector{} // verify after storing a block should be able to retrieve it back - blocks := store.InitAll(cacheMetrics, db).Blocks + blocks := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks block := unittest.FullBlockFixture() prop := unittest.ProposalFromBlock(block) @@ -46,7 +47,7 @@ func TestBlockStoreAndRetrieve(t *testing.T) { // verify after a restart, the block stored in the database is the same // as the original - blocksAfterRestart := store.InitAll(cacheMetrics, db).Blocks + blocksAfterRestart := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks receivedAfterRestart, err := blocksAfterRestart.ByID(block.ID()) require.NoError(t, err) require.Equal(t, *block, *receivedAfterRestart) @@ -57,7 +58,7 @@ func TestBlockIndexByHeightAndRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() cacheMetrics := &metrics.NoopCollector{} - blocks := store.InitAll(cacheMetrics, db).Blocks + blocks := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks block := unittest.FullBlockFixture() prop := unittest.ProposalFromBlock(block) @@ -102,7 +103,7 @@ func TestBlockIndexByHeightAndRetrieve(t *testing.T) { require.ErrorIs(t, err, storage.ErrNotFound) // Verify after a restart, the block indexed by height is still retrievable - blocksAfterRestart := store.InitAll(cacheMetrics, db).Blocks + blocksAfterRestart := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks receivedAfterRestart, err := blocksAfterRestart.ByHeight(block.Height) require.NoError(t, err) require.Equal(t, *block, *receivedAfterRestart) @@ -113,7 +114,7 @@ func TestBlockIndexByViewAndRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() cacheMetrics := &metrics.NoopCollector{} - blocks := store.InitAll(cacheMetrics, db).Blocks + blocks := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks block := unittest.FullBlockFixture() prop := unittest.ProposalFromBlock(block) @@ -153,7 +154,7 @@ func TestBlockIndexByViewAndRetrieve(t *testing.T) { require.ErrorIs(t, err, storage.ErrNotFound) // Verify after a restart, the block indexed by view is still retrievable - blocksAfterRestart := store.InitAll(cacheMetrics, db).Blocks + blocksAfterRestart := store.InitAll(cacheMetrics, db, flow.Emulator).Blocks receivedAfterRestart, err := blocksAfterRestart.ByView(block.View) require.NoError(t, err) require.Equal(t, *block, *receivedAfterRestart) diff --git a/storage/store/cluster_blocks_test.go b/storage/store/cluster_blocks_test.go index 1f9377348d3..d4c039db602 100644 --- a/storage/store/cluster_blocks_test.go +++ b/storage/store/cluster_blocks_test.go @@ -17,7 +17,7 @@ import ( func TestClusterBlocks(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() - chain := unittest.ClusterBlockFixtures(5) + chain := unittest.ClusterBlockFixtures(5) // TODO(4204) set an appropriate cluster chainID on blocks parent, blocks := chain[0], chain[1:] // add parent and mark its height as the latest finalized block @@ -55,7 +55,7 @@ func TestClusterBlocks(t *testing.T) { clusterBlocks := NewClusterBlocks( db, blocks[0].ChainID, - NewHeaders(metrics.NewNoopCollector(), db), + NewHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), NewClusterPayloads(metrics.NewNoopCollector(), db), ) diff --git a/storage/store/guarantees_test.go b/storage/store/guarantees_test.go index de27ab3f5ab..addf69dfa6e 100644 --- a/storage/store/guarantees_test.go +++ b/storage/store/guarantees_test.go @@ -26,7 +26,7 @@ func TestGuaranteeStoreRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks guarantees := all.Guarantees @@ -92,7 +92,7 @@ func TestStoreDuplicateGuarantee(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks store1 := all.Guarantees expected := unittest.CollectionGuaranteeFixture() @@ -131,7 +131,7 @@ func TestStoreConflictingGuarantee(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) blocks := all.Blocks store1 := all.Guarantees expected := unittest.CollectionGuaranteeFixture() diff --git a/storage/store/headers.go b/storage/store/headers.go index 8fd1210d76d..5bfed7d8145 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -21,20 +21,30 @@ type Headers struct { heightCache *Cache[uint64, flow.Identifier] viewCache *Cache[uint64, flow.Identifier] sigs *proposalSignatures + chainID flow.ChainID } var _ storage.Headers = (*Headers)(nil) // NewHeaders creates a Headers instance, which stores block headers. // It supports storing, caching and retrieving by block ID and the additionally indexed by header height. -func NewHeaders(collector module.CacheMetrics, db storage.DB) *Headers { +func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.ChainID) *Headers { storeWithLock := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, header *flow.Header) error { + if header.ChainID != chainID { + return fmt.Errorf("expected chain ID %v, got %v", chainID, header.ChainID) // TODO(4204) error sentinel + } return operation.InsertHeader(lctx, rw, blockID, header) } retrieve := func(r storage.Reader, blockID flow.Identifier) (*flow.Header, error) { var header flow.Header err := operation.RetrieveHeader(r, blockID, &header) + if err != nil { + return nil, err + } + if header.ChainID != chainID { + return nil, fmt.Errorf("expected chain ID '%v', got '%v'", chainID, header.ChainID) // TODO(4204) error sentinel + } return &header, err } @@ -65,7 +75,8 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB) *Headers { withLimit[uint64, flow.Identifier](4*flow.DefaultTransactionExpiry), withRetrieve(retrieveView)), - sigs: newProposalSignatures(collector, db), + sigs: newProposalSignatures(collector, db), + chainID: chainID, } return h diff --git a/storage/store/headers_test.go b/storage/store/headers_test.go index bc4f8ceb12f..2ca3860cdb7 100644 --- a/storage/store/headers_test.go +++ b/storage/store/headers_test.go @@ -21,7 +21,7 @@ func TestHeaderStoreRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks @@ -55,7 +55,7 @@ func TestHeaderIndexByViewAndRetrieve(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks @@ -88,7 +88,7 @@ func TestHeaderIndexByViewAndRetrieve(t *testing.T) { func TestHeaderRetrieveWithoutStore(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - headers := store.NewHeaders(metrics, db) + headers := store.NewHeaders(metrics, db, flow.Emulator) header := unittest.BlockHeaderFixture() @@ -106,7 +106,7 @@ func TestHeadersByParentID(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks @@ -172,7 +172,7 @@ func TestHeadersByParentIDChainStructure(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) headers := all.Headers blocks := all.Blocks diff --git a/storage/store/init.go b/storage/store/init.go index a4ec067d16c..b08a224becf 100644 --- a/storage/store/init.go +++ b/storage/store/init.go @@ -1,6 +1,7 @@ package store import ( + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/storage" ) @@ -27,8 +28,8 @@ type All struct { Collections *Collections } -func InitAll(metrics module.CacheMetrics, db storage.DB) *All { - headers := NewHeaders(metrics, db) +func InitAll(metrics module.CacheMetrics, db storage.DB, chainID flow.ChainID) *All { + headers := NewHeaders(metrics, db, chainID) guarantees := NewGuarantees(metrics, db, DefaultCacheSize, DefaultCacheSize) seals := NewSeals(metrics, db) index := NewIndex(metrics, db) diff --git a/storage/store/payloads_test.go b/storage/store/payloads_test.go index 604ff48b9cc..8752dd35df8 100644 --- a/storage/store/payloads_test.go +++ b/storage/store/payloads_test.go @@ -6,6 +6,7 @@ import ( "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation/dbtest" @@ -18,7 +19,7 @@ func TestPayloadStoreRetrieve(t *testing.T) { lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - all := store.InitAll(metrics, db) + all := store.InitAll(metrics, db, flow.Emulator) payloads := all.Payloads blocks := all.Blocks From 0fe40075c30552d4f6239514f65408c1feebe9af Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 2 Dec 2025 12:18:31 -0800 Subject: [PATCH 02/16] update cluster mutator/snapshot tests --- engine/collection/epochmgr/engine.go | 2 +- .../epochmgr/factories/cluster_state.go | 17 ++--- engine/collection/epochmgr/factories/epoch.go | 4 +- engine/collection/epochmgr/factory.go | 4 +- .../epochmgr/mock/epoch_components_factory.go | 44 ++++++------- engine/testutil/nodes.go | 2 +- module/builder/collection/builder_test.go | 63 ++++++++++--------- state/cluster/badger/mutator.go | 28 +++++---- state/cluster/badger/mutator_test.go | 5 +- state/cluster/badger/snapshot_test.go | 3 +- 10 files changed, 94 insertions(+), 78 deletions(-) diff --git a/engine/collection/epochmgr/engine.go b/engine/collection/epochmgr/engine.go index c0bc2d87a81..5db5e9624d0 100644 --- a/engine/collection/epochmgr/engine.go +++ b/engine/collection/epochmgr/engine.go @@ -288,7 +288,7 @@ func (e *Engine) Done() <-chan struct{} { // Error returns: // - ErrNotAuthorizedForEpoch if this node is not authorized in the epoch. func (e *Engine) createEpochComponents(epoch protocol.CommittedEpoch) (*EpochComponents, error) { - state, prop, sync, hot, voteAggregator, timeoutAggregator, messageHub, err := e.factory.Create(epoch) + state, prop, sync, hot, voteAggregator, timeoutAggregator, messageHub, err := e.factory.Create(epoch, e.state.Params().ChainID()) if err != nil { return nil, fmt.Errorf("could not setup requirements for epoch (%d): %w", epoch.Counter(), err) } diff --git a/engine/collection/epochmgr/factories/cluster_state.go b/engine/collection/epochmgr/factories/cluster_state.go index 714104c8218..fef89841053 100644 --- a/engine/collection/epochmgr/factories/cluster_state.go +++ b/engine/collection/epochmgr/factories/cluster_state.go @@ -5,6 +5,7 @@ import ( "github.com/jordanschalm/lockctx" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" clusterkv "github.com/onflow/flow-go/state/cluster/badger" "github.com/onflow/flow-go/storage" @@ -33,38 +34,40 @@ func NewClusterStateFactory( return factory, nil } -func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot) ( +func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot, chainID flow.ChainID) ( *clusterkv.MutableState, *store.Headers, storage.ClusterPayloads, storage.ClusterBlocks, + *store.Headers, error, ) { headers := store.NewHeaders(f.metrics, f.db, stateRoot.ClusterID()) payloads := store.NewClusterPayloads(f.metrics, f.db) blocks := store.NewClusterBlocks(f.db, stateRoot.ClusterID(), headers, payloads) + consensusHeaders := store.NewHeaders(f.metrics, f.db, chainID) // for reference blocks isBootStrapped, err := clusterkv.IsBootstrapped(f.db, stateRoot.ClusterID()) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could not check cluster state db: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could not check cluster state db: %w", err) } var clusterState *clusterkv.State if isBootStrapped { clusterState, err = clusterkv.OpenState(f.db, f.tracer, headers, payloads, stateRoot.ClusterID(), stateRoot.EpochCounter()) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could not open cluster state: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could not open cluster state: %w", err) } } else { clusterState, err = clusterkv.Bootstrap(f.db, f.lockManager, stateRoot) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could not bootstrap cluster state: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could not bootstrap cluster state: %w", err) } } - mutableState, err := clusterkv.NewMutableState(clusterState, f.lockManager, f.tracer, headers, payloads) + mutableState, err := clusterkv.NewMutableState(clusterState, f.lockManager, f.tracer, headers, payloads, consensusHeaders) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("could create mutable cluster state: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("could create mutable cluster state: %w", err) } - return mutableState, headers, payloads, blocks, err + return mutableState, headers, payloads, blocks, consensusHeaders, err } diff --git a/engine/collection/epochmgr/factories/epoch.go b/engine/collection/epochmgr/factories/epoch.go index c55224a62bc..7bb81160fa6 100644 --- a/engine/collection/epochmgr/factories/epoch.go +++ b/engine/collection/epochmgr/factories/epoch.go @@ -5,6 +5,7 @@ import ( "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/engine/collection/epochmgr" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/mempool/epochs" @@ -56,6 +57,7 @@ func NewEpochComponentsFactory( func (factory *EpochComponentsFactory) Create( epoch protocol.CommittedEpoch, + consensusChainID flow.ChainID, ) ( state cluster.State, compliance component.Component, @@ -107,7 +109,7 @@ func (factory *EpochComponentsFactory) Create( return } var mutableState *badger.MutableState - mutableState, headers, payloads, blocks, err = factory.state.Create(stateRoot) + mutableState, headers, payloads, blocks, _, err = factory.state.Create(stateRoot, consensusChainID) state = mutableState if err != nil { err = fmt.Errorf("could not create cluster state: %w", err) diff --git a/engine/collection/epochmgr/factory.go b/engine/collection/epochmgr/factory.go index c6370674e51..d27b1652bbe 100644 --- a/engine/collection/epochmgr/factory.go +++ b/engine/collection/epochmgr/factory.go @@ -2,6 +2,7 @@ package epochmgr import ( "github.com/onflow/flow-go/consensus/hotstuff" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/state/cluster" @@ -16,9 +17,10 @@ type EpochComponentsFactory interface { // be used either for an ongoing epoch (for example, after a restart) or // for an epoch that will start soon. It is safe to call multiple times for // a given epoch counter. + // ChainID refers to the consensus chain, from which reference blocks are used. // // Must return ErrNotAuthorizedForEpoch if this node is not authorized in the epoch. - Create(epoch protocol.CommittedEpoch) ( + Create(epoch protocol.CommittedEpoch, chainID flow.ChainID) ( state cluster.State, proposal component.Component, sync module.ReadyDoneAware, diff --git a/engine/collection/epochmgr/mock/epoch_components_factory.go b/engine/collection/epochmgr/mock/epoch_components_factory.go index 4e58c01c41a..f53f4930c77 100644 --- a/engine/collection/epochmgr/mock/epoch_components_factory.go +++ b/engine/collection/epochmgr/mock/epoch_components_factory.go @@ -6,6 +6,8 @@ import ( component "github.com/onflow/flow-go/module/component" cluster "github.com/onflow/flow-go/state/cluster" + flow "github.com/onflow/flow-go/model/flow" + hotstuff "github.com/onflow/flow-go/consensus/hotstuff" mock "github.com/stretchr/testify/mock" @@ -20,9 +22,9 @@ type EpochComponentsFactory struct { mock.Mock } -// Create provides a mock function with given fields: epoch -func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error) { - ret := _m.Called(epoch) +// Create provides a mock function with given fields: epoch, chainID +func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error) { + ret := _m.Called(epoch, chainID) if len(ret) == 0 { panic("no return value specified for Create") @@ -36,67 +38,67 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch) (cluster var r5 hotstuff.TimeoutAggregator var r6 component.Component var r7 error - if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error)); ok { - return rf(epoch) + if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch, flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error)); ok { + return rf(epoch, chainID) } - if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch) cluster.State); ok { - r0 = rf(epoch) + if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch, flow.ChainID) cluster.State); ok { + r0 = rf(epoch, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(cluster.State) } } - if rf, ok := ret.Get(1).(func(protocol.CommittedEpoch) component.Component); ok { - r1 = rf(epoch) + if rf, ok := ret.Get(1).(func(protocol.CommittedEpoch, flow.ChainID) component.Component); ok { + r1 = rf(epoch, chainID) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(component.Component) } } - if rf, ok := ret.Get(2).(func(protocol.CommittedEpoch) module.ReadyDoneAware); ok { - r2 = rf(epoch) + if rf, ok := ret.Get(2).(func(protocol.CommittedEpoch, flow.ChainID) module.ReadyDoneAware); ok { + r2 = rf(epoch, chainID) } else { if ret.Get(2) != nil { r2 = ret.Get(2).(module.ReadyDoneAware) } } - if rf, ok := ret.Get(3).(func(protocol.CommittedEpoch) module.HotStuff); ok { - r3 = rf(epoch) + if rf, ok := ret.Get(3).(func(protocol.CommittedEpoch, flow.ChainID) module.HotStuff); ok { + r3 = rf(epoch, chainID) } else { if ret.Get(3) != nil { r3 = ret.Get(3).(module.HotStuff) } } - if rf, ok := ret.Get(4).(func(protocol.CommittedEpoch) hotstuff.VoteAggregator); ok { - r4 = rf(epoch) + if rf, ok := ret.Get(4).(func(protocol.CommittedEpoch, flow.ChainID) hotstuff.VoteAggregator); ok { + r4 = rf(epoch, chainID) } else { if ret.Get(4) != nil { r4 = ret.Get(4).(hotstuff.VoteAggregator) } } - if rf, ok := ret.Get(5).(func(protocol.CommittedEpoch) hotstuff.TimeoutAggregator); ok { - r5 = rf(epoch) + if rf, ok := ret.Get(5).(func(protocol.CommittedEpoch, flow.ChainID) hotstuff.TimeoutAggregator); ok { + r5 = rf(epoch, chainID) } else { if ret.Get(5) != nil { r5 = ret.Get(5).(hotstuff.TimeoutAggregator) } } - if rf, ok := ret.Get(6).(func(protocol.CommittedEpoch) component.Component); ok { - r6 = rf(epoch) + if rf, ok := ret.Get(6).(func(protocol.CommittedEpoch, flow.ChainID) component.Component); ok { + r6 = rf(epoch, chainID) } else { if ret.Get(6) != nil { r6 = ret.Get(6).(component.Component) } } - if rf, ok := ret.Get(7).(func(protocol.CommittedEpoch) error); ok { - r7 = rf(epoch) + if rf, ok := ret.Get(7).(func(protocol.CommittedEpoch, flow.ChainID) error); ok { + r7 = rf(epoch, chainID) } else { r7 = ret.Error(7) } diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index cc924726948..9d106f67ac3 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -238,7 +238,7 @@ func CompleteStateFixture( pdb := unittest.TypedPebbleDB(t, publicDBDir, pebble.Open) db := pebbleimpl.ToDB(pdb) lockManager := storage.NewTestingLockManager() - s := store.InitAll(metric, db, flow.Emulator) + s := store.InitAll(metric, db, rootSnapshot.Params().ChainID()) secretsDB := unittest.TypedBadgerDB(t, secretsDBDir, storagebadger.InitSecret) consumer := events.NewDistributor() diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index dc408b09039..191527218e4 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -62,9 +62,10 @@ type BuilderSuite struct { chainID flow.ChainID epochCounter uint64 - headers storage.Headers - payloads storage.ClusterPayloads - blocks storage.Blocks + headers storage.Headers + payloads storage.ClusterPayloads + blocks storage.Blocks + consensusHeaders storage.Headers state cluster.MutableState @@ -98,9 +99,10 @@ func (suite *BuilderSuite) SetupTest() { all := store.InitAll(metrics, suite.db, flow.Emulator) // TODO(4204) - handle cluster and non-cluster blocks? consumer := events.NewNoop() - suite.headers = all.Headers + suite.headers = store.NewHeaders(metrics, suite.db, suite.chainID) suite.blocks = all.Blocks suite.payloads = store.NewClusterPayloads(metrics, suite.db) + suite.consensusHeaders = all.Headers // just bootstrap with a genesis block, we'll use this as reference root, result, seal := unittest.BootstrapFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) @@ -132,7 +134,7 @@ func (suite *BuilderSuite) SetupTest() { clusterState, err := clusterkv.Bootstrap(suite.db, suite.lockManager, clusterStateRoot) suite.Require().NoError(err) - suite.state, err = clusterkv.NewMutableState(clusterState, suite.lockManager, tracer, suite.headers, suite.payloads) + suite.state, err = clusterkv.NewMutableState(clusterState, suite.lockManager, tracer, suite.headers, suite.payloads, suite.consensusHeaders) suite.Require().NoError(err) state, err := pbadger.Bootstrap( @@ -182,7 +184,7 @@ func (suite *BuilderSuite) SetupTest() { metrics, suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -602,7 +604,7 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -698,7 +700,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSize() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -731,7 +733,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionByteSize() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -764,7 +766,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionTotalGas() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -823,7 +825,7 @@ func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -879,7 +881,7 @@ func (suite *BuilderSuite) TestBuildOn_EmptyMempool() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -920,7 +922,7 @@ func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -945,7 +947,7 @@ func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { h.WithHeight(1). - WithChainID(flow.Emulator). + WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -987,7 +989,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1017,7 +1019,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { // since rate limiting does not apply to non-payer keys, we should fill all collections in 10 blocks parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1055,7 +1057,7 @@ func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1079,7 +1081,7 @@ func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { // rate-limiting should be applied, resulting in half-full collections (5/10) parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1119,7 +1121,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1163,7 +1165,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() { // rate-limiting should be applied, resulting in minimum collection size. parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1200,7 +1202,7 @@ func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1225,7 +1227,7 @@ func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { // having one transaction and empty collections otherwise parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1266,7 +1268,7 @@ func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1290,7 +1292,7 @@ func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { // rate-limiting should not be applied, since the payer is marked as unlimited parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1331,7 +1333,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitDryRun() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1355,7 +1357,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitDryRun() { // rate-limiting should not be applied, since dry-run setting is enabled parentID := suite.genesis.ID() setter := func(h *flow.HeaderBodyBuilder) error { - h.WithChainID(flow.Emulator). + h.WithChainID(suite.chainID). WithParentID(parentID). WithView(1337). WithParentView(1336). @@ -1394,7 +1396,7 @@ func (suite *BuilderSuite) TestBuildOn_SystemTxAlwaysIncluded() { metrics.NewNoopCollector(), suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, @@ -1489,9 +1491,10 @@ func benchmarkBuildOn(b *testing.B, size int) { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() all := store.InitAll(metrics, suite.db, flow.Emulator) - suite.headers = all.Headers + suite.headers = store.NewHeaders(metrics, suite.db, suite.chainID) suite.blocks = all.Blocks suite.payloads = store.NewClusterPayloads(metrics, suite.db) + suite.consensusHeaders = all.Headers qc := unittest.QuorumCertificateFixture(unittest.QCWithRootBlockID(suite.genesis.ID())) stateRoot, err := clusterkv.NewStateRoot(suite.genesis, qc, suite.epochCounter) @@ -1499,7 +1502,7 @@ func benchmarkBuildOn(b *testing.B, size int) { state, err := clusterkv.Bootstrap(suite.db, suite.lockManager, stateRoot) assert.NoError(b, err) - suite.state, err = clusterkv.NewMutableState(state, suite.lockManager, tracer, suite.headers, suite.payloads) + suite.state, err = clusterkv.NewMutableState(state, suite.lockManager, tracer, suite.headers, suite.payloads, suite.consensusHeaders) assert.NoError(b, err) // add some transactions to transaction pool @@ -1517,7 +1520,7 @@ func benchmarkBuildOn(b *testing.B, size int) { metrics, suite.protoState, suite.state, - suite.headers, + suite.consensusHeaders, suite.headers, suite.payloads, suite.pool, diff --git a/state/cluster/badger/mutator.go b/state/cluster/badger/mutator.go index c27a02ae955..fbf5bd95c98 100644 --- a/state/cluster/badger/mutator.go +++ b/state/cluster/badger/mutator.go @@ -22,21 +22,23 @@ import ( type MutableState struct { *State - lockManager lockctx.Manager - tracer module.Tracer - headers storage.Headers - payloads storage.ClusterPayloads + lockManager lockctx.Manager + tracer module.Tracer + headers storage.Headers + payloads storage.ClusterPayloads + consensusHeaders storage.Headers } var _ clusterstate.MutableState = (*MutableState)(nil) -func NewMutableState(state *State, lockManager lockctx.Manager, tracer module.Tracer, headers storage.Headers, payloads storage.ClusterPayloads) (*MutableState, error) { +func NewMutableState(state *State, lockManager lockctx.Manager, tracer module.Tracer, headers storage.Headers, payloads storage.ClusterPayloads, consensusHeaders storage.Headers) (*MutableState, error) { mutableState := &MutableState{ - State: state, - lockManager: lockManager, - tracer: tracer, - headers: headers, - payloads: payloads, + State: state, + lockManager: lockManager, + tracer: tracer, + headers: headers, + payloads: payloads, + consensusHeaders: consensusHeaders, } return mutableState, nil } @@ -224,7 +226,7 @@ func (m *MutableState) checkPayloadReferenceBlock(ctx extendContext) error { payload := ctx.candidate.Payload // 1 - the reference block must be known - refBlock, err := m.headers.ByBlockID(payload.ReferenceBlockID) + refBlock, err := m.consensusHeaders.ByBlockID(payload.ReferenceBlockID) if err != nil { if errors.Is(err, storage.ErrNotFound) { return state.NewUnverifiableExtensionError("cluster block references unknown reference block (id=%x)", payload.ReferenceBlockID) @@ -237,7 +239,7 @@ func (m *MutableState) checkPayloadReferenceBlock(ctx extendContext) error { // a reference block which is above the finalized boundary can't be verified yet return state.NewUnverifiableExtensionError("reference block is above finalized boundary (%d>%d)", refBlock.Height, ctx.finalizedConsensusHeight) } else { - storedBlockIDForHeight, err := m.headers.BlockIDByHeight(refBlock.Height) + storedBlockIDForHeight, err := m.consensusHeaders.BlockIDByHeight(refBlock.Height) if err != nil { return irrecoverable.NewExceptionf("could not look up block ID for finalized height: %w", err) } @@ -291,7 +293,7 @@ func (m *MutableState) checkPayloadTransactions(lctx lockctx.Proof, ctx extendCo minRefHeight := uint64(math.MaxUint64) maxRefHeight := uint64(0) for _, flowTx := range payload.Collection.Transactions { - refBlock, err := m.headers.ByBlockID(flowTx.ReferenceBlockID) + refBlock, err := m.consensusHeaders.ByBlockID(flowTx.ReferenceBlockID) if errors.Is(err, storage.ErrNotFound) { // Reject collection if it contains a transaction with an unknown reference block, because we cannot verify its validity. return state.NewUnverifiableExtensionError("collection contains tx (tx_id=%x) with unknown reference block (block_id=%x): %w", flowTx.ID(), flowTx.ReferenceBlockID, err) diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index 130794b3f8f..5db05eb98ee 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -68,7 +68,8 @@ func (suite *MutatorSuite) SetupTest() { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, suite.db, suite.chainID) // TODO(4204) - handle cluster and non-cluster blocks? + all := store.InitAll(metrics, suite.db, flow.Emulator) + clusterHeaders := store.NewHeaders(metrics, suite.db, suite.chainID) colPayloads := store.NewClusterPayloads(metrics, suite.db) // just bootstrap with a genesis block, we'll use this as reference @@ -135,7 +136,7 @@ func (suite *MutatorSuite) SetupTest() { suite.NoError(err) clusterState, err := Bootstrap(suite.db, suite.lockManager, clusterStateRoot) suite.Assert().Nil(err) - suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, all.Headers, colPayloads) + suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, clusterHeaders, colPayloads, all.Headers) suite.Assert().Nil(err) } diff --git a/state/cluster/badger/snapshot_test.go b/state/cluster/badger/snapshot_test.go index fb6a5bb2269..22cde4b4b1f 100644 --- a/state/cluster/badger/snapshot_test.go +++ b/state/cluster/badger/snapshot_test.go @@ -58,6 +58,7 @@ func (suite *SnapshotSuite) SetupTest() { all := store.InitAll(metrics, suite.db, flow.Emulator) // TODO(4204) - handle cluster and non-cluster blocks? colPayloads := store.NewClusterPayloads(metrics, suite.db) + clusterHeaders := store.NewHeaders(metrics, suite.db, suite.chainID) root := unittest.RootSnapshotFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) suite.epochCounter = root.Encodable().SealingSegment.LatestProtocolStateEntry().EpochEntry.EpochCounter() @@ -84,7 +85,7 @@ func (suite *SnapshotSuite) SetupTest() { suite.Require().NoError(err) clusterState, err := Bootstrap(suite.db, suite.lockManager, clusterStateRoot) suite.Require().NoError(err) - suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, all.Headers, colPayloads) + suite.state, err = NewMutableState(clusterState, suite.lockManager, tracer, clusterHeaders, colPayloads, all.Headers) suite.Require().NoError(err) } From 467b732684946c90848795a3f51cdcb0b222ab96 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 2 Dec 2025 12:23:12 -0800 Subject: [PATCH 03/16] update header generation in cluster builder tests --- module/builder/collection/builder_test.go | 25 ++++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index 191527218e4..191496c5de0 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -39,18 +39,7 @@ import ( ) var signer = func(*flow.Header) ([]byte, error) { return unittest.SignatureFixture(), nil } -var setter = func(h *flow.HeaderBodyBuilder) error { - h.WithHeight(42). - WithChainID(flow.Emulator). - WithParentID(unittest.IdentifierFixture()). - WithView(1337). - WithParentView(1336). - WithParentVoterIndices(unittest.SignerIndicesFixture(4)). - WithParentVoterSigData(unittest.QCSigDataFixture()). - WithProposerID(unittest.IdentifierFixture()) - - return nil -} +var setter func(*flow.HeaderBodyBuilder) error type BuilderSuite struct { suite.Suite @@ -85,6 +74,18 @@ func (suite *BuilderSuite) SetupTest() { suite.genesis, err = unittest.ClusterBlock.Genesis() require.NoError(suite.T(), err) suite.chainID = suite.genesis.ChainID + setter = func(h *flow.HeaderBodyBuilder) error { + h.WithHeight(42). + WithChainID(suite.chainID). + WithParentID(unittest.IdentifierFixture()). + WithView(1337). + WithParentView(1336). + WithParentVoterIndices(unittest.SignerIndicesFixture(4)). + WithParentVoterSigData(unittest.QCSigDataFixture()). + WithProposerID(unittest.IdentifierFixture()) + + return nil + } suite.pool = herocache.NewTransactions(1000, unittest.Logger(), metrics.NewNoopCollector()) From 585c0e9ec5bc61365e5287f52c4a43845ed8a629 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 3 Dec 2025 13:28:24 -0800 Subject: [PATCH 04/16] update mock usage in epochmgr tests --- engine/collection/epochmgr/engine_test.go | 23 +++++++++++-------- .../test/cluster_switchover_test.go | 2 +- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/engine/collection/epochmgr/engine_test.go b/engine/collection/epochmgr/engine_test.go index e5309df3f82..c3a96c8d00e 100644 --- a/engine/collection/epochmgr/engine_test.go +++ b/engine/collection/epochmgr/engine_test.go @@ -108,35 +108,35 @@ type Suite struct { // MockFactoryCreate mocks the epoch factory to create epoch components for the given epoch. func (suite *Suite) MockFactoryCreate(arg any) { - suite.factory.On("Create", arg). + suite.factory.On("Create", arg, mock.Anything). Run(func(args mock.Arguments) { epoch, ok := args.Get(0).(realprotocol.CommittedEpoch) suite.Require().Truef(ok, "invalid type %T", args.Get(0)) suite.components[epoch.Counter()] = newMockComponents(suite.T()) }). Return( - func(epoch realprotocol.CommittedEpoch) realcluster.State { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) realcluster.State { return suite.ComponentsForEpoch(epoch).state }, - func(epoch realprotocol.CommittedEpoch) component.Component { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) component.Component { return suite.ComponentsForEpoch(epoch).prop }, - func(epoch realprotocol.CommittedEpoch) realmodule.ReadyDoneAware { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) realmodule.ReadyDoneAware { return suite.ComponentsForEpoch(epoch).sync }, - func(epoch realprotocol.CommittedEpoch) realmodule.HotStuff { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) realmodule.HotStuff { return suite.ComponentsForEpoch(epoch).hotstuff }, - func(epoch realprotocol.CommittedEpoch) hotstuff.VoteAggregator { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) hotstuff.VoteAggregator { return suite.ComponentsForEpoch(epoch).voteAggregator }, - func(epoch realprotocol.CommittedEpoch) hotstuff.TimeoutAggregator { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) hotstuff.TimeoutAggregator { return suite.ComponentsForEpoch(epoch).timeoutAggregator }, - func(epoch realprotocol.CommittedEpoch) component.Component { + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) component.Component { return suite.ComponentsForEpoch(epoch).messageHub }, - func(epoch realprotocol.CommittedEpoch) error { return nil }, + func(epoch realprotocol.CommittedEpoch, chainID flow.ChainID) error { return nil }, ).Maybe() } @@ -164,6 +164,9 @@ func (suite *Suite) SetupTest() { suite.state.On("Final").Return(suite.snap) suite.state.On("AtBlockID", suite.header.ID()).Return(suite.snap).Maybe() + params := protocol.NewParams(suite.T()) + params.On("ChainID").Return(flow.ChainID("chain-id")).Maybe() + suite.state.On("Params").Return(params) suite.snap.On("Epochs").Return(suite.epochQuery) suite.snap.On("Head").Return( func() *flow.Header { return suite.header }, @@ -274,7 +277,7 @@ func (suite *Suite) MockAsUnauthorizedNode(forEpoch uint64) { suite.factory = epochmgr.NewEpochComponentsFactory(suite.T()) suite.factory. - On("Create", mock.MatchedBy(unauthorizedMatcher)). + On("Create", mock.MatchedBy(unauthorizedMatcher), mock.Anything). Return(nil, nil, nil, nil, nil, nil, nil, ErrNotAuthorizedForEpoch) suite.MockFactoryCreate(mock.MatchedBy(authorizedMatcher)) diff --git a/engine/collection/test/cluster_switchover_test.go b/engine/collection/test/cluster_switchover_test.go index a0ae19371a2..b9f1b25234c 100644 --- a/engine/collection/test/cluster_switchover_test.go +++ b/engine/collection/test/cluster_switchover_test.go @@ -377,7 +377,7 @@ func (tc *ClusterSwitchoverTestCase) BlockInEpoch(epochCounter uint64) *flow.Hea func (tc *ClusterSwitchoverTestCase) SubmitTransactionToCluster( epochCounter uint64, // the epoch we are submitting the transacting w.r.t. clustering flow.ClusterList, // the clustering for the epoch - clusterIndex uint, // the index of the cluster we are targetting + clusterIndex uint, // the index of the cluster we are targeting ) { clusterMembers := clustering[int(clusterIndex)] From 6d9c3890a34e2cd0ede87f07acf30cf862e7a79c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 3 Dec 2025 13:29:20 -0800 Subject: [PATCH 05/16] enable TestExtend_WithReferenceBlockFromClusterChain --- state/cluster/badger/mutator_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index 5db05eb98ee..26bc9000ce3 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -445,8 +445,6 @@ func (suite *MutatorSuite) TestExtend_WithExpiredReferenceBlock() { } func (suite *MutatorSuite) TestExtend_WithReferenceBlockFromClusterChain() { - // TODO skipping as this isn't implemented yet - unittest.SkipUnless(suite.T(), unittest.TEST_TODO, "skipping as this isn't implemented yet") // set genesis from cluster chain as reference block proposal := suite.ProposalWithParentAndPayload(suite.genesis, *model.NewEmptyPayload(suite.genesis.ID())) err := suite.state.Extend(&proposal) From 927b229e44b9be6280124aa25b94866721f55c14 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 3 Dec 2025 13:47:28 -0800 Subject: [PATCH 06/16] fix FinalizedAncestryLookup during cluster switchover Weakens the chainID requirement for cluster chains when reading from storage. --- module/builder/collection/builder.go | 2 +- storage/store/headers.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/module/builder/collection/builder.go b/module/builder/collection/builder.go index 1fa49d6154a..59fd9ca5b3c 100644 --- a/module/builder/collection/builder.go +++ b/module/builder/collection/builder.go @@ -386,7 +386,7 @@ func (b *Builder) populateFinalizedAncestryLookup(lctx lockctx.Proof, ctx *block } for _, blockID := range clusterBlockIDs { - header, err := b.clusterHeaders.ByBlockID(blockID) + header, err := b.clusterHeaders.ByBlockID(blockID) // TODO(4204) transaction deduplication crosses clusterHeaders epoch boundary if err != nil { return fmt.Errorf("could not retrieve cluster header (id=%x): %w", blockID, err) } diff --git a/storage/store/headers.go b/storage/store/headers.go index 5bfed7d8145..e4718382c51 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -3,6 +3,7 @@ package store import ( "errors" "fmt" + "strings" "github.com/jordanschalm/lockctx" @@ -36,13 +37,18 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.Chain return operation.InsertHeader(lctx, rw, blockID, header) } + isClusterChain := func(chainID flow.ChainID) bool { + return strings.HasPrefix(chainID.String(), "cluster") + } retrieve := func(r storage.Reader, blockID flow.Identifier) (*flow.Header, error) { var header flow.Header err := operation.RetrieveHeader(r, blockID, &header) if err != nil { return nil, err } - if header.ChainID != chainID { + // raise an error when the retrieved header is for a different chain than expected, + // except in the case of cluster chains where the previous epoch(=chain) can be checked for transaction deduplication + if header.ChainID != chainID && !(isClusterChain(chainID) && isClusterChain(header.ChainID)) { return nil, fmt.Errorf("expected chain ID '%v', got '%v'", chainID, header.ChainID) // TODO(4204) error sentinel } return &header, err From 81ffdeb3a4878d1e9dd4bbdffae822acccc5e2d5 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 3 Dec 2025 13:56:03 -0800 Subject: [PATCH 07/16] Use appropriate height index for header storage --- storage/store/headers.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/storage/store/headers.go b/storage/store/headers.go index e4718382c51..457666f43d7 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -60,6 +60,14 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.Chain return id, err } + if isClusterChain(chainID) { + retrieveHeight = func(r storage.Reader, height uint64) (flow.Identifier, error) { + var id flow.Identifier + err := operation.LookupClusterBlockHeight(r, chainID, height, &id) + return id, err + } + } + retrieveView := func(r storage.Reader, view uint64) (flow.Identifier, error) { var id flow.Identifier err := operation.LookupCertifiedBlockByView(r, view, &id) From a22fd0b4c849c4b9281aff5f825eba6b1ca891b3 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 8 Dec 2025 09:49:37 -0800 Subject: [PATCH 08/16] introduce sentinel error for incorrect header chain --- storage/errors.go | 4 ++++ storage/store/headers.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/storage/errors.go b/storage/errors.go index b3d81d9709c..5e268f5a2f2 100644 --- a/storage/errors.go +++ b/storage/errors.go @@ -31,6 +31,10 @@ var ( // ErrNotBootstrapped is returned when the database has not been bootstrapped. ErrNotBootstrapped = errors.New("pebble database not bootstrapped") + + // ErrWrongChain is returned when data from a specific chain (consensus or cluster) + // is expected to be read or inserted, but the actual chainID does not match. + ErrWrongChain = errors.New("data is not part of the expected chain") ) // InvalidDKGStateTransitionError is a sentinel error that is returned in case an invalid state transition is attempted. diff --git a/storage/store/headers.go b/storage/store/headers.go index 457666f43d7..bdc6ed9b4b2 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -32,7 +32,7 @@ var _ storage.Headers = (*Headers)(nil) func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.ChainID) *Headers { storeWithLock := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, header *flow.Header) error { if header.ChainID != chainID { - return fmt.Errorf("expected chain ID %v, got %v", chainID, header.ChainID) // TODO(4204) error sentinel + return fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) } return operation.InsertHeader(lctx, rw, blockID, header) } @@ -49,7 +49,7 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.Chain // raise an error when the retrieved header is for a different chain than expected, // except in the case of cluster chains where the previous epoch(=chain) can be checked for transaction deduplication if header.ChainID != chainID && !(isClusterChain(chainID) && isClusterChain(header.ChainID)) { - return nil, fmt.Errorf("expected chain ID '%v', got '%v'", chainID, header.ChainID) // TODO(4204) error sentinel + return nil, fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) } return &header, err } From ee8052533c22e109f3852a8ae37b0d4677e69ed4 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 8 Dec 2025 09:51:21 -0800 Subject: [PATCH 09/16] update default ChainID for cluster block fixture in tests --- storage/store/cluster_blocks_test.go | 2 +- utils/unittest/cluster_block.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/store/cluster_blocks_test.go b/storage/store/cluster_blocks_test.go index d4c039db602..ed3bffcb3d7 100644 --- a/storage/store/cluster_blocks_test.go +++ b/storage/store/cluster_blocks_test.go @@ -17,7 +17,7 @@ import ( func TestClusterBlocks(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { lockManager := storage.NewTestingLockManager() - chain := unittest.ClusterBlockFixtures(5) // TODO(4204) set an appropriate cluster chainID on blocks + chain := unittest.ClusterBlockFixtures(5) parent, blocks := chain[0], chain[1:] // add parent and mark its height as the latest finalized block diff --git a/utils/unittest/cluster_block.go b/utils/unittest/cluster_block.go index 55e2206a64b..ab02a48e1c2 100644 --- a/utils/unittest/cluster_block.go +++ b/utils/unittest/cluster_block.go @@ -17,6 +17,7 @@ func ClusterBlockFixture(opts ...func(*cluster.Block)) *cluster.Block { HeaderBody: HeaderBodyFixture(), Payload: *ClusterPayloadFixture(3), } + block.ChainID = "cluster" for _, opt := range opts { opt(block) } From 2ecfee1d51d7e2a4d780994ca148a1b9e6cb0b0e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 8 Dec 2025 10:02:23 -0800 Subject: [PATCH 10/16] update tests --- module/block_iterator/iterator_test.go | 8 ++++---- module/builder/collection/builder_test.go | 2 +- state/cluster/badger/snapshot_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index 6f57414cf62..a6c5f9f2fd4 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -21,9 +21,9 @@ func TestIterateHeight(t *testing.T) { lockManager := storage.NewTestingLockManager() dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { // create blocks with siblings - b1 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 1}} - b2 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 2}} - b3 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 3}} + b1 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 1, ChainID: flow.Emulator}} + b2 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 2, ChainID: flow.Emulator}} + b3 := &flow.Header{HeaderBody: flow.HeaderBody{Height: 3, ChainID: flow.Emulator}} bs := []*flow.Header{b1, b2, b3} // index height @@ -41,7 +41,7 @@ func TestIterateHeight(t *testing.T) { // create iterator // b0 is the root block, iterate from b1 to b3 iterRange := module.IteratorRange{Start: b1.Height, End: b3.Height} - headers := store.NewHeaders(&metrics.NoopCollector{}, db, "") // TODO(4204) set chainID in this test? + headers := store.NewHeaders(&metrics.NoopCollector{}, db, flow.Emulator) getBlockIDByIndex := func(height uint64) (flow.Identifier, bool, error) { blockID, err := headers.BlockIDByHeight(height) if err != nil { diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index 191496c5de0..a8c64656f9b 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -97,7 +97,7 @@ func (suite *BuilderSuite) SetupTest() { tracer := trace.NewNoopTracer() log := zerolog.Nop() - all := store.InitAll(metrics, suite.db, flow.Emulator) // TODO(4204) - handle cluster and non-cluster blocks? + all := store.InitAll(metrics, suite.db, flow.Emulator) consumer := events.NewNoop() suite.headers = store.NewHeaders(metrics, suite.db, suite.chainID) diff --git a/state/cluster/badger/snapshot_test.go b/state/cluster/badger/snapshot_test.go index 22cde4b4b1f..feb26fed5df 100644 --- a/state/cluster/badger/snapshot_test.go +++ b/state/cluster/badger/snapshot_test.go @@ -56,7 +56,7 @@ func (suite *SnapshotSuite) SetupTest() { metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() - all := store.InitAll(metrics, suite.db, flow.Emulator) // TODO(4204) - handle cluster and non-cluster blocks? + all := store.InitAll(metrics, suite.db, flow.Emulator) colPayloads := store.NewClusterPayloads(metrics, suite.db) clusterHeaders := store.NewHeaders(metrics, suite.db, suite.chainID) From 6fe94ecf02f91a5014308767e15b02e0baa44680 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Dec 2025 11:57:49 -0800 Subject: [PATCH 11/16] move IsClusterChain to a method on ChainID --- model/flow/chain.go | 6 ++++++ storage/store/headers.go | 6 +----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/model/flow/chain.go b/model/flow/chain.go index 7cc4df23244..b3864b64549 100644 --- a/model/flow/chain.go +++ b/model/flow/chain.go @@ -2,6 +2,7 @@ package flow import ( "fmt" + "strings" "github.com/pkg/errors" @@ -62,6 +63,11 @@ func (c ChainID) Transient() bool { return c == Emulator || c == MonotonicEmulator || c == Localnet || c == Benchnet || c == BftTestnet || c == Previewnet } +// IsClusterChain returns whether the chain ID is for a collection cluster during an epoch, rather than a full network. +func (c ChainID) IsClusterChain() bool { + return strings.HasPrefix(string(c), "cluster") +} + // getChainCodeWord derives the network type used for address generation from the globally // configured chain ID. func (c ChainID) getChainCodeWord() uint64 { diff --git a/storage/store/headers.go b/storage/store/headers.go index bdc6ed9b4b2..d518e1805fb 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -3,7 +3,6 @@ package store import ( "errors" "fmt" - "strings" "github.com/jordanschalm/lockctx" @@ -37,9 +36,6 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.Chain return operation.InsertHeader(lctx, rw, blockID, header) } - isClusterChain := func(chainID flow.ChainID) bool { - return strings.HasPrefix(chainID.String(), "cluster") - } retrieve := func(r storage.Reader, blockID flow.Identifier) (*flow.Header, error) { var header flow.Header err := operation.RetrieveHeader(r, blockID, &header) @@ -60,7 +56,7 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.Chain return id, err } - if isClusterChain(chainID) { + if chainID.IsClusterChain() { retrieveHeight = func(r storage.Reader, height uint64) (flow.Identifier, error) { var id flow.Identifier err := operation.LookupClusterBlockHeight(r, chainID, height, &id) From 2fbe0b56606413b0070dcfc229a8801e24c7503d Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Dec 2025 12:33:16 -0800 Subject: [PATCH 12/16] Add NewClusterHeaders constructor and make chainID checks more explicit --- .../storage/read_range_cluster_blocks.go | 2 +- .../cmd/read-badger/cmd/cluster_blocks.go | 2 +- .../read-light-block/read_light_block_test.go | 2 +- .../epochmgr/factories/cluster_state.go | 2 +- module/builder/collection/builder_test.go | 2 +- state/cluster/badger/mutator_test.go | 2 +- state/cluster/badger/snapshot_test.go | 2 +- storage/store/cluster_blocks_test.go | 2 +- storage/store/headers.go | 63 ++++++++++++------- 9 files changed, 50 insertions(+), 29 deletions(-) diff --git a/admin/commands/storage/read_range_cluster_blocks.go b/admin/commands/storage/read_range_cluster_blocks.go index 7fe1c068ff2..6e2a288f70c 100644 --- a/admin/commands/storage/read_range_cluster_blocks.go +++ b/admin/commands/storage/read_range_cluster_blocks.go @@ -50,7 +50,7 @@ func (c *ReadRangeClusterBlocksCommand) Handler(ctx context.Context, req *admin. return nil, admin.NewInvalidAdminReqErrorf("getting for more than %v blocks at a time might have an impact to node's performance and is not allowed", Max_Range_Cluster_Block_Limit) } - clusterHeaders := store.NewHeaders(&metrics.NoopCollector{}, c.db, flow.ChainID(chainID)) + clusterHeaders := store.NewClusterHeaders(&metrics.NoopCollector{}, c.db, flow.ChainID(chainID)) clusterBlocks := store.NewClusterBlocks( c.db, flow.ChainID(chainID), clusterHeaders, c.payloads, ) diff --git a/cmd/util/cmd/read-badger/cmd/cluster_blocks.go b/cmd/util/cmd/read-badger/cmd/cluster_blocks.go index c2c9362d087..af2f98b8b62 100644 --- a/cmd/util/cmd/read-badger/cmd/cluster_blocks.go +++ b/cmd/util/cmd/read-badger/cmd/cluster_blocks.go @@ -38,7 +38,7 @@ var clusterBlocksCmd = &cobra.Command{ log.Info().Msgf("got flag chain name: %s", flagChainName) chainID := flow.ChainID(flagChainName) - headers := store.NewHeaders(metrics, db, chainID) + headers := store.NewClusterHeaders(metrics, db, chainID) clusterPayloads := store.NewClusterPayloads(metrics, db) clusterBlocks := store.NewClusterBlocks(db, chainID, headers, clusterPayloads) diff --git a/cmd/util/cmd/read-light-block/read_light_block_test.go b/cmd/util/cmd/read-light-block/read_light_block_test.go index 94329b8837b..7a77b05f126 100644 --- a/cmd/util/cmd/read-light-block/read_light_block_test.go +++ b/cmd/util/cmd/read-light-block/read_light_block_test.go @@ -56,7 +56,7 @@ func TestReadClusterRange(t *testing.T) { clusterBlocks := store.NewClusterBlocks( db, blocks[0].ChainID, - store.NewHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), + store.NewClusterHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), store.NewClusterPayloads(metrics.NewNoopCollector(), db), ) diff --git a/engine/collection/epochmgr/factories/cluster_state.go b/engine/collection/epochmgr/factories/cluster_state.go index fef89841053..688efb614a5 100644 --- a/engine/collection/epochmgr/factories/cluster_state.go +++ b/engine/collection/epochmgr/factories/cluster_state.go @@ -43,7 +43,7 @@ func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot, chainID flo error, ) { - headers := store.NewHeaders(f.metrics, f.db, stateRoot.ClusterID()) + headers := store.NewClusterHeaders(f.metrics, f.db, stateRoot.ClusterID()) payloads := store.NewClusterPayloads(f.metrics, f.db) blocks := store.NewClusterBlocks(f.db, stateRoot.ClusterID(), headers, payloads) consensusHeaders := store.NewHeaders(f.metrics, f.db, chainID) // for reference blocks diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index a8c64656f9b..94a685d7921 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -100,7 +100,7 @@ func (suite *BuilderSuite) SetupTest() { all := store.InitAll(metrics, suite.db, flow.Emulator) consumer := events.NewNoop() - suite.headers = store.NewHeaders(metrics, suite.db, suite.chainID) + suite.headers = store.NewClusterHeaders(metrics, suite.db, suite.chainID) suite.blocks = all.Blocks suite.payloads = store.NewClusterPayloads(metrics, suite.db) suite.consensusHeaders = all.Headers diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index 26bc9000ce3..b0abedbd251 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -69,7 +69,7 @@ func (suite *MutatorSuite) SetupTest() { tracer := trace.NewNoopTracer() log := zerolog.Nop() all := store.InitAll(metrics, suite.db, flow.Emulator) - clusterHeaders := store.NewHeaders(metrics, suite.db, suite.chainID) + clusterHeaders := store.NewClusterHeaders(metrics, suite.db, suite.chainID) colPayloads := store.NewClusterPayloads(metrics, suite.db) // just bootstrap with a genesis block, we'll use this as reference diff --git a/state/cluster/badger/snapshot_test.go b/state/cluster/badger/snapshot_test.go index feb26fed5df..a9a6de1ee76 100644 --- a/state/cluster/badger/snapshot_test.go +++ b/state/cluster/badger/snapshot_test.go @@ -58,7 +58,7 @@ func (suite *SnapshotSuite) SetupTest() { all := store.InitAll(metrics, suite.db, flow.Emulator) colPayloads := store.NewClusterPayloads(metrics, suite.db) - clusterHeaders := store.NewHeaders(metrics, suite.db, suite.chainID) + clusterHeaders := store.NewClusterHeaders(metrics, suite.db, suite.chainID) root := unittest.RootSnapshotFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) suite.epochCounter = root.Encodable().SealingSegment.LatestProtocolStateEntry().EpochEntry.EpochCounter() diff --git a/storage/store/cluster_blocks_test.go b/storage/store/cluster_blocks_test.go index ed3bffcb3d7..bae0c1e2484 100644 --- a/storage/store/cluster_blocks_test.go +++ b/storage/store/cluster_blocks_test.go @@ -55,7 +55,7 @@ func TestClusterBlocks(t *testing.T) { clusterBlocks := NewClusterBlocks( db, blocks[0].ChainID, - NewHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), + NewClusterHeaders(metrics.NewNoopCollector(), db, blocks[0].ChainID), NewClusterPayloads(metrics.NewNoopCollector(), db), ) diff --git a/storage/store/headers.go b/storage/store/headers.go index d518e1805fb..482bb12e593 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -27,8 +27,49 @@ type Headers struct { var _ storage.Headers = (*Headers)(nil) // NewHeaders creates a Headers instance, which stores block headers. -// It supports storing, caching and retrieving by block ID and the additionally indexed by header height. +// It supports storing, caching and retrieving by block ID, and additionally indexes by header height and view. func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.ChainID) *Headers { + if chainID.IsClusterChain() { + panic("NewHeaders called on cluster chain ID - use NewClusterHeaders instead") + } + retrieveHeight := func(r storage.Reader, height uint64) (flow.Identifier, error) { + var id flow.Identifier + err := operation.LookupBlockHeight(r, height, &id) + return id, err + } + retrieveView := func(r storage.Reader, view uint64) (flow.Identifier, error) { + var id flow.Identifier + err := operation.LookupCertifiedBlockByView(r, view, &id) + return id, err + } + return newHeaders(collector, db, chainID, retrieveHeight, retrieveView) +} + +// NewClusterHeaders creates a Headers instance for a collection cluster chain, which stores block headers for cluster blocks. +// It supports storing, caching and retrieving by block ID, and additionally an index by header height. +func NewClusterHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.ChainID) *Headers { + if !chainID.IsClusterChain() { + panic("NewClusterHeaders called on non-cluster chain ID - use NewHeaders instead") + } + retrieveHeight := func(r storage.Reader, height uint64) (flow.Identifier, error) { + var id flow.Identifier + err := operation.LookupClusterBlockHeight(r, chainID, height, &id) + return id, err + } + retrieveView := func(r storage.Reader, height uint64) (flow.Identifier, error) { + var id flow.Identifier + return id, fmt.Errorf("retrieve by view not implemented for cluster headers") + } + return newHeaders(collector, db, chainID, retrieveHeight, retrieveView) +} + +// newHeaders contains shared logic for Header storage, including storing and retrieving by block ID +func newHeaders(collector module.CacheMetrics, + db storage.DB, + chainID flow.ChainID, + retrieveHeight retrieveFunc[uint64, flow.Identifier], + retrieveView retrieveFunc[uint64, flow.Identifier], +) *Headers { storeWithLock := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, header *flow.Header) error { if header.ChainID != chainID { return fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) @@ -50,26 +91,6 @@ func NewHeaders(collector module.CacheMetrics, db storage.DB, chainID flow.Chain return &header, err } - retrieveHeight := func(r storage.Reader, height uint64) (flow.Identifier, error) { - var id flow.Identifier - err := operation.LookupBlockHeight(r, height, &id) - return id, err - } - - if chainID.IsClusterChain() { - retrieveHeight = func(r storage.Reader, height uint64) (flow.Identifier, error) { - var id flow.Identifier - err := operation.LookupClusterBlockHeight(r, chainID, height, &id) - return id, err - } - } - - retrieveView := func(r storage.Reader, view uint64) (flow.Identifier, error) { - var id flow.Identifier - err := operation.LookupCertifiedBlockByView(r, view, &id) - return id, err - } - h := &Headers{ db: db, cache: newCache(collector, metrics.ResourceHeader, From f16ccbfff0d64b3e5482fdd2cf2c219074469db4 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Dec 2025 12:53:10 -0800 Subject: [PATCH 13/16] add chain-specific lock checks for header insertion --- storage/store/headers.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/storage/store/headers.go b/storage/store/headers.go index 482bb12e593..02a674a9d28 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -74,6 +74,15 @@ func newHeaders(collector module.CacheMetrics, if header.ChainID != chainID { return fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) } + if chainID.IsClusterChain() { + if !lctx.HoldsLock(storage.LockInsertOrFinalizeClusterBlock) { + return fmt.Errorf("missing lock: %v", storage.LockInsertOrFinalizeClusterBlock) + } + } else { + if !lctx.HoldsLock(storage.LockInsertBlock) { + return fmt.Errorf("missing lock: %v", storage.LockInsertBlock) + } + } return operation.InsertHeader(lctx, rw, blockID, header) } From c24be76816f4bb5620421ca0727415a4aa07cb1c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Dec 2025 13:50:15 -0800 Subject: [PATCH 14/16] update determineChainID() and document expected errors --- cmd/scaffold.go | 36 +++++++++++++++++++++------------- state/protocol/badger/state.go | 12 ++++++++++++ storage/store/headers.go | 2 ++ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 7b084e44f29..469f4e16c43 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -1186,24 +1186,32 @@ func (fnb *FlowNodeBuilder) initStorageLockManager() error { return nil } +// determineChainID attempts to determine the chain this node is running on +// directly from the database or root snapshot, before storage interfaces have been initialized. +// No errors expected during normal operation. func (fnb *FlowNodeBuilder) determineChainID() error { - if ok, _ := badgerState.IsBootstrapped(fnb.ProtocolDB); ok { - chainID, err := badgerState.GetChainIDFromLatestFinalizedHeader(fnb.ProtocolDB) - if err == nil { - fnb.RootChainID = chainID - return nil - } - } - // could not read from DB; try reading root snapshot from disk - fnb.Logger.Info().Msgf("loading root protocol state snapshot from disk") - rootSnapshot, err := loadRootProtocolSnapshot(fnb.BaseConfig.BootstrapDir) + bootstrapped, err := badgerState.IsBootstrapped(fnb.ProtocolDB) if err != nil { - return fmt.Errorf("failed to read protocol snapshot from disk: %w", err) - } - // set root snapshot fields (including RootChainID) - if err := fnb.setRootSnapshot(rootSnapshot); err != nil { return err } + if bootstrapped { + chainID, err := badgerState.GetChainIDFromLatestFinalizedHeader(fnb.ProtocolDB) + if err != nil { + return err + } + fnb.RootChainID = chainID + } else { + // try reading root snapshot from disk (full bootstrap will happen later) + fnb.Logger.Info().Msgf("loading root protocol state snapshot from disk") + rootSnapshot, err := loadRootProtocolSnapshot(fnb.BaseConfig.BootstrapDir) + if err != nil { + return fmt.Errorf("failed to read protocol snapshot from disk: %w", err) + } + // set root snapshot fields, including fnb.RootChainID + if err := fnb.setRootSnapshot(rootSnapshot); err != nil { + return err + } + } return nil } diff --git a/state/protocol/badger/state.go b/state/protocol/badger/state.go index f692e83dea8..d8b437aae8c 100644 --- a/state/protocol/badger/state.go +++ b/state/protocol/badger/state.go @@ -989,6 +989,10 @@ func IsBootstrapped(db storage.DB) (bool, error) { return true, nil } +// GetChainIDFromLatestFinalizedHeader attempts to retrieve the consensus chainID +// from the latest finalized header in the database, before storage or protocol state have been initialized. +// Expected errors during normal operations: +// - [storage.ErrNotFound] if the node is not bootstrapped. func GetChainIDFromLatestFinalizedHeader(db storage.DB) (flow.ChainID, error) { h, err := GetLatestFinalizedHeader(db) if err != nil { @@ -999,6 +1003,8 @@ func GetChainIDFromLatestFinalizedHeader(db storage.DB) (flow.ChainID, error) { // GetLatestFinalizedHeader attempts to retrieve the latest finalized header // without going through the storage.Headers interface. +// Expected errors during normal operations: +// - [storage.ErrNotFound] if the node is not bootstrapped. func GetLatestFinalizedHeader(db storage.DB) (*flow.Header, error) { var finalized uint64 r := db.Reader() @@ -1009,11 +1015,17 @@ func GetLatestFinalizedHeader(db storage.DB) (*flow.Header, error) { var id flow.Identifier err = operation.LookupBlockHeight(r, finalized, &id) if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("could not retrieve finalized header: not present in height index") + } return nil, err } var header flow.Header err = operation.RetrieveHeader(r, id, &header) if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("could not retrieve finalized header: block not known") + } return nil, err } return &header, nil diff --git a/storage/store/headers.go b/storage/store/headers.go index 02a674a9d28..38474e70537 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -134,6 +134,7 @@ func newHeaders(collector module.CacheMetrics, // It returns [storage.ErrAlreadyExists] if the header already exists, i.e. we only insert a new header once. // This error allows the caller to detect duplicate inserts. If the header is stored along with other parts // of the block in the same batch, similar duplication checks can be skipped for storing other parts of the block. +// Returns [storage.ErrWrongChain] if the header's ChainID does not match the one used when initializing the storage. // No other errors are expected during normal operation. func (h *Headers) storeTx( lctx lockctx.Proof, @@ -182,6 +183,7 @@ func (h *Headers) retrieveIdByHeightTx(height uint64) (flow.Identifier, error) { // ByBlockID returns the header with the given ID. It is available for finalized blocks and those pending finalization. // Error returns: // - [storage.ErrNotFound] if no block header with the given ID exists +// - [storage.ErrWrongChain] if the block header exists in the database but is part of a different chain than expected func (h *Headers) ByBlockID(blockID flow.Identifier) (*flow.Header, error) { return h.retrieveTx(blockID) } From e397124b32b73905f76b33ad1f211ce153bb4dad Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Dec 2025 14:11:51 -0800 Subject: [PATCH 15/16] Fix bug in populateFinalizedAncestryLookup and remove workaround The check was unintentionally crossing an epoch boundary and retrieving headers from a previous cluster chain. In addition, `ctx.refEpochFirstHeight` was never initialized. See https://github.com/onflow/flow-go/pull/8222#discussion_r2612489290 for details --- module/builder/collection/builder.go | 41 +++++++++++++++++++++------- storage/store/headers.go | 5 ++-- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/module/builder/collection/builder.go b/module/builder/collection/builder.go index 59fd9ca5b3c..81cad19d433 100644 --- a/module/builder/collection/builder.go +++ b/module/builder/collection/builder.go @@ -45,6 +45,7 @@ type Builder struct { log zerolog.Logger clusterEpoch uint64 // the operating epoch for this cluster // cache of values about the operating epoch which never change + epochFirstHeight *uint64 // first height of this cluster's operating epoch epochFinalHeight *uint64 // last height of this cluster's operating epoch (nil if epoch not ended) epochFinalID *flow.Identifier // ID of last block in this cluster's operating epoch (nil if epoch not ended) } @@ -285,6 +286,29 @@ func (b *Builder) getBlockBuildContext(parentID flow.Identifier) (*blockBuildCon ctx.refChainFinalizedHeight = mainChainFinalizedHeader.Height ctx.refChainFinalizedID = mainChainFinalizedHeader.ID() + // If we don't have the epoch boundaries (first/final height ON MAIN CHAIN) cached, try retrieve and cache them + r := b.db.Reader() + + if b.epochFirstHeight != nil { + ctx.refEpochFirstHeight = *b.epochFirstHeight + } else { + var refEpochFirstHeight uint64 + + err = operation.RetrieveEpochFirstHeight(r, b.clusterEpoch, &refEpochFirstHeight) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + // can be missing if we joined (dynamic bootstrapped) in the middle of an epoch. + // 0 means FinalizedAncestryLookup will not be bounded by the epoch start, + // but only by which cluster blocks we have available. + refEpochFirstHeight = 0 + } else { + return nil, fmt.Errorf("unexpected failure to retrieve first height of operating epoch: %w", err) + } + } + b.epochFirstHeight = &refEpochFirstHeight + ctx.refEpochFirstHeight = *b.epochFirstHeight + } + // if the epoch has ended and the final block is cached, use the cached values if b.epochFinalHeight != nil && b.epochFinalID != nil { ctx.refEpochFinalID = b.epochFinalID @@ -292,8 +316,6 @@ func (b *Builder) getBlockBuildContext(parentID flow.Identifier) (*blockBuildCon return ctx, nil } - r := b.db.Reader() - var refEpochFinalHeight uint64 var refEpochFinalID flow.Identifier @@ -379,14 +401,14 @@ func (b *Builder) populateFinalizedAncestryLookup(lctx lockctx.Proof, ctx *block // the finalized cluster blocks which could possibly contain any conflicting transactions var clusterBlockIDs []flow.Identifier - start, end := findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight) + start, end := findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight, ctx) err := operation.LookupClusterBlocksByReferenceHeightRange(lctx, b.db.Reader(), start, end, &clusterBlockIDs) if err != nil { return fmt.Errorf("could not lookup finalized cluster blocks by reference height range [%d,%d]: %w", start, end, err) } for _, blockID := range clusterBlockIDs { - header, err := b.clusterHeaders.ByBlockID(blockID) // TODO(4204) transaction deduplication crosses clusterHeaders epoch boundary + header, err := b.clusterHeaders.ByBlockID(blockID) if err != nil { return fmt.Errorf("could not retrieve cluster header (id=%x): %w", blockID, err) } @@ -617,11 +639,10 @@ func (b *Builder) buildHeader( // Input range is the (inclusive) range of reference heights of transactions included // in the collection under construction. Output range is the (inclusive) range of // reference heights which need to be searched. -func findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight uint64) (start, end uint64) { - start = minRefHeight - flow.DefaultTransactionExpiry + 1 - if start > minRefHeight { - start = 0 // overflow check +func findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight uint64, ctx *blockBuildContext) (start, end uint64) { + delta := uint64(flow.DefaultTransactionExpiry + 1) + if minRefHeight <= ctx.refEpochFirstHeight+delta { + return ctx.refEpochFirstHeight, maxRefHeight // bound at start of epoch } - end = maxRefHeight - return start, end + return minRefHeight - delta, maxRefHeight } diff --git a/storage/store/headers.go b/storage/store/headers.go index 38474e70537..e4996551fbb 100644 --- a/storage/store/headers.go +++ b/storage/store/headers.go @@ -92,9 +92,8 @@ func newHeaders(collector module.CacheMetrics, if err != nil { return nil, err } - // raise an error when the retrieved header is for a different chain than expected, - // except in the case of cluster chains where the previous epoch(=chain) can be checked for transaction deduplication - if header.ChainID != chainID && !(isClusterChain(chainID) && isClusterChain(header.ChainID)) { + // raise an error when the retrieved header is for a different chain than expected + if header.ChainID != chainID { return nil, fmt.Errorf("expected chain ID %v, got %v: %w", chainID, header.ChainID, storage.ErrWrongChain) } return &header, err From f53d3616ccba076bf2f53e2f56fa87e970ec1838 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Dec 2025 16:44:15 -0800 Subject: [PATCH 16/16] clarify chainID is for consensus --- engine/collection/epochmgr/factory.go | 3 +-- .../epochmgr/mock/epoch_components_factory.go | 24 +++++++++---------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/engine/collection/epochmgr/factory.go b/engine/collection/epochmgr/factory.go index d27b1652bbe..fec38261b38 100644 --- a/engine/collection/epochmgr/factory.go +++ b/engine/collection/epochmgr/factory.go @@ -17,10 +17,9 @@ type EpochComponentsFactory interface { // be used either for an ongoing epoch (for example, after a restart) or // for an epoch that will start soon. It is safe to call multiple times for // a given epoch counter. - // ChainID refers to the consensus chain, from which reference blocks are used. // // Must return ErrNotAuthorizedForEpoch if this node is not authorized in the epoch. - Create(epoch protocol.CommittedEpoch, chainID flow.ChainID) ( + Create(epoch protocol.CommittedEpoch, consensusChainID flow.ChainID) ( state cluster.State, proposal component.Component, sync module.ReadyDoneAware, diff --git a/engine/collection/epochmgr/mock/epoch_components_factory.go b/engine/collection/epochmgr/mock/epoch_components_factory.go index f53f4930c77..171c88676f6 100644 --- a/engine/collection/epochmgr/mock/epoch_components_factory.go +++ b/engine/collection/epochmgr/mock/epoch_components_factory.go @@ -22,9 +22,9 @@ type EpochComponentsFactory struct { mock.Mock } -// Create provides a mock function with given fields: epoch, chainID -func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error) { - ret := _m.Called(epoch, chainID) +// Create provides a mock function with given fields: epoch, consensusChainID +func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, consensusChainID flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error) { + ret := _m.Called(epoch, consensusChainID) if len(ret) == 0 { panic("no return value specified for Create") @@ -39,10 +39,10 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID var r6 component.Component var r7 error if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch, flow.ChainID) (cluster.State, component.Component, module.ReadyDoneAware, module.HotStuff, hotstuff.VoteAggregator, hotstuff.TimeoutAggregator, component.Component, error)); ok { - return rf(epoch, chainID) + return rf(epoch, consensusChainID) } if rf, ok := ret.Get(0).(func(protocol.CommittedEpoch, flow.ChainID) cluster.State); ok { - r0 = rf(epoch, chainID) + r0 = rf(epoch, consensusChainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(cluster.State) @@ -50,7 +50,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(1).(func(protocol.CommittedEpoch, flow.ChainID) component.Component); ok { - r1 = rf(epoch, chainID) + r1 = rf(epoch, consensusChainID) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(component.Component) @@ -58,7 +58,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(2).(func(protocol.CommittedEpoch, flow.ChainID) module.ReadyDoneAware); ok { - r2 = rf(epoch, chainID) + r2 = rf(epoch, consensusChainID) } else { if ret.Get(2) != nil { r2 = ret.Get(2).(module.ReadyDoneAware) @@ -66,7 +66,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(3).(func(protocol.CommittedEpoch, flow.ChainID) module.HotStuff); ok { - r3 = rf(epoch, chainID) + r3 = rf(epoch, consensusChainID) } else { if ret.Get(3) != nil { r3 = ret.Get(3).(module.HotStuff) @@ -74,7 +74,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(4).(func(protocol.CommittedEpoch, flow.ChainID) hotstuff.VoteAggregator); ok { - r4 = rf(epoch, chainID) + r4 = rf(epoch, consensusChainID) } else { if ret.Get(4) != nil { r4 = ret.Get(4).(hotstuff.VoteAggregator) @@ -82,7 +82,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(5).(func(protocol.CommittedEpoch, flow.ChainID) hotstuff.TimeoutAggregator); ok { - r5 = rf(epoch, chainID) + r5 = rf(epoch, consensusChainID) } else { if ret.Get(5) != nil { r5 = ret.Get(5).(hotstuff.TimeoutAggregator) @@ -90,7 +90,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(6).(func(protocol.CommittedEpoch, flow.ChainID) component.Component); ok { - r6 = rf(epoch, chainID) + r6 = rf(epoch, consensusChainID) } else { if ret.Get(6) != nil { r6 = ret.Get(6).(component.Component) @@ -98,7 +98,7 @@ func (_m *EpochComponentsFactory) Create(epoch protocol.CommittedEpoch, chainID } if rf, ok := ret.Get(7).(func(protocol.CommittedEpoch, flow.ChainID) error); ok { - r7 = rf(epoch, chainID) + r7 = rf(epoch, consensusChainID) } else { r7 = ret.Error(7) }