Skip to content

Commit 3fd6e45

Browse files
andyzhang2023andyzhang2023
authored andcommitted
pevm: fallback to sequencial processor when the TxDAG is too deep (bnb-chain#251)
Co-authored-by: andyzhang2023 <andyzhang2023@gmail.com>
1 parent f8d6a95 commit 3fd6e45

File tree

8 files changed

+133
-33
lines changed

8 files changed

+133
-33
lines changed

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ var (
177177
utils.ParallelTxDAGFlag,
178178
utils.ParallelTxDAGFileFlag,
179179
utils.ParallelTxDAGSenderPrivFlag,
180+
utils.ParallelTxDATMaxDepthRatioFlag,
180181
configFileFlag,
181182
utils.LogDebugFlag,
182183
utils.LogBacktraceAtFlag,

cmd/utils/flags.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
11361136
Category: flags.VMCategory,
11371137
}
11381138

1139+
ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{
1140+
Name: "parallel.txdag-max-depth-ratio",
1141+
Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)",
1142+
Value: 0.9,
1143+
Category: flags.VMCategory,
1144+
}
1145+
11391146
VMOpcodeOptimizeFlag = &cli.BoolFlag{
11401147
Name: "vm.opcode.optimize",
11411148
Usage: "enable opcode optimization",
@@ -2057,6 +2064,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
20572064
cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name)
20582065
}
20592066

2067+
if ctx.IsSet(ParallelTxDATMaxDepthRatioFlag.Name) {
2068+
cfg.ParallelTxDAGMaxDepthRatio = ctx.Float64(ParallelTxDATMaxDepthRatioFlag.Name)
2069+
}
2070+
20602071
if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) {
20612072
priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name)
20622073
if cfg.Miner.ParallelTxDAGSenderPriv, err = crypto.HexToECDSA(priHex); err != nil {

core/blockchain.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,6 +1760,19 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
17601760
return bc.insertChain(chain, true)
17611761
}
17621762

1763+
func (bc *BlockChain) useSerialProcessor(block *types.Block) (bool, bool) {
1764+
// findout whether or not the dependencies of the block are too deep to be processed
1765+
// if the dependencies are too deep, we will fallback to serial processing
1766+
txCount := len(block.Transactions())
1767+
_, depth := BuildTxLevels(txCount, bc.vmConfig.TxDAG)
1768+
tooDeep := float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio
1769+
isByzantium := bc.chainConfig.IsByzantium(block.Number())
1770+
1771+
txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)
1772+
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep || !isByzantium
1773+
return useSerialProcessor, tooDeep
1774+
}
1775+
17631776
// insertChain is the internal implementation of InsertChain, which assumes that
17641777
// 1) chains are contiguous, and 2) The chain mutex is held.
17651778
//
@@ -1971,6 +1984,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
19711984
)
19721985

19731986
blockProcessedInParallel := false
1987+
var (
1988+
tooDeep, useSerialProcessor bool
1989+
depth int
1990+
)
19741991
// skip block process if we already have the state, receipts and logs from mining work
19751992
if !(receiptExist && logExist && stateExist) {
19761993
// Retrieve the parent block and it's state to execute on top
@@ -1982,9 +1999,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
19821999
if bc.vmConfig.EnableParallelExec {
19832000
bc.parseTxDAG(block)
19842001
}
1985-
isByzantium := bc.chainConfig.IsByzantium(block.Number())
19862002

1987-
if bc.vmConfig.EnableParallelExec && bc.vmConfig.TxDAG != nil && bc.vmConfig.EnableTxParallelMerge && isByzantium {
2003+
useSerialProcessor, tooDeep = bc.useSerialProcessor(block)
2004+
if !useSerialProcessor {
19882005
statedb, err = state.NewParallel(parent.Root, bc.stateCache, bc.snaps)
19892006
} else {
19902007
statedb, err = state.New(parent.Root, bc.stateCache, bc.snaps)
@@ -2018,8 +2035,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
20182035

20192036
// Process block using the parent state as reference point
20202037
pstart = time.Now()
2021-
txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)
2022-
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary
20232038
if useSerialProcessor {
20242039
receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig)
20252040
blockProcessedInParallel = false
@@ -2143,7 +2158,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
21432158
"accountUpdates", common.PrettyDuration(timers.AccountUpdates),
21442159
"storageUpdates", common.PrettyDuration(timers.StorageUpdates),
21452160
"accountHashes", common.PrettyDuration(timers.AccountHashes),
2146-
"storageHashes", common.PrettyDuration(timers.StorageHashes))
2161+
"storageHashes", common.PrettyDuration(timers.StorageHashes),
2162+
"tooDeep", tooDeep, "depth", depth,
2163+
)
21472164

