@@ -231,6 +231,7 @@ type LegacyPool struct {
231231 locals * accountSet // Set of local transaction to exempt from eviction rules
232232 journal * journal // Journal of local transaction to back up to disk
233233
234+ reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
234235 pending map [common.Address ]* list // All currently processable transactions
235236 queue map [common.Address ]* list // Queued but non-processable transactions
236237 beats map [common.Address ]time.Time // Last heartbeat from each known account
@@ -307,7 +308,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
307308// head to allow balance / nonce checks. The transaction journal will be loaded
308309// from disk and filtered based on the provided starting settings. The internal
309310// goroutines will be spun up and the pool deemed operational afterwards.
310- func (pool * LegacyPool ) Init (gasTip * big.Int , head * types.Header ) error {
311+ func (pool * LegacyPool ) Init (gasTip * big.Int , head * types.Header , reserve txpool.AddressReserver ) error {
312+ // Set the address reserver to request exclusive access to pooled accounts
313+ pool .reserve = reserve
314+
311315 // Set the basic pool parameters
312316 pool .gasTip .Store (gasTip )
313317
@@ -390,7 +394,7 @@ func (pool *LegacyPool) loop() {
390394 if time .Since (pool .beats [addr ]) > pool .config .Lifetime {
391395 list := pool .queue [addr ].Flatten ()
392396 for _ , tx := range list {
393- pool .removeTx (tx .Hash (), true )
397+ pool .removeTx (tx .Hash (), true , true )
394398 }
395399 queuedEvictionMeter .Mark (int64 (len (list )))
396400 }
@@ -468,7 +472,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) error {
468472 // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
469473 drop := pool .all .RemotesBelowTip (tip )
470474 for _ , tx := range drop {
471- pool .removeTx (tx .Hash (), false )
475+ pool .removeTx (tx .Hash (), false , true )
472476 }
473477 pool .priced .Removed (len (drop ))
474478 }
@@ -549,11 +553,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
549553// The enforceTips parameter can be used to do an extra filtering on the pending
550554// transactions and only return those whose **effective** tip is large enough in
551555// the next pending execution environment.
552- func (pool * LegacyPool ) Pending (enforceTips bool ) map [common.Address ][]* types. Transaction {
556+ func (pool * LegacyPool ) Pending (enforceTips bool ) map [common.Address ][]* txpool. LazyTransaction {
553557 pool .mu .Lock ()
554558 defer pool .mu .Unlock ()
555559
556- pending := make (map [common.Address ][]* types. Transaction , len (pool .pending ))
560+ pending := make (map [common.Address ][]* txpool. LazyTransaction , len (pool .pending ))
557561 for addr , list := range pool .pending {
558562 txs := list .Flatten ()
559563
@@ -567,7 +571,18 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Tr
567571 }
568572 }
569573 if len (txs ) > 0 {
570- pending [addr ] = txs
574+ lazies := make ([]* txpool.LazyTransaction , len (txs ))
575+ for i := 0 ; i < len (txs ); i ++ {
576+ lazies [i ] = & txpool.LazyTransaction {
577+ Pool : pool ,
578+ Hash : txs [i ].Hash (),
579+ Tx : & txpool.Transaction {Tx : txs [i ]},
580+ Time : txs [i ].Time (),
581+ GasFeeCap : txs [i ].GasFeeCap (),
582+ GasTipCap : txs [i ].GasTipCap (),
583+ }
584+ }
585+ pending [addr ] = lazies
571586 }
572587 }
573588 return pending
@@ -630,6 +645,16 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
630645 State : pool .currentState ,
631646
632647 FirstNonceGap : nil , // Pool allows arbitrary arrival order, don't invalidate nonce gaps
648+ UsedAndLeftSlots : func (addr common.Address ) (int , int ) {
649+ var have int
650+ if list := pool .pending [addr ]; list != nil {
651+ have += list .Len ()
652+ }
653+ if list := pool .queue [addr ]; list != nil {
654+ have += list .Len ()
655+ }
656+ return have , math .MaxInt
657+ },
633658 ExistingExpenditure : func (addr common.Address ) * big.Int {
634659 if list := pool .pending [addr ]; list != nil {
635660 return list .totalcost
@@ -703,13 +728,35 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
703728 invalidTxMeter .Mark (1 )
704729 return false , err
705730 }
706-
707731 // already validated by this point
708732 from , _ := types .Sender (pool .signer , tx )
733+
709734 if tx .IsSpecialTransaction () && pool .IsSigner (from ) && pool .pendingNonces .get (from ) == tx .Nonce () {
710735 return pool .promoteSpecialTx (from , tx , isLocal )
711736 }
712737
738+ // If the address is not yet known, request exclusivity to track the account
739+ // only by this subpool until all transactions are evicted
740+ var (
741+ _ , hasPending = pool .pending [from ]
742+ _ , hasQueued = pool .queue [from ]
743+ )
744+ if ! hasPending && ! hasQueued {
745+ if err := pool .reserve (from , true ); err != nil {
746+ return false , err
747+ }
748+ defer func () {
749+ // If the transaction is rejected by some post-validation check, remove
750+ // the lock on the reservation set.
751+ //
752+ // Note, `err` here is the named error return, which will be initialized
753+ // by a return statement before running deferred methods. Take care with
754+ // removing or subscoping err as it will break this clause.
755+ if err != nil {
756+ pool .reserve (from , false )
757+ }
758+ }()
759+ }
713760 // If the transaction pool is full, discard underpriced transactions
714761 if uint64 (pool .all .Slots ()+ numSlots (tx )) > pool .config .GlobalSlots + pool .config .GlobalQueue {
715762 // If the new transaction is underpriced, don't accept it
@@ -763,7 +810,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
763810 for _ , tx := range drop {
764811 log .Trace ("Discarding freshly underpriced transaction" , "hash" , tx .Hash (), "gasTipCap" , tx .GasTipCap (), "gasFeeCap" , tx .GasFeeCap ())
765812 underpricedTxMeter .Mark (1 )
766- dropped := pool .removeTx (tx .Hash (), false )
813+
814+ sender , _ := types .Sender (pool .signer , tx )
815+ dropped := pool .removeTx (tx .Hash (), false , sender != from ) // Don't unreserve the sender of the tx being added if last from the acc
816+
767817 pool .changesSinceReorg += dropped
768818 }
769819 }
@@ -1126,15 +1176,35 @@ func (pool *LegacyPool) Has(hash common.Hash) bool {
11261176
11271177// removeTx removes a single transaction from the queue, moving all subsequent
11281178// transactions back to the future queue.
1179+ //
1180+ // In unreserve is false, the account will not be relinquished to the main txpool
1181+ // even if there are no more references to it. This is used to handle a race when
1182+ // a tx being added, and it evicts a previously scheduled tx from the same account,
1183+ // which could lead to a premature release of the lock.
1184+ //
11291185// Returns the number of transactions removed from the pending queue.
1130- func (pool * LegacyPool ) removeTx (hash common.Hash , outofbound bool ) int {
1186+ func (pool * LegacyPool ) removeTx (hash common.Hash , outofbound bool , unreserve bool ) int {
11311187 // Fetch the transaction we wish to delete
11321188 tx := pool .all .Get (hash )
11331189 if tx == nil {
11341190 return 0
11351191 }
11361192 addr , _ := types .Sender (pool .signer , tx ) // already validated during insertion
11371193
1194+ // If after deletion there are no more transactions belonging to this account,
1195+ // relinquish the address reservation. It's a bit convoluted do this, via a
1196+ // defer, but it's safer vs. the many return pathways.
1197+ if unreserve {
1198+ defer func () {
1199+ var (
1200+ _ , hasPending = pool .pending [addr ]
1201+ _ , hasQueued = pool .queue [addr ]
1202+ )
1203+ if ! hasPending && ! hasQueued {
1204+ pool .reserve (addr , false )
1205+ }
1206+ }()
1207+ }
11381208 // Remove it from the list of known transactions
11391209 pool .all .Remove (hash )
11401210 if outofbound {
@@ -1383,7 +1453,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
13831453 // there's nothing to add
13841454 if newNum >= oldNum {
13851455 // If we reorged to a same or higher number, then it's not a case of setHead
1386- log .Warn ("Transaction pool reset with missing oldhead " ,
1456+ log .Warn ("Transaction pool reset with missing old head " ,
13871457 "old" , oldHead .Hash (), "oldnum" , oldNum , "new" , newHead .Hash (), "newnum" , newNum )
13881458 return
13891459 }
@@ -1427,7 +1497,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
14271497 return
14281498 }
14291499 }
1430- reinject = types .TxDifference (discarded , included )
1500+ lost := make ([]* types.Transaction , 0 , len (discarded ))
1501+ for _ , tx := range types .TxDifference (discarded , included ) {
1502+ if pool .Filter (tx ) {
1503+ lost = append (lost , tx )
1504+ }
1505+ }
1506+ reinject = lost
14311507 }
14321508 }
14331509 }
@@ -1522,6 +1598,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
15221598 if list .Empty () {
15231599 delete (pool .queue , addr )
15241600 delete (pool .beats , addr )
1601+ if _ , ok := pool .pending [addr ]; ! ok {
1602+ pool .reserve (addr , false )
1603+ }
15251604 }
15261605 }
15271606 return promoted
@@ -1643,7 +1722,7 @@ func (pool *LegacyPool) truncateQueue() {
16431722 // Drop all transactions if they are less than the overflow
16441723 if size := uint64 (list .Len ()); size <= drop {
16451724 for _ , tx := range list .Flatten () {
1646- pool .removeTx (tx .Hash (), true )
1725+ pool .removeTx (tx .Hash (), true , true )
16471726 }
16481727 drop -= size
16491728 queuedRateLimitMeter .Mark (int64 (size ))
@@ -1652,7 +1731,7 @@ func (pool *LegacyPool) truncateQueue() {
16521731 // Otherwise drop only last few transactions
16531732 txs := list .Flatten ()
16541733 for i := len (txs ) - 1 ; i >= 0 && drop > 0 ; i -- {
1655- pool .removeTx (txs [i ].Hash (), true )
1734+ pool .removeTx (txs [i ].Hash (), true , true )
16561735 drop --
16571736 queuedRateLimitMeter .Mark (1 )
16581737 }
@@ -1719,6 +1798,9 @@ func (pool *LegacyPool) demoteUnexecutables() {
17191798 // Delete the entire pending entry if it became empty.
17201799 if list .Empty () {
17211800 delete (pool .pending , addr )
1801+ if _ , ok := pool .queue [addr ]; ! ok {
1802+ pool .reserve (addr , false )
1803+ }
17221804 }
17231805 }
17241806}
0 commit comments