Skip to content

Commit d689876

Browse files
authored
Fallback to serial execution if parallel execution fails (ethereum#1392)
1 parent 135878f commit d689876

File tree

2 files changed

+106
-29
lines changed

2 files changed

+106
-29
lines changed

core/blockchain.go

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"github.com/ethereum/go-ethereum/common/mclock"
4040
"github.com/ethereum/go-ethereum/common/prque"
4141
"github.com/ethereum/go-ethereum/consensus"
42-
"github.com/ethereum/go-ethereum/core/blockstm"
4342
"github.com/ethereum/go-ethereum/core/rawdb"
4443
"github.com/ethereum/go-ethereum/core/state"
4544
"github.com/ethereum/go-ethereum/core/state/snapshot"
@@ -86,12 +85,15 @@ var (
8685
blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil)
8786
triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil)
8887

89-
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
90-
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
91-
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
92-
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
93-
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
94-
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)
88+
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
89+
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
90+
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
91+
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
92+
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
93+
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)
94+
blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil)
95+
blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil)
96+
blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil)
9597

9698
blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
9799
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
@@ -569,7 +571,7 @@ func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis
569571
return bc, nil
570572
}
571573

572-
func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, blockEndErr error) {
574+
func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
573575
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
574576
ctx, cancel := context.WithCancel(context.Background())
575577
defer cancel()
@@ -597,6 +599,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
597599
err error
598600
statedb *state.StateDB
599601
counter metrics.Counter
602+
parallel bool
600603
}
601604

602605
resultChan := make(chan Result, 2)
@@ -606,44 +609,58 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
606609
if bc.parallelProcessor != nil {
607610
parallelStatedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
608611
if err != nil {
609-
return nil, nil, 0, nil, err
612+
return nil, nil, 0, nil, 0, err
610613
}
611614
parallelStatedb.SetLogger(bc.logger)
612615

613616
processorCount++
614617

615618
go func() {
616619
parallelStatedb.StartPrefetcher("chain", nil)
620+
pstart := time.Now()
617621
receipts, logs, usedGas, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.vmConfig, ctx)
618-
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter}
622+
blockExecutionParallelTimer.UpdateSince(pstart)
623+
if err == nil {
624+
vstart := time.Now()
625+
err = bc.validator.ValidateState(block, parallelStatedb, receipts, usedGas, false)
626+
vtime = time.Since(vstart)
627+
}
628+
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter, true}
619629
}()
620630
}
621631

622632
if bc.processor != nil {
623633
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
624634
if err != nil {
625-
return nil, nil, 0, nil, err
635+
return nil, nil, 0, nil, 0, err
626636
}
627637
statedb.SetLogger(bc.logger)
628638

629639
processorCount++
630640

631641
go func() {
632642
statedb.StartPrefetcher("chain", nil)
643+
pstart := time.Now()
633644
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, ctx)
634-
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter}
645+
blockExecutionSerialTimer.UpdateSince(pstart)
646+
if err == nil {
647+
vstart := time.Now()
648+
err = bc.validator.ValidateState(block, statedb, receipts, usedGas, false)
649+
vtime = time.Since(vstart)
650+
}
651+
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter, false}
635652
}()
636653
}
637654

638655
result := <-resultChan
639656

640-
if _, ok := result.err.(blockstm.ParallelExecFailedError); ok {
657+
if result.parallel && result.err != nil {
641658
log.Warn("Parallel state processor failed", "err", result.err)
642-
659+
blockExecutionParallelErrorCounter.Inc(1)
643660
// If the parallel processor failed, we will fallback to the serial processor if enabled
644661
if processorCount == 2 {
645-
result.statedb.StopPrefetcher()
646662
result = <-resultChan
663+
result.statedb.StopPrefetcher()
647664
processorCount--
648665
}
649666
}
@@ -658,7 +675,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
658675
}()
659676
}
660677

661-
return result.receipts, result.logs, result.usedGas, result.statedb, result.err
678+
return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err
662679
}
663680

664681
// empty returns an indicator whether the blockchain is empty.
@@ -2323,7 +2340,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
23232340