21482165
// Write the block to the chain and get the status.
21492166
var (

core/parallel_state_scheduler.go

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,6 @@ func (tl TxLevel) predictTxDAG(dag types.TxDAG) {
350350

351351
func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
352352
var levels TxLevels = make(TxLevels, 0, 8)
353-
var currLevel int = 0
354-
355353
var enlargeLevelsIfNeeded = func(currLevel int, levels *TxLevels) {
356354
if len(*levels) <= currLevel {
357355
for i := len(*levels); i <= currLevel; i++ {
@@ -367,22 +365,47 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
367365
return TxLevels{all}
368366
}
369367

370-
marked := make(map[int]int, len(all))
371-
for _, tx := range all {
372-
dep := dag.TxDep(tx.txIndex)
368+
// build the levels from the DAG
369+
marked, _ := BuildTxLevels(len(all), dag)
370+
// put the transactions into the levels
371+
for txIndex, tx := range all {
372+
level := marked[txIndex]
373+
enlargeLevelsIfNeeded(level, &levels)
374+
levels[level] = append(levels[level], tx)
375+
}
376+
return levels
377+
}
378+
379+
func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) {
380+
if dag == nil {
381+
return make(map[int]int), 0
382+
}
383+
// marked is used to record which level that each transaction should be put
384+
marked = make(map[int]int, txCount)
385+
var (
386+
// currLevelHasTx marks if the current level has any transaction
387+
currLevelHasTx bool
388+
)
389+
390+
depth, currLevelHasTx = 0, false
391+
for txIndex := 0; txIndex < txCount; txIndex++ {
392+
dep := dag.TxDep(txIndex)
373393
switch true {
374394
case dep != nil && dep.CheckFlag(types.ExcludedTxFlag),
375395
dep != nil && dep.CheckFlag(types.NonDependentRelFlag):
376396
// excluted tx, occupies the whole level
377397
// or dependent-to-all tx, occupies the whole level, too
378-
levels = append(levels, TxLevel{tx})
379-
marked[tx.txIndex], currLevel = len(levels)-1, len(levels)
398+
if currLevelHasTx {
399+
// shift to next level if there are transactions in the current level
400+
depth++
401+
}
402+
marked[txIndex] = depth
403+
// occupy the current level
404+
depth, currLevelHasTx = depth+1, false
380405

381406
case dep == nil || len(dep.TxIndexes) == 0:
382-
// dependent on none
383-
enlargeLevelsIfNeeded(currLevel, &levels)
384-
levels[currLevel] = append(levels[currLevel], tx)
385-
marked[tx.txIndex] = currLevel
407+
// dependent on none, just put it in the current level
408+
marked[txIndex], currLevelHasTx = depth, true
386409

387410
case dep != nil && len(dep.TxIndexes) > 0:
388411
// dependent on others
@@ -395,19 +418,22 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
395418
}
396419
if prevLevel < 0 {
397420
// broken DAG, just ignored it
398-
enlargeLevelsIfNeeded(currLevel, &levels)
399-
levels[currLevel] = append(levels[currLevel], tx)
400-
marked[tx.txIndex] = currLevel
421+
marked[txIndex], currLevelHasTx = depth, true
401422
continue
402423
}
403-
enlargeLevelsIfNeeded(prevLevel+1, &levels)
404-
levels[prevLevel+1] = append(levels[prevLevel+1], tx)
405424
// record the level of this tx
406-
marked[tx.txIndex] = prevLevel + 1
425+
marked[txIndex] = prevLevel + 1
426+
if marked[txIndex] > depth {
427+
depth, currLevelHasTx = marked[txIndex], true
428+
}
407429

408430
default:
409431
panic("unexpected case")
410432
}
411433
}
412-
return levels
434+
// check if the last level has any transaction, to avoid the empty level
435+
if !currLevelHasTx {
436+
depth--
437+
}
438+
return marked, depth + 1
413439
}

core/parallel_state_scheduler_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,12 +601,44 @@ func TestNewTxLevels(t *testing.T) {
601601
assertEqual(levels([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, nil, {-2}, {-2}}), [][]uint64{{1, 2, 3}, {4}, {5}}, t)
602602

603603
// case 9: loop-back txdag
604-
assertEqual(levels([]uint64{1, 2, 3, 4}, [][]int{{1}, nil, {0}, nil}), [][]uint64{{1, 2, 4}, {3}}, t)
604+
assertEqual(levels([]uint64{1, 2, 3, 4}, [][]int{{1}, nil, {0}, nil}), [][]uint64{{1, 2}, {3, 4}}, t)
605+
606+
// case 10: nonedependent txs + execlude txs + nonedependent txs
607+
assertEqual(levels([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, {-2}, nil, nil}), [][]uint64{{1, 2}, {3}, {4, 5}}, t)
608+
}
609+
610+
func TestBuildLevels(t *testing.T) {
611+
var (
612+
marks map[int]int
613+
depth int
614+
)
615+
// case 1: 1 excluded tx + n no dependencies txs + n dependencies txs + 1 all-dependencies tx
616+
marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5, 6, 7}, [][]int{{-1}, nil, nil, nil, {0, 1}, {2}, {-2}})
617+
assertEqualMarks(marks, map[int]int{0: 0, 1: 1, 2: 1, 3: 1, 4: 2, 5: 2, 6: 3}, t)
618+
if depth != 4 {
619+
t.Fatalf("expected depth: 4, got depth: %d", depth)
620+
}
621+
// case 2: nonedependent txs + execlude txs + nonedependent txs
622+
marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, {-2}, nil, nil})
623+
assertEqualMarks(marks, map[int]int{0: 0, 1: 0, 2: 1, 3: 2, 4: 2}, t)
624+
if depth != 3 {
625+
t.Fatalf("expected depth: 3, got depth: %d", depth)
626+
}
627+
// case 3: (broken TxDAG) n dependent txs + 1 execlude tx + none dependent txs
628+
marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5}, [][]int{{1}, {2}, {-1}, nil, nil})
629+
assertEqualMarks(marks, map[int]int{0: 0, 1: 0, 2: 1, 3: 2, 4: 2}, t)
630+
if depth != 3 {
631+
t.Fatalf("expected depth: 3, got depth: %d", depth)
632+
}
605633
}
606634

