Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 47 additions & 168 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math"
"math/big"
"slices"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -238,11 +237,10 @@ type LegacyPool struct {
pendingNonces *noncer // Pending state tracking virtual nonces
reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools

pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
pending map[common.Address]*list // All currently processable transactions
queue *queue
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand All @@ -266,14 +264,14 @@ func New(config Config, chain BlockChain) *LegacyPool {
config = (&config).sanitize()

// Create the transaction pool with its initial settings
signer := types.LatestSigner(chain.Config())
pool := &LegacyPool{
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
signer: signer,
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
queue: newQueue(config, signer),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
Expand Down Expand Up @@ -369,15 +367,9 @@ func (pool *LegacyPool) loop() {
// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Any old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true, true)
}
queuedEvictionMeter.Mark(int64(len(list)))
}
evicted := pool.queue.evict(false)
for _, hash := range evicted {
pool.removeTx(hash, true, true)
}
pool.mu.Unlock()
}
Expand Down Expand Up @@ -459,11 +451,7 @@ func (pool *LegacyPool) stats() (int, int) {
for _, list := range pool.pending {
pending += list.Len()
}
queued := 0
for _, list := range pool.queue {
queued += list.Len()
}
return pending, queued
return pending, pool.queue.stats()
}

// Content retrieves the data content of the transaction pool, returning all the
Expand All @@ -476,10 +464,7 @@ func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
queued := make(map[common.Address][]*types.Transaction, len(pool.queue))
for addr, list := range pool.queue {
queued[addr] = list.Flatten()
}
queued := pool.queue.content()
return pending, queued
}

Expand All @@ -493,10 +478,7 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
if list, ok := pool.pending[addr]; ok {
pending = list.Flatten()
}
var queued []*types.Transaction
if list, ok := pool.queue[addr]; ok {
queued = list.Flatten()
}
queued := pool.queue.contentFrom(addr)
return pending, queued
}

Expand Down Expand Up @@ -655,7 +637,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
if pending := pool.pending[auth]; pending != nil {
count += pending.Len()
}
if queue := pool.queue[auth]; queue != nil {
if queue, ok := pool.queue.get(auth); ok {
count += queue.Len()
}
if count > 1 {
Expand Down Expand Up @@ -702,7 +684,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
// only by this subpool until all transactions are evicted
var (
_, hasPending = pool.pending[from]
_, hasQueued = pool.queue[from]
_, hasQueued = pool.queue.get(from)
)
if !hasPending && !hasQueued {
if err := pool.reserver.Hold(from); err != nil {
Expand Down Expand Up @@ -801,7 +783,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
pool.queue.bump(from)
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
Expand All @@ -826,7 +808,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
}
// The transaction has a nonce gap with pending list, it's only considered
// as executable if transactions in queue can fill up the nonce gap.
queue, ok := pool.queue[from]
queue, ok := pool.queue.get(from)
if !ok {
return true
}
Expand All @@ -842,25 +824,12 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
//
// Note, this method assumes the pool lock is held!
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
return false, txpool.ErrReplaceUnderpriced
replaced, err := pool.queue.add(hash, tx)
if err != nil {
return false, err
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
if replaced != nil {
pool.removeTx(*replaced, true, true)
}
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
Expand All @@ -871,11 +840,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl
pool.all.Add(tx)
pool.priced.Put(tx)
}
// If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist {
pool.beats[from] = time.Now()
}
return old != nil, nil
return replaced != nil, nil
}

// promoteTx adds a transaction to the pending (processable) list of transactions
Expand Down Expand Up @@ -910,7 +875,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
pool.pendingNonces.set(addr, tx.Nonce()+1)

// Successful promotion, bump the heartbeat
pool.beats[addr] = time.Now()
pool.queue.bump(addr)
return true
}

Expand Down Expand Up @@ -1023,7 +988,7 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus {

if txList := pool.pending[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
return txpool.TxStatusPending
} else if txList := pool.queue[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
} else if txList, ok := pool.queue.get(from); ok && txList.txs.items[tx.Nonce()] != nil {
return txpool.TxStatusQueued
}
return txpool.TxStatusUnknown
Expand Down Expand Up @@ -1100,7 +1065,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
defer func() {
var (
_, hasPending = pool.pending[addr]
_, hasQueued = pool.queue[addr]
_, hasQueued = pool.queue.get(addr)
)
if !hasPending && !hasQueued {
pool.reserver.Release(addr)
Expand Down Expand Up @@ -1132,16 +1097,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedGauge.Dec(1)
}
if future.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
pool.queue.removeTx(addr, tx)
return 0
}

Expand Down Expand Up @@ -1289,10 +1245,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
}
// Reset needs promote for all addresses
promoteAddrs = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
}
promoteAddrs = append(promoteAddrs, pool.queue.addresses()...)
}
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
Expand Down Expand Up @@ -1446,61 +1399,21 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction

// Iterate over all accounts and promote any executable transactions
gasLimit := pool.currentHead.Load().GasLimit
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
pool.all.Remove(tx.Hash())
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
pool.all.Remove(tx.Hash())
promoteable, removeable := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces)
promoted := make([]*types.Transaction, 0, len(promoteable))

// promote all promoteable transactions
for _, tx := range promoteable {
from, _ := pool.signer.Sender(tx)
if pool.promoteTx(from, tx.Hash(), tx) {
promoted = append(promoted, tx)
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))

// Gather all executable transactions and promote them
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
promoted = append(promoted, tx)
}
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))

// Drop all transactions over the allowed limit
var caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}

// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
if _, ok := pool.pending[addr]; !ok {
pool.reserver.Release(addr)
}
}
// remove all removable transactions
for _, hash := range removeable {
pool.removeTx(hash, true, true)
}
return promoted
}
Expand Down Expand Up @@ -1589,44 +1502,10 @@ func (pool *LegacyPool) truncatePending() {

// truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit.
func (pool *LegacyPool) truncateQueue() {
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued <= pool.config.GlobalQueue {
return
}

// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
sort.Sort(sort.Reverse(addresses))

// Drop transactions until the total is below the limit
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
removed := pool.queue.truncate()

addresses = addresses[:len(addresses)-1]

// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true, true)
}
drop -= size
queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true, true)
drop--
queuedRateLimitMeter.Mark(1)
}
for _, hash := range removed {
pool.removeTx(hash, true, false)
}
Comment on lines 1516 to 1527
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removeTx instead of pool.all.Remove, and why is it with unreserve=false ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added patch to use pool.all.Remove, and unreserve as needed.

}

Expand Down Expand Up @@ -1683,7 +1562,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
if _, ok := pool.queue[addr]; !ok {
if _, ok := pool.queue.get(addr); !ok {
pool.reserver.Release(addr)
}
}
Expand Down Expand Up @@ -1942,17 +1821,17 @@ func (pool *LegacyPool) Clear() {
// acquire the subpool lock until the transaction addition is completed.

for addr := range pool.pending {
if _, ok := pool.queue[addr]; !ok {
if _, ok := pool.queue.get(addr); !ok {
pool.reserver.Release(addr)
}
}
for addr := range pool.queue {
for _, addr := range pool.queue.addresses() {
pool.reserver.Release(addr)
}
pool.all.Clear()
pool.priced.Reheap()
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)
pool.queue = newQueue(pool.config, pool.signer)
pool.pendingNonces = newNoncer(pool.currentState)
}

Expand Down
Loading