Skip to content

Commit 16cd550

Browse files
authored
core/txpool: consensus/dbft: use static legacy pool instance in dbft (#466)
1 parent 84177b7 commit 16cd550

File tree

2 files changed

+106
-62
lines changed

2 files changed

+106
-62
lines changed

consensus/dbft/dbft.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ type DBFT struct {
223223
validatorsCache *lru.Cache[uint64, []common.Address]
224224
// dkgIndexCache is a cache for storing the index array of the ordered validators
225225
dkgIndexCache *lru.Cache[uint64, []int]
226+
// staticPool is a legacy pool instance for decrypted transaction verification,
227+
// which is initialized once per height at postBlock callback. It should be reset
228+
// before any reusage at the same height.
229+
staticPool *legacypool.LegacyPool
226230

227231
// The fields below are for testing only
228232
fakeDiff bool // Skip difficulty verifications
@@ -910,9 +914,11 @@ func (c *DBFT) verifyPreBlockCb(b dbft.PreBlock[common.Hash]) bool {
910914
}
911915
ethBlock := dbftBlock.ToEthBlock()
912916

913-
localPool := c.newLocalPool(parent)
914-
defer localPool.CloseSilently()
915-
errs := localPool.Add(dbftBlock.transactions, false, false)
917+
// If is the first view of the block, then initialize static pool with fresh parent.
918+
if c.dbft.Context.ViewNumber == 0 {
919+
c.initStaticPool(parent)
920+
}
921+
errs := c.staticPool.Add(dbftBlock.transactions, false, false)
916922
for i, err := range errs {
917923
if err != nil {
918924
log.Warn("proposed PreBlock has invalid transaction",
@@ -923,6 +929,7 @@ func (c *DBFT) verifyPreBlockCb(b dbft.PreBlock[common.Hash]) bool {
923929
return false
924930
}
925931
}
932+
c.staticPool.ResetStatic()
926933

927934
state, receipts, gasUsed, err := c.chain.VerifyBlock(ethBlock, true)
928935
if err != nil {
@@ -1122,15 +1129,14 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
11221129
j int
11231130
txx = make([]*types.Transaction, len(pre.transactions))
11241131
parent = c.chain.GetHeader(pre.header.ParentHash, pre.header.Number.Uint64()-1)
1125-
localPool = c.newLocalPool(parent)
11261132
fallbackToEnvelope = func(i int, incrementJ bool, reason string) bool {
11271133
if len(reason) != 0 {
11281134
log.Info("Falling back to envelope",
11291135
"envelope hash", pre.transactions[i].Hash(),
11301136
"envelope index", i,
11311137
"reason", reason)
11321138
}
1133-
errs := localPool.Add([]*types.Transaction{pre.transactions[i]}, false, false)
1139+
errs := c.staticPool.Add([]*types.Transaction{pre.transactions[i]}, false, false)
11341140
if errs[0] != nil {
11351141
log.Info("Falling back to original set of transactions",
11361142
"envelope hash", pre.transactions[i].Hash(),
@@ -1147,7 +1153,6 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
11471153
return true
11481154
}
11491155
)
1150-
defer localPool.CloseSilently()
11511156

11521157
// If we're backup, then pre.finalReceipts are filled in during PreBlock verification;
11531158
// if we're primary, then reuse receipts got after PrepareRequest construction since
@@ -1156,7 +1161,8 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
11561161
if ctx.IsPrimary() && pre.finalState == nil {
11571162
receipts = c.sealingReceipts
11581163
}
1159-
1164+
// No need to reinitialize static pool since it was already initialized in verifyPreBlockCb.
1165+
// Reset the static pool after processing all transactions.
11601166
for i := range pre.transactions {
11611167
var isEnvelope = j < len(pre.envelopesData) && pre.envelopesData[j].index == i
11621168
if !isEnvelope || // pre.transactions[i] is not an envelope, use it as-is.
@@ -1192,7 +1198,7 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
11921198
break
11931199
}
11941200
}
1195-
errs := localPool.Add([]*types.Transaction{decryptedTx}, false, false)
1201+
errs := c.staticPool.Add([]*types.Transaction{decryptedTx}, false, false)
11961202
if errs[0] != nil {
11971203
if fallbackToEnvelope(i, true, fmt.Sprintf("decrypted transaction has pool conflicts: %s", errs[0].Error())) {
11981204
continue
@@ -1205,6 +1211,7 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
12051211
hasDecryptedTxs = true
12061212
}
12071213
pre.finalTransactions = txx
1214+
c.staticPool.ResetStatic()
12081215
}
12091216

12101217
// Use cached processing results if no new transactions were included into
@@ -1250,10 +1257,10 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
12501257
return nil
12511258
}
12521259

1253-
// newLocalPool returns an initialized instance of LegacyPool with default config
1260+
// newStaticPool returns an initialized instance of LegacyPool with default config
12541261
// except that locals are prohibited and journal is not stored.
1255-
func (c *DBFT) newLocalPool(parent *types.Header) *legacypool.LegacyPool {
1256-
p := legacypool.New(legacypool.Config{
1262+
func newStaticPool(chain ChainHeaderReader) *legacypool.LegacyPool {
1263+
return legacypool.NewStatic(legacypool.Config{
12571264
Locals: nil,
12581265
NoLocals: true,
12591266
Journal: "",
@@ -1267,10 +1274,12 @@ func (c *DBFT) newLocalPool(parent *types.Header) *legacypool.LegacyPool {
12671274
Lifetime: legacypool.DefaultConfig.Lifetime,
12681275
ReannounceTimeThreshold: legacypool.DefaultConfig.ReannounceTimeThreshold,
12691276
ReannounceRemotes: false,
1270-
}, c.chain)
1271-
p.Init(legacypool.DefaultConfig.PriceLimit, parent, func(addr common.Address, reserve bool) error { return nil })
1277+
}, chain)
1278+
}
12721279

1273-
return p
1280+
// initStaticPool initializes the static pool with the provided parent header.
1281+
func (c *DBFT) initStaticPool(parent *types.Header) {
1282+
c.staticPool.InitStatic(legacypool.DefaultConfig.PriceLimit, parent, func(addr common.Address, reserve bool) error { return nil })
12741283
}
12751284

12761285
// validateDecryptedTx checks the validity of the transaction to determine whether the outer envelope transaction should be replaced.
@@ -2032,6 +2041,7 @@ func (c *DBFT) Start(chain ChainHeaderWriter) {
20322041
if c.dbftStarted.CompareAndSwap(false, true) {
20332042
c.chain = chain
20342043
c.blockQueue.chain = chain
2044+
c.staticPool = newStaticPool(c.chain)
20352045

20362046
// Subscribe for minted blocks prior to accessing current chain header.
20372047
// Sealing proposal awaiting may take some time during which new blocks may

core/txpool/legacypool/legacypool.go

Lines changed: 82 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -278,37 +278,36 @@ func New(config Config, chain BlockChain) *LegacyPool {
278278
// Sanitize the input to ensure no vulnerable gas prices are set
279279
config = (&config).sanitize()
280280

281-
// Create the transaction pool with its initial settings
282-
pool := &LegacyPool{
283-
config: config,
284-
chain: chain,
285-
chainconfig: chain.Config(),
286-
signer: types.LatestSigner(chain.Config()),
287-
pending: make(map[common.Address]*list),
288-
cached: make(map[common.Address]*list),
289-
queue: make(map[common.Address]*list),
290-
beats: make(map[common.Address]time.Time),
291-
all: newLookup(),
292-
reqResetCh: make(chan *txpoolResetRequest),
293-
reqPromoteCh: make(chan *accountSet),
294-
queueTxEventCh: make(chan *types.Transaction),
295-
reorgDoneCh: make(chan chan struct{}),
296-
reorgShutdownCh: make(chan struct{}),
297-
initDoneCh: make(chan struct{}),
298-
}
299-
pool.locals = newAccountSet(pool.signer)
300-
for _, addr := range config.Locals {
301-
log.Info("Setting new local account", "address", addr)
302-
pool.locals.add(addr)
303-
}
304-
pool.priced = newPricedList(pool.all)
305-
281+
// Create the transaction pool and its channels
282+
pool := NewStatic(config, chain)
283+
pool.reqResetCh = make(chan *txpoolResetRequest)
284+
pool.reqPromoteCh = make(chan *accountSet)
285+
pool.queueTxEventCh = make(chan *types.Transaction)
286+
pool.reorgDoneCh = make(chan chan struct{})
287+
pool.reorgShutdownCh = make(chan struct{})
288+
pool.initDoneCh = make(chan struct{})
306289
if !config.NoLocals && config.Journal != "" {
307290
pool.journal = newTxJournal(config.Journal)
308291
}
309292
return pool
310293
}
311294

295+
// NewStatic creates a new transaction pool without any channel
296+
// and journal. A pool created in this way should only be inited
297+
// with InitStatic.
298+
func NewStatic(config Config, chain BlockChain) *LegacyPool {
299+
// Create the transaction pool with its initial settings
300+
pool := &LegacyPool{
301+
config: config,
302+
chain: chain,
303+
chainconfig: chain.Config(),
304+
signer: types.LatestSigner(chain.Config()),
305+
}
306+
pool.setEmptyLists()
307+
pool.setInitialLocals()
308+
return pool
309+
}
310+
312311
// Filter returns whether the given transaction can be consumed by the legacy
313312
// pool, specifically, whether it is a Legacy, AccessList or Dynamic transaction.
314313
func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
@@ -325,6 +324,32 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
325324
// from disk and filtered based on the provided starting settings. The internal
326325
// goroutines will be spun up and the pool deemed operational afterwards.
327326
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
327+
// Do basic initializations about reserver, gas price and state
328+
pool.InitStatic(gasTip, head, reserve)
329+
330+
// Start the reorg loop early, so it can handle requests generated during
331+
// journal loading.
332+
pool.wg.Add(1)
333+
go pool.scheduleReorgLoop()
334+
335+
// If local transactions and journaling is enabled, load from disk
336+
if pool.journal != nil {
337+
if err := pool.journal.load(pool.addLocals); err != nil {
338+
log.Warn("Failed to load transaction journal", "err", err)
339+
}
340+
if err := pool.journal.rotate(pool.local()); err != nil {
341+
log.Warn("Failed to rotate transaction journal", "err", err)
342+
}
343+
}
344+
pool.wg.Add(1)
345+
go pool.loop()
346+
return nil
347+
}
348+
349+
// InitStatic sets the gas price needed to keep a transaction in the pool
350+
// and the chain head to allow balance / nonce checks. This method doesn't
351+
// start loops or load journals, so the pool can be released automatically.
352+
func (pool *LegacyPool) InitStatic(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
328353
// Set the address reserver to request exclusive access to pooled accounts
329354
pool.reserve = reserve
330355

@@ -344,24 +369,34 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
344369
pool.currentHead.Store(head)
345370
pool.currentState = statedb
346371
pool.pendingNonces = newNoncer(statedb)
372+
return nil
373+
}
347374

348-
// Start the reorg loop early, so it can handle requests generated during
349-
// journal loading.
350-
pool.wg.Add(1)
351-
go pool.scheduleReorgLoop()
375+
// ResetStatic reverts any transaction insertion to a pool created by NewStatic
376+
// and inited by InitStatic, so that the pool can be reused as only initialized.
377+
func (pool *LegacyPool) ResetStatic() {
378+
pool.setEmptyLists()
379+
pool.setInitialLocals()
380+
pool.pendingNonces = newNoncer(pool.currentState)
381+
}
352382

353-
// If local transactions and journaling is enabled, load from disk
354-
if pool.journal != nil {
355-
if err := pool.journal.load(pool.addLocals); err != nil {
356-
log.Warn("Failed to load transaction journal", "err", err)
357-
}
358-
if err := pool.journal.rotate(pool.local()); err != nil {
359-
log.Warn("Failed to rotate transaction journal", "err", err)
360-
}
383+
// setEmptyLists sets all transaction records of the pool to empty.
384+
func (pool *LegacyPool) setEmptyLists() {
385+
pool.pending = make(map[common.Address]*list)
386+
pool.cached = make(map[common.Address]*list)
387+
pool.queue = make(map[common.Address]*list)
388+
pool.beats = make(map[common.Address]time.Time)
389+
pool.all = newLookup()
390+
pool.priced = newPricedList(pool.all)
391+
}
392+
393+
// setInitialLocals resets the local accounts of the pool to initially configured.
394+
func (pool *LegacyPool) setInitialLocals() {
395+
pool.locals = newAccountSet(pool.signer)
396+
for _, addr := range pool.config.Locals {
397+
log.Info("Setting new local account", "address", addr)
398+
pool.locals.add(addr)
361399
}
362-
pool.wg.Add(1)
363-
go pool.loop()
364-
return nil
365400
}
366401

367402
// loop is the transaction pool's main event loop, waiting for and reacting to
@@ -475,21 +510,16 @@ func (pool *LegacyPool) loop() {
475510
}
476511
}
477512

478-
// CloseSilently is the same as Close(), but doesn't log. Intended to be used
479-
// for short-lived verification pools (not the main node tx pool).
480-
func (pool *LegacyPool) CloseSilently() {
513+
// Close terminates the transaction pool. If the pool is created through NewStatic
514+
// and inited through InitStatic, then this operation is not necessary.
515+
func (pool *LegacyPool) Close() error {
481516
// Terminate the pool reorger and return
482517
close(pool.reorgShutdownCh)
483518
pool.wg.Wait()
484519

485520
if pool.journal != nil {
486521
pool.journal.close()
487522
}
488-
}
489-
490-
// Close terminates the transaction pool.
491-
func (pool *LegacyPool) Close() error {
492-
pool.CloseSilently()
493523
log.Info("Transaction pool stopped")
494524
return nil
495525
}
@@ -1185,6 +1215,10 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
11851215
errs[nilSlot] = err
11861216
nilSlot++
11871217
}
1218+
// Return if no channel is initialized, ref NewStatic and InitStatic
1219+
if pool.reqPromoteCh == nil {
1220+
return errs
1221+
}
11881222
// Reorg the pool internals if needed and return
11891223
done := pool.requestPromoteExecutables(dirtyAddrs)
11901224
if sync {

0 commit comments

Comments
 (0)