diff --git a/plugin/evm/database/block_cache.go b/plugin/evm/database/block_cache.go new file mode 100644 index 0000000000..3ae786b34d --- /dev/null +++ b/plugin/evm/database/block_cache.go @@ -0,0 +1,51 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "sync" + + "github.com/ava-labs/libevm/common" +) + +// blockCache is a cache for block header and body. +// It allows saving body and header separately via save while +// fetching both body and header at once via get. +type blockCache struct { + bodyCache map[common.Hash][]byte + headerCache map[common.Hash][]byte + mu sync.RWMutex +} + +func newBlockCache() *blockCache { + return &blockCache{ + bodyCache: make(map[common.Hash][]byte), + headerCache: make(map[common.Hash][]byte), + } +} + +func (b *blockCache) get(blockHash common.Hash) ([]byte, []byte, bool) { + b.mu.RLock() + defer b.mu.RUnlock() + bodyData, hasBody := b.bodyCache[blockHash] + headerData, hasHeader := b.headerCache[blockHash] + return bodyData, headerData, hasBody && hasHeader +} + +func (b *blockCache) clear(blockHash common.Hash) { + b.mu.Lock() + defer b.mu.Unlock() + delete(b.bodyCache, blockHash) + delete(b.headerCache, blockHash) +} + +func (b *blockCache) save(key []byte, blockHash common.Hash, value []byte) { + b.mu.Lock() + defer b.mu.Unlock() + if isBodyKey(key) { + b.bodyCache[blockHash] = value + } else if isHeaderKey(key) { + b.headerCache[blockHash] = value + } +} diff --git a/plugin/evm/database/block_database.go b/plugin/evm/database/block_database.go new file mode 100644 index 0000000000..4d3188b932 --- /dev/null +++ b/plugin/evm/database/block_database.go @@ -0,0 +1,510 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "path/filepath" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/heightindexdb/meterdb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/log" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + _ ethdb.Database = (*BlockDatabase)(nil) + + // Key prefixes for block data in the ethdb. + // REVIEW: I opted to just copy these from libevm since these should never change + // and we can avoid libevm changes by not needing to export them. + // Alternatively, we can update libevm to export these or create reader functions. + chainDBHeaderPrefix = []byte("h") + chainDBBlockBodyPrefix = []byte("b") + chainDBReceiptsPrefix = []byte("r") + + // Database prefix for storing block database migrator's internal state and progress tracking + migratorDBPrefix = []byte("migrator") + + // Key for storing the minimum height configuration of the block databases. + // This value determines the lowest block height that will be stored in the + // height-indexed block databases. Once set during initialization, the block + // database min height cannot be changed without recreating the databases. + blockDBMinHeightKey = []byte("blockdb_min_height") +) + +const ( + // Number of elements in the RLP encoded block data + blockDataElements = 3 + // Number of elements in the RLP encoded receipt data + receiptDataElements = 2 + blockNumberSize = 8 + blockHashSize = 32 +) + +// BlockDatabase is a wrapper around an ethdb.Database that stores block header +// body, and receipts in separate height-indexed block databases. +// All other keys are stored in the underlying ethdb.Database (chainDB). +type BlockDatabase struct { + ethdb.Database + // DB for storing block database state data (ie min height) + stateDB database.Database + blockDB database.HeightIndex + receiptsDB database.HeightIndex + + config blockdb.DatabaseConfig + blockDBPath string + minHeight uint64 + + bCache *blockCache // cache for block header and body + migrator *blockDatabaseMigrator + + reg prometheus.Registerer + // todo: use logger for blockdb instances + //nolint:unused + logger logging.Logger + + initialized bool +} + +type blockDatabaseBatch struct { + database *BlockDatabase + ethdb.Batch +} + +func NewBlockDatabase( + stateDB database.Database, + chainDB ethdb.Database, + config blockdb.DatabaseConfig, + blockDBPath string, + logger logging.Logger, + reg prometheus.Registerer, +) *BlockDatabase { + return &BlockDatabase{ + stateDB: stateDB, + Database: chainDB, + bCache: newBlockCache(), + blockDBPath: blockDBPath, + config: config, + reg: reg, + logger: logger, + } +} + +// InitWithStateSync initializes the height-indexed databases with the +// appropriate minimum height based on existing configuration and state sync settings. +// +// Initialization cases (in order of precedence): +// 1. Databases already exist → loads with existing min height +// 2. Data to migrate exists → initializes with min block height to migrate +// 3. No data to migrate + state sync enabled → defers initialization +// 4. No data to migrate + state sync disabled → initializes with min height 1 +// +// Returns true if databases were initialized, false otherwise. +func (db *BlockDatabase) InitWithStateSync(stateSyncEnabled bool) (bool, error) { + minHeight, err := getDatabaseMinHeight(db.stateDB) + if err != nil { + return false, err + } + + // Databases already exist, load with existing min height + if minHeight != nil { + if err := db.InitWithMinHeight(*minHeight); err != nil { + return false, err + } + return true, nil + } + + // Data to migrate exists, initialize with min block height to migrate + minMigrateHeight := minBlockHeightToMigrate(db.Database) + if minMigrateHeight != nil { + if err := db.InitWithMinHeight(*minMigrateHeight); err != nil { + return false, err + } + return true, nil + } + + // No data to migrate and state sync disabled, initialize with min height 1 + if !stateSyncEnabled { + // Genesis block is not stored in height-indexed databases to avoid + // min height complexity with different node types (pruned vs archive). + if err := db.InitWithMinHeight(1); err != nil { + return false, err + } + return true, nil + } + return false, nil +} + +// InitWithMinHeight initializes the height-indexed databases with the provided minimum height. +func (db *BlockDatabase) InitWithMinHeight(minHeight uint64) error { + if db.initialized { + log.Warn("InitWithMinHeight called on a block database that is already initialized") + return nil + } + log.Info("Initializing block database with min height", "minHeight", minHeight) + + if err := db.stateDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { + return err + } + blockDB, err := db.createMeteredBlockDatabase("blockdb", minHeight) + if err != nil { + return err + } + db.blockDB = blockDB + receiptsDB, err := db.createMeteredBlockDatabase("receiptsdb", minHeight) + if err != nil { + return err + } + db.receiptsDB = receiptsDB + + if err := db.initMigrator(); err != nil { + return fmt.Errorf("failed to init migrator: %w", err) + } + + db.initialized = true + db.minHeight = minHeight + return nil +} + +// Migrate migrates block headers, bodies, and receipts from ethDB to the block databases. +func (db *BlockDatabase) Migrate() error { + if !db.initialized { + return errors.New("block database must be initialized before migrating") + } + return db.migrator.Migrate() +} + +func (db *BlockDatabase) Put(key []byte, value []byte) error { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Put(key, value) + } + + blockNumber, blockHash := blockNumberAndHashFromKey(key) + + if isReceiptKey(key) { + return db.writeReceipts(blockNumber, blockHash, value) + } + + db.bCache.save(key, blockHash, value) + if bodyData, headerData, ok := db.bCache.get(blockHash); ok { + db.bCache.clear(blockHash) + return db.writeBlock(blockNumber, blockHash, headerData, bodyData) + } + return nil +} + +func (db *BlockDatabase) Get(key []byte) ([]byte, error) { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Get(key) + } + blockNumber, blockHash := blockNumberAndHashFromKey(key) + + if isReceiptKey(key) { + return db.getReceipts(key, blockNumber, blockHash) + } + return db.getBlock(key, blockNumber, blockHash) +} + +func (db *BlockDatabase) Has(key []byte) (bool, error) { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Has(key) + } + data, err := db.Get(key) + if err != nil { + return false, err + } + return data != nil, nil +} + +func (db *BlockDatabase) Delete(key []byte) error { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Delete(key) + } + + // no-op since deleting written blocks is not supported + return nil +} + +func (db *BlockDatabase) Close() error { + if db.migrator != nil { + db.migrator.Stop() + } + if db.blockDB != nil { + err := db.blockDB.Close() + if err != nil { + return err + } + } + if db.receiptsDB != nil { + err := db.receiptsDB.Close() + if err != nil { + return err + } + } + return db.Database.Close() +} + +func (db *BlockDatabase) initMigrator() error { + if db.migrator != nil { + return nil + } + migratorStateDB := prefixdb.New(migratorDBPrefix, db.stateDB) + migrator, err := NewBlockDatabaseMigrator(migratorStateDB, db.blockDB, db.receiptsDB, db.Database) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +func (db *BlockDatabase) createMeteredBlockDatabase(namespace string, minHeight uint64) (database.HeightIndex, error) { + path := filepath.Join(db.blockDBPath, namespace) + config := db.config.WithDir(path).WithMinimumHeight(minHeight) + newDB, err := blockdb.New(config, db.logger) + if err != nil { + return nil, fmt.Errorf("failed to create block database at %s: %w", path, err) + } + + meteredDB, err := meterdb.New(db.reg, namespace, newDB) + if err != nil { + if err := newDB.Close(); err != nil { + return nil, fmt.Errorf("failed to close block database: %w", err) + } + return nil, fmt.Errorf("failed to create metered %s database: %w", namespace, err) + } + + return meteredDB, nil +} + +func (db *BlockDatabase) getReceipts(key []byte, blockNumber uint64, blockHash common.Hash) ([]byte, error) { + encodedReceiptData, err := db.receiptsDB.Get(blockNumber) + if err != nil { + if errors.Is(err, database.ErrNotFound) && db.isMigrationNeeded() { + return db.Database.Get(key) + } + return nil, err + } + + decodedHash, receiptData, err := decodeReceiptData(encodedReceiptData) + if err != nil { + return nil, err + } + if decodedHash != blockHash { + return nil, nil + } + + return receiptData, nil +} + +func (db *BlockDatabase) getBlock(key []byte, blockNumber uint64, blockHash common.Hash) ([]byte, error) { + encodedBlock, err := db.blockDB.Get(blockNumber) + if err != nil { + if errors.Is(err, database.ErrNotFound) && db.isMigrationNeeded() { + return db.Database.Get(key) + } + return nil, err + } + + decodedHash, header, body, err := decodeBlockData(encodedBlock) + if err != nil { + return nil, err + } + if decodedHash != blockHash { + return nil, nil + } + + if isBodyKey(key) { + return body, nil + } + if isHeaderKey(key) { + return header, nil + } + + return nil, fmt.Errorf("unexpected key for block data: %s", key) +} + +func (db *BlockDatabase) isMigrationNeeded() bool { + return db.migrator != nil && db.migrator.Status() != migrationCompleted +} + +func (db *BlockDatabase) writeReceipts(blockNumber uint64, blockHash common.Hash, data []byte) error { + encodedReceipts, err := encodeReceiptData(data, blockHash) + if err != nil { + return err + } + return db.receiptsDB.Put(blockNumber, encodedReceipts) +} + +func (db *BlockDatabase) writeBlock(blockNumber uint64, blockHash common.Hash, header []byte, body []byte) error { + encodedBlock, err := encodeBlockData(header, body, blockHash) + if err != nil { + return err + } + return db.blockDB.Put(blockNumber, encodedBlock) +} + +func (db *BlockDatabase) shouldWriteToBlockDatabase(key []byte) bool { + if !db.initialized { + return false + } + + isTargetKey := isBodyKey(key) || isHeaderKey(key) || isReceiptKey(key) + if !isTargetKey { + return false + } + + // Only blocks with height >= min height are stored in block database + if blockNumber, _ := blockNumberAndHashFromKey(key); blockNumber < db.minHeight { + return false + } + + return true +} + +func (db *BlockDatabase) NewBatch() ethdb.Batch { + return blockDatabaseBatch{ + database: db, + Batch: db.Database.NewBatch(), + } +} + +func (db *BlockDatabase) NewBatchWithSize(size int) ethdb.Batch { + return blockDatabaseBatch{ + database: db, + Batch: db.Database.NewBatchWithSize(size), + } +} + +func (batch blockDatabaseBatch) Put(key []byte, value []byte) error { + if !batch.database.shouldWriteToBlockDatabase(key) { + return batch.Batch.Put(key, value) + } + return batch.database.Put(key, value) +} + +func (batch blockDatabaseBatch) Delete(key []byte) error { + if !batch.database.shouldWriteToBlockDatabase(key) { + return batch.Batch.Delete(key) + } + return batch.database.Delete(key) +} + +func blockNumberAndHashFromKey(key []byte) (uint64, common.Hash) { + var prefixLen int + switch { + case isBodyKey(key): + prefixLen = len(chainDBBlockBodyPrefix) + case isHeaderKey(key): + prefixLen = len(chainDBHeaderPrefix) + case isReceiptKey(key): + prefixLen = len(chainDBReceiptsPrefix) + } + blockNumber := binary.BigEndian.Uint64(key[prefixLen : prefixLen+8]) + blockHash := common.BytesToHash(key[prefixLen+blockNumberSize:]) + return blockNumber, blockHash +} + +func decodeBlockData(encodedBlock []byte) (hash common.Hash, header []byte, body []byte, err error) { + var blockData [][]byte + if err := rlp.DecodeBytes(encodedBlock, &blockData); err != nil { + return common.Hash{}, nil, nil, err + } + + if len(blockData) != blockDataElements { + return common.Hash{}, nil, nil, fmt.Errorf( + "invalid block data format: expected %d elements, got %d", + blockDataElements, + len(blockData), + ) + } + + return common.BytesToHash(blockData[0]), blockData[1], blockData[2], nil +} + +func encodeBlockData(header []byte, body []byte, blockHash common.Hash) ([]byte, error) { + return rlp.EncodeToBytes([][]byte{blockHash.Bytes(), header, body}) +} + +func encodeReceiptData(receipts []byte, blockHash common.Hash) ([]byte, error) { + return rlp.EncodeToBytes([][]byte{blockHash.Bytes(), receipts}) +} + +func decodeReceiptData(encodedReceipts []byte) (common.Hash, []byte, error) { + var receiptData [][]byte + if err := rlp.DecodeBytes(encodedReceipts, &receiptData); err != nil { + return common.Hash{}, nil, err + } + + if len(receiptData) != receiptDataElements { + return common.Hash{}, nil, fmt.Errorf( + "invalid receipt data format: expected %d elements, got %d", + receiptDataElements, + len(receiptData), + ) + } + + return common.BytesToHash(receiptData[0]), receiptData[1], nil +} + +func encodeBlockNumber(number uint64) []byte { + enc := make([]byte, blockNumberSize) + binary.BigEndian.PutUint64(enc, number) + return enc +} + +func isBodyKey(key []byte) bool { + if len(key) != len(chainDBBlockBodyPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBBlockBodyPrefix) +} + +func isHeaderKey(key []byte) bool { + if len(key) != len(chainDBHeaderPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBHeaderPrefix) +} + +func isReceiptKey(key []byte) bool { + if len(key) != len(chainDBReceiptsPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBReceiptsPrefix) +} + +func getDatabaseMinHeight(db database.Database) (*uint64, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return nil, err + } + if !has { + return nil, nil + } + minHeightBytes, err := db.Get(blockDBMinHeightKey) + if err != nil { + return nil, err + } + minHeight := binary.BigEndian.Uint64(minHeightBytes) + return &minHeight, nil +} + +// IsBlockDatabaseCreated checks if block database has already been created +func IsBlockDatabaseCreated(db database.Database) (bool, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return false, err + } + return has, nil +} diff --git a/plugin/evm/database/block_database_migration_test.go b/plugin/evm/database/block_database_migration_test.go new file mode 100644 index 0000000000..62268d88e9 --- /dev/null +++ b/plugin/evm/database/block_database_migration_test.go @@ -0,0 +1,355 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "path/filepath" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +// Tests migration status is correctly set to "not started" after initializing a new migrator. +func TestBlockDatabaseMigrator_StatusNotStarted(t *testing.T) { + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationNotStarted) +} + +// Tests migration status transitions to "in progress" while a migration is actively running. +func TestBlockDatabaseMigrator_StatusInProgress(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + blocks, receipts := createBlocks(t, 10) + writeBlocks(chainDB, blocks, receipts) + + // migrate with a slow block database + slowDB := &slowBlockDatabase{ + HeightIndex: wrapper.blockDB, + shouldSlow: func() bool { + return true + }, + } + wrapper.migrator.blockDB = slowDB + require.NoError(t, wrapper.migrator.Migrate()) + + // Wait for it to be in progress + require.Eventually(t, func() bool { + return wrapper.migrator.Status() == migrationInProgress + }, 2*time.Second, 100*time.Millisecond) + + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationInProgress) +} + +// Tests migration status is correctly set to "completed" after a successful migration. +func TestBlockDatabaseMigrator_StatusCompleted(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + blocks, receipts := createBlocks(t, 3) + writeBlocks(chainDB, blocks, receipts) + wrapper.migrator.blockDB = wrapper.blockDB + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 5*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationCompleted) +} + +func TestBlockDatabaseMigrator_Migration(t *testing.T) { + testCases := []struct { + name string + initStatus migrationStatus + toMigrateHeights []uint64 + migratedHeights []uint64 + expStatus migrationStatus + }{ + { + name: "migrate_5_blocks", + toMigrateHeights: []uint64{0, 1, 2, 3, 4}, + expStatus: migrationCompleted, + }, + { + name: "migrate_5_blocks_20_24", + toMigrateHeights: []uint64{20, 21, 22, 23, 24}, + expStatus: migrationCompleted, + }, + { + name: "migrate_non_consecutive_blocks", + toMigrateHeights: []uint64{20, 21, 22, 29, 30, 40}, + expStatus: migrationCompleted, + }, + { + name: "half_blocks_migrated", + initStatus: migrationInProgress, + toMigrateHeights: []uint64{6, 7, 8, 9, 10}, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + expStatus: migrationCompleted, + }, + { + name: "all_blocks_migrated_in_progress", + initStatus: migrationInProgress, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + expStatus: migrationCompleted, + }, + { + name: "empty_chainDB_no_blocks_to_migrate", + expStatus: migrationCompleted, + }, + { + name: "half_non_consecutive_blocks_migrated", + initStatus: migrationInProgress, + toMigrateHeights: []uint64{2, 3, 7, 8, 10}, + migratedHeights: []uint64{0, 1, 4, 5, 9}, + expStatus: migrationCompleted, + }, + { + name: "migration_already_completed", + initStatus: migrationCompleted, + migratedHeights: []uint64{0, 1, 2}, + expStatus: migrationCompleted, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + // find the max block height to create + maxBlockHeight := uint64(0) + heightSets := [][]uint64{tc.toMigrateHeights, tc.migratedHeights} + for _, heights := range heightSets { + for _, height := range heights { + if height > maxBlockHeight { + maxBlockHeight = height + } + } + } + + // create blocks and receipts + blocks, receipts := createBlocks(t, int(maxBlockHeight)+1) + + // set initial state + if tc.initStatus != migrationNotStarted { + require.NoError(t, wrapper.migrator.setStatus(tc.initStatus)) + } + for _, height := range tc.toMigrateHeights { + writeBlocks(chainDB, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + for _, height := range tc.migratedHeights { + writeBlocks(wrapper, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + + // Migrate database + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, tc.expStatus) + + // Verify that all blocks and receipts are accessible from the block wrapper + totalBlocks := len(tc.toMigrateHeights) + len(tc.migratedHeights) + allExpectedBlocks := make(map[uint64]*types.Block, totalBlocks) + allExpectedReceipts := make(map[uint64]types.Receipts, totalBlocks) + for _, heights := range heightSets { + for _, height := range heights { + allExpectedBlocks[height] = blocks[height] + allExpectedReceipts[height] = receipts[height] + } + } + require.Len(t, allExpectedBlocks, totalBlocks) + for blockNum, expectedBlock := range allExpectedBlocks { + actualBlock := rawdb.ReadBlock(wrapper, expectedBlock.Hash(), blockNum) + require.NotNil(t, actualBlock, "Block %d should be accessible", blockNum) + assertRLPEqual(t, expectedBlock, actualBlock) + expectedReceipts := allExpectedReceipts[blockNum] + actualReceipts := rawdb.ReadReceipts(wrapper, expectedBlock.Hash(), blockNum, expectedBlock.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, actualReceipts) + + // verify chainDB no longer has any blocks or receipts (except for genesis) + if expectedBlock.NumberU64() != 0 { + require.False(t, rawdb.HasHeader(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64()), "Block %d should not be in chainDB", expectedBlock.NumberU64()) + require.False(t, rawdb.HasBody(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64())) + require.False(t, rawdb.HasReceipts(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64())) + } + } + }) + } +} + +// Tests that a migration in progress can be stopped and resumed, verifying +// that the migration state is properly persisted and that the migration +// can continue from where it left off after restart. +func TestBlockDatabaseMigrator_AbruptStop(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + if wrapper != nil { + require.NoError(t, wrapper.Close()) + } + }) + + // Create blocks and write them to chainDB + blocks, receipts := createBlocks(t, 10) + writeBlocks(chainDB, blocks, receipts) + + // Create a slow block database that slows down after 3 blocks + blockCount := 0 + slowBdb := &slowBlockDatabase{ + HeightIndex: wrapper.blockDB, + shouldSlow: func() bool { + blockCount++ + return blockCount > 3 + }, + } + + // Start migration with slow block database and wait for 3 blocks to be migrated + wrapper.migrator.blockDB = slowBdb + require.NoError(t, wrapper.migrator.Migrate()) + require.Eventually(t, func() bool { + return wrapper.migrator.blocksProcessed() >= 3 + }, 10*time.Second, 100*time.Millisecond) + require.NoError(t, wrapper.Close()) + + // Create new wrapper and verify migration status + wrapper, chainDB = newDatabasesFromDir(t, dataDir) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationInProgress) + + // Check blockDB and chainDB to ensure we have the correct amount of blocks + chainDBBlocks := 0 + blockdbBlocks := 0 + wrapperBlocks := 0 + for _, block := range blocks { + if rawdb.HasHeader(chainDB, block.Hash(), block.NumberU64()) { + chainDBBlocks++ + } + if has, _ := wrapper.blockDB.Has(block.NumberU64()); has { + blockdbBlocks++ + } + if rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64()) { + wrapperBlocks++ + } + } + require.Equal(t, 10, chainDBBlocks+blockdbBlocks) + require.Equal(t, 10, wrapperBlocks) + + // Start migration again and finish + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 15*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationCompleted) + + // Verify that all blocks are accessible from the new wrapper + for i, block := range blocks { + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + require.NotNil(t, actualBlock, "Block %d should be accessible after resumption", block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, receipts[i], actualReceipts) + + // chainDB no longer has blocks except for genesis + if block.NumberU64() != 0 { + require.False(t, rawdb.HasHeader(chainDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(chainDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasReceipts(chainDB, block.Hash(), block.NumberU64())) + } + } +} + +// Test that the genesis block is not migrated; the rest of the blocks should be migrated. +func TestBlockDatabaseMigrator_Genesis(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(WrapDatabase(base)) + blocks, receipts := createBlocks(t, 10) + + // Write only genesis and block 5-9 + writeBlocks(chainDB, blocks[0:1], receipts[0:1]) + writeBlocks(chainDB, blocks[5:10], receipts[5:10]) + + blockDBPath := filepath.Join(dataDir, "blockdb") + wrapper := NewBlockDatabase(base, chainDB, blockdb.DefaultConfig(), blockDBPath, logging.NoLog{}, prometheus.NewRegistry()) + initialized, err := wrapper.InitWithStateSync(false) + require.NoError(t, err) + require.True(t, initialized) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + // validate our min height is 5 since we don't store genesis block and first + // block to migrate is block 5. + require.True(t, wrapper.initialized) + require.Equal(t, uint64(5), wrapper.minHeight) + + // migrate and wait for completion + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + + // verify genesis block is not migrated + genesisHash := rawdb.ReadCanonicalHash(chainDB, 0) + require.True(t, rawdb.HasHeader(chainDB, genesisHash, 0)) + has, _ := wrapper.blockDB.Has(0) + require.False(t, has) + + // verify blocks 1-4 are missing + for i := 1; i < 5; i++ { + hash := rawdb.ReadCanonicalHash(wrapper, uint64(i)) + require.Equal(t, common.Hash{}, hash) + } + + // verify blocks 5-9 are migrated + for _, expectedBlock := range blocks[5:10] { + actualBlock := rawdb.ReadBlock(wrapper, expectedBlock.Hash(), expectedBlock.NumberU64()) + require.NotNil(t, actualBlock) + assertRLPEqual(t, expectedBlock, actualBlock) + has, err := wrapper.blockDB.Has(expectedBlock.NumberU64()) + require.NoError(t, err) + require.True(t, has) + } +} + +func waitForMigratorCompletion(t *testing.T, migrator *blockDatabaseMigrator, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for time.Now().Before(deadline) { + if migrator.Status() == migrationCompleted { + return + } + <-ticker.C + } + + require.Failf(t, "migration did not complete within timeout", "timeout: %v", timeout) +} + +func assertMigrationStatus(t *testing.T, db database.Database, migrator *blockDatabaseMigrator, expectedStatus migrationStatus) { + t.Helper() + + require.Equal(t, expectedStatus, migrator.Status(), "migrator status should match expected") + diskStatus, err := getMigrationStatus(db) + require.NoError(t, err) + require.Equal(t, expectedStatus, diskStatus, "disk database status should match expected") +} diff --git a/plugin/evm/database/block_database_migrator.go b/plugin/evm/database/block_database_migrator.go new file mode 100644 index 0000000000..f15b309327 --- /dev/null +++ b/plugin/evm/database/block_database_migrator.go @@ -0,0 +1,506 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils/timer" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/log" + "github.com/ava-labs/libevm/rlp" +) + +type migrationStatus int + +const ( + migrationNotStarted migrationStatus = iota + migrationInProgress + migrationCompleted + + logProgressInterval = 5 * time.Minute // Log every 5 minutes + compactionInterval = 250_000 // Compact every 250k blocks processed + batchDeleteInterval = 25_000 // Write delete batch every 25k blocks +) + +var ( + // migrationStatusKey stores the persisted progress state for the migrator. + migrationStatusKey = []byte("migration_status") + + // endBlockNumberKey stores the target block number to migrate to. + endBlockNumberKey = []byte("migration_end_block_number") +) + +// blockDatabaseMigrator migrates canonical block data and receipts from +// ethdb.Database into the height-indexed block and receipt databases. +type blockDatabaseMigrator struct { + stateDB database.Database + chainDB ethdb.Database + blockDB database.HeightIndex + receiptsDB database.HeightIndex + + status migrationStatus + mu sync.RWMutex // protects status/running/cancel + running bool + cancel context.CancelFunc + wg sync.WaitGroup + + processed uint64 + endHeight uint64 +} + +// NewBlockDatabaseMigrator creates a new block database migrator with +// current migration status and target migration end block number. +func NewBlockDatabaseMigrator( + stateDB database.Database, + blockDB database.HeightIndex, + receiptsDB database.HeightIndex, + chainDB ethdb.Database, +) (*blockDatabaseMigrator, error) { + m := &blockDatabaseMigrator{ + blockDB: blockDB, + receiptsDB: receiptsDB, + stateDB: stateDB, + chainDB: chainDB, + } + + // load status + status, err := getMigrationStatus(stateDB) + if err != nil { + return nil, err + } + m.status = status + + // load end block height + endHeight, err := getEndBlockHeight(stateDB) + if err != nil { + return nil, err + } + + if endHeight == 0 { + if endHeight, err = loadAndSaveBlockEndHeight(stateDB, chainDB); err != nil { + return nil, err + } + } + m.endHeight = endHeight + + return m, nil +} + +func (b *blockDatabaseMigrator) Status() migrationStatus { + b.mu.RLock() + defer b.mu.RUnlock() + return b.status +} + +func (b *blockDatabaseMigrator) Stop() { + b.mu.Lock() + cancel := b.cancel + b.mu.Unlock() + if cancel != nil { + cancel() + // Wait for migration goroutine to finish cleanup + b.wg.Wait() + } +} + +func (b *blockDatabaseMigrator) Migrate() error { + if b.status == migrationCompleted { + return nil + } + ctx, err := b.beginRun() + if err != nil { + return err + } + + if err := b.setStatus(migrationInProgress); err != nil { + b.endRun() + return err + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer b.endRun() + if err := b.run(ctx); err != nil && !errors.Is(err, context.Canceled) { + log.Info("migration failed", "err", err) + } + }() + return nil +} + +func (b *blockDatabaseMigrator) beginRun() (context.Context, error) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.running { + return nil, errors.New("migration already running") + } + ctx, cancel := context.WithCancel(context.Background()) + b.cancel = cancel + b.running = true + return ctx, nil +} + +func (b *blockDatabaseMigrator) endRun() { + b.mu.Lock() + defer b.mu.Unlock() + b.cancel = nil + b.running = false +} + +func (b *blockDatabaseMigrator) setStatus(s migrationStatus) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.status == s { + return nil + } + if err := b.stateDB.Put(migrationStatusKey, []byte{byte(s)}); err != nil { + return err + } + b.status = s + return nil +} + +func (b *blockDatabaseMigrator) run(ctx context.Context) error { + var ( + etaTarget uint64 + etaTracker = timer.NewEtaTracker(10, 1.2) + startTime = time.Now() + timeOfNextLog = startTime.Add(logProgressInterval) + deleteBatch = b.chainDB.NewBatch() + lastCompactionNum uint64 + firstBlockInRange uint64 + lastBlockInRange uint64 + // Iterate over block bodies instead of headers since there are keys + // under the header prefix that we are not migrating. + iter = b.chainDB.NewIterator(chainDBBlockBodyPrefix, nil) + ) + + // Defer cleanup logic + defer func() { + // Release iterator (safe to call multiple times) + iter.Release() + + // Write any remaining deletes in batch + if deleteBatch.ValueSize() > 0 { + if err := deleteBatch.Write(); err != nil { + log.Error("failed to write final delete batch", "err", err) + } + } + + // Compact final range if we processed any blocks + if lastBlockInRange > 0 { + b.compactBlockRange(firstBlockInRange, lastBlockInRange) + } + + // Log final statistics + processingTime := time.Since(startTime) + log.Info("blockdb migration completed", + "blocks_processed", atomic.LoadUint64(&b.processed), + "total_processing_time", processingTime.String()) + }() + + log.Info("blockdb migration started") + + // iterator will iterate all block headers in ascending order by block number + for iter.Next() { + // Check if migration should be stopped + select { + case <-ctx.Done(): + log.Info("migration stopped", "blocks_processed", atomic.LoadUint64(&b.processed)) + return ctx.Err() + default: + // Continue with migration + } + + key := iter.Key() + if !shouldMigrateKey(b.chainDB, key) { + continue + } + + blockNum, hash := blockNumberAndHashFromKey(key) + + if etaTarget == 0 && b.endHeight > 0 && blockNum < b.endHeight { + etaTarget = b.endHeight - blockNum + etaTracker.AddSample(0, etaTarget, startTime) + } + + // Track the range of blocks for compaction + if firstBlockInRange == 0 { + firstBlockInRange = blockNum + } + lastBlockInRange = blockNum + + // Migrate block data (header + body) + if err := b.migrateBlock(blockNum, hash, iter.Value()); err != nil { + return fmt.Errorf("failed to migrate block data: %w", err) + } + + // Migrate receipts + if err := b.migrateReceipts(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate receipt data: %w", err) + } + + // Add deletes to batch + if err := b.deleteBlock(deleteBatch, blockNum, hash); err != nil { + return fmt.Errorf("failed to add block deletes to batch: %w", err) + } + + processed := atomic.AddUint64(&b.processed, 1) + + // Write delete batch every batchDeleteInterval blocks + if processed%batchDeleteInterval == 0 { + if err := deleteBatch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch: %w", err) + } + deleteBatch.Reset() + } + + // Compact every compactionInterval blocks + if processed-lastCompactionNum >= compactionInterval { + log.Info("compaction interval reached, releasing iterator and compacting", + "blocks_since_last_compaction", processed-lastCompactionNum, + "total_processed", processed) + + // Write any remaining deletes in batch before compaction + if deleteBatch.ValueSize() > 0 { + if err := deleteBatch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch before compaction: %w", err) + } + deleteBatch.Reset() + } + + // Release iterator before compaction + iter.Release() + + // Compact the range we just processed + b.compactBlockRange(firstBlockInRange, lastBlockInRange) + + // Recreate iterator + start := encodeBlockNumber(blockNum + 1) + iter = b.chainDB.NewIterator(chainDBBlockBodyPrefix, start) + lastCompactionNum = processed + firstBlockInRange = 0 + } + + // Log progress every logProgressInterval + if now := time.Now(); now.After(timeOfNextLog) { + logFields := []interface{}{ + "blocks_processed", processed, + "last_processed_height", blockNum, + "time_elapsed", time.Since(startTime), + } + if b.endHeight != 0 && etaTarget > 0 { + etaPtr, progressPercentage := etaTracker.AddSample(processed, etaTarget, now) + if etaPtr != nil { + logFields = append(logFields, "eta", etaPtr.String()) + logFields = append(logFields, "pctComplete", progressPercentage) + } + } + + log.Info("blockdb migration status", logFields...) + timeOfNextLog = now.Add(logProgressInterval) + } + } + + if iter.Error() != nil { + return fmt.Errorf("failed to iterate over chainDB: %w", iter.Error()) + } + + if err := b.setStatus(migrationCompleted); err != nil { + log.Error("failed to save completed migration status", "err", err) + } + + return nil +} + +func (b *blockDatabaseMigrator) compactBlockRange(startBlock, endBlock uint64) { + startTime := time.Now() + + // Compact block headers + startHeaderKey := blockHeaderKey(startBlock, common.Hash{}) + endHeaderKey := blockHeaderKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startHeaderKey, endHeaderKey); err != nil { + log.Error("failed to compact block headers in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + // Compact block bodies + startBodyKey := blockBodyKey(startBlock, common.Hash{}) + endBodyKey := blockBodyKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startBodyKey, endBodyKey); err != nil { + log.Error("failed to compact block bodies in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + // Compact receipts for this range + startReceiptsKey := receiptsKey(startBlock, common.Hash{}) + endReceiptsKey := receiptsKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startReceiptsKey, endReceiptsKey); err != nil { + log.Error("failed to compact receipts in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + log.Info("compaction of block range completed", + "start_block", startBlock, + "end_block", endBlock, + "duration", time.Since(startTime)) +} + +func (b *blockDatabaseMigrator) migrateBlock(blockNum uint64, hash common.Hash, bodyBytes []byte) error { + header := rawdb.ReadHeader(b.chainDB, hash, blockNum) + headerBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + encodedBlock, err := encodeBlockData(headerBytes, bodyBytes, hash) + if err != nil { + return fmt.Errorf("failed to encode block data: %w", err) + } + if err := b.blockDB.Put(blockNum, encodedBlock); err != nil { + return fmt.Errorf("failed to write block to blockDB: %w", err) + } + return nil +} + +func (b *blockDatabaseMigrator) migrateReceipts(blockNum uint64, hash common.Hash) error { + // Read raw receipt bytes directly from chainDB + receiptBytes := rawdb.ReadReceiptsRLP(b.chainDB, hash, blockNum) + if receiptBytes == nil { + // No receipts for this block, skip + return nil + } + + encodedReceipts, err := encodeReceiptData(receiptBytes, hash) + if err != nil { + return fmt.Errorf("failed to encode receipts with hash: %w", err) + } + if err := b.receiptsDB.Put(blockNum, encodedReceipts); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + + return nil +} + +// deleteBlock adds delete operations for a block to the provided batch +func (b *blockDatabaseMigrator) deleteBlock(batch ethdb.Batch, blockNum uint64, hash common.Hash) error { + headerKey := blockHeaderKey(blockNum, hash) + if err := batch.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from chainDB: %w", err) + } + rawdb.DeleteBody(batch, hash, blockNum) + rawdb.DeleteReceipts(batch, hash, blockNum) + + return nil +} + +func (b *blockDatabaseMigrator) blocksProcessed() uint64 { + return atomic.LoadUint64(&b.processed) +} + +func getMigrationStatus(db database.Database) (migrationStatus, error) { + var status migrationStatus + has, err := db.Has(migrationStatusKey) + if err != nil { + return status, err + } + if !has { + return status, nil + } + b, err := db.Get(migrationStatusKey) + if err != nil { + return status, err + } + if len(b) != 1 { + return status, fmt.Errorf("invalid migration status encoding length=%d", len(b)) + } + return migrationStatus(b[0]), nil +} + +func getEndBlockHeight(db database.Database) (uint64, error) { + has, err := db.Has(endBlockNumberKey) + if err != nil { + return 0, err + } + if !has { + return 0, nil + } + blockNumberBytes, err := db.Get(endBlockNumberKey) + if err != nil { + return 0, err + } + if len(blockNumberBytes) != blockNumberSize { + return 0, fmt.Errorf("invalid block number encoding length=%d", len(blockNumberBytes)) + } + return binary.BigEndian.Uint64(blockNumberBytes), nil +} + +func loadAndSaveBlockEndHeight(stateDB database.Database, chainDB ethdb.Database) (uint64, error) { + headHash := rawdb.ReadHeadHeaderHash(chainDB) + if headHash == (common.Hash{}) { + return 0, nil + } + headBlockNumber := rawdb.ReadHeaderNumber(chainDB, headHash) + if headBlockNumber == nil || *headBlockNumber == 0 { + return 0, nil + } + endHeight := *headBlockNumber + if err := stateDB.Put(endBlockNumberKey, encodeBlockNumber(endHeight)); err != nil { + return 0, fmt.Errorf("failed to save head block number %d: %w", endHeight, err) + } + log.Info("blockdb migration: saved head block number", "head_block_number", endHeight) + return endHeight, nil +} + +func shouldMigrateKey(db ethdb.Database, key []byte) bool { + if !isBodyKey(key) { + return false + } + blockNum, hash := blockNumberAndHashFromKey(key) + + // Skip genesis blocks to avoid complicating state-sync min-height handling. + if blockNum == 0 { + return false + } + + canonicalHash := rawdb.ReadCanonicalHash(db, blockNum) + return canonicalHash == hash +} + +// blockHeaderKey = headerPrefix + num (uint64 big endian) + hash +func blockHeaderKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBHeaderPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// blockBodyKey = bodyPrefix + num (uint64 big endian) + hash +func blockBodyKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBBlockBodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// receiptsKey = receiptsPrefix + num (uint64 big endian) + hash +func receiptsKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// minBlockHeightToMigrate returns the smallest block number that should be migrated. +func minBlockHeightToMigrate(db ethdb.Database) *uint64 { + iter := db.NewIterator(chainDBBlockBodyPrefix, nil) + defer iter.Release() + for iter.Next() { + key := iter.Key() + if !shouldMigrateKey(db, key) { + continue + } + blockNum, _ := blockNumberAndHashFromKey(key) + return &blockNum + } + return nil +} diff --git a/plugin/evm/database/block_database_test.go b/plugin/evm/database/block_database_test.go new file mode 100644 index 0000000000..08a3dc1e0f --- /dev/null +++ b/plugin/evm/database/block_database_test.go @@ -0,0 +1,676 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/coreth/consensus/dummy" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/coreth/plugin/evm/customtypes" +) + +func TestMain(m *testing.M) { + customtypes.Register() + params.RegisterExtras() + os.Exit(m.Run()) +} + +// slowBlockDatabase wraps a BlockDatabase to add artificial delays +type slowBlockDatabase struct { + database.HeightIndex + shouldSlow func() bool +} + +func (s *slowBlockDatabase) Put(blockNumber uint64, encodedBlock []byte) error { + // Sleep to make migration hang for a bit + if s.shouldSlow == nil || s.shouldSlow() { + time.Sleep(100 * time.Millisecond) + } + return s.HeightIndex.Put(blockNumber, encodedBlock) +} + +var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = crypto.PubkeyToAddress(key2.PublicKey) +) + +func newDatabasesFromDir(t *testing.T, dataDir string) (*BlockDatabase, ethdb.Database) { + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(WrapDatabase(base)) + + // Create wrapped block database + blockDBPath := filepath.Join(dataDir, "blockdb") + wrapper := NewBlockDatabase(base, chainDB, blockdb.DefaultConfig(), blockDBPath, logging.NoLog{}, prometheus.NewRegistry()) + wrapper.InitWithMinHeight(1) + + return wrapper, chainDB +} + +func createBlocks(t *testing.T, numBlocks int) ([]*types.Block, []types.Receipts) { + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Number: 0, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + db, blocks, receipts, err := core.GenerateChainWithGenesis(gspec, engine, numBlocks-1, 10, func(_ int, gen *core.BlockGen) { + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &addr2, + Gas: 500000, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + + // add genesis block from db to blocks + genesisHash := rawdb.ReadCanonicalHash(db, 0) + genesisBlock := rawdb.ReadBlock(db, genesisHash, 0) + genesisReceipts := rawdb.ReadReceipts(db, genesisHash, 0, 0, params.TestChainConfig) + blocks = append([]*types.Block{genesisBlock}, blocks...) + receipts = append([]types.Receipts{genesisReceipts}, receipts...) + + return blocks, receipts +} + +func writeBlocks(db ethdb.Database, blocks []*types.Block, receipts []types.Receipts) { + for i, block := range blocks { + rawdb.WriteBlock(db, block) + if len(receipts) > 0 { + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) + } + + // ensure written blocks are canonical + rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) + } +} + +// todo: break this down into read blocks and test everything +// make sure to include reading genesis block +func TestBlockDatabase_Read(t *testing.T) { + // Test that BlockDatabase reads block data correctly + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(wrapper, blocks, receipts) + + testCases := []struct { + name string + test func(t *testing.T, block *types.Block, blockReceipts types.Receipts) + }{ + { + name: "header", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + }, + }, + { + name: "body", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + body := rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Body(), body) + }, + }, + { + name: "block", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + }, + }, + { + name: "receipts_and_logs", + test: func(t *testing.T, block *types.Block, blockReceipts types.Receipts) { + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), block.NumberU64())) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, recs) + + logs := rawdb.ReadLogs(wrapper, block.Hash(), block.NumberU64()) + expectedLogs := make([][]*types.Log, len(blockReceipts)) + for j, receipt := range blockReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, logs) + }, + }, + { + name: "block_number", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Equal(t, block.NumberU64(), *blockNumber) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for i, block := range blocks { + blockReceipts := receipts[i] + tc.test(t, block, blockReceipts) + } + }) + } +} + +func TestBlockDatabase_Delete(t *testing.T) { + // Test BlockDatabase delete operations. + // We are verifying that block header, body and receipts cannot be deleted, + // but hash to height mapping should be deleted. + + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 4) + targetBlocks := blocks[1:] + targetReceipts := receipts[1:] + writeBlocks(wrapper, targetBlocks, targetReceipts) + + // delete block data + for i, block := range targetBlocks { + rawdb.DeleteBlock(wrapper, block.Hash(), block.NumberU64()) + + // we cannot delete header or body + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + body := rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Body(), body) + + // hash to height mapping should be deleted + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Nil(t, blockNumber) + + // Receipts and logs should not be deleted + expectedReceipts := targetReceipts[i] + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, recs) + logs := rawdb.ReadLogs(wrapper, block.Hash(), block.NumberU64()) + expectedLogs := make([][]*types.Log, len(expectedReceipts)) + for j, receipt := range expectedReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, logs) + } +} + +func TestBlockDatabase_WriteOrder_CachesUntilBoth(t *testing.T) { + // Test that partial writes (header or body only) are cached and not persisted + // until both header and body are available, then written as a single RLP block + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 2) + block := blocks[1] + + // Write header first - should be cached but not persisted + rawdb.WriteHeader(wrapper, block.Header()) + + // Verify header is not yet available (cached but not written to blockdb) + require.False(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.Nil(t, rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64())) + + // Verify underlying chainDB also has no header + require.Nil(t, rawdb.ReadHeader(chainDB, block.Hash(), block.NumberU64())) + + // Write body - should trigger writing both header and body to blockdb + rawdb.WriteBody(wrapper, block.Hash(), block.NumberU64(), block.Body()) + + // Now both should be available + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + + // Underlying chainDB should still have no header/body + require.Nil(t, rawdb.ReadHeader(chainDB, block.Hash(), block.NumberU64())) + require.Nil(t, rawdb.ReadBody(chainDB, block.Hash(), block.NumberU64())) +} + +func TestBlockDatabase_Batch(t *testing.T) { + // Test that batch operations work correctly for both writing and reading block data and receipts + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block := blocks[1] + blockReceipts := receipts[1] + + batch := wrapper.NewBatch() + + // Write header, body, and receipts to batch + rawdb.WriteBlock(batch, block) + rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), blockReceipts) + + // After writing both header and body to batch, both should be available immediately + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + + // Receipts should also be available immediately since they're stored separately + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), block.NumberU64())) + + // Before write, header number should not be available + require.Nil(t, rawdb.ReadHeaderNumber(wrapper, block.Hash())) + + // After Write(), verify header number is available + require.NoError(t, batch.Write()) + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Equal(t, block.NumberU64(), *blockNumber) +} + +func TestBlockDatabase_SameBlockWrites(t *testing.T) { + // Test that writing the same block twice via rawdb.WriteBlock doesn't cause issues + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 1) + block := blocks[0] + + // Write block twice + rawdb.WriteBlock(wrapper, block) + rawdb.WriteBlock(wrapper, block) + + // Verify block data is still correct after duplicate writes + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) +} + +func TestBlockDatabase_DifferentBlocksSameHeight(t *testing.T) { + // Test that writing different blocks to the same height overwrites the first block + // and reading by the first block's hash returns nothing. + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block1 := blocks[1] + receipt1 := receipts[1] + + // Manually create a second block with the same height but different content + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + _, blocks2, receipts2, err := core.GenerateChainWithGenesis(gspec, engine, 1, 10, func(_ int, gen *core.BlockGen) { + gen.OffsetTime(int64(5000)) + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &addr2, + Gas: 450000, + GasTipCap: big.NewInt(5), + GasFeeCap: big.NewInt(5), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + block2 := blocks2[0] + receipt2 := receipts2[0] + + // Ensure both blocks have the same height but different hashes + require.Equal(t, block1.NumberU64(), block2.NumberU64()) + require.NotEqual(t, block1.Hash(), block2.Hash()) + + // Write two blocks with the same height + writeBlocks(wrapper, []*types.Block{block1, block2}, []types.Receipts{receipt1, receipt2}) + + // Reading by the first block's hash does not return anything + require.False(t, rawdb.HasHeader(wrapper, block1.Hash(), block1.NumberU64())) + require.False(t, rawdb.HasBody(wrapper, block1.Hash(), block1.NumberU64())) + firstHeader := rawdb.ReadHeader(wrapper, block1.Hash(), block1.NumberU64()) + require.Nil(t, firstHeader) + firstBody := rawdb.ReadBody(wrapper, block1.Hash(), block1.NumberU64()) + require.Nil(t, firstBody) + require.False(t, rawdb.HasReceipts(wrapper, block1.Hash(), block1.NumberU64())) + firstReceipts := rawdb.ReadReceipts(wrapper, block1.Hash(), block1.NumberU64(), block1.Time(), params.TestChainConfig) + require.Nil(t, firstReceipts) + + // Reading by the second block's hash returns second block data + require.True(t, rawdb.HasHeader(wrapper, block2.Hash(), block2.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block2.Hash(), block2.NumberU64())) + secondBlock := rawdb.ReadBlock(wrapper, block2.Hash(), block2.NumberU64()) + assertRLPEqual(t, block2, secondBlock) + require.True(t, rawdb.HasReceipts(wrapper, block2.Hash(), block2.NumberU64())) + secondReceipts := rawdb.ReadReceipts(wrapper, block2.Hash(), block2.NumberU64(), block2.Time(), params.TestChainConfig) + assertRLPEqual(t, receipt2, secondReceipts) +} + +func TestBlockDatabase_EmptyReceipts(t *testing.T) { + // Test that blocks with no transactions (empty receipts) are handled correctly + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + + // Create blocks without any transactions (empty receipts) + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + _, blocks, receipts, err := core.GenerateChainWithGenesis(gspec, engine, 3, 10, func(_ int, _ *core.BlockGen) { + // Don't add any transactions - this will create blocks with empty receipts + }) + require.NoError(t, err) + + // Verify all blocks have empty receipts + for i := range blocks { + require.Empty(t, receipts[i], "Block %d should have empty receipts", i) + } + + // write some blocks to chain db and some to wrapper and trigger migration + writeBlocks(chainDB, blocks[:2], receipts[:2]) + writeBlocks(wrapper, blocks[2:], receipts[2:]) + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + + // Verify that blocks with empty receipts are handled correctly + for _, block := range blocks { + blockNum := block.NumberU64() + + // Block data should be accessible + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), blockNum)) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), blockNum)) + + // Receipts should only be stored in the receiptsDB + require.False(t, rawdb.HasReceipts(chainDB, block.Hash(), blockNum)) + _, err := wrapper.receiptsDB.Get(blockNum) + require.NoError(t, err) + + // to be consistent with ethdb behavior, empty receipts should return true for HasReceipts + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), blockNum)) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), blockNum, block.Time(), params.TestChainConfig) + require.Empty(t, recs) + logs := rawdb.ReadLogs(wrapper, block.Hash(), blockNum) + require.Empty(t, logs) + } +} + +func TestBlockDatabase_Close_PersistsData(t *testing.T) { + // Test that Close() properly closes both databases and data persists + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 1) + block := blocks[0] + blockReceipts := receipts[0] + + // Write block and receipts + writeBlocks(wrapper, blocks, receipts) + + // Verify data is present + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, recs) + + // Close the wrapper db + require.NoError(t, wrapper.Close()) + + // Test we should no longer be able to read the block + require.False(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + _, err := wrapper.blockDB.Get(block.NumberU64()) + require.Error(t, err) + require.ErrorIs(t, err, database.ErrClosed) + + // Reopen the database and verify data is still present + wrapper, _ = newDatabasesFromDir(t, dataDir) + persistedBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, persistedBlock) + persistedRecs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, persistedRecs) + require.NoError(t, wrapper.Close()) +} + +func TestBlockDatabase_ReadDuringMigration(t *testing.T) { + // Test that blocks are readable during migration for both migrated and un-migrated blocks. + // This test: + // 1. Generates 21 blocks with receipts + // 2. Adds first 20 blocks and their receipts to chainDB + // 3. Creates wrapper database with migration disabled + // 4. Start migration with slow block database to control migration speed + // 5. Waits for at least 5 blocks to be migrated + // 6. Writes block 21 during migration (this is fast) + // 7. Verifies all 21 blocks and their receipts are readable via rawdb using wrapper + + dataDir := t.TempDir() + // Create initial databases using newDatabasesFromDir + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 21) + + // Add first 20 blocks to KVDB (chainDB) - these will be migrated + writeBlocks(chainDB, blocks[:20], receipts[:20]) + + // Create a slow block database to control migration speed + blockCount := 0 + slowBdb := &slowBlockDatabase{ + HeightIndex: wrapper.blockDB, + shouldSlow: func() bool { + blockCount++ + return blockCount > 5 // Slow down after 5 blocks + }, + } + + // Create and start migrator manually with slow block database + wrapper.migrator.blockDB = slowBdb + require.NoError(t, wrapper.migrator.Migrate()) + + // Wait for at least 5 blocks to be migrated + require.Eventually(t, func() bool { + return wrapper.migrator.blocksProcessed() >= 5 + }, 15*time.Second, 100*time.Millisecond) + + // Write block 21 to the wrapper database (this simulates a new block being added during migration) + writeBlocks(wrapper, blocks[20:21], receipts[20:21]) + + // Verify all 21 blocks are readable via rawdb using the wrapper + for i, block := range blocks { + blockNum := block.NumberU64() + expectedReceipts := receipts[i] + + // Test reading block header and body + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), blockNum)) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), blockNum)) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualBlock, "Block %d should be readable", blockNum) + assertRLPEqual(t, block, actualBlock) + actualHeader := rawdb.ReadHeader(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualHeader, "Block %d header should be readable", blockNum) + assertRLPEqual(t, block.Header(), actualHeader) + actualBody := rawdb.ReadBody(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualBody, "Block %d body should be readable", blockNum) + assertRLPEqual(t, block.Body(), actualBody) + + // Test reading receipts and logs + actualReceipts := rawdb.ReadReceipts(wrapper, block.Hash(), blockNum, block.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, actualReceipts) + actualLogs := rawdb.ReadLogs(wrapper, block.Hash(), blockNum) + expectedLogs := make([][]*types.Log, len(expectedReceipts)) + for j, receipt := range expectedReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, actualLogs) + + // Header number should be readable + actualBlockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.NotNil(t, actualBlockNumber, "Block %d number mapping should be readable", blockNum) + require.Equal(t, blockNum, *actualBlockNumber) + } + + require.NoError(t, wrapper.Close()) +} + +func TestBlockDatabase_Initialization(t *testing.T) { + blocks, _ := createBlocks(t, 10) + + testCases := []struct { + name string + stateSyncEnabled bool + chainDBBlocks []*types.Block + existingDBMinHeight *uint64 + expInitialized bool + expMinHeight uint64 + expMinHeightSet bool + }{ + { + name: "empty_chainDB_no_state_sync", + stateSyncEnabled: false, + expInitialized: true, + expMinHeight: 1, + expMinHeightSet: true, + }, + { + name: "empty_chainDB_state_sync_no_init", + stateSyncEnabled: true, + expInitialized: false, + expMinHeight: 0, + expMinHeightSet: false, + }, + { + name: "migration_needed", + stateSyncEnabled: false, + chainDBBlocks: blocks[5:10], + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "migration_needed_with_genesis", + stateSyncEnabled: false, + chainDBBlocks: append([]*types.Block{blocks[0]}, blocks[5:10]...), + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "migration_needed_state_sync", + stateSyncEnabled: true, + chainDBBlocks: blocks[5:10], + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "existing_db_created_with_min_height", + existingDBMinHeight: func() *uint64 { v := uint64(2); return &v }(), + stateSyncEnabled: false, + chainDBBlocks: blocks[5:8], + expInitialized: true, + expMinHeight: 2, + expMinHeightSet: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(WrapDatabase(base)) + blockDBPath := filepath.Join(dataDir, "blockdb") + + // Create the block database with an existing min height if needed + if tc.existingDBMinHeight != nil { + minHeight, err := getDatabaseMinHeight(base) + require.NoError(t, err) + require.Nil(t, minHeight) + wrapper := NewBlockDatabase( + base, + chainDB, + blockdb.DefaultConfig(), + blockDBPath, + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, wrapper.InitWithMinHeight(*tc.existingDBMinHeight)) + require.NoError(t, wrapper.blockDB.Close()) + require.NoError(t, wrapper.receiptsDB.Close()) + minHeight, err = getDatabaseMinHeight(base) + require.NoError(t, err) + require.Equal(t, *tc.existingDBMinHeight, *minHeight) + } + + // write chainDB blocks if needed + if len(tc.chainDBBlocks) > 0 { + writeBlocks(chainDB, tc.chainDBBlocks, []types.Receipts{}) + } + + // Create wrapper database + wrapper := NewBlockDatabase( + base, + chainDB, + blockdb.DefaultConfig(), + blockDBPath, + logging.NoLog{}, + prometheus.NewRegistry(), + ) + initialized, err := wrapper.InitWithStateSync(tc.stateSyncEnabled) + require.NoError(t, err) + require.Equal(t, tc.expInitialized, initialized) + require.Equal(t, tc.expInitialized, wrapper.initialized) + + // Verify initialization state and min height + if tc.expMinHeightSet { + require.Equal(t, tc.expMinHeight, wrapper.minHeight) + } else { + require.Equal(t, uint64(0), wrapper.minHeight) + } + + require.NoError(t, wrapper.Close()) + }) + } +} + +// Test that genesis block (block number 0) behavior works correctly. +// Genesis blocks should only exist in chainDB and not in the wrapper's block database. +func TestBlockDatabase_Genesis(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + require.True(t, wrapper.initialized) + require.Equal(t, uint64(1), wrapper.minHeight) + blocks, receipts := createBlocks(t, 10) + writeBlocks(wrapper, blocks, receipts) + + // validate genesis block can be retrieved and its stored in chainDB + genesisHash := rawdb.ReadCanonicalHash(chainDB, 0) + genesisBlock := rawdb.ReadBlock(wrapper, genesisHash, 0) + assertRLPEqual(t, blocks[0], genesisBlock) + has, _ := wrapper.blockDB.Has(0) + require.False(t, has) + require.Equal(t, uint64(1), wrapper.minHeight) +} + +func assertRLPEqual(t *testing.T, expected, actual interface{}) { + t.Helper() + + expectedBytes, err := rlp.EncodeToBytes(expected) + require.NoError(t, err) + actualBytes, err := rlp.EncodeToBytes(actual) + require.NoError(t, err) + require.Equal(t, expectedBytes, actualBytes) +}