607635
func TestMultiLevel(t *testing.T) {
608636
// case 7: 1 excluded tx + n no dependencies txs + n dependencies txs + 1 all-dependencies tx
609-
assertEqual(levels([]uint64{1, 2, 3, 4, 5, 6, 7, 8}, [][]int{nil, nil, nil, {0}, nil, {1}, nil, {2}}), [][]uint64{{1, 2, 3, 5, 7}, {4, 6, 8}}, t)
637+
assertEqual(levels([]uint64{1, 2, 3, 4, 5, 6, 7, 8}, [][]int{nil, nil, nil, {0}, nil, {1}, nil, {2}}), [][]uint64{{1, 2, 3}, {4, 5, 6, 7, 8}}, t)
638+
}
639+
640+
func levelMarks(nonces []uint64, txdag [][]int) (map[int]int, int) {
641+
return BuildTxLevels(len(nonces), int2txdag(txdag))
610642
}
611643

612644
func levels(nonces []uint64, txdag [][]int) TxLevels {
@@ -650,6 +682,17 @@ func int2txdag(txdag [][]int) types.TxDAG {
650682
return &dag
651683
}
652684

685+
func assertEqualMarks(actual map[int]int, expected map[int]int, t *testing.T) {
686+
if len(actual) != len(expected) {
687+
t.Fatalf("expected %d marks, got %d marks", len(expected), len(actual))
688+
}
689+
for i, mark := range actual {
690+
if expected[i] != mark {
691+
t.Fatalf("expected mark[%d]: %d, got mark[%d]: %d", i, expected[i], i, mark)
692+
}
693+
}
694+
}
695+
653696
func assertEqual(actual TxLevels, expected [][]uint64, t *testing.T) {
654697
if len(actual) != len(expected) {
655698
t.Fatalf("expected %d levels, got %d levels", len(expected), len(actual))

core/vm/interpreter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Config struct {
4141
TxDAG types.TxDAG
4242
EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode
4343
EnableTxParallelMerge bool // Whether to enable parallel merge in parallel mode
44+
TxDAGMaxDepthRatio float64
4445
}
4546

4647
// ScopeContext contains the things that are per-call, such as stack and memory,

eth/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
242242
EnableTxParallelMerge: config.ParallelTxParallelMerge,
243243
ParallelTxNum: config.ParallelTxNum,
244244
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
245+
TxDAGMaxDepthRatio: config.ParallelTxDAGMaxDepthRatio,
245246
}
246247
cacheConfig = &core.CacheConfig{
247248
TrieCleanLimit: config.TrieCleanCache,

eth/ethconfig/config.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,14 @@ type Config struct {
219219
RollupDisableTxPoolAdmission bool
220220
RollupHaltOnIncompatibleProtocolVersion string
221221

222-
ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
223-
ParallelTxNum int // Number of slot for transaction execution
224-
EnableOpcodeOptimizing bool
225-
EnableParallelTxDAG bool
226-
ParallelTxDAGFile string
227-
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
228-
ParallelTxParallelMerge bool
229-
222+
ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
223+
ParallelTxNum int // Number of slot for transaction execution
224+
EnableOpcodeOptimizing bool
225+
EnableParallelTxDAG bool
226+
ParallelTxDAGFile string
227+
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
228+
ParallelTxParallelMerge bool
229+
ParallelTxDAGMaxDepthRatio float64
230230
}
231231

232232
// CreateConsensusEngine creates a consensus engine for the given chain config.

0 commit comments

Comments
 (0)