Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
path: avalanchego
token: ${{ secrets.AVALANCHE_PAT }}
- name: Run e2e tests
uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@84e9aebcfbc04602865f4c0a3e8b46a27409a3f5
uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@ca04567537150e2d71e6f1ef4fae606ade0bdc89
with:
run: ./scripts/run_task.sh test-e2e-ci
prometheus_url: ${{ secrets.PROMETHEUS_URL || '' }}
Expand Down Expand Up @@ -132,7 +132,7 @@ jobs:
ref: ${{ github.event.inputs.avalanchegoBranch }}
path: avalanchego
- name: Run Warp E2E Tests
uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@84e9aebcfbc04602865f4c0a3e8b46a27409a3f5
uses: ava-labs/avalanchego/.github/actions/run-monitored-tmpnet-cmd@ca04567537150e2d71e6f1ef4fae606ade0bdc89
with:
run: ./scripts/run_task.sh test-e2e-warp-ci
artifact_prefix: warp
Expand Down
110 changes: 78 additions & 32 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var (
blockTrieOpsTimer = metrics.GetOrRegisterCounter("chain/block/trie", nil)
blockValidationTimer = metrics.GetOrRegisterCounter("chain/block/validations/state", nil)
blockWriteTimer = metrics.GetOrRegisterCounter("chain/block/writes", nil)
blockAcceptTimer = metrics.GetOrRegisterCounter("chain/block/accepts", nil)

acceptorQueueGauge = metrics.GetOrRegisterGauge("chain/acceptor/queue/size", nil)
acceptorWorkTimer = metrics.GetOrRegisterCounter("chain/acceptor/work", nil)
Expand Down Expand Up @@ -137,11 +138,12 @@ var (
)

const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
badBlockLimit = 10
bodyCacheLimit = 256
blockCacheLimit = 256
verifiedBlockCacheLimit = 1024
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
badBlockLimit = 10

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -293,12 +295,14 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
stateManager TrieWriter
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
stateManager TrieWriter
verifiedBlockCache FIFOCache[common.Hash, *types.Block] // cache for verified but not accepted blocks
verifiedReceiptsCache FIFOCache[common.Hash, types.Receipts] // cache for verified but not accepted receipts

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -412,21 +416,23 @@ func NewBlockChain(
log.Info("")

bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
badBlocks: lru.NewCache[common.Hash, *badBlock](badBlockLimit),
engine: engine,
vmConfig: vmConfig,
senderCacher: NewTxSenderCacher(runtime.NumCPU()),
acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit),
quit: make(chan struct{}),
acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize),
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
badBlocks: lru.NewCache[common.Hash, *badBlock](badBlockLimit),
verifiedBlockCache: NewFIFOCache[common.Hash, *types.Block](verifiedBlockCacheLimit),
verifiedReceiptsCache: NewFIFOCache[common.Hash, types.Receipts](verifiedBlockCacheLimit),
engine: engine,
vmConfig: vmConfig,
senderCacher: NewTxSenderCacher(runtime.NumCPU()),
acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit),
quit: make(chan struct{}),
acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize),
}
bc.stateCache = extstate.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
Expand Down Expand Up @@ -591,6 +597,17 @@ func (bc *BlockChain) warmAcceptedCaches() {
log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime))
}

func (bc *BlockChain) writeAcceptedBlockAndReceipts(b *types.Block) {
rawdb.WriteBlock(bc.db, b)

if receipts, ok := bc.verifiedReceiptsCache.Get(b.Hash()); ok {
rawdb.WriteReceipts(bc.db, b.Hash(), b.NumberU64(), receipts)
}

bc.verifiedBlockCache.Remove(b.Hash())
bc.verifiedReceiptsCache.Remove(b.Hash())
}

