Skip to content

Commit 8a20e87

Browse files
core/txpool/legacypool: move queue out of main txpool
1 parent 264c06a commit 8a20e87

File tree

2 files changed

+83
-199
lines changed

2 files changed

+83
-199
lines changed

core/txpool/legacypool/legacypool.go

Lines changed: 47 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"math"
2424
"math/big"
2525
"slices"
26-
"sort"
2726
"sync"
2827
"sync/atomic"
2928
"time"
@@ -238,11 +237,10 @@ type LegacyPool struct {
238237
pendingNonces *noncer // Pending state tracking virtual nonces
239238
reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools
240239

241-
pending map[common.Address]*list // All currently processable transactions
242-
queue map[common.Address]*list // Queued but non-processable transactions
243-
beats map[common.Address]time.Time // Last heartbeat from each known account
244-
all *lookup // All transactions to allow lookups
245-
priced *pricedList // All transactions sorted by price
240+
pending map[common.Address]*list // All currently processable transactions
241+
queue *queue
242+
all *lookup // All transactions to allow lookups
243+
priced *pricedList // All transactions sorted by price
246244

247245
reqResetCh chan *txpoolResetRequest
248246
reqPromoteCh chan *accountSet
@@ -266,14 +264,14 @@ func New(config Config, chain BlockChain) *LegacyPool {
266264
config = (&config).sanitize()
267265

268266
// Create the transaction pool with its initial settings
267+
signer := types.LatestSigner(chain.Config())
269268
pool := &LegacyPool{
270269
config: config,
271270
chain: chain,
272271
chainconfig: chain.Config(),
273-
signer: types.LatestSigner(chain.Config()),
272+
signer: signer,
274273
pending: make(map[common.Address]*list),
275-
queue: make(map[common.Address]*list),
276-
beats: make(map[common.Address]time.Time),
274+
queue: newQueue(config, signer),
277275
all: newLookup(),
278276
reqResetCh: make(chan *txpoolResetRequest),
279277
reqPromoteCh: make(chan *accountSet),
@@ -369,15 +367,9 @@ func (pool *LegacyPool) loop() {
369367
// Handle inactive account transaction eviction
370368
case <-evict.C:
371369
pool.mu.Lock()
372-
for addr := range pool.queue {
373-
// Any old enough should be removed
374-
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
375-
list := pool.queue[addr].Flatten()
376-
for _, tx := range list {
377-
pool.removeTx(tx.Hash(), true, true)
378-
}
379-
queuedEvictionMeter.Mark(int64(len(list)))
380-
}
370+
evicted := pool.queue.evict(false)
371+
for _, hash := range evicted {
372+
pool.removeTx(hash, true, true)
381373
}
382374
pool.mu.Unlock()
383375
}
@@ -459,11 +451,7 @@ func (pool *LegacyPool) stats() (int, int) {
459451
for _, list := range pool.pending {
460452
pending += list.Len()
461453
}
462-
queued := 0
463-
for _, list := range pool.queue {
464-
queued += list.Len()
465-
}
466-
return pending, queued
454+
return pending, pool.queue.stats()
467455
}
468456

469457
// Content retrieves the data content of the transaction pool, returning all the
@@ -476,10 +464,7 @@ func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[
476464
for addr, list := range pool.pending {
477465
pending[addr] = list.Flatten()
478466
}
479-
queued := make(map[common.Address][]*types.Transaction, len(pool.queue))
480-
for addr, list := range pool.queue {
481-
queued[addr] = list.Flatten()
482-
}
467+
queued := pool.queue.content()
483468
return pending, queued
484469
}
485470

@@ -493,10 +478,7 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
493478
if list, ok := pool.pending[addr]; ok {
494479
pending = list.Flatten()
495480
}
496-
var queued []*types.Transaction
497-
if list, ok := pool.queue[addr]; ok {
498-
queued = list.Flatten()
499-
}
481+
queued := pool.queue.contentFrom(addr)
500482
return pending, queued
501483
}
502484

@@ -655,7 +637,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
655637
if pending := pool.pending[auth]; pending != nil {
656638
count += pending.Len()
657639
}
658-
if queue := pool.queue[auth]; queue != nil {
640+
if queue, ok := pool.queue.get(auth); ok {
659641
count += queue.Len()
660642
}
661643
if count > 1 {
@@ -702,7 +684,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
702684
// only by this subpool until all transactions are evicted
703685
var (
704686
_, hasPending = pool.pending[from]
705-
_, hasQueued = pool.queue[from]
687+
_, hasQueued = pool.queue.get(from)
706688
)
707689
if !hasPending && !hasQueued {
708690
if err := pool.reserver.Hold(from); err != nil {
@@ -801,7 +783,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
801783
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
802784

803785
// Successful promotion, bump the heartbeat
804-
pool.beats[from] = time.Now()
786+
pool.queue.bump(from)
805787
return old != nil, nil
806788
}
807789
// New transaction isn't replacing a pending one, push into queue
@@ -826,7 +808,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
826808
}
827809
// The transaction has a nonce gap with pending list, it's only considered
828810
// as executable if transactions in queue can fill up the nonce gap.
829-
queue, ok := pool.queue[from]
811+
queue, ok := pool.queue.get(from)
830812
if !ok {
831813
return true
832814
}
@@ -842,25 +824,12 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
842824
//
843825
// Note, this method assumes the pool lock is held!
844826
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
845-
// Try to insert the transaction into the future queue
846-
from, _ := types.Sender(pool.signer, tx) // already validated
847-
if pool.queue[from] == nil {
848-
pool.queue[from] = newList(false)
849-
}
850-
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
851-
if !inserted {
852-
// An older transaction was better, discard this
853-
queuedDiscardMeter.Mark(1)
854-
return false, txpool.ErrReplaceUnderpriced
827+
replaced, err := pool.queue.add(hash, tx)
828+
if err != nil {
829+
return false, err
855830
}
856-
// Discard any previous transaction and mark this
857-
if old != nil {
858-
pool.all.Remove(old.Hash())
859-
pool.priced.Removed(1)
860-
queuedReplaceMeter.Mark(1)
861-
} else {
862-
// Nothing was replaced, bump the queued counter
863-
queuedGauge.Inc(1)
831+
if replaced != nil {
832+
pool.removeTx(*replaced, true, true)
864833
}
865834
// If the transaction isn't in lookup set but it's expected to be there,
866835
// show the error log.
@@ -871,11 +840,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl
871840
pool.all.Add(tx)
872841
pool.priced.Put(tx)
873842
}
874-
// If we never record the heartbeat, do it right now.
875-
if _, exist := pool.beats[from]; !exist {
876-
pool.beats[from] = time.Now()
877-
}
878-
return old != nil, nil
843+
return replaced != nil, nil
879844
}
880845

881846
// promoteTx adds a transaction to the pending (processable) list of transactions
@@ -910,7 +875,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
910875
pool.pendingNonces.set(addr, tx.Nonce()+1)
911876

912877
// Successful promotion, bump the heartbeat
913-
pool.beats[addr] = time.Now()
878+
pool.queue.bump(addr)
914879
return true
915880
}
916881

@@ -1023,7 +988,7 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus {
1023988

1024989
if txList := pool.pending[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
1025990
return txpool.TxStatusPending
1026-
} else if txList := pool.queue[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
991+
} else if txList, ok := pool.queue.get(from); ok && txList.txs.items[tx.Nonce()] != nil {
1027992
return txpool.TxStatusQueued
1028993
}
1029994
return txpool.TxStatusUnknown
@@ -1100,7 +1065,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
11001065
defer func() {
11011066
var (
11021067
_, hasPending = pool.pending[addr]
1103-
_, hasQueued = pool.queue[addr]
1068+
_, hasQueued = pool.queue.get(addr)
11041069
)
11051070
if !hasPending && !hasQueued {
11061071
pool.reserver.Release(addr)
@@ -1132,16 +1097,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
11321097
}
11331098
}
11341099
// Transaction is in the future queue
1135-
if future := pool.queue[addr]; future != nil {
1136-
if removed, _ := future.Remove(tx); removed {
1137-
// Reduce the queued counter
1138-
queuedGauge.Dec(1)
1139-
}
1140-
if future.Empty() {
1141-
delete(pool.queue, addr)
1142-
delete(pool.beats, addr)
1143-
}
1144-
}
1100+
pool.queue.removeTx(addr, tx)
11451101
return 0
11461102
}
11471103

@@ -1289,10 +1245,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
12891245
}
12901246
}
12911247
// Reset needs promote for all addresses
1292-
promoteAddrs = make([]common.Address, 0, len(pool.queue))
1293-
for addr := range pool.queue {
1294-
promoteAddrs = append(promoteAddrs, addr)
1295-
}
1248+
promoteAddrs = append(promoteAddrs, pool.queue.addresses()...)
12961249
}
12971250
// Check for pending transactions for every account that sent new ones
12981251
promoted := pool.promoteExecutables(promoteAddrs)
@@ -1446,61 +1399,21 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
14461399
// future queue to the set of pending transactions. During this process, all
14471400
// invalidated transactions (low nonce, low balance) are deleted.
14481401
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
1449-
// Track the promoted transactions to broadcast them at once
1450-
var promoted []*types.Transaction
1451-
1452-
// Iterate over all accounts and promote any executable transactions
14531402
gasLimit := pool.currentHead.Load().GasLimit
1454-
for _, addr := range accounts {
1455-
list := pool.queue[addr]
1456-
if list == nil {
1457-
continue // Just in case someone calls with a non existing account
1458-
}
1459-
// Drop all transactions that are deemed too old (low nonce)
1460-
forwards := list.Forward(pool.currentState.GetNonce(addr))
1461-
for _, tx := range forwards {
1462-
pool.all.Remove(tx.Hash())
1463-
}
1464-
log.Trace("Removed old queued transactions", "count", len(forwards))
1465-
// Drop all transactions that are too costly (low balance or out of gas)
1466-
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
1467-
for _, tx := range drops {
1468-
pool.all.Remove(tx.Hash())
1403+
promoteable, removeable := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces)
1404+
promoted := make([]*types.Transaction, 0, len(promoteable))
1405+
1406+
// promote all promoteable transactions
1407+
for _, tx := range promoteable {
1408+
from, _ := pool.signer.Sender(tx)
1409+
if pool.promoteTx(from, tx.Hash(), tx) {
1410+
promoted = append(promoted, tx)
14691411
}
1470-
log.Trace("Removed unpayable queued transactions", "count", len(drops))
1471-
queuedNofundsMeter.Mark(int64(len(drops)))
1472-
1473-
// Gather all executable transactions and promote them
1474-
readies := list.Ready(pool.pendingNonces.get(addr))
1475-
for _, tx := range readies {
1476-
hash := tx.Hash()
1477-
if pool.promoteTx(addr, hash, tx) {
1478-
promoted = append(promoted, tx)
1479-
}
1480-
}
1481-
log.Trace("Promoted queued transactions", "count", len(promoted))
1482-
queuedGauge.Dec(int64(len(readies)))
1483-
1484-
// Drop all transactions over the allowed limit
1485-
var caps = list.Cap(int(pool.config.AccountQueue))
1486-
for _, tx := range caps {
1487-
hash := tx.Hash()
1488-
pool.all.Remove(hash)
1489-
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
1490-
}
1491-
queuedRateLimitMeter.Mark(int64(len(caps)))
1492-
// Mark all the items dropped as removed
1493-
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
1494-
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
1412+
}
14951413

1496-
// Delete the entire queue entry if it became empty.
1497-
if list.Empty() {
1498-
delete(pool.queue, addr)
1499-
delete(pool.beats, addr)
1500-
if _, ok := pool.pending[addr]; !ok {
1501-
pool.reserver.Release(addr)
1502-
}
1503-
}
1414+
// remove all removable transactions
1415+
for _, hash := range removeable {
1416+
pool.removeTx(hash, true, true)
15041417
}
15051418
return promoted
15061419
}
@@ -1589,44 +1502,10 @@ func (pool *LegacyPool) truncatePending() {
15891502

15901503
// truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit.
15911504
func (pool *LegacyPool) truncateQueue() {
1592-
queued := uint64(0)
1593-
for _, list := range pool.queue {
1594-
queued += uint64(list.Len())
1595-
}
1596-
if queued <= pool.config.GlobalQueue {
1597-
return
1598-
}
1599-
1600-
// Sort all accounts with queued transactions by heartbeat
1601-
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
1602-
for addr := range pool.queue {
1603-
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
1604-
}
1605-
sort.Sort(sort.Reverse(addresses))
1606-
1607-
// Drop transactions until the total is below the limit
1608-
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
1609-
addr := addresses[len(addresses)-1]
1610-
list := pool.queue[addr.address]
1505+
removed := pool.queue.truncate()
16111506

1612-
addresses = addresses[:len(addresses)-1]
1613-
1614-
// Drop all transactions if they are less than the overflow
1615-
if size := uint64(list.Len()); size <= drop {
1616-
for _, tx := range list.Flatten() {
1617-
pool.removeTx(tx.Hash(), true, true)
1618-
}
1619-
drop -= size
1620-
queuedRateLimitMeter.Mark(int64(size))
1621-
continue
1622-
}
1623-
// Otherwise drop only last few transactions
1624-
txs := list.Flatten()
1625-
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
1626-
pool.removeTx(txs[i].Hash(), true, true)
1627-
drop--
1628-
queuedRateLimitMeter.Mark(1)
1629-
}
1507+
for _, hash := range removed {
1508+
pool.removeTx(hash, true, false)
16301509
}
16311510
}
16321511