23242341
// Process block using the parent state as reference point
23252342
pstart := time.Now()
2326-
receipts, logs, usedGas, statedb, err := bc.ProcessBlock(block, parent)
2343+
receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent)
23272344
activeState = statedb
23282345

23292346
if err != nil {
@@ -2338,18 +2355,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
23382355
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
23392356
}
23402357
// BOR
2341-
ptime := time.Since(pstart)
2342-
2343-
vstart := time.Now()
2344-
2345-
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil {
2346-
bc.reportBlock(block, receipts, err)
2347-
followupInterrupt.Store(true)
2348-
2349-
return it.index, err
2350-
}
2358+
ptime := time.Since(pstart) - vtime
23512359

2352-
vtime := time.Since(vstart)
23532360
proctime := time.Since(start) // processing + validation
23542361

23552362
// Update the metrics touched during block processing and validation

core/blockchain_test.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package core
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"math/big"
@@ -170,7 +171,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
170171
return err
171172
}
172173

173-
receipts, _, usedGas, statedb, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())
174+
receipts, _, usedGas, statedb, _, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())
174175

175176
if err != nil {
176177
blockchain.reportBlock(block, receipts, err)
@@ -216,6 +217,75 @@ func testParallelBlockChainImport(t *testing.T, scheme string) {
216217
}
217218
}
218219

220+
type AlwaysFailParallelStateProcessor struct {
221+
}
222+
223+
func (p *AlwaysFailParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
224+
return nil, nil, 0, errors.New("always fail")
225+
}
226+
227+
type SlowSerialStateProcessor struct {
228+
s Processor
229+
}
230+
231+
func NewSlowSerialStateProcessor(s Processor) *SlowSerialStateProcessor {
232+
return &SlowSerialStateProcessor{s: s}
233+
}
234+
235+
func (p *SlowSerialStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
236+
time.Sleep(100 * time.Millisecond)
237+
return p.s.Process(block, statedb, cfg, interruptCtx)
238+
}
239+
240+
func TestSuccessfulBlockImportParallelFailed(t *testing.T) {
241+
t.Parallel()
242+
243+
testSuccessfulBlockImportParallelFailed(t, rawdb.HashScheme)
244+
testSuccessfulBlockImportParallelFailed(t, rawdb.PathScheme)
245+
}
246+
247+
func testSuccessfulBlockImportParallelFailed(t *testing.T, scheme string) {
248+
// Create a new blockchain with 10 initial blocks
249+
db, _, blockchain, err := newCanonical(ethash.NewFaker(), 10, true, scheme)
250+
blockchain.parallelProcessor = &AlwaysFailParallelStateProcessor{}
251+
blockchain.processor = NewSlowSerialStateProcessor(blockchain.processor)
252+
if err != nil {
253+
t.Fatalf("failed to create canonical chain: %v", err)
254+
}
255+
defer blockchain.Stop()
256+
257+
// Create valid blocks to import
258+
block := blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash())
259+
blocks := makeBlockChain(blockchain.chainConfig, block, 5, ethash.NewFaker(), db, canonicalSeed)
260+
261+
// Import the blocks
262+
n, err := blockchain.InsertChain(blocks)
263+
if err != nil {
264+
t.Fatalf("failed to import valid blocks: %v", err)
265+
}
266+
267+
// Verify all blocks were imported
268+
if n != len(blocks) {
269+
t.Errorf("imported %d blocks, wanted %d", n, len(blocks))
270+
}
271+
272+
// Verify the last block is properly linked
273+
if blockchain.CurrentBlock().Hash() != blocks[len(blocks)-1].Hash() {
274+
t.Errorf("current block hash mismatch: got %x, want %x",
275+
blockchain.CurrentBlock().Hash(),
276+
blocks[len(blocks)-1].Hash())
277+
}
278+
279+
// Verify block numbers are sequential
280+
for i, block := range blocks {
281+
expectedNumber := uint64(11 + i) // 10 initial blocks + new blocks
282+
if block.NumberU64() != expectedNumber {
283+
t.Errorf("block %d has wrong number: got %d, want %d",
284+
i, block.NumberU64(), expectedNumber)
285+
}
286+
}
287+
}
288+
219289
// testHeaderChainImport tries to process a chain of header, writing them into
220290
// the database if successful.
221291
func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {

0 commit comments

Comments
 (0)