diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8d40d8e0b4..51cefb172a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,7 +104,7 @@ jobs: path: avalanchego token: ${{ secrets.AVALANCHE_PAT }} - name: Run e2e tests - uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@84e9aebcfbc04602865f4c0a3e8b46a27409a3f5 + uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@ca04567537150e2d71e6f1ef4fae606ade0bdc89 with: run: ./scripts/run_task.sh test-e2e-ci prometheus_url: ${{ secrets.PROMETHEUS_URL || '' }} @@ -132,7 +132,7 @@ jobs: ref: ${{ github.event.inputs.avalanchegoBranch }} path: avalanchego - name: Run Warp E2E Tests - uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@84e9aebcfbc04602865f4c0a3e8b46a27409a3f5 + uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@ca04567537150e2d71e6f1ef4fae606ade0bdc89 with: run: ./scripts/run_task.sh test-e2e-warp-ci artifact_prefix: warp diff --git a/core/blockchain.go b/core/blockchain.go index ba97bd8df7..064e7f7f9f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -108,6 +108,7 @@ var ( blockTrieOpsTimer = metrics.GetOrRegisterCounter("chain/block/trie", nil) blockValidationTimer = metrics.GetOrRegisterCounter("chain/block/validations/state", nil) blockWriteTimer = metrics.GetOrRegisterCounter("chain/block/writes", nil) + blockAcceptTimer = metrics.GetOrRegisterCounter("chain/block/accepts", nil) acceptorQueueGauge = metrics.GetOrRegisterGauge("chain/acceptor/queue/size", nil) acceptorWorkTimer = metrics.GetOrRegisterCounter("chain/acceptor/work", nil) @@ -137,11 +138,12 @@ var ( ) const ( - bodyCacheLimit = 256 - blockCacheLimit = 256 - receiptsCacheLimit = 32 - txLookupCacheLimit = 1024 - badBlockLimit = 10 + bodyCacheLimit = 256 + blockCacheLimit = 256 + verifiedBlockCacheLimit = 1024 + receiptsCacheLimit = 32 + txLookupCacheLimit = 1024 + badBlockLimit = 10 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // @@ -293,12 +295,14 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triedb *triedb.Database // The database handler for maintaining trie nodes. - stateCache state.Database // State database to reuse between imports (contains state cache) - txIndexer *txIndexer // Transaction indexer, might be nil if not enabled - stateManager TrieWriter + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triedb *triedb.Database // The database handler for maintaining trie nodes. + stateCache state.Database // State database to reuse between imports (contains state cache) + txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + stateManager TrieWriter + verifiedBlockCache FIFOCache[common.Hash, *types.Block] // cache for verified but not accepted blocks + verifiedReceiptsCache FIFOCache[common.Hash, types.Receipts] // cache for verified but not accepted receipts hc *HeaderChain rmLogsFeed event.Feed @@ -412,21 +416,23 @@ func NewBlockChain( log.Info("") bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triedb: triedb, - bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), - receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), - txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), - badBlocks: lru.NewCache[common.Hash, *badBlock](badBlockLimit), - engine: engine, - vmConfig: vmConfig, - senderCacher: NewTxSenderCacher(runtime.NumCPU()), - acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), - quit: make(chan struct{}), - acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triedb: triedb, + bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), + receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), + txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), + badBlocks: lru.NewCache[common.Hash, *badBlock](badBlockLimit), + verifiedBlockCache: NewFIFOCache[common.Hash, *types.Block](verifiedBlockCacheLimit), + verifiedReceiptsCache: NewFIFOCache[common.Hash, types.Receipts](verifiedBlockCacheLimit), + engine: engine, + vmConfig: vmConfig, + senderCacher: NewTxSenderCacher(runtime.NumCPU()), + acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), + quit: make(chan struct{}), + acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize), } bc.stateCache = extstate.NewDatabaseWithNodeDB(bc.db, bc.triedb) bc.validator = NewBlockValidator(chainConfig, bc, engine) @@ -591,6 +597,17 @@ func (bc *BlockChain) warmAcceptedCaches() { log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime)) } +func (bc *BlockChain) writeAcceptedBlockAndReceipts(b *types.Block) { + rawdb.WriteBlock(bc.db, b) + + if receipts, ok := bc.verifiedReceiptsCache.Get(b.Hash()); ok { + rawdb.WriteReceipts(bc.db, b.Hash(), b.NumberU64(), receipts) + } + + bc.verifiedBlockCache.Remove(b.Hash()) + bc.verifiedReceiptsCache.Remove(b.Hash()) +} + // startAcceptor starts processing items on the [acceptorQueue]. If a [nil] // object is placed on the [acceptorQueue], the [startAcceptor] will exit. func (bc *BlockChain) startAcceptor() { @@ -724,10 +741,26 @@ func (bc *BlockChain) loadLastState(lastAcceptedHash common.Hash) error { if head == (common.Hash{}) { return errors.New("could not read head block hash") } - // Make sure the entire head block is available - headBlock := bc.GetBlockByHash(head) + var headBlock *types.Block + headBlock = bc.GetBlockByHash(head) if headBlock == nil { - return fmt.Errorf("could not load head block %s", head.Hex()) + // Fallback: if the recorded head is missing (or its block data is not available), + // reset head markers to the provided last accepted block. + // This can happen because head block is updated on verify while blocks are + // saved on accept. Therefore, if a crash happens after verify but before accept, + // the head block will be missing. + lastAccepted := bc.GetBlockByHash(lastAcceptedHash) + if lastAccepted == nil { + return fmt.Errorf("could not load last accepted block to repair head markers") + } + batch := bc.db.NewBatch() + rawdb.WriteCanonicalHash(batch, lastAccepted.Hash(), lastAccepted.NumberU64()) + rawdb.WriteHeadBlockHash(batch, lastAccepted.Hash()) + rawdb.WriteHeadHeaderHash(batch, lastAccepted.Hash()) + if err := batch.Write(); err != nil { + log.Crit("Failed to repair head markers", "err", err) + } + headBlock = lastAccepted } // Everything seems to be fine, set as the head block bc.currentBlock.Store(headBlock.Header()) @@ -1086,6 +1119,8 @@ func (bc *BlockChain) Accept(block *types.Block) error { bc.chainmu.Lock() defer bc.chainmu.Unlock() + start := time.Now() + // The parent of [block] must be the last accepted block. if bc.lastAccepted.Hash() != block.ParentHash() { return fmt.Errorf( @@ -1107,6 +1142,9 @@ func (bc *BlockChain) Accept(block *types.Block) error { } } + // ensure last accepted block is persisted on disk before setting it + bc.writeAcceptedBlockAndReceipts(block) + // Enqueue block in the acceptor bc.lastAccepted = block bc.addAcceptorQueue(block) @@ -1128,6 +1166,8 @@ func (bc *BlockChain) Accept(block *types.Block) error { latestGasCapacityGauge.Update(int64(s.Gas.Capacity)) latestGasTargetGauge.Update(int64(s.Target())) } + + blockAcceptTimer.Inc(time.Since(start).Milliseconds()) return nil } @@ -1155,6 +1195,8 @@ func (bc *BlockChain) Reject(block *types.Block) error { // Remove the block from the block cache (ignore return value of whether it was in the cache) _ = bc.blockCache.Remove(block.Hash()) + bc.verifiedBlockCache.Remove(block.Hash()) + bc.verifiedReceiptsCache.Remove(block.Hash()) return nil } @@ -1213,13 +1255,17 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, parentRoot common // writeBlockWithState writes the block and all associated state to the database, // but it expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, parentRoot common.Hash, receipts []*types.Receipt, state *state.StateDB) error { + // only write the block to cache to avoid storing non-accepted blocks in the database + bc.verifiedBlockCache.Put(block.Hash(), block) + // also cache the receipts for verified but not accepted blocks + bc.verifiedReceiptsCache.Put(block.Hash(), receipts) + // Irrelevant of the canonical status, write the block itself to the database. // // Note all the components of block(hash->number map, header, body, receipts) // should be written atomically. BlockBatch is used for containing all components. blockBatch := bc.db.NewBatch() - rawdb.WriteBlock(blockBatch, block) - rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WriteHeaderNumber(blockBatch, block.Hash(), block.NumberU64()) rawdb.WritePreimages(blockBatch, state.Preimages()) if err := blockBatch.Write(); err != nil { log.Crit("Failed to write block into disk", "err", err) @@ -1453,7 +1499,7 @@ func (bc *BlockChain) collectUnflattenedLogs(b *types.Block, removed bool) [][]* if excessBlobGas != nil { blobGasPrice = eip4844.CalcBlobFee(*excessBlobGas) } - receipts := rawdb.ReadRawReceipts(bc.db, b.Hash(), b.NumberU64()) + receipts := bc.GetReceiptsByHash(b.Hash()) if err := receipts.DeriveFields(bc.chainConfig, b.Hash(), b.NumberU64(), b.Time(), b.BaseFee(), blobGasPrice, b.Transactions()); err != nil { log.Error("Failed to derive block receipts fields", "hash", b.Hash(), "number", b.NumberU64(), "err", err) } diff --git a/core/blockchain_ext_test.go b/core/blockchain_ext_test.go index 566f85274a..6311e31e11 100644 --- a/core/blockchain_ext_test.go +++ b/core/blockchain_ext_test.go @@ -1652,6 +1652,9 @@ func ReexecCorruptedStateTest(t *testing.T, create ReexecTestFunc) { // Simulate a crash by updating the acceptor tip blockchain.writeBlockAcceptedIndices(chain[1]) + + // need to write accepted block to disk as we are no longer writing it on verify + rawdb.WriteBlock(blockchain.db, chain[1]) blockchain.Stop() // Restart blockchain with existing state diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index faa6786316..dba2916b58 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -61,18 +61,32 @@ func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool { // GetHeader retrieves a block header from the database by hash and number, // caching it if found. func (bc *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header { - return bc.hc.GetHeader(hash, number) + if block, ok := bc.verifiedBlockCache.Get(hash); ok { + return block.Header() + } + header := bc.hc.GetHeader(hash, number) + if header == nil { + return nil + } + return header } // GetHeaderByHash retrieves a block header from the database by hash, caching it if // found. func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header { + if block, ok := bc.verifiedBlockCache.Get(hash); ok { + return block.Header() + } return bc.hc.GetHeaderByHash(hash) } // GetHeaderByNumber retrieves a block header from the database by number, // caching it (associated with its hash) if found. func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header { + hash := rawdb.ReadCanonicalHash(bc.db, number) + if block, ok := bc.verifiedBlockCache.Get(hash); ok { + return block.Header() + } return bc.hc.GetHeaderByNumber(number) } @@ -83,6 +97,9 @@ func (bc *BlockChain) GetBody(hash common.Hash) *types.Body { if cached, ok := bc.bodyCache.Get(hash); ok { return cached } + if block, ok := bc.verifiedBlockCache.Get(hash); ok { + return block.Body() + } number := bc.hc.GetBlockNumber(hash) if number == nil { return nil @@ -101,6 +118,9 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool { if bc.blockCache.Contains(hash) { return true } + if _, ok := bc.verifiedBlockCache.Get(hash); ok { + return true + } if !bc.HasHeader(hash, number) { return false } @@ -112,6 +132,9 @@ func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool { if !bc.HasBlock(hash, number) { return false } + if _, ok := bc.verifiedReceiptsCache.Get(hash); ok { + return true + } if bc.receiptsCache.Contains(hash) { return true } @@ -125,6 +148,9 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { if block, ok := bc.blockCache.Get(hash); ok { return block } + if block, ok := bc.verifiedBlockCache.Get(hash); ok { + return block + } block := rawdb.ReadBlock(bc.db, hash, number) if block == nil { return nil @@ -174,6 +200,12 @@ func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*type // GetReceiptsByHash retrieves the receipts for all transactions in a given block. func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { + // Check verified receipts cache first + if receipts, ok := bc.verifiedReceiptsCache.Get(hash); ok { + return receipts + } + + // Check main receipts cache if receipts, ok := bc.receiptsCache.Get(hash); ok { return receipts } diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index c493e367d7..2f13c7e5e5 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -35,17 +35,24 @@ import ( "math/big" "testing" + avalanchedatabase "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" "github.com/ava-labs/coreth/consensus/dummy" "github.com/ava-labs/coreth/params" "github.com/ava-labs/coreth/plugin/evm/customrawdb" + "github.com/ava-labs/coreth/plugin/evm/database" "github.com/ava-labs/coreth/plugin/evm/upgrade/ap3" "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/core/vm" "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/ethdb" ethparams "github.com/ava-labs/libevm/params" "github.com/ava-labs/libevm/triedb" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -510,12 +517,21 @@ func testLongReorgedDeepRepair(t *testing.T, snapshots bool) { func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme, customrawdb.FirewoodScheme} { t.Run(scheme, func(t *testing.T) { - testRepairWithScheme(t, tt, snapshots, scheme) + testRepairWithScheme(t, tt, snapshots, scheme, false) + }) + t.Run(scheme+"- blockdb", func(t *testing.T) { + testRepairWithScheme(t, tt, snapshots, scheme, true) }) } } -func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string) { +func useBlockDatabase(t *testing.T, kvdb avalanchedatabase.Database, chaindb ethdb.Database, datadir string) ethdb.Database { + db := database.NewBlockDatabase(kvdb, chaindb, blockdb.DefaultConfig(), datadir, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, db.InitWithMinHeight(1)) + return db +} + +func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string, useBlockDB bool) { // It's hard to follow the test case, visualize the input //log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) // fmt.Println(tt.dump(true)) @@ -534,6 +550,11 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s if err != nil { t.Fatalf("Failed to create persistent database: %v", err) } + kvdb := memdb.New() + if useBlockDB { + db = useBlockDatabase(t, kvdb, db, datadir) + } + defer db.Close() // Might double close, should be fine // Initialize a fresh chain @@ -627,8 +648,10 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s if err != nil { t.Fatalf("Failed to reopen persistent database: %v", err) } + if useBlockDB { + db = useBlockDatabase(t, kvdb, db, datadir) + } defer db.Close() - newChain, err := NewBlockChain(db, config, gspec, engine, vm.Config{}, lastAcceptedHash, false) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -638,8 +661,11 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s // Iterate over all the remaining blocks and ensure there are no gaps verifyNoGaps(t, newChain, true, canonblocks) verifyNoGaps(t, newChain, false, sideblocks) - verifyCutoff(t, newChain, true, canonblocks, tt.expCanonicalBlocks) - verifyCutoff(t, newChain, false, sideblocks, tt.expSidechainBlocks) + // Only accepted (committed) blocks are persisted after restart. + cutoffHead := int(newChain.LastAcceptedBlock().NumberU64()) + verifyCutoff(t, newChain, true, canonblocks, cutoffHead) + // Sidechain blocks are not persisted after restart; expect absence. + verifyCutoff(t, newChain, false, sideblocks, 0) if head := newChain.CurrentHeader(); head.Number.Uint64() != tt.expHeadBlock { t.Errorf("Head header mismatch: have %d, want %d", head.Number, tt.expHeadBlock) diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index d17c32fec9..6f4725448f 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -143,7 +143,9 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks []*types.Block) { // Iterate over all the remaining blocks and ensure there are no gaps verifyNoGaps(t, chain, true, blocks) - verifyCutoff(t, chain, true, blocks, basic.expCanonicalBlocks) + // Only accepted (committed) blocks are persisted after restart. + acceptedHead := int(chain.LastAcceptedBlock().NumberU64()) + verifyCutoff(t, chain, true, blocks, acceptedHead) if head := chain.CurrentHeader(); head.Number.Uint64() != basic.expHeadBlock { t.Errorf("Head header mismatch: have %d, want %d", head.Number, basic.expHeadBlock) diff --git a/core/fifo_cache.go b/core/fifo_cache.go index 9f1eb8cf1e..16cef153f5 100644 --- a/core/fifo_cache.go +++ b/core/fifo_cache.go @@ -15,6 +15,7 @@ var ( type FIFOCache[K comparable, V any] interface { Put(K, V) Get(K) (V, bool) + Remove(K) error } // NewFIFOCache creates a new First-In-First-Out cache of size [limit]. @@ -56,6 +57,13 @@ func (f *BufferFIFOCache[K, V]) Get(key K) (V, bool) { return v, ok } +func (f *BufferFIFOCache[K, V]) Remove(key K) error { + f.l.Lock() + defer f.l.Unlock() + + return f.remove(key) +} + // remove is used as the callback in [BoundedBuffer]. It is assumed that the // [WriteLock] is held when this is accessed. func (f *BufferFIFOCache[K, V]) remove(key K) error { @@ -69,3 +77,5 @@ func (*NoOpFIFOCache[K, V]) Put(K, V) {} func (*NoOpFIFOCache[K, V]) Get(K) (V, bool) { return *new(V), false } + +func (_ *NoOpFIFOCache[K, V]) Remove(_ K) error { return nil } diff --git a/core/headerchain_test.go b/core/headerchain_test.go index dadf28f7f2..c9e68e5005 100644 --- a/core/headerchain_test.go +++ b/core/headerchain_test.go @@ -54,7 +54,7 @@ func verifyUnbrokenCanonchain(bc *BlockChain) error { if h.Number.Uint64() == 0 { break } - h = bc.hc.GetHeader(h.ParentHash, h.Number.Uint64()-1) + h = bc.GetHeader(h.ParentHash, h.Number.Uint64()-1) } return nil } diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b0fbd5a86f..fbbf893672 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -272,6 +272,17 @@ func TestFilters(t *testing.T) { if err != nil { t.Fatal(err) } + // Persist headers/bodies for testBackend lookups. The filter testBackend resolves + // headers directly from the database via hash->number and header reads, and + // our insert path does not write headers for unaccepted blocks. + for _, block := range chain { + rawdb.WriteBlock(db, block) + + // need to also persist receipts on disk since the testBackend resolves receipts + // directly from the database while non-accepted block receipts are not persisted in the chain. + receipts := bc.GetReceiptsByHash(block.Hash()) + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts) + } // Set block 998 as Finalized (-3) // bc.SetFinalized(chain[998].Header()) diff --git a/go.mod b/go.mod index 2760703e4d..b0df2fa4da 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ go 1.24.7 require ( github.com/VictoriaMetrics/fastcache v1.12.1 - github.com/ava-labs/avalanchego v1.13.6-0.20251003124629-84e9aebcfbc0 + github.com/ava-labs/avalanchego v1.13.6-0.20251007222745-2a5a3cfb2fe2 github.com/ava-labs/firewood-go-ethhash/ffi v0.0.12 github.com/ava-labs/libevm v1.13.15-0.20251002164226-35926db4d661 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc diff --git a/go.sum b/go.sum index 8880cb2ebf..4c2e0fc22f 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/ava-labs/avalanchego v1.13.6-0.20251003124629-84e9aebcfbc0 h1:B3ti+6P0r7V6BZw5PG7nP7hZ4AJj/1tSsmOOAwnonhs= -github.com/ava-labs/avalanchego v1.13.6-0.20251003124629-84e9aebcfbc0/go.mod h1:yplWYV/FzAZeYAhy0yOj8wjJA1PCdTPxQf8Wzpwg6DY= +github.com/ava-labs/avalanchego v1.13.6-0.20251007222745-2a5a3cfb2fe2 h1:cS+nGwUvZT7SmJcuPKttDTmhmizC3BGPvDVyCm8RvZ0= +github.com/ava-labs/avalanchego v1.13.6-0.20251007222745-2a5a3cfb2fe2/go.mod h1:yplWYV/FzAZeYAhy0yOj8wjJA1PCdTPxQf8Wzpwg6DY= github.com/ava-labs/firewood-go-ethhash/ffi v0.0.12 h1:aMcrLbpJ/dyu2kZDf/Di/4JIWsUcYPyTDKymiHpejt0= github.com/ava-labs/firewood-go-ethhash/ffi v0.0.12/go.mod h1:cq89ua3iiZ5wPBALTEQS5eG8DIZcs7ov6OiL4YR1BVY= github.com/ava-labs/libevm v1.13.15-0.20251002164226-35926db4d661 h1:lt4yQE1HMvxWrdD5RFj+h9kWUsZK2rmNohvkeQsbG9M= diff --git a/plugin/evm/config/config.go b/plugin/evm/config/config.go index ff449dd15d..b170af6e32 100644 --- a/plugin/evm/config/config.go +++ b/plugin/evm/config/config.go @@ -138,7 +138,13 @@ type Config struct { StateSyncRequestSize uint16 `json:"state-sync-request-size"` // Database Settings - InspectDatabase bool `json:"inspect-database"` // Inspects the database on startup if enabled. + InspectDatabase bool `json:"inspect-database"` // Inspects the database on startup if enabled. + BlockDatabaseEnabled bool `json:"block-database-enabled"` // Use block database for storing block data + BlockDatabaseSyncToDisk bool `json:"block-database-sync-to-disk"` // Sync block database to disk + + // BlockDatabaseMigrationDisabled disables migrating block data from key-value + // database to the block database. Only new blocks will be stored in the block database. + BlockDatabaseMigrationDisabled bool `json:"block-database-migration-disabled"` // SkipUpgradeCheck disables checking that upgrades must take place before the last // accepted block. Skipping this check is useful when a node operator does not update diff --git a/plugin/evm/config/default_config.go b/plugin/evm/config/default_config.go index 5366f9b6ce..66d5d304d7 100644 --- a/plugin/evm/config/default_config.go +++ b/plugin/evm/config/default_config.go @@ -80,6 +80,10 @@ func NewDefaultConfig() Config { StateHistory: uint64(32), // Estimated block count in 24 hours with 2s block accept period HistoricalProofQueryWindow: uint64(24 * time.Hour / (2 * time.Second)), + // Database Settings + BlockDatabaseEnabled: false, + BlockDatabaseSyncToDisk: false, + BlockDatabaseMigrationDisabled: false, // Price Option Defaults PriceOptionSlowFeePercentage: uint64(95), PriceOptionFastFeePercentage: uint64(105), diff --git a/plugin/evm/database/block_cache.go b/plugin/evm/database/block_cache.go new file mode 100644 index 0000000000..3ae786b34d --- /dev/null +++ b/plugin/evm/database/block_cache.go @@ -0,0 +1,51 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "sync" + + "github.com/ava-labs/libevm/common" +) + +// blockCache is a cache for block header and body. +// It allows saving body and header separately via save while +// fetching both body and header at once via get. +type blockCache struct { + bodyCache map[common.Hash][]byte + headerCache map[common.Hash][]byte + mu sync.RWMutex +} + +func newBlockCache() *blockCache { + return &blockCache{ + bodyCache: make(map[common.Hash][]byte), + headerCache: make(map[common.Hash][]byte), + } +} + +func (b *blockCache) get(blockHash common.Hash) ([]byte, []byte, bool) { + b.mu.RLock() + defer b.mu.RUnlock() + bodyData, hasBody := b.bodyCache[blockHash] + headerData, hasHeader := b.headerCache[blockHash] + return bodyData, headerData, hasBody && hasHeader +} + +func (b *blockCache) clear(blockHash common.Hash) { + b.mu.Lock() + defer b.mu.Unlock() + delete(b.bodyCache, blockHash) + delete(b.headerCache, blockHash) +} + +func (b *blockCache) save(key []byte, blockHash common.Hash, value []byte) { + b.mu.Lock() + defer b.mu.Unlock() + if isBodyKey(key) { + b.bodyCache[blockHash] = value + } else if isHeaderKey(key) { + b.headerCache[blockHash] = value + } +} diff --git a/plugin/evm/database/block_database.go b/plugin/evm/database/block_database.go new file mode 100644 index 0000000000..4d3188b932 --- /dev/null +++ b/plugin/evm/database/block_database.go @@ -0,0 +1,510 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "path/filepath" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/heightindexdb/meterdb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/log" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + _ ethdb.Database = (*BlockDatabase)(nil) + + // Key prefixes for block data in the ethdb. + // REVIEW: I opted to just copy these from libevm since these should never change + // and we can avoid libevm changes by not needing to export them. + // Alternatively, we can update libevm to export these or create reader functions. + chainDBHeaderPrefix = []byte("h") + chainDBBlockBodyPrefix = []byte("b") + chainDBReceiptsPrefix = []byte("r") + + // Database prefix for storing block database migrator's internal state and progress tracking + migratorDBPrefix = []byte("migrator") + + // Key for storing the minimum height configuration of the block databases. + // This value determines the lowest block height that will be stored in the + // height-indexed block databases. Once set during initialization, the block + // database min height cannot be changed without recreating the databases. + blockDBMinHeightKey = []byte("blockdb_min_height") +) + +const ( + // Number of elements in the RLP encoded block data + blockDataElements = 3 + // Number of elements in the RLP encoded receipt data + receiptDataElements = 2 + blockNumberSize = 8 + blockHashSize = 32 +) + +// BlockDatabase is a wrapper around an ethdb.Database that stores block header +// body, and receipts in separate height-indexed block databases. +// All other keys are stored in the underlying ethdb.Database (chainDB). +type BlockDatabase struct { + ethdb.Database + // DB for storing block database state data (ie min height) + stateDB database.Database + blockDB database.HeightIndex + receiptsDB database.HeightIndex + + config blockdb.DatabaseConfig + blockDBPath string + minHeight uint64 + + bCache *blockCache // cache for block header and body + migrator *blockDatabaseMigrator + + reg prometheus.Registerer + // todo: use logger for blockdb instances + //nolint:unused + logger logging.Logger + + initialized bool +} + +type blockDatabaseBatch struct { + database *BlockDatabase + ethdb.Batch +} + +func NewBlockDatabase( + stateDB database.Database, + chainDB ethdb.Database, + config blockdb.DatabaseConfig, + blockDBPath string, + logger logging.Logger, + reg prometheus.Registerer, +) *BlockDatabase { + return &BlockDatabase{ + stateDB: stateDB, + Database: chainDB, + bCache: newBlockCache(), + blockDBPath: blockDBPath, + config: config, + reg: reg, + logger: logger, + } +} + +// InitWithStateSync initializes the height-indexed databases with the +// appropriate minimum height based on existing configuration and state sync settings. +// +// Initialization cases (in order of precedence): +// 1. Databases already exist → loads with existing min height +// 2. Data to migrate exists → initializes with min block height to migrate +// 3. No data to migrate + state sync enabled → defers initialization +// 4. No data to migrate + state sync disabled → initializes with min height 1 +// +// Returns true if databases were initialized, false otherwise. +func (db *BlockDatabase) InitWithStateSync(stateSyncEnabled bool) (bool, error) { + minHeight, err := getDatabaseMinHeight(db.stateDB) + if err != nil { + return false, err + } + + // Databases already exist, load with existing min height + if minHeight != nil { + if err := db.InitWithMinHeight(*minHeight); err != nil { + return false, err + } + return true, nil + } + + // Data to migrate exists, initialize with min block height to migrate + minMigrateHeight := minBlockHeightToMigrate(db.Database) + if minMigrateHeight != nil { + if err := db.InitWithMinHeight(*minMigrateHeight); err != nil { + return false, err + } + return true, nil + } + + // No data to migrate and state sync disabled, initialize with min height 1 + if !stateSyncEnabled { + // Genesis block is not stored in height-indexed databases to avoid + // min height complexity with different node types (pruned vs archive). + if err := db.InitWithMinHeight(1); err != nil { + return false, err + } + return true, nil + } + return false, nil +} + +// InitWithMinHeight initializes the height-indexed databases with the provided minimum height. +func (db *BlockDatabase) InitWithMinHeight(minHeight uint64) error { + if db.initialized { + log.Warn("InitWithMinHeight called on a block database that is already initialized") + return nil + } + log.Info("Initializing block database with min height", "minHeight", minHeight) + + if err := db.stateDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { + return err + } + blockDB, err := db.createMeteredBlockDatabase("blockdb", minHeight) + if err != nil { + return err + } + db.blockDB = blockDB + receiptsDB, err := db.createMeteredBlockDatabase("receiptsdb", minHeight) + if err != nil { + return err + } + db.receiptsDB = receiptsDB + + if err := db.initMigrator(); err != nil { + return fmt.Errorf("failed to init migrator: %w", err) + } + + db.initialized = true + db.minHeight = minHeight + return nil +} + +// Migrate migrates block headers, bodies, and receipts from ethDB to the block databases. +func (db *BlockDatabase) Migrate() error { + if !db.initialized { + return errors.New("block database must be initialized before migrating") + } + return db.migrator.Migrate() +} + +func (db *BlockDatabase) Put(key []byte, value []byte) error { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Put(key, value) + } + + blockNumber, blockHash := blockNumberAndHashFromKey(key) + + if isReceiptKey(key) { + return db.writeReceipts(blockNumber, blockHash, value) + } + + db.bCache.save(key, blockHash, value) + if bodyData, headerData, ok := db.bCache.get(blockHash); ok { + db.bCache.clear(blockHash) + return db.writeBlock(blockNumber, blockHash, headerData, bodyData) + } + return nil +} + +func (db *BlockDatabase) Get(key []byte) ([]byte, error) { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Get(key) + } + blockNumber, blockHash := blockNumberAndHashFromKey(key) + + if isReceiptKey(key) { + return db.getReceipts(key, blockNumber, blockHash) + } + return db.getBlock(key, blockNumber, blockHash) +} + +func (db *BlockDatabase) Has(key []byte) (bool, error) { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Has(key) + } + data, err := db.Get(key) + if err != nil { + return false, err + } + return data != nil, nil +} + +func (db *BlockDatabase) Delete(key []byte) error { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Delete(key) + } + + // no-op since deleting written blocks is not supported + return nil +} + +func (db *BlockDatabase) Close() error { + if db.migrator != nil { + db.migrator.Stop() + } + if db.blockDB != nil { + err := db.blockDB.Close() + if err != nil { + return err + } + } + if db.receiptsDB != nil { + err := db.receiptsDB.Close() + if err != nil { + return err + } + } + return db.Database.Close() +} + +func (db *BlockDatabase) initMigrator() error { + if db.migrator != nil { + return nil + } + migratorStateDB := prefixdb.New(migratorDBPrefix, db.stateDB) + migrator, err := NewBlockDatabaseMigrator(migratorStateDB, db.blockDB, db.receiptsDB, db.Database) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +func (db *BlockDatabase) createMeteredBlockDatabase(namespace string, minHeight uint64) (database.HeightIndex, error) { + path := filepath.Join(db.blockDBPath, namespace) + config := db.config.WithDir(path).WithMinimumHeight(minHeight) + newDB, err := blockdb.New(config, db.logger) + if err != nil { + return nil, fmt.Errorf("failed to create block database at %s: %w", path, err) + } + + meteredDB, err := meterdb.New(db.reg, namespace, newDB) + if err != nil { + if err := newDB.Close(); err != nil { + return nil, fmt.Errorf("failed to close block database: %w", err) + } + return nil, fmt.Errorf("failed to create metered %s database: %w", namespace, err) + } + + return meteredDB, nil +} + +func (db *BlockDatabase) getReceipts(key []byte, blockNumber uint64, blockHash common.Hash) ([]byte, error) { + encodedReceiptData, err := db.receiptsDB.Get(blockNumber) + if err != nil { + if errors.Is(err, database.ErrNotFound) && db.isMigrationNeeded() { + return db.Database.Get(key) + } + return nil, err + } + + decodedHash, receiptData, err := decodeReceiptData(encodedReceiptData) + if err != nil { + return nil, err + } + if decodedHash != blockHash { + return nil, nil + } + + return receiptData, nil +} + +func (db *BlockDatabase) getBlock(key []byte, blockNumber uint64, blockHash common.Hash) ([]byte, error) { + encodedBlock, err := db.blockDB.Get(blockNumber) + if err != nil { + if errors.Is(err, database.ErrNotFound) && db.isMigrationNeeded() { + return db.Database.Get(key) + } + return nil, err + } + + decodedHash, header, body, err := decodeBlockData(encodedBlock) + if err != nil { + return nil, err + } + if decodedHash != blockHash { + return nil, nil + } + + if isBodyKey(key) { + return body, nil + } + if isHeaderKey(key) { + return header, nil + } + + return nil, fmt.Errorf("unexpected key for block data: %s", key) +} + +func (db *BlockDatabase) isMigrationNeeded() bool { + return db.migrator != nil && db.migrator.Status() != migrationCompleted +} + +func (db *BlockDatabase) writeReceipts(blockNumber uint64, blockHash common.Hash, data []byte) error { + encodedReceipts, err := encodeReceiptData(data, blockHash) + if err != nil { + return err + } + return db.receiptsDB.Put(blockNumber, encodedReceipts) +} + +func (db *BlockDatabase) writeBlock(blockNumber uint64, blockHash common.Hash, header []byte, body []byte) error { + encodedBlock, err := encodeBlockData(header, body, blockHash) + if err != nil { + return err + } + return db.blockDB.Put(blockNumber, encodedBlock) +} + +func (db *BlockDatabase) shouldWriteToBlockDatabase(key []byte) bool { + if !db.initialized { + return false + } + + isTargetKey := isBodyKey(key) || isHeaderKey(key) || isReceiptKey(key) + if !isTargetKey { + return false + } + + // Only blocks with height >= min height are stored in block database + if blockNumber, _ := blockNumberAndHashFromKey(key); blockNumber < db.minHeight { + return false + } + + return true +} + +func (db *BlockDatabase) NewBatch() ethdb.Batch { + return blockDatabaseBatch{ + database: db, + Batch: db.Database.NewBatch(), + } +} + +func (db *BlockDatabase) NewBatchWithSize(size int) ethdb.Batch { + return blockDatabaseBatch{ + database: db, + Batch: db.Database.NewBatchWithSize(size), + } +} + +func (batch blockDatabaseBatch) Put(key []byte, value []byte) error { + if !batch.database.shouldWriteToBlockDatabase(key) { + return batch.Batch.Put(key, value) + } + return batch.database.Put(key, value) +} + +func (batch blockDatabaseBatch) Delete(key []byte) error { + if !batch.database.shouldWriteToBlockDatabase(key) { + return batch.Batch.Delete(key) + } + return batch.database.Delete(key) +} + +func blockNumberAndHashFromKey(key []byte) (uint64, common.Hash) { + var prefixLen int + switch { + case isBodyKey(key): + prefixLen = len(chainDBBlockBodyPrefix) + case isHeaderKey(key): + prefixLen = len(chainDBHeaderPrefix) + case isReceiptKey(key): + prefixLen = len(chainDBReceiptsPrefix) + } + blockNumber := binary.BigEndian.Uint64(key[prefixLen : prefixLen+8]) + blockHash := common.BytesToHash(key[prefixLen+blockNumberSize:]) + return blockNumber, blockHash +} + +func decodeBlockData(encodedBlock []byte) (hash common.Hash, header []byte, body []byte, err error) { + var blockData [][]byte + if err := rlp.DecodeBytes(encodedBlock, &blockData); err != nil { + return common.Hash{}, nil, nil, err + } + + if len(blockData) != blockDataElements { + return common.Hash{}, nil, nil, fmt.Errorf( + "invalid block data format: expected %d elements, got %d", + blockDataElements, + len(blockData), + ) + } + + return common.BytesToHash(blockData[0]), blockData[1], blockData[2], nil +} + +func encodeBlockData(header []byte, body []byte, blockHash common.Hash) ([]byte, error) { + return rlp.EncodeToBytes([][]byte{blockHash.Bytes(), header, body}) +} + +func encodeReceiptData(receipts []byte, blockHash common.Hash) ([]byte, error) { + return rlp.EncodeToBytes([][]byte{blockHash.Bytes(), receipts}) +} + +func decodeReceiptData(encodedReceipts []byte) (common.Hash, []byte, error) { + var receiptData [][]byte + if err := rlp.DecodeBytes(encodedReceipts, &receiptData); err != nil { + return common.Hash{}, nil, err + } + + if len(receiptData) != receiptDataElements { + return common.Hash{}, nil, fmt.Errorf( + "invalid receipt data format: expected %d elements, got %d", + receiptDataElements, + len(receiptData), + ) + } + + return common.BytesToHash(receiptData[0]), receiptData[1], nil +} + +func encodeBlockNumber(number uint64) []byte { + enc := make([]byte, blockNumberSize) + binary.BigEndian.PutUint64(enc, number) + return enc +} + +func isBodyKey(key []byte) bool { + if len(key) != len(chainDBBlockBodyPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBBlockBodyPrefix) +} + +func isHeaderKey(key []byte) bool { + if len(key) != len(chainDBHeaderPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBHeaderPrefix) +} + +func isReceiptKey(key []byte) bool { + if len(key) != len(chainDBReceiptsPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBReceiptsPrefix) +} + +func getDatabaseMinHeight(db database.Database) (*uint64, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return nil, err + } + if !has { + return nil, nil + } + minHeightBytes, err := db.Get(blockDBMinHeightKey) + if err != nil { + return nil, err + } + minHeight := binary.BigEndian.Uint64(minHeightBytes) + return &minHeight, nil +} + +// IsBlockDatabaseCreated checks if block database has already been created +func IsBlockDatabaseCreated(db database.Database) (bool, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return false, err + } + return has, nil +} diff --git a/plugin/evm/database/block_database_migration_test.go b/plugin/evm/database/block_database_migration_test.go new file mode 100644 index 0000000000..62268d88e9 --- /dev/null +++ b/plugin/evm/database/block_database_migration_test.go @@ -0,0 +1,355 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "path/filepath" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +// Tests migration status is correctly set to "not started" after initializing a new migrator. +func TestBlockDatabaseMigrator_StatusNotStarted(t *testing.T) { + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationNotStarted) +} + +// Tests migration status transitions to "in progress" while a migration is actively running. +func TestBlockDatabaseMigrator_StatusInProgress(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + blocks, receipts := createBlocks(t, 10) + writeBlocks(chainDB, blocks, receipts) + + // migrate with a slow block database + slowDB := &slowBlockDatabase{ + HeightIndex: wrapper.blockDB, + shouldSlow: func() bool { + return true + }, + } + wrapper.migrator.blockDB = slowDB + require.NoError(t, wrapper.migrator.Migrate()) + + // Wait for it to be in progress + require.Eventually(t, func() bool { + return wrapper.migrator.Status() == migrationInProgress + }, 2*time.Second, 100*time.Millisecond) + + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationInProgress) +} + +// Tests migration status is correctly set to "completed" after a successful migration. +func TestBlockDatabaseMigrator_StatusCompleted(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + blocks, receipts := createBlocks(t, 3) + writeBlocks(chainDB, blocks, receipts) + wrapper.migrator.blockDB = wrapper.blockDB + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 5*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationCompleted) +} + +func TestBlockDatabaseMigrator_Migration(t *testing.T) { + testCases := []struct { + name string + initStatus migrationStatus + toMigrateHeights []uint64 + migratedHeights []uint64 + expStatus migrationStatus + }{ + { + name: "migrate_5_blocks", + toMigrateHeights: []uint64{0, 1, 2, 3, 4}, + expStatus: migrationCompleted, + }, + { + name: "migrate_5_blocks_20_24", + toMigrateHeights: []uint64{20, 21, 22, 23, 24}, + expStatus: migrationCompleted, + }, + { + name: "migrate_non_consecutive_blocks", + toMigrateHeights: []uint64{20, 21, 22, 29, 30, 40}, + expStatus: migrationCompleted, + }, + { + name: "half_blocks_migrated", + initStatus: migrationInProgress, + toMigrateHeights: []uint64{6, 7, 8, 9, 10}, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + expStatus: migrationCompleted, + }, + { + name: "all_blocks_migrated_in_progress", + initStatus: migrationInProgress, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + expStatus: migrationCompleted, + }, + { + name: "empty_chainDB_no_blocks_to_migrate", + expStatus: migrationCompleted, + }, + { + name: "half_non_consecutive_blocks_migrated", + initStatus: migrationInProgress, + toMigrateHeights: []uint64{2, 3, 7, 8, 10}, + migratedHeights: []uint64{0, 1, 4, 5, 9}, + expStatus: migrationCompleted, + }, + { + name: "migration_already_completed", + initStatus: migrationCompleted, + migratedHeights: []uint64{0, 1, 2}, + expStatus: migrationCompleted, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + // find the max block height to create + maxBlockHeight := uint64(0) + heightSets := [][]uint64{tc.toMigrateHeights, tc.migratedHeights} + for _, heights := range heightSets { + for _, height := range heights { + if height > maxBlockHeight { + maxBlockHeight = height + } + } + } + + // create blocks and receipts + blocks, receipts := createBlocks(t, int(maxBlockHeight)+1) + + // set initial state + if tc.initStatus != migrationNotStarted { + require.NoError(t, wrapper.migrator.setStatus(tc.initStatus)) + } + for _, height := range tc.toMigrateHeights { + writeBlocks(chainDB, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + for _, height := range tc.migratedHeights { + writeBlocks(wrapper, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + + // Migrate database + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, tc.expStatus) + + // Verify that all blocks and receipts are accessible from the block wrapper + totalBlocks := len(tc.toMigrateHeights) + len(tc.migratedHeights) + allExpectedBlocks := make(map[uint64]*types.Block, totalBlocks) + allExpectedReceipts := make(map[uint64]types.Receipts, totalBlocks) + for _, heights := range heightSets { + for _, height := range heights { + allExpectedBlocks[height] = blocks[height] + allExpectedReceipts[height] = receipts[height] + } + } + require.Len(t, allExpectedBlocks, totalBlocks) + for blockNum, expectedBlock := range allExpectedBlocks { + actualBlock := rawdb.ReadBlock(wrapper, expectedBlock.Hash(), blockNum) + require.NotNil(t, actualBlock, "Block %d should be accessible", blockNum) + assertRLPEqual(t, expectedBlock, actualBlock) + expectedReceipts := allExpectedReceipts[blockNum] + actualReceipts := rawdb.ReadReceipts(wrapper, expectedBlock.Hash(), blockNum, expectedBlock.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, actualReceipts) + + // verify chainDB no longer has any blocks or receipts (except for genesis) + if expectedBlock.NumberU64() != 0 { + require.False(t, rawdb.HasHeader(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64()), "Block %d should not be in chainDB", expectedBlock.NumberU64()) + require.False(t, rawdb.HasBody(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64())) + require.False(t, rawdb.HasReceipts(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64())) + } + } + }) + } +} + +// Tests that a migration in progress can be stopped and resumed, verifying +// that the migration state is properly persisted and that the migration +// can continue from where it left off after restart. +func TestBlockDatabaseMigrator_AbruptStop(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + if wrapper != nil { + require.NoError(t, wrapper.Close()) + } + }) + + // Create blocks and write them to chainDB + blocks, receipts := createBlocks(t, 10) + writeBlocks(chainDB, blocks, receipts) + + // Create a slow block database that slows down after 3 blocks + blockCount := 0 + slowBdb := &slowBlockDatabase{ + HeightIndex: wrapper.blockDB, + shouldSlow: func() bool { + blockCount++ + return blockCount > 3 + }, + } + + // Start migration with slow block database and wait for 3 blocks to be migrated + wrapper.migrator.blockDB = slowBdb + require.NoError(t, wrapper.migrator.Migrate()) + require.Eventually(t, func() bool { + return wrapper.migrator.blocksProcessed() >= 3 + }, 10*time.Second, 100*time.Millisecond) + require.NoError(t, wrapper.Close()) + + // Create new wrapper and verify migration status + wrapper, chainDB = newDatabasesFromDir(t, dataDir) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationInProgress) + + // Check blockDB and chainDB to ensure we have the correct amount of blocks + chainDBBlocks := 0 + blockdbBlocks := 0 + wrapperBlocks := 0 + for _, block := range blocks { + if rawdb.HasHeader(chainDB, block.Hash(), block.NumberU64()) { + chainDBBlocks++ + } + if has, _ := wrapper.blockDB.Has(block.NumberU64()); has { + blockdbBlocks++ + } + if rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64()) { + wrapperBlocks++ + } + } + require.Equal(t, 10, chainDBBlocks+blockdbBlocks) + require.Equal(t, 10, wrapperBlocks) + + // Start migration again and finish + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 15*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationCompleted) + + // Verify that all blocks are accessible from the new wrapper + for i, block := range blocks { + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + require.NotNil(t, actualBlock, "Block %d should be accessible after resumption", block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, receipts[i], actualReceipts) + + // chainDB no longer has blocks except for genesis + if block.NumberU64() != 0 { + require.False(t, rawdb.HasHeader(chainDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(chainDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasReceipts(chainDB, block.Hash(), block.NumberU64())) + } + } +} + +// Test that the genesis block is not migrated; the rest of the blocks should be migrated. +func TestBlockDatabaseMigrator_Genesis(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(WrapDatabase(base)) + blocks, receipts := createBlocks(t, 10) + + // Write only genesis and block 5-9 + writeBlocks(chainDB, blocks[0:1], receipts[0:1]) + writeBlocks(chainDB, blocks[5:10], receipts[5:10]) + + blockDBPath := filepath.Join(dataDir, "blockdb") + wrapper := NewBlockDatabase(base, chainDB, blockdb.DefaultConfig(), blockDBPath, logging.NoLog{}, prometheus.NewRegistry()) + initialized, err := wrapper.InitWithStateSync(false) + require.NoError(t, err) + require.True(t, initialized) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + // validate our min height is 5 since we don't store genesis block and first + // block to migrate is block 5. + require.True(t, wrapper.initialized) + require.Equal(t, uint64(5), wrapper.minHeight) + + // migrate and wait for completion + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + + // verify genesis block is not migrated + genesisHash := rawdb.ReadCanonicalHash(chainDB, 0) + require.True(t, rawdb.HasHeader(chainDB, genesisHash, 0)) + has, _ := wrapper.blockDB.Has(0) + require.False(t, has) + + // verify blocks 1-4 are missing + for i := 1; i < 5; i++ { + hash := rawdb.ReadCanonicalHash(wrapper, uint64(i)) + require.Equal(t, common.Hash{}, hash) + } + + // verify blocks 5-9 are migrated + for _, expectedBlock := range blocks[5:10] { + actualBlock := rawdb.ReadBlock(wrapper, expectedBlock.Hash(), expectedBlock.NumberU64()) + require.NotNil(t, actualBlock) + assertRLPEqual(t, expectedBlock, actualBlock) + has, err := wrapper.blockDB.Has(expectedBlock.NumberU64()) + require.NoError(t, err) + require.True(t, has) + } +} + +func waitForMigratorCompletion(t *testing.T, migrator *blockDatabaseMigrator, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for time.Now().Before(deadline) { + if migrator.Status() == migrationCompleted { + return + } + <-ticker.C + } + + require.Failf(t, "migration did not complete within timeout", "timeout: %v", timeout) +} + +func assertMigrationStatus(t *testing.T, db database.Database, migrator *blockDatabaseMigrator, expectedStatus migrationStatus) { + t.Helper() + + require.Equal(t, expectedStatus, migrator.Status(), "migrator status should match expected") + diskStatus, err := getMigrationStatus(db) + require.NoError(t, err) + require.Equal(t, expectedStatus, diskStatus, "disk database status should match expected") +} diff --git a/plugin/evm/database/block_database_migrator.go b/plugin/evm/database/block_database_migrator.go new file mode 100644 index 0000000000..f15b309327 --- /dev/null +++ b/plugin/evm/database/block_database_migrator.go @@ -0,0 +1,506 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils/timer" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/log" + "github.com/ava-labs/libevm/rlp" +) + +type migrationStatus int + +const ( + migrationNotStarted migrationStatus = iota + migrationInProgress + migrationCompleted + + logProgressInterval = 5 * time.Minute // Log every 5 minutes + compactionInterval = 250_000 // Compact every 250k blocks processed + batchDeleteInterval = 25_000 // Write delete batch every 25k blocks +) + +var ( + // migrationStatusKey stores the persisted progress state for the migrator. + migrationStatusKey = []byte("migration_status") + + // endBlockNumberKey stores the target block number to migrate to. + endBlockNumberKey = []byte("migration_end_block_number") +) + +// blockDatabaseMigrator migrates canonical block data and receipts from +// ethdb.Database into the height-indexed block and receipt databases. +type blockDatabaseMigrator struct { + stateDB database.Database + chainDB ethdb.Database + blockDB database.HeightIndex + receiptsDB database.HeightIndex + + status migrationStatus + mu sync.RWMutex // protects status/running/cancel + running bool + cancel context.CancelFunc + wg sync.WaitGroup + + processed uint64 + endHeight uint64 +} + +// NewBlockDatabaseMigrator creates a new block database migrator with +// current migration status and target migration end block number. +func NewBlockDatabaseMigrator( + stateDB database.Database, + blockDB database.HeightIndex, + receiptsDB database.HeightIndex, + chainDB ethdb.Database, +) (*blockDatabaseMigrator, error) { + m := &blockDatabaseMigrator{ + blockDB: blockDB, + receiptsDB: receiptsDB, + stateDB: stateDB, + chainDB: chainDB, + } + + // load status + status, err := getMigrationStatus(stateDB) + if err != nil { + return nil, err + } + m.status = status + + // load end block height + endHeight, err := getEndBlockHeight(stateDB) + if err != nil { + return nil, err + } + + if endHeight == 0 { + if endHeight, err = loadAndSaveBlockEndHeight(stateDB, chainDB); err != nil { + return nil, err + } + } + m.endHeight = endHeight + + return m, nil +} + +func (b *blockDatabaseMigrator) Status() migrationStatus { + b.mu.RLock() + defer b.mu.RUnlock() + return b.status +} + +func (b *blockDatabaseMigrator) Stop() { + b.mu.Lock() + cancel := b.cancel + b.mu.Unlock() + if cancel != nil { + cancel() + // Wait for migration goroutine to finish cleanup + b.wg.Wait() + } +} + +func (b *blockDatabaseMigrator) Migrate() error { + if b.status == migrationCompleted { + return nil + } + ctx, err := b.beginRun() + if err != nil { + return err + } + + if err := b.setStatus(migrationInProgress); err != nil { + b.endRun() + return err + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer b.endRun() + if err := b.run(ctx); err != nil && !errors.Is(err, context.Canceled) { + log.Info("migration failed", "err", err) + } + }() + return nil +} + +func (b *blockDatabaseMigrator) beginRun() (context.Context, error) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.running { + return nil, errors.New("migration already running") + } + ctx, cancel := context.WithCancel(context.Background()) + b.cancel = cancel + b.running = true + return ctx, nil +} + +func (b *blockDatabaseMigrator) endRun() { + b.mu.Lock() + defer b.mu.Unlock() + b.cancel = nil + b.running = false +} + +func (b *blockDatabaseMigrator) setStatus(s migrationStatus) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.status == s { + return nil + } + if err := b.stateDB.Put(migrationStatusKey, []byte{byte(s)}); err != nil { + return err + } + b.status = s + return nil +} + +func (b *blockDatabaseMigrator) run(ctx context.Context) error { + var ( + etaTarget uint64 + etaTracker = timer.NewEtaTracker(10, 1.2) + startTime = time.Now() + timeOfNextLog = startTime.Add(logProgressInterval) + deleteBatch = b.chainDB.NewBatch() + lastCompactionNum uint64 + firstBlockInRange uint64 + lastBlockInRange uint64 + // Iterate over block bodies instead of headers since there are keys + // under the header prefix that we are not migrating. + iter = b.chainDB.NewIterator(chainDBBlockBodyPrefix, nil) + ) + + // Defer cleanup logic + defer func() { + // Release iterator (safe to call multiple times) + iter.Release() + + // Write any remaining deletes in batch + if deleteBatch.ValueSize() > 0 { + if err := deleteBatch.Write(); err != nil { + log.Error("failed to write final delete batch", "err", err) + } + } + + // Compact final range if we processed any blocks + if lastBlockInRange > 0 { + b.compactBlockRange(firstBlockInRange, lastBlockInRange) + } + + // Log final statistics + processingTime := time.Since(startTime) + log.Info("blockdb migration completed", + "blocks_processed", atomic.LoadUint64(&b.processed), + "total_processing_time", processingTime.String()) + }() + + log.Info("blockdb migration started") + + // iterator will iterate all block headers in ascending order by block number + for iter.Next() { + // Check if migration should be stopped + select { + case <-ctx.Done(): + log.Info("migration stopped", "blocks_processed", atomic.LoadUint64(&b.processed)) + return ctx.Err() + default: + // Continue with migration + } + + key := iter.Key() + if !shouldMigrateKey(b.chainDB, key) { + continue + } + + blockNum, hash := blockNumberAndHashFromKey(key) + + if etaTarget == 0 && b.endHeight > 0 && blockNum < b.endHeight { + etaTarget = b.endHeight - blockNum + etaTracker.AddSample(0, etaTarget, startTime) + } + + // Track the range of blocks for compaction + if firstBlockInRange == 0 { + firstBlockInRange = blockNum + } + lastBlockInRange = blockNum + + // Migrate block data (header + body) + if err := b.migrateBlock(blockNum, hash, iter.Value()); err != nil { + return fmt.Errorf("failed to migrate block data: %w", err) + } + + // Migrate receipts + if err := b.migrateReceipts(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate receipt data: %w", err) + } + + // Add deletes to batch + if err := b.deleteBlock(deleteBatch, blockNum, hash); err != nil { + return fmt.Errorf("failed to add block deletes to batch: %w", err) + } + + processed := atomic.AddUint64(&b.processed, 1) + + // Write delete batch every batchDeleteInterval blocks + if processed%batchDeleteInterval == 0 { + if err := deleteBatch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch: %w", err) + } + deleteBatch.Reset() + } + + // Compact every compactionInterval blocks + if processed-lastCompactionNum >= compactionInterval { + log.Info("compaction interval reached, releasing iterator and compacting", + "blocks_since_last_compaction", processed-lastCompactionNum, + "total_processed", processed) + + // Write any remaining deletes in batch before compaction + if deleteBatch.ValueSize() > 0 { + if err := deleteBatch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch before compaction: %w", err) + } + deleteBatch.Reset() + } + + // Release iterator before compaction + iter.Release() + + // Compact the range we just processed + b.compactBlockRange(firstBlockInRange, lastBlockInRange) + + // Recreate iterator + start := encodeBlockNumber(blockNum + 1) + iter = b.chainDB.NewIterator(chainDBBlockBodyPrefix, start) + lastCompactionNum = processed + firstBlockInRange = 0 + } + + // Log progress every logProgressInterval + if now := time.Now(); now.After(timeOfNextLog) { + logFields := []interface{}{ + "blocks_processed", processed, + "last_processed_height", blockNum, + "time_elapsed", time.Since(startTime), + } + if b.endHeight != 0 && etaTarget > 0 { + etaPtr, progressPercentage := etaTracker.AddSample(processed, etaTarget, now) + if etaPtr != nil { + logFields = append(logFields, "eta", etaPtr.String()) + logFields = append(logFields, "pctComplete", progressPercentage) + } + } + + log.Info("blockdb migration status", logFields...) + timeOfNextLog = now.Add(logProgressInterval) + } + } + + if iter.Error() != nil { + return fmt.Errorf("failed to iterate over chainDB: %w", iter.Error()) + } + + if err := b.setStatus(migrationCompleted); err != nil { + log.Error("failed to save completed migration status", "err", err) + } + + return nil +} + +func (b *blockDatabaseMigrator) compactBlockRange(startBlock, endBlock uint64) { + startTime := time.Now() + + // Compact block headers + startHeaderKey := blockHeaderKey(startBlock, common.Hash{}) + endHeaderKey := blockHeaderKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startHeaderKey, endHeaderKey); err != nil { + log.Error("failed to compact block headers in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + // Compact block bodies + startBodyKey := blockBodyKey(startBlock, common.Hash{}) + endBodyKey := blockBodyKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startBodyKey, endBodyKey); err != nil { + log.Error("failed to compact block bodies in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + // Compact receipts for this range + startReceiptsKey := receiptsKey(startBlock, common.Hash{}) + endReceiptsKey := receiptsKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startReceiptsKey, endReceiptsKey); err != nil { + log.Error("failed to compact receipts in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + log.Info("compaction of block range completed", + "start_block", startBlock, + "end_block", endBlock, + "duration", time.Since(startTime)) +} + +func (b *blockDatabaseMigrator) migrateBlock(blockNum uint64, hash common.Hash, bodyBytes []byte) error { + header := rawdb.ReadHeader(b.chainDB, hash, blockNum) + headerBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + encodedBlock, err := encodeBlockData(headerBytes, bodyBytes, hash) + if err != nil { + return fmt.Errorf("failed to encode block data: %w", err) + } + if err := b.blockDB.Put(blockNum, encodedBlock); err != nil { + return fmt.Errorf("failed to write block to blockDB: %w", err) + } + return nil +} + +func (b *blockDatabaseMigrator) migrateReceipts(blockNum uint64, hash common.Hash) error { + // Read raw receipt bytes directly from chainDB + receiptBytes := rawdb.ReadReceiptsRLP(b.chainDB, hash, blockNum) + if receiptBytes == nil { + // No receipts for this block, skip + return nil + } + + encodedReceipts, err := encodeReceiptData(receiptBytes, hash) + if err != nil { + return fmt.Errorf("failed to encode receipts with hash: %w", err) + } + if err := b.receiptsDB.Put(blockNum, encodedReceipts); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + + return nil +} + +// deleteBlock adds delete operations for a block to the provided batch +func (b *blockDatabaseMigrator) deleteBlock(batch ethdb.Batch, blockNum uint64, hash common.Hash) error { + headerKey := blockHeaderKey(blockNum, hash) + if err := batch.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from chainDB: %w", err) + } + rawdb.DeleteBody(batch, hash, blockNum) + rawdb.DeleteReceipts(batch, hash, blockNum) + + return nil +} + +func (b *blockDatabaseMigrator) blocksProcessed() uint64 { + return atomic.LoadUint64(&b.processed) +} + +func getMigrationStatus(db database.Database) (migrationStatus, error) { + var status migrationStatus + has, err := db.Has(migrationStatusKey) + if err != nil { + return status, err + } + if !has { + return status, nil + } + b, err := db.Get(migrationStatusKey) + if err != nil { + return status, err + } + if len(b) != 1 { + return status, fmt.Errorf("invalid migration status encoding length=%d", len(b)) + } + return migrationStatus(b[0]), nil +} + +func getEndBlockHeight(db database.Database) (uint64, error) { + has, err := db.Has(endBlockNumberKey) + if err != nil { + return 0, err + } + if !has { + return 0, nil + } + blockNumberBytes, err := db.Get(endBlockNumberKey) + if err != nil { + return 0, err + } + if len(blockNumberBytes) != blockNumberSize { + return 0, fmt.Errorf("invalid block number encoding length=%d", len(blockNumberBytes)) + } + return binary.BigEndian.Uint64(blockNumberBytes), nil +} + +func loadAndSaveBlockEndHeight(stateDB database.Database, chainDB ethdb.Database) (uint64, error) { + headHash := rawdb.ReadHeadHeaderHash(chainDB) + if headHash == (common.Hash{}) { + return 0, nil + } + headBlockNumber := rawdb.ReadHeaderNumber(chainDB, headHash) + if headBlockNumber == nil || *headBlockNumber == 0 { + return 0, nil + } + endHeight := *headBlockNumber + if err := stateDB.Put(endBlockNumberKey, encodeBlockNumber(endHeight)); err != nil { + return 0, fmt.Errorf("failed to save head block number %d: %w", endHeight, err) + } + log.Info("blockdb migration: saved head block number", "head_block_number", endHeight) + return endHeight, nil +} + +func shouldMigrateKey(db ethdb.Database, key []byte) bool { + if !isBodyKey(key) { + return false + } + blockNum, hash := blockNumberAndHashFromKey(key) + + // Skip genesis blocks to avoid complicating state-sync min-height handling. + if blockNum == 0 { + return false + } + + canonicalHash := rawdb.ReadCanonicalHash(db, blockNum) + return canonicalHash == hash +} + +// blockHeaderKey = headerPrefix + num (uint64 big endian) + hash +func blockHeaderKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBHeaderPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// blockBodyKey = bodyPrefix + num (uint64 big endian) + hash +func blockBodyKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBBlockBodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// receiptsKey = receiptsPrefix + num (uint64 big endian) + hash +func receiptsKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// minBlockHeightToMigrate returns the smallest block number that should be migrated. +func minBlockHeightToMigrate(db ethdb.Database) *uint64 { + iter := db.NewIterator(chainDBBlockBodyPrefix, nil) + defer iter.Release() + for iter.Next() { + key := iter.Key() + if !shouldMigrateKey(db, key) { + continue + } + blockNum, _ := blockNumberAndHashFromKey(key) + return &blockNum + } + return nil +} diff --git a/plugin/evm/database/block_database_test.go b/plugin/evm/database/block_database_test.go new file mode 100644 index 0000000000..08a3dc1e0f --- /dev/null +++ b/plugin/evm/database/block_database_test.go @@ -0,0 +1,676 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package database + +import ( + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/coreth/consensus/dummy" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/coreth/plugin/evm/customtypes" +) + +func TestMain(m *testing.M) { + customtypes.Register() + params.RegisterExtras() + os.Exit(m.Run()) +} + +// slowBlockDatabase wraps a BlockDatabase to add artificial delays +type slowBlockDatabase struct { + database.HeightIndex + shouldSlow func() bool +} + +func (s *slowBlockDatabase) Put(blockNumber uint64, encodedBlock []byte) error { + // Sleep to make migration hang for a bit + if s.shouldSlow == nil || s.shouldSlow() { + time.Sleep(100 * time.Millisecond) + } + return s.HeightIndex.Put(blockNumber, encodedBlock) +} + +var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = crypto.PubkeyToAddress(key2.PublicKey) +) + +func newDatabasesFromDir(t *testing.T, dataDir string) (*BlockDatabase, ethdb.Database) { + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(WrapDatabase(base)) + + // Create wrapped block database + blockDBPath := filepath.Join(dataDir, "blockdb") + wrapper := NewBlockDatabase(base, chainDB, blockdb.DefaultConfig(), blockDBPath, logging.NoLog{}, prometheus.NewRegistry()) + wrapper.InitWithMinHeight(1) + + return wrapper, chainDB +} + +func createBlocks(t *testing.T, numBlocks int) ([]*types.Block, []types.Receipts) { + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Number: 0, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + db, blocks, receipts, err := core.GenerateChainWithGenesis(gspec, engine, numBlocks-1, 10, func(_ int, gen *core.BlockGen) { + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &addr2, + Gas: 500000, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + + // add genesis block from db to blocks + genesisHash := rawdb.ReadCanonicalHash(db, 0) + genesisBlock := rawdb.ReadBlock(db, genesisHash, 0) + genesisReceipts := rawdb.ReadReceipts(db, genesisHash, 0, 0, params.TestChainConfig) + blocks = append([]*types.Block{genesisBlock}, blocks...) + receipts = append([]types.Receipts{genesisReceipts}, receipts...) + + return blocks, receipts +} + +func writeBlocks(db ethdb.Database, blocks []*types.Block, receipts []types.Receipts) { + for i, block := range blocks { + rawdb.WriteBlock(db, block) + if len(receipts) > 0 { + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) + } + + // ensure written blocks are canonical + rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) + } +} + +// todo: break this down into read blocks and test everything +// make sure to include reading genesis block +func TestBlockDatabase_Read(t *testing.T) { + // Test that BlockDatabase reads block data correctly + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(wrapper, blocks, receipts) + + testCases := []struct { + name string + test func(t *testing.T, block *types.Block, blockReceipts types.Receipts) + }{ + { + name: "header", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + }, + }, + { + name: "body", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + body := rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Body(), body) + }, + }, + { + name: "block", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + }, + }, + { + name: "receipts_and_logs", + test: func(t *testing.T, block *types.Block, blockReceipts types.Receipts) { + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), block.NumberU64())) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, recs) + + logs := rawdb.ReadLogs(wrapper, block.Hash(), block.NumberU64()) + expectedLogs := make([][]*types.Log, len(blockReceipts)) + for j, receipt := range blockReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, logs) + }, + }, + { + name: "block_number", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Equal(t, block.NumberU64(), *blockNumber) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for i, block := range blocks { + blockReceipts := receipts[i] + tc.test(t, block, blockReceipts) + } + }) + } +} + +func TestBlockDatabase_Delete(t *testing.T) { + // Test BlockDatabase delete operations. + // We are verifying that block header, body and receipts cannot be deleted, + // but hash to height mapping should be deleted. + + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 4) + targetBlocks := blocks[1:] + targetReceipts := receipts[1:] + writeBlocks(wrapper, targetBlocks, targetReceipts) + + // delete block data + for i, block := range targetBlocks { + rawdb.DeleteBlock(wrapper, block.Hash(), block.NumberU64()) + + // we cannot delete header or body + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + body := rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Body(), body) + + // hash to height mapping should be deleted + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Nil(t, blockNumber) + + // Receipts and logs should not be deleted + expectedReceipts := targetReceipts[i] + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, recs) + logs := rawdb.ReadLogs(wrapper, block.Hash(), block.NumberU64()) + expectedLogs := make([][]*types.Log, len(expectedReceipts)) + for j, receipt := range expectedReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, logs) + } +} + +func TestBlockDatabase_WriteOrder_CachesUntilBoth(t *testing.T) { + // Test that partial writes (header or body only) are cached and not persisted + // until both header and body are available, then written as a single RLP block + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 2) + block := blocks[1] + + // Write header first - should be cached but not persisted + rawdb.WriteHeader(wrapper, block.Header()) + + // Verify header is not yet available (cached but not written to blockdb) + require.False(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.Nil(t, rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64())) + + // Verify underlying chainDB also has no header + require.Nil(t, rawdb.ReadHeader(chainDB, block.Hash(), block.NumberU64())) + + // Write body - should trigger writing both header and body to blockdb + rawdb.WriteBody(wrapper, block.Hash(), block.NumberU64(), block.Body()) + + // Now both should be available + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + + // Underlying chainDB should still have no header/body + require.Nil(t, rawdb.ReadHeader(chainDB, block.Hash(), block.NumberU64())) + require.Nil(t, rawdb.ReadBody(chainDB, block.Hash(), block.NumberU64())) +} + +func TestBlockDatabase_Batch(t *testing.T) { + // Test that batch operations work correctly for both writing and reading block data and receipts + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block := blocks[1] + blockReceipts := receipts[1] + + batch := wrapper.NewBatch() + + // Write header, body, and receipts to batch + rawdb.WriteBlock(batch, block) + rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), blockReceipts) + + // After writing both header and body to batch, both should be available immediately + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + + // Receipts should also be available immediately since they're stored separately + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), block.NumberU64())) + + // Before write, header number should not be available + require.Nil(t, rawdb.ReadHeaderNumber(wrapper, block.Hash())) + + // After Write(), verify header number is available + require.NoError(t, batch.Write()) + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Equal(t, block.NumberU64(), *blockNumber) +} + +func TestBlockDatabase_SameBlockWrites(t *testing.T) { + // Test that writing the same block twice via rawdb.WriteBlock doesn't cause issues + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 1) + block := blocks[0] + + // Write block twice + rawdb.WriteBlock(wrapper, block) + rawdb.WriteBlock(wrapper, block) + + // Verify block data is still correct after duplicate writes + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) +} + +func TestBlockDatabase_DifferentBlocksSameHeight(t *testing.T) { + // Test that writing different blocks to the same height overwrites the first block + // and reading by the first block's hash returns nothing. + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block1 := blocks[1] + receipt1 := receipts[1] + + // Manually create a second block with the same height but different content + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + _, blocks2, receipts2, err := core.GenerateChainWithGenesis(gspec, engine, 1, 10, func(_ int, gen *core.BlockGen) { + gen.OffsetTime(int64(5000)) + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &addr2, + Gas: 450000, + GasTipCap: big.NewInt(5), + GasFeeCap: big.NewInt(5), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + block2 := blocks2[0] + receipt2 := receipts2[0] + + // Ensure both blocks have the same height but different hashes + require.Equal(t, block1.NumberU64(), block2.NumberU64()) + require.NotEqual(t, block1.Hash(), block2.Hash()) + + // Write two blocks with the same height + writeBlocks(wrapper, []*types.Block{block1, block2}, []types.Receipts{receipt1, receipt2}) + + // Reading by the first block's hash does not return anything + require.False(t, rawdb.HasHeader(wrapper, block1.Hash(), block1.NumberU64())) + require.False(t, rawdb.HasBody(wrapper, block1.Hash(), block1.NumberU64())) + firstHeader := rawdb.ReadHeader(wrapper, block1.Hash(), block1.NumberU64()) + require.Nil(t, firstHeader) + firstBody := rawdb.ReadBody(wrapper, block1.Hash(), block1.NumberU64()) + require.Nil(t, firstBody) + require.False(t, rawdb.HasReceipts(wrapper, block1.Hash(), block1.NumberU64())) + firstReceipts := rawdb.ReadReceipts(wrapper, block1.Hash(), block1.NumberU64(), block1.Time(), params.TestChainConfig) + require.Nil(t, firstReceipts) + + // Reading by the second block's hash returns second block data + require.True(t, rawdb.HasHeader(wrapper, block2.Hash(), block2.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block2.Hash(), block2.NumberU64())) + secondBlock := rawdb.ReadBlock(wrapper, block2.Hash(), block2.NumberU64()) + assertRLPEqual(t, block2, secondBlock) + require.True(t, rawdb.HasReceipts(wrapper, block2.Hash(), block2.NumberU64())) + secondReceipts := rawdb.ReadReceipts(wrapper, block2.Hash(), block2.NumberU64(), block2.Time(), params.TestChainConfig) + assertRLPEqual(t, receipt2, secondReceipts) +} + +func TestBlockDatabase_EmptyReceipts(t *testing.T) { + // Test that blocks with no transactions (empty receipts) are handled correctly + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + + // Create blocks without any transactions (empty receipts) + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + _, blocks, receipts, err := core.GenerateChainWithGenesis(gspec, engine, 3, 10, func(_ int, _ *core.BlockGen) { + // Don't add any transactions - this will create blocks with empty receipts + }) + require.NoError(t, err) + + // Verify all blocks have empty receipts + for i := range blocks { + require.Empty(t, receipts[i], "Block %d should have empty receipts", i) + } + + // write some blocks to chain db and some to wrapper and trigger migration + writeBlocks(chainDB, blocks[:2], receipts[:2]) + writeBlocks(wrapper, blocks[2:], receipts[2:]) + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + + // Verify that blocks with empty receipts are handled correctly + for _, block := range blocks { + blockNum := block.NumberU64() + + // Block data should be accessible + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), blockNum)) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), blockNum)) + + // Receipts should only be stored in the receiptsDB + require.False(t, rawdb.HasReceipts(chainDB, block.Hash(), blockNum)) + _, err := wrapper.receiptsDB.Get(blockNum) + require.NoError(t, err) + + // to be consistent with ethdb behavior, empty receipts should return true for HasReceipts + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), blockNum)) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), blockNum, block.Time(), params.TestChainConfig) + require.Empty(t, recs) + logs := rawdb.ReadLogs(wrapper, block.Hash(), blockNum) + require.Empty(t, logs) + } +} + +func TestBlockDatabase_Close_PersistsData(t *testing.T) { + // Test that Close() properly closes both databases and data persists + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 1) + block := blocks[0] + blockReceipts := receipts[0] + + // Write block and receipts + writeBlocks(wrapper, blocks, receipts) + + // Verify data is present + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, recs) + + // Close the wrapper db + require.NoError(t, wrapper.Close()) + + // Test we should no longer be able to read the block + require.False(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + _, err := wrapper.blockDB.Get(block.NumberU64()) + require.Error(t, err) + require.ErrorIs(t, err, database.ErrClosed) + + // Reopen the database and verify data is still present + wrapper, _ = newDatabasesFromDir(t, dataDir) + persistedBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, persistedBlock) + persistedRecs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, persistedRecs) + require.NoError(t, wrapper.Close()) +} + +func TestBlockDatabase_ReadDuringMigration(t *testing.T) { + // Test that blocks are readable during migration for both migrated and un-migrated blocks. + // This test: + // 1. Generates 21 blocks with receipts + // 2. Adds first 20 blocks and their receipts to chainDB + // 3. Creates wrapper database with migration disabled + // 4. Start migration with slow block database to control migration speed + // 5. Waits for at least 5 blocks to be migrated + // 6. Writes block 21 during migration (this is fast) + // 7. Verifies all 21 blocks and their receipts are readable via rawdb using wrapper + + dataDir := t.TempDir() + // Create initial databases using newDatabasesFromDir + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 21) + + // Add first 20 blocks to KVDB (chainDB) - these will be migrated + writeBlocks(chainDB, blocks[:20], receipts[:20]) + + // Create a slow block database to control migration speed + blockCount := 0 + slowBdb := &slowBlockDatabase{ + HeightIndex: wrapper.blockDB, + shouldSlow: func() bool { + blockCount++ + return blockCount > 5 // Slow down after 5 blocks + }, + } + + // Create and start migrator manually with slow block database + wrapper.migrator.blockDB = slowBdb + require.NoError(t, wrapper.migrator.Migrate()) + + // Wait for at least 5 blocks to be migrated + require.Eventually(t, func() bool { + return wrapper.migrator.blocksProcessed() >= 5 + }, 15*time.Second, 100*time.Millisecond) + + // Write block 21 to the wrapper database (this simulates a new block being added during migration) + writeBlocks(wrapper, blocks[20:21], receipts[20:21]) + + // Verify all 21 blocks are readable via rawdb using the wrapper + for i, block := range blocks { + blockNum := block.NumberU64() + expectedReceipts := receipts[i] + + // Test reading block header and body + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), blockNum)) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), blockNum)) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualBlock, "Block %d should be readable", blockNum) + assertRLPEqual(t, block, actualBlock) + actualHeader := rawdb.ReadHeader(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualHeader, "Block %d header should be readable", blockNum) + assertRLPEqual(t, block.Header(), actualHeader) + actualBody := rawdb.ReadBody(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualBody, "Block %d body should be readable", blockNum) + assertRLPEqual(t, block.Body(), actualBody) + + // Test reading receipts and logs + actualReceipts := rawdb.ReadReceipts(wrapper, block.Hash(), blockNum, block.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, actualReceipts) + actualLogs := rawdb.ReadLogs(wrapper, block.Hash(), blockNum) + expectedLogs := make([][]*types.Log, len(expectedReceipts)) + for j, receipt := range expectedReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, actualLogs) + + // Header number should be readable + actualBlockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.NotNil(t, actualBlockNumber, "Block %d number mapping should be readable", blockNum) + require.Equal(t, blockNum, *actualBlockNumber) + } + + require.NoError(t, wrapper.Close()) +} + +func TestBlockDatabase_Initialization(t *testing.T) { + blocks, _ := createBlocks(t, 10) + + testCases := []struct { + name string + stateSyncEnabled bool + chainDBBlocks []*types.Block + existingDBMinHeight *uint64 + expInitialized bool + expMinHeight uint64 + expMinHeightSet bool + }{ + { + name: "empty_chainDB_no_state_sync", + stateSyncEnabled: false, + expInitialized: true, + expMinHeight: 1, + expMinHeightSet: true, + }, + { + name: "empty_chainDB_state_sync_no_init", + stateSyncEnabled: true, + expInitialized: false, + expMinHeight: 0, + expMinHeightSet: false, + }, + { + name: "migration_needed", + stateSyncEnabled: false, + chainDBBlocks: blocks[5:10], + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "migration_needed_with_genesis", + stateSyncEnabled: false, + chainDBBlocks: append([]*types.Block{blocks[0]}, blocks[5:10]...), + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "migration_needed_state_sync", + stateSyncEnabled: true, + chainDBBlocks: blocks[5:10], + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "existing_db_created_with_min_height", + existingDBMinHeight: func() *uint64 { v := uint64(2); return &v }(), + stateSyncEnabled: false, + chainDBBlocks: blocks[5:8], + expInitialized: true, + expMinHeight: 2, + expMinHeightSet: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(WrapDatabase(base)) + blockDBPath := filepath.Join(dataDir, "blockdb") + + // Create the block database with an existing min height if needed + if tc.existingDBMinHeight != nil { + minHeight, err := getDatabaseMinHeight(base) + require.NoError(t, err) + require.Nil(t, minHeight) + wrapper := NewBlockDatabase( + base, + chainDB, + blockdb.DefaultConfig(), + blockDBPath, + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, wrapper.InitWithMinHeight(*tc.existingDBMinHeight)) + require.NoError(t, wrapper.blockDB.Close()) + require.NoError(t, wrapper.receiptsDB.Close()) + minHeight, err = getDatabaseMinHeight(base) + require.NoError(t, err) + require.Equal(t, *tc.existingDBMinHeight, *minHeight) + } + + // write chainDB blocks if needed + if len(tc.chainDBBlocks) > 0 { + writeBlocks(chainDB, tc.chainDBBlocks, []types.Receipts{}) + } + + // Create wrapper database + wrapper := NewBlockDatabase( + base, + chainDB, + blockdb.DefaultConfig(), + blockDBPath, + logging.NoLog{}, + prometheus.NewRegistry(), + ) + initialized, err := wrapper.InitWithStateSync(tc.stateSyncEnabled) + require.NoError(t, err) + require.Equal(t, tc.expInitialized, initialized) + require.Equal(t, tc.expInitialized, wrapper.initialized) + + // Verify initialization state and min height + if tc.expMinHeightSet { + require.Equal(t, tc.expMinHeight, wrapper.minHeight) + } else { + require.Equal(t, uint64(0), wrapper.minHeight) + } + + require.NoError(t, wrapper.Close()) + }) + } +} + +// Test that genesis block (block number 0) behavior works correctly. +// Genesis blocks should only exist in chainDB and not in the wrapper's block database. +func TestBlockDatabase_Genesis(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + require.True(t, wrapper.initialized) + require.Equal(t, uint64(1), wrapper.minHeight) + blocks, receipts := createBlocks(t, 10) + writeBlocks(wrapper, blocks, receipts) + + // validate genesis block can be retrieved and its stored in chainDB + genesisHash := rawdb.ReadCanonicalHash(chainDB, 0) + genesisBlock := rawdb.ReadBlock(wrapper, genesisHash, 0) + assertRLPEqual(t, blocks[0], genesisBlock) + has, _ := wrapper.blockDB.Has(0) + require.False(t, has) + require.Equal(t, uint64(1), wrapper.minHeight) +} + +func assertRLPEqual(t *testing.T, expected, actual interface{}) { + t.Helper() + + expectedBytes, err := rlp.EncodeToBytes(expected) + require.NoError(t, err) + actualBytes, err := rlp.EncodeToBytes(actual) + require.NoError(t, err) + require.Equal(t, expectedBytes, actualBytes) +} diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 548072c786..0d59269f62 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -130,6 +130,7 @@ var ( metadataPrefix = []byte("metadata") warpPrefix = []byte("warp") ethDBPrefix = []byte("ethdb") + blockDBPrefix = []byte("blockdb") ) var ( @@ -312,7 +313,10 @@ func (vm *VM) Initialize( } // Initialize the database - vm.initializeDBs(db) + if err := vm.initializeDBs(db); err != nil { + return err + } + if vm.config.InspectDatabase { if err := vm.inspectDatabases(); err != nil { return err diff --git a/plugin/evm/vm_database.go b/plugin/evm/vm_database.go index 0606679f4e..311dc16777 100644 --- a/plugin/evm/vm_database.go +++ b/plugin/evm/vm_database.go @@ -4,12 +4,17 @@ package evm import ( + "errors" + "path/filepath" + "strconv" "time" "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/database/versiondb" + "github.com/ava-labs/avalanchego/x/blockdb" "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" "github.com/ava-labs/coreth/plugin/evm/database" @@ -17,12 +22,13 @@ import ( avalanchedatabase "github.com/ava-labs/avalanchego/database" ) +const ( + blockDBFolder = "blockdb" +) + // initializeDBs initializes the databases used by the VM. // coreth always uses the avalanchego provided database. -func (vm *VM) initializeDBs(db avalanchedatabase.Database) { - // Use NewNested rather than New so that the structure of the database - // remains the same regardless of the provided baseDB type. - vm.chaindb = rawdb.NewDatabase(database.WrapDatabase(prefixdb.NewNested(ethDBPrefix, db))) +func (vm *VM) initializeDBs(db avalanchedatabase.Database) error { vm.versiondb = versiondb.New(db) vm.acceptedBlockDB = prefixdb.New(acceptedPrefix, vm.versiondb) vm.metadataDB = prefixdb.New(metadataPrefix, vm.versiondb) @@ -30,6 +36,61 @@ func (vm *VM) initializeDBs(db avalanchedatabase.Database) { // that warp signatures are committed to the database atomically with // the last accepted block. vm.warpDB = prefixdb.New(warpPrefix, db) + + // chaindb must be created after acceptedBlockDB since it uses it + chaindb, err := vm.newChainDB(db) + if err != nil { + return err + } + vm.chaindb = chaindb + return nil +} + +// newChainDB creates a new chain database +// If block database is enabled, it will wrap the chaindb with separate databases +// dedicated for storing blocks data. +// If block database is disable but was previously enabled, it will return an error. +func (vm *VM) newChainDB(db avalanchedatabase.Database) (ethdb.Database, error) { + // Use NewNested rather than New so that the structure of the database + // remains the same regardless of the provided baseDB type. + chainDB := rawdb.NewDatabase(database.WrapDatabase(prefixdb.NewNested(ethDBPrefix, db))) + + // Error if block database has been enabled/created and then disabled + stateDB := prefixdb.New(blockDBPrefix, db) + created, err := database.IsBlockDatabaseCreated(stateDB) + if err != nil { + return nil, err + } + if !vm.config.BlockDatabaseEnabled { + if created { + return nil, errors.New("cannot disable block database after it has been enabled") + } + return chainDB, nil + } + + versionPath := strconv.FormatUint(blockdb.IndexFileVersion, 10) + blockDBPath := filepath.Join(vm.ctx.ChainDataDir, blockDBFolder, versionPath) + hasLastAccepted, err := vm.acceptedBlockDB.Has(lastAcceptedKey) + if err != nil { + return nil, err + } + stateSyncEnabled := !hasLastAccepted + if vm.config.StateSyncEnabled != nil { + stateSyncEnabled = *vm.config.StateSyncEnabled + } + config := blockdb.DefaultConfig().WithSyncToDisk(vm.config.BlockDatabaseSyncToDisk) + blockDB := database.NewBlockDatabase(stateDB, chainDB, config, blockDBPath, vm.ctx.Log, vm.sdkMetrics) + initialized, err := blockDB.InitWithStateSync(stateSyncEnabled) + log.Info("blockDB initialized", "initialized", initialized, "stateSyncEnabled", stateSyncEnabled) + if err != nil { + return nil, err + } + if initialized && !vm.config.BlockDatabaseMigrationDisabled { + if err := blockDB.Migrate(); err != nil { + return nil, err + } + } + return blockDB, nil } func (vm *VM) inspectDatabases() error { @@ -41,6 +102,9 @@ func (vm *VM) inspectDatabases() error { if err := inspectDB(vm.acceptedBlockDB, "acceptedBlockDB"); err != nil { return err } + if err := inspectDB(vm.acceptedBlockDB, "acceptedBlockDB"); err != nil { + return err + } if err := inspectDB(vm.metadataDB, "metadataDB"); err != nil { return err } diff --git a/plugin/evm/vm_test.go b/plugin/evm/vm_test.go index 3edffbff12..7cfa35640a 100644 --- a/plugin/evm/vm_test.go +++ b/plugin/evm/vm_test.go @@ -1259,6 +1259,7 @@ func TestSkipChainConfigCheckCompatible(t *testing.T) { // use the block's timestamp instead of 0 since rewind to genesis // is hardcoded to be allowed in core/genesis.go. newCTX := snowtest.Context(t, vm.ctx.ChainID) + newCTX.ChainDataDir = tvm.Ctx.ChainDataDir upgradetest.SetTimesTo(&newCTX.NetworkUpgrades, upgradetest.Latest, upgrade.UnscheduledActivationTime) upgradetest.SetTimesTo(&newCTX.NetworkUpgrades, fork+1, blk.Timestamp()) upgradetest.SetTimesTo(&newCTX.NetworkUpgrades, fork, upgrade.InitiallyActiveTime) diff --git a/plugin/evm/vm_warp_test.go b/plugin/evm/vm_warp_test.go index c4aa7fdc39..b026183298 100644 --- a/plugin/evm/vm_warp_test.go +++ b/plugin/evm/vm_warp_test.go @@ -29,7 +29,6 @@ import ( "github.com/ava-labs/avalanchego/vms/evm/predicate" "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" "github.com/ava-labs/libevm/common" - "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/crypto" "github.com/stretchr/testify/require" @@ -125,7 +124,7 @@ func testSendWarpMessage(t *testing.T, scheme string) { // Verify that the constructed block contains the expected log with an unsigned warp message in the log data ethBlock1 := blk.(*chain.BlockWrapper).Block.(*wrappedBlock).ethBlock require.Len(ethBlock1.Transactions(), 1) - receipts := rawdb.ReadReceipts(vm.chaindb, ethBlock1.Hash(), ethBlock1.NumberU64(), ethBlock1.Time(), vm.chainConfig) + receipts := vm.blockChain.GetReceiptsByHash(ethBlock1.Hash()) require.Len(receipts, 1) require.Len(receipts[0].Logs, 1) diff --git a/plugin/evm/vmtest/genesis.go b/plugin/evm/vmtest/genesis.go index 4af162ce7d..68f1c8cdb0 100644 --- a/plugin/evm/vmtest/genesis.go +++ b/plugin/evm/vmtest/genesis.go @@ -112,6 +112,7 @@ func SetupGenesis( ) { ctx := snowtest.Context(t, snowtest.CChainID) ctx.NetworkUpgrades = upgradetest.GetConfig(fork) + ctx.ChainDataDir = t.TempDir() baseDB := memdb.New() diff --git a/plugin/evm/wrapped_block.go b/plugin/evm/wrapped_block.go index 8ff47e5cba..d1e3d5d7b6 100644 --- a/plugin/evm/wrapped_block.go +++ b/plugin/evm/wrapped_block.go @@ -16,7 +16,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/libevm/common" - "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/log" "github.com/ava-labs/libevm/rlp" @@ -145,11 +144,11 @@ func (b *wrappedBlock) handlePrecompileAccept(rules extras.Rules) error { } // Read receipts from disk - receipts := rawdb.ReadReceipts(b.vm.chaindb, b.ethBlock.Hash(), b.ethBlock.NumberU64(), b.ethBlock.Time(), b.vm.chainConfig) + receipts := b.vm.blockChain.GetReceiptsByHash(b.ethBlock.Hash()) // If there are no receipts, ReadReceipts may be nil, so we check the length and confirm the ReceiptHash // is empty to ensure that missing receipts results in an error on accept. if len(receipts) == 0 && b.ethBlock.ReceiptHash() != types.EmptyRootHash { - return fmt.Errorf("failed to fetch receipts for accepted block with non-empty root hash (%s) (Block: %s, Height: %d)", b.ethBlock.ReceiptHash(), b.ethBlock.Hash(), b.ethBlock.NumberU64()) + return fmt.Errorf("failed to fetch receipts for verified block with non-empty root hash (%s) (Block: %s, Height: %d)", b.ethBlock.ReceiptHash(), b.ethBlock.Hash(), b.ethBlock.NumberU64()) } acceptCtx := &precompileconfig.AcceptContext{ SnowCtx: b.vm.ctx, diff --git a/sync/blocksync/syncer.go b/sync/blocksync/syncer.go index ca69a6dabc..a2826c2c08 100644 --- a/sync/blocksync/syncer.go +++ b/sync/blocksync/syncer.go @@ -13,6 +13,7 @@ import ( "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" + evmdatabase "github.com/ava-labs/coreth/plugin/evm/database" syncpkg "github.com/ava-labs/coreth/sync" statesyncclient "github.com/ava-labs/coreth/sync/client" ) @@ -75,6 +76,20 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { nextHeight := s.fromHeight blocksToFetch := s.blocksToFetch + // If the blockdb is not initialized, init it to the next height + + // see if we can init the blockdb with the next block syncer height since we + // will be fetching these from peers + firstBlockToFetch := nextHeight - blocksToFetch + 1 + log.Info("Checking if blockdb is initialized on block syncer", "nextHeight", nextHeight, "firstBlockToFetch", firstBlockToFetch) + if blockDB, ok := s.db.(*evmdatabase.BlockDatabase); ok { + log.Info("Initializing blockdb on block syncer", "nextHeight", nextHeight, "firstBlockToFetch", firstBlockToFetch) + blockDB.InitWithMinHeight(firstBlockToFetch) + + // we don't need to migrate here since we can only get here when we have no + // data to migrate + } + // first, check for blocks already available on disk so we don't // request them from peers. for blocksToFetch > 0 {