Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var (
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")

// ErrAccountLimitExceeded is returned if a transaction would exceed the number
// allowed by a pool for a single account.
ErrAccountLimitExceeded = errors.New("account limit exceeded")

// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")
Expand Down
118 changes: 104 additions & 14 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ type LegacyPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk

reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
Expand Down Expand Up @@ -307,7 +308,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings. The internal
// goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header) error {
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts
pool.reserve = reserve

// Set the basic pool parameters
pool.gasTip.Store(gasTip)
pool.reset(nil, head)
Expand Down Expand Up @@ -377,7 +381,7 @@ func (pool *LegacyPool) loop() {
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
}
queuedEvictionMeter.Mark(int64(len(list)))
}
Expand Down Expand Up @@ -455,7 +459,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) error {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(tip)
for _, tx := range drop {
pool.removeTx(tx.Hash(), false)
pool.removeTx(tx.Hash(), false, true)
}
pool.priced.Removed(len(drop))
}
Expand Down Expand Up @@ -536,11 +540,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The enforceTips parameter can be used to do an extra filtering on the pending
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock()
defer pool.mu.Unlock()

pending := make(map[common.Address][]*types.Transaction, len(pool.pending))
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()

Expand All @@ -554,7 +558,18 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Tr
}
}
if len(txs) > 0 {
pending[addr] = txs
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: &txpool.Transaction{Tx: txs[i]},
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
}
}
pending[addr] = lazies
}
}
return pending
Expand Down Expand Up @@ -617,6 +632,16 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
State: pool.currentState,

FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
UsedAndLeftSlots: func(addr common.Address) (int, int) {
var have int
if list := pool.pending[addr]; list != nil {
have += list.Len()
}
if list := pool.queue[addr]; list != nil {
have += list.Len()
}
return have, math.MaxInt
},
ExistingExpenditure: func(addr common.Address) *big.Int {
if list := pool.pending[addr]; list != nil {
return list.totalcost
Expand Down Expand Up @@ -690,13 +715,35 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
invalidTxMeter.Mark(1)
return false, err
}

// already validated by this point
from, _ := types.Sender(pool.signer, tx)

if tx.IsSpecialTransaction() && pool.IsSigner(from) && pool.pendingNonces.get(from) == tx.Nonce() {
return pool.promoteSpecialTx(from, tx, isLocal)
}

// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
var (
_, hasPending = pool.pending[from]
_, hasQueued = pool.queue[from]
)
if !hasPending && !hasQueued {
if err := pool.reserve(from, true); err != nil {
return false, err
}
defer func() {
// If the transaction is rejected by some post-validation check, remove
// the lock on the reservation set.
//
// Note, `err` here is the named error return, which will be initialized
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
pool.reserve(from, false)
}
}()
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
Expand Down Expand Up @@ -750,7 +797,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
dropped := pool.removeTx(tx.Hash(), false)

sender, _ := types.Sender(pool.signer, tx)
dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc

pool.changesSinceReorg += dropped
}
}
Expand Down Expand Up @@ -1113,15 +1163,35 @@ func (pool *LegacyPool) Has(hash common.Hash) bool {

// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
//
// In unreserve is false, the account will not be relinquished to the main txpool
// even if there are no more references to it. This is used to handle a race when
// a tx being added, and it evicts a previously scheduled tx from the same account,
// which could lead to a premature release of the lock.
//
// Returns the number of transactions removed from the pending queue.
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int {
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return 0
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

// If after deletion there are no more transactions belonging to this account,
// relinquish the address reservation. It's a bit convoluted do this, via a
// defer, but it's safer vs. the many return pathways.
if unreserve {
defer func() {
var (
_, hasPending = pool.pending[addr]
_, hasQueued = pool.queue[addr]
)
if !hasPending && !hasQueued {
pool.reserve(addr, false)
}
}()
}
// Remove it from the list of known transactions
pool.all.Remove(hash)
if outofbound {
Expand Down Expand Up @@ -1355,7 +1425,6 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
Expand All @@ -1367,7 +1436,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// there's nothing to add
if newNum >= oldNum {
// If we reorged to a same or higher number, then it's not a case of setHead
log.Warn("Transaction pool reset with missing oldhead",
log.Warn("Transaction pool reset with missing old head",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
return
}
Expand All @@ -1376,6 +1445,15 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
// We still need to update the current state s.th. the lost transactions can be readded by the user
} else {
if add == nil {
// if the new head is nil, it means that something happened between
// the firing of newhead-event and _now_: most likely a
// reorg caused by sync-reversion or explicit sethead back to an
// earlier block.
log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash())
return
}
var discarded, included types.Transactions
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
Expand All @@ -1402,7 +1480,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
return
}
}
reinject = types.TxDifference(discarded, included)
lost := make([]*types.Transaction, 0, len(discarded))
for _, tx := range types.TxDifference(discarded, included) {
if pool.Filter(tx) {
lost = append(lost, tx)
}
}
reinject = lost
}
}
}
Expand Down Expand Up @@ -1497,6 +1581,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
if _, ok := pool.pending[addr]; !ok {
pool.reserve(addr, false)
}
}
}
return promoted
Expand Down Expand Up @@ -1618,7 +1705,7 @@ func (pool *LegacyPool) truncateQueue() {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
}
drop -= size
queuedRateLimitMeter.Mark(int64(size))
Expand All @@ -1627,7 +1714,7 @@ func (pool *LegacyPool) truncateQueue() {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
pool.removeTx(txs[i].Hash(), true, true)
drop--
queuedRateLimitMeter.Mark(1)
}
Expand Down Expand Up @@ -1694,6 +1781,9 @@ func (pool *LegacyPool) demoteUnexecutables() {
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
if _, ok := pool.queue[addr]; !ok {
pool.reserve(addr, false)
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/txpool/legacypool/legacypool2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestTransactionFutureAttack(t *testing.T) {
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
fillPool(t, pool)
pending, _ := pool.Stats()
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestTransactionFuture1559(t *testing.T) {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()

// Create a number of test accounts, fund them and make transactions
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestTransactionZAttack(t *testing.T) {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
Expand Down Expand Up @@ -226,7 +226,7 @@ func BenchmarkFutureAttack(b *testing.B) {
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
fillPool(b, pool)

Expand Down
Loading