// startAcceptor starts processing items on the [acceptorQueue]. If a [nil]
// object is placed on the [acceptorQueue], the [startAcceptor] will exit.
func (bc *BlockChain) startAcceptor() {
Expand Down Expand Up @@ -724,10 +741,26 @@ func (bc *BlockChain) loadLastState(lastAcceptedHash common.Hash) error {
if head == (common.Hash{}) {
return errors.New("could not read head block hash")
}
// Make sure the entire head block is available
headBlock := bc.GetBlockByHash(head)
var headBlock *types.Block
headBlock = bc.GetBlockByHash(head)
if headBlock == nil {
return fmt.Errorf("could not load head block %s", head.Hex())
// Fallback: if the recorded head is missing (or its block data is not available),
// reset head markers to the provided last accepted block.
// This can happen because head block is updated on verify while blocks are
// saved on accept. Therefore, if a crash happens after verify but before accept,
// the head block will be missing.
lastAccepted := bc.GetBlockByHash(lastAcceptedHash)
if lastAccepted == nil {
return fmt.Errorf("could not load last accepted block to repair head markers")
}
batch := bc.db.NewBatch()
rawdb.WriteCanonicalHash(batch, lastAccepted.Hash(), lastAccepted.NumberU64())
rawdb.WriteHeadBlockHash(batch, lastAccepted.Hash())
rawdb.WriteHeadHeaderHash(batch, lastAccepted.Hash())
if err := batch.Write(); err != nil {
log.Crit("Failed to repair head markers", "err", err)
}
headBlock = lastAccepted
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(headBlock.Header())
Expand Down Expand Up @@ -1086,6 +1119,8 @@ func (bc *BlockChain) Accept(block *types.Block) error {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

start := time.Now()

// The parent of [block] must be the last accepted block.
if bc.lastAccepted.Hash() != block.ParentHash() {
return fmt.Errorf(
Expand All @@ -1107,6 +1142,9 @@ func (bc *BlockChain) Accept(block *types.Block) error {
}
}

// ensure last accepted block is persisted on disk before setting it
bc.writeAcceptedBlockAndReceipts(block)

// Enqueue block in the acceptor
bc.lastAccepted = block
bc.addAcceptorQueue(block)
Expand All @@ -1128,6 +1166,8 @@ func (bc *BlockChain) Accept(block *types.Block) error {
latestGasCapacityGauge.Update(int64(s.Gas.Capacity))
latestGasTargetGauge.Update(int64(s.Target()))
}

blockAcceptTimer.Inc(time.Since(start).Milliseconds())
return nil
}

Expand Down Expand Up @@ -1155,6 +1195,8 @@ func (bc *BlockChain) Reject(block *types.Block) error {

// Remove the block from the block cache (ignore return value of whether it was in the cache)
_ = bc.blockCache.Remove(block.Hash())
bc.verifiedBlockCache.Remove(block.Hash())
bc.verifiedReceiptsCache.Remove(block.Hash())

return nil
}
Expand Down Expand Up @@ -1213,13 +1255,17 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, parentRoot common
// writeBlockWithState writes the block and all associated state to the database,
// but it expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, parentRoot common.Hash, receipts []*types.Receipt, state *state.StateDB) error {
// only write the block to cache to avoid storing non-accepted blocks in the database
bc.verifiedBlockCache.Put(block.Hash(), block)
// also cache the receipts for verified but not accepted blocks
bc.verifiedReceiptsCache.Put(block.Hash(), receipts)

// Irrelevant of the canonical status, write the block itself to the database.
//
// Note all the components of block(hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
blockBatch := bc.db.NewBatch()
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
rawdb.WriteHeaderNumber(blockBatch, block.Hash(), block.NumberU64())
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
Expand Down Expand Up @@ -1453,7 +1499,7 @@ func (bc *BlockChain) collectUnflattenedLogs(b *types.Block, removed bool) [][]*
if excessBlobGas != nil {
blobGasPrice = eip4844.CalcBlobFee(*excessBlobGas)
}
receipts := rawdb.ReadRawReceipts(bc.db, b.Hash(), b.NumberU64())
receipts := bc.GetReceiptsByHash(b.Hash())
if err := receipts.DeriveFields(bc.chainConfig, b.Hash(), b.NumberU64(), b.Time(), b.BaseFee(), blobGasPrice, b.Transactions()); err != nil {
log.Error("Failed to derive block receipts fields", "hash", b.Hash(), "number", b.NumberU64(), "err", err)
}
Expand Down
3 changes: 3 additions & 0 deletions core/blockchain_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,9 @@ func ReexecCorruptedStateTest(t *testing.T, create ReexecTestFunc) {

// Simulate a crash by updating the acceptor tip
blockchain.writeBlockAcceptedIndices(chain[1])

// need to write accepted block to disk as we are no longer writing it on verify
rawdb.WriteBlock(blockchain.db, chain[1])
blockchain.Stop()

// Restart blockchain with existing state
Expand Down
34 changes: 33 additions & 1 deletion core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,32 @@ func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool {
// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (bc *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
return bc.hc.GetHeader(hash, number)
if block, ok := bc.verifiedBlockCache.Get(hash); ok {
return block.Header()
}
header := bc.hc.GetHeader(hash, number)
if header == nil {
return nil
}
return header
}

// GetHeaderByHash retrieves a block header from the database by hash, caching it if
// found.
func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
if block, ok := bc.verifiedBlockCache.Get(hash); ok {
return block.Header()
}
return bc.hc.GetHeaderByHash(hash)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
hash := rawdb.ReadCanonicalHash(bc.db, number)
if block, ok := bc.verifiedBlockCache.Get(hash); ok {
return block.Header()
}
return bc.hc.GetHeaderByNumber(number)
}

Expand All @@ -83,6 +97,9 @@ func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
if cached, ok := bc.bodyCache.Get(hash); ok {
return cached
}
if block, ok := bc.verifiedBlockCache.Get(hash); ok {
return block.Body()
}
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return nil
Expand All @@ -101,6 +118,9 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {
if bc.blockCache.Contains(hash) {
return true
}
if _, ok := bc.verifiedBlockCache.Get(hash); ok {
return true
}
if !bc.HasHeader(hash, number) {
return false
}
Expand All @@ -112,6 +132,9 @@ func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool {
if !bc.HasBlock(hash, number) {
return false
}
if _, ok := bc.verifiedReceiptsCache.Get(hash); ok {
return true
}
if bc.receiptsCache.Contains(hash) {
return true
}
Expand All @@ -125,6 +148,9 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
if block, ok := bc.blockCache.Get(hash); ok {
return block
}
if block, ok := bc.verifiedBlockCache.Get(hash); ok {
return block
}
block := rawdb.ReadBlock(bc.db, hash, number)
if block == nil {
return nil
Expand Down Expand Up @@ -174,6 +200,12 @@ func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*type

// GetReceiptsByHash retrieves the receipts for all transactions in a given block.
func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
// Check verified receipts cache first
if receipts, ok := bc.verifiedReceiptsCache.Get(hash); ok {
return receipts
}

// Check main receipts cache
if receipts, ok := bc.receiptsCache.Get(hash); ok {
return receipts
}
Expand Down
Loading