@@ -1683,7 +1562,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
16831562
// Delete the entire pending entry if it became empty.
16841563
if list.Empty() {
16851564
delete(pool.pending, addr)
1686-
if _, ok := pool.queue[addr]; !ok {
1565+
if _, ok := pool.queue.get(addr); !ok {
16871566
pool.reserver.Release(addr)
16881567
}
16891568
}
@@ -1942,17 +1821,17 @@ func (pool *LegacyPool) Clear() {
19421821
// acquire the subpool lock until the transaction addition is completed.
19431822

19441823
for addr := range pool.pending {
1945-
if _, ok := pool.queue[addr]; !ok {
1824+
if _, ok := pool.queue.get(addr); !ok {
19461825
pool.reserver.Release(addr)
19471826
}
19481827
}
1949-
for addr := range pool.queue {
1828+
for _, addr := range pool.queue.addresses() {
19501829
pool.reserver.Release(addr)
19511830
}
19521831
pool.all.Clear()
19531832
pool.priced.Reheap()
19541833
pool.pending = make(map[common.Address]*list)
1955-
pool.queue = make(map[common.Address]*list)
1834+
pool.queue = newQueue(pool.config, pool.signer)
19561835
pool.pendingNonces = newNoncer(pool.currentState)
19571836
}
19581837

0 commit comments

Comments
 (0)