From 8a20e87fcfc7b3fb4040f308397b9d12880c4616 Mon Sep 17 00:00:00 2001 From: MariusVanDerWijden Date: Thu, 24 Jul 2025 17:03:34 +0200 Subject: [PATCH 1/7] core/txpool/legacypool: move queue out of main txpool --- core/txpool/legacypool/legacypool.go | 215 +++++----------------- core/txpool/legacypool/legacypool_test.go | 67 +++---- 2 files changed, 83 insertions(+), 199 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 93a003c172b..e2ae7383559 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -23,7 +23,6 @@ import ( "math" "math/big" "slices" - "sort" "sync" "sync/atomic" "time" @@ -238,11 +237,10 @@ type LegacyPool struct { pendingNonces *noncer // Pending state tracking virtual nonces reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools - pending map[common.Address]*list // All currently processable transactions - queue map[common.Address]*list // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - all *lookup // All transactions to allow lookups - priced *pricedList // All transactions sorted by price + pending map[common.Address]*list // All currently processable transactions + queue *queue + all *lookup // All transactions to allow lookups + priced *pricedList // All transactions sorted by price reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet @@ -266,14 +264,14 @@ func New(config Config, chain BlockChain) *LegacyPool { config = (&config).sanitize() // Create the transaction pool with its initial settings + signer := types.LatestSigner(chain.Config()) pool := &LegacyPool{ config: config, chain: chain, chainconfig: chain.Config(), - signer: types.LatestSigner(chain.Config()), + signer: signer, pending: make(map[common.Address]*list), - queue: make(map[common.Address]*list), - beats: make(map[common.Address]time.Time), + queue: newQueue(config, signer), all: newLookup(), reqResetCh: make(chan *txpoolResetRequest), reqPromoteCh: make(chan *accountSet), @@ -369,15 +367,9 @@ func (pool *LegacyPool) loop() { // Handle inactive account transaction eviction case <-evict.C: pool.mu.Lock() - for addr := range pool.queue { - // Any old enough should be removed - if time.Since(pool.beats[addr]) > pool.config.Lifetime { - list := pool.queue[addr].Flatten() - for _, tx := range list { - pool.removeTx(tx.Hash(), true, true) - } - queuedEvictionMeter.Mark(int64(len(list))) - } + evicted := pool.queue.evict(false) + for _, hash := range evicted { + pool.removeTx(hash, true, true) } pool.mu.Unlock() } @@ -459,11 +451,7 @@ func (pool *LegacyPool) stats() (int, int) { for _, list := range pool.pending { pending += list.Len() } - queued := 0 - for _, list := range pool.queue { - queued += list.Len() - } - return pending, queued + return pending, pool.queue.stats() } // Content retrieves the data content of the transaction pool, returning all the @@ -476,10 +464,7 @@ func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[ for addr, list := range pool.pending { pending[addr] = list.Flatten() } - queued := make(map[common.Address][]*types.Transaction, len(pool.queue)) - for addr, list := range pool.queue { - queued[addr] = list.Flatten() - } + queued := pool.queue.content() return pending, queued } @@ -493,10 +478,7 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, if list, ok := pool.pending[addr]; ok { pending = list.Flatten() } - var queued []*types.Transaction - if list, ok := pool.queue[addr]; ok { - queued = list.Flatten() - } + queued := pool.queue.contentFrom(addr) return pending, queued } @@ -655,7 +637,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error { if pending := pool.pending[auth]; pending != nil { count += pending.Len() } - if queue := pool.queue[auth]; queue != nil { + if queue, ok := pool.queue.get(auth); ok { count += queue.Len() } if count > 1 { @@ -702,7 +684,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { // only by this subpool until all transactions are evicted var ( _, hasPending = pool.pending[from] - _, hasQueued = pool.queue[from] + _, hasQueued = pool.queue.get(from) ) if !hasPending && !hasQueued { if err := pool.reserver.Hold(from); err != nil { @@ -801,7 +783,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // Successful promotion, bump the heartbeat - pool.beats[from] = time.Now() + pool.queue.bump(from) return old != nil, nil } // New transaction isn't replacing a pending one, push into queue @@ -826,7 +808,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo } // The transaction has a nonce gap with pending list, it's only considered // as executable if transactions in queue can fill up the nonce gap. - queue, ok := pool.queue[from] + queue, ok := pool.queue.get(from) if !ok { return true } @@ -842,25 +824,12 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo // // Note, this method assumes the pool lock is held! func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) { - // Try to insert the transaction into the future queue - from, _ := types.Sender(pool.signer, tx) // already validated - if pool.queue[from] == nil { - pool.queue[from] = newList(false) - } - inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) - if !inserted { - // An older transaction was better, discard this - queuedDiscardMeter.Mark(1) - return false, txpool.ErrReplaceUnderpriced + replaced, err := pool.queue.add(hash, tx) + if err != nil { + return false, err } - // Discard any previous transaction and mark this - if old != nil { - pool.all.Remove(old.Hash()) - pool.priced.Removed(1) - queuedReplaceMeter.Mark(1) - } else { - // Nothing was replaced, bump the queued counter - queuedGauge.Inc(1) + if replaced != nil { + pool.removeTx(*replaced, true, true) } // If the transaction isn't in lookup set but it's expected to be there, // show the error log. @@ -871,11 +840,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl pool.all.Add(tx) pool.priced.Put(tx) } - // If we never record the heartbeat, do it right now. - if _, exist := pool.beats[from]; !exist { - pool.beats[from] = time.Now() - } - return old != nil, nil + return replaced != nil, nil } // promoteTx adds a transaction to the pending (processable) list of transactions @@ -910,7 +875,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ pool.pendingNonces.set(addr, tx.Nonce()+1) // Successful promotion, bump the heartbeat - pool.beats[addr] = time.Now() + pool.queue.bump(addr) return true } @@ -1023,7 +988,7 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus { if txList := pool.pending[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil { return txpool.TxStatusPending - } else if txList := pool.queue[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil { + } else if txList, ok := pool.queue.get(from); ok && txList.txs.items[tx.Nonce()] != nil { return txpool.TxStatusQueued } return txpool.TxStatusUnknown @@ -1100,7 +1065,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo defer func() { var ( _, hasPending = pool.pending[addr] - _, hasQueued = pool.queue[addr] + _, hasQueued = pool.queue.get(addr) ) if !hasPending && !hasQueued { pool.reserver.Release(addr) @@ -1132,16 +1097,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo } } // Transaction is in the future queue - if future := pool.queue[addr]; future != nil { - if removed, _ := future.Remove(tx); removed { - // Reduce the queued counter - queuedGauge.Dec(1) - } - if future.Empty() { - delete(pool.queue, addr) - delete(pool.beats, addr) - } - } + pool.queue.removeTx(addr, tx) return 0 } @@ -1289,10 +1245,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, } } // Reset needs promote for all addresses - promoteAddrs = make([]common.Address, 0, len(pool.queue)) - for addr := range pool.queue { - promoteAddrs = append(promoteAddrs, addr) - } + promoteAddrs = append(promoteAddrs, pool.queue.addresses()...) } // Check for pending transactions for every account that sent new ones promoted := pool.promoteExecutables(promoteAddrs) @@ -1446,61 +1399,21 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction { - // Track the promoted transactions to broadcast them at once - var promoted []*types.Transaction - - // Iterate over all accounts and promote any executable transactions gasLimit := pool.currentHead.Load().GasLimit - for _, addr := range accounts { - list := pool.queue[addr] - if list == nil { - continue // Just in case someone calls with a non existing account - } - // Drop all transactions that are deemed too old (low nonce) - forwards := list.Forward(pool.currentState.GetNonce(addr)) - for _, tx := range forwards { - pool.all.Remove(tx.Hash()) - } - log.Trace("Removed old queued transactions", "count", len(forwards)) - // Drop all transactions that are too costly (low balance or out of gas) - drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit) - for _, tx := range drops { - pool.all.Remove(tx.Hash()) + promoteable, removeable := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces) + promoted := make([]*types.Transaction, 0, len(promoteable)) + + // promote all promoteable transactions + for _, tx := range promoteable { + from, _ := pool.signer.Sender(tx) + if pool.promoteTx(from, tx.Hash(), tx) { + promoted = append(promoted, tx) } - log.Trace("Removed unpayable queued transactions", "count", len(drops)) - queuedNofundsMeter.Mark(int64(len(drops))) - - // Gather all executable transactions and promote them - readies := list.Ready(pool.pendingNonces.get(addr)) - for _, tx := range readies { - hash := tx.Hash() - if pool.promoteTx(addr, hash, tx) { - promoted = append(promoted, tx) - } - } - log.Trace("Promoted queued transactions", "count", len(promoted)) - queuedGauge.Dec(int64(len(readies))) - - // Drop all transactions over the allowed limit - var caps = list.Cap(int(pool.config.AccountQueue)) - for _, tx := range caps { - hash := tx.Hash() - pool.all.Remove(hash) - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) - } - queuedRateLimitMeter.Mark(int64(len(caps))) - // Mark all the items dropped as removed - pool.priced.Removed(len(forwards) + len(drops) + len(caps)) - queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) + } - // Delete the entire queue entry if it became empty. - if list.Empty() { - delete(pool.queue, addr) - delete(pool.beats, addr) - if _, ok := pool.pending[addr]; !ok { - pool.reserver.Release(addr) - } - } + // remove all removable transactions + for _, hash := range removeable { + pool.removeTx(hash, true, true) } return promoted } @@ -1589,44 +1502,10 @@ func (pool *LegacyPool) truncatePending() { // truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit. func (pool *LegacyPool) truncateQueue() { - queued := uint64(0) - for _, list := range pool.queue { - queued += uint64(list.Len()) - } - if queued <= pool.config.GlobalQueue { - return - } - - // Sort all accounts with queued transactions by heartbeat - addresses := make(addressesByHeartbeat, 0, len(pool.queue)) - for addr := range pool.queue { - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) - } - sort.Sort(sort.Reverse(addresses)) - - // Drop transactions until the total is below the limit - for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { - addr := addresses[len(addresses)-1] - list := pool.queue[addr.address] + removed := pool.queue.truncate() - addresses = addresses[:len(addresses)-1] - - // Drop all transactions if they are less than the overflow - if size := uint64(list.Len()); size <= drop { - for _, tx := range list.Flatten() { - pool.removeTx(tx.Hash(), true, true) - } - drop -= size - queuedRateLimitMeter.Mark(int64(size)) - continue - } - // Otherwise drop only last few transactions - txs := list.Flatten() - for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - pool.removeTx(txs[i].Hash(), true, true) - drop-- - queuedRateLimitMeter.Mark(1) - } + for _, hash := range removed { + pool.removeTx(hash, true, false) } } @@ -1683,7 +1562,7 @@ func (pool *LegacyPool) demoteUnexecutables() { // Delete the entire pending entry if it became empty. if list.Empty() { delete(pool.pending, addr) - if _, ok := pool.queue[addr]; !ok { + if _, ok := pool.queue.get(addr); !ok { pool.reserver.Release(addr) } } @@ -1942,17 +1821,17 @@ func (pool *LegacyPool) Clear() { // acquire the subpool lock until the transaction addition is completed. for addr := range pool.pending { - if _, ok := pool.queue[addr]; !ok { + if _, ok := pool.queue.get(addr); !ok { pool.reserver.Release(addr) } } - for addr := range pool.queue { + for _, addr := range pool.queue.addresses() { pool.reserver.Release(addr) } pool.all.Clear() pool.priced.Reheap() pool.pending = make(map[common.Address]*list) - pool.queue = make(map[common.Address]*list) + pool.queue = newQueue(pool.config, pool.signer) pool.pendingNonces = newNoncer(pool.currentState) } diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 1ba080b749e..1a1cbb9a0cc 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -466,8 +466,8 @@ func TestQueue(t *testing.T) { if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } - if len(pool.queue) > 0 { - t.Error("expected transaction queue to be empty. is", len(pool.queue)) + if len(pool.queue.queued) > 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue.queued)) } } @@ -492,8 +492,8 @@ func TestQueue2(t *testing.T) { if len(pool.pending) != 1 { t.Error("expected pending length to be 1, got", len(pool.pending)) } - if pool.queue[from].Len() != 2 { - t.Error("expected len(queue) == 2, got", pool.queue[from].Len()) + if list, _ := pool.queue.get(from); list.Len() != 2 { + t.Error("expected len(queue) == 2, got", list.Len()) } } @@ -639,8 +639,8 @@ func TestMissingNonce(t *testing.T) { if len(pool.pending) != 0 { t.Error("expected 0 pending transactions, got", len(pool.pending)) } - if pool.queue[addr].Len() != 1 { - t.Error("expected 1 queued transaction, got", pool.queue[addr].Len()) + if list, _ := pool.queue.get(addr); list.Len() != 1 { + t.Error("expected 1 queued transaction, got", list.Len()) } if pool.all.Count() != 1 { t.Error("expected 1 total transactions, got", pool.all.Count()) @@ -712,8 +712,8 @@ func TestDropping(t *testing.T) { if pool.pending[account].Len() != 3 { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) } - if pool.queue[account].Len() != 3 { - t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) + if list, _ := pool.queue.get(account); list.Len() != 3 { + t.Errorf("queued transaction mismatch: have %d, want %d", list.Len(), 3) } if pool.all.Count() != 6 { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) @@ -722,8 +722,8 @@ func TestDropping(t *testing.T) { if pool.pending[account].Len() != 3 { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) } - if pool.queue[account].Len() != 3 { - t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) + if list, _ := pool.queue.get(account); list.Len() != 3 { + t.Errorf("queued transaction mismatch: have %d, want %d", list.Len(), 3) } if pool.all.Count() != 6 { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) @@ -741,13 +741,14 @@ func TestDropping(t *testing.T) { if _, ok := pool.pending[account].txs.items[tx2.Nonce()]; ok { t.Errorf("out-of-fund pending transaction present: %v", tx1) } - if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok { + list, _ := pool.queue.get(account) + if _, ok := list.txs.items[tx10.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; !ok { + if _, ok := list.txs.items[tx11.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account].txs.items[tx12.Nonce()]; ok { + if _, ok := list.txs.items[tx12.Nonce()]; ok { t.Errorf("out-of-fund queued transaction present: %v", tx11) } if pool.all.Count() != 4 { @@ -763,10 +764,11 @@ func TestDropping(t *testing.T) { if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; ok { t.Errorf("over-gased pending transaction present: %v", tx1) } - if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok { + list, _ = pool.queue.get(account) + if _, ok := list.txs.items[tx10.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok { + if _, ok := list.txs.items[tx11.Nonce()]; ok { t.Errorf("over-gased queued transaction present: %v", tx11) } if pool.all.Count() != 2 { @@ -820,8 +822,8 @@ func TestPostponing(t *testing.T) { if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs)) } - if len(pool.queue) != 0 { - t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0) + if len(pool.queue.addresses()) != 0 { + t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue.addresses()), 0) } if pool.all.Count() != len(txs) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) @@ -830,8 +832,8 @@ func TestPostponing(t *testing.T) { if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs)) } - if len(pool.queue) != 0 { - t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0) + if len(pool.queue.addresses()) != 0 { + t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue.addresses()), 0) } if pool.all.Count() != len(txs) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) @@ -847,7 +849,8 @@ func TestPostponing(t *testing.T) { if _, ok := pool.pending[accs[0]].txs.items[txs[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txs[0]) } - if _, ok := pool.queue[accs[0]].txs.items[txs[0].Nonce()]; ok { + list, _ := pool.queue.get(accs[0]) + if _, ok := list.txs.items[txs[0].Nonce()]; ok { t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txs[0]) } for i, tx := range txs[1:100] { @@ -855,14 +858,14 @@ func TestPostponing(t *testing.T) { if _, ok := pool.pending[accs[0]].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[accs[0]].txs.items[tx.Nonce()]; !ok { + if _, ok := list.txs.items[tx.Nonce()]; !ok { t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx) } } else { if _, ok := pool.pending[accs[0]].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[accs[0]].txs.items[tx.Nonce()]; ok { + if _, ok := list.txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx) } } @@ -872,13 +875,14 @@ func TestPostponing(t *testing.T) { if pool.pending[accs[1]] != nil { t.Errorf("invalidated account still has pending transactions") } + list, _ = pool.queue.get(accs[1]) for i, tx := range txs[100:] { if i%2 == 1 { - if _, ok := pool.queue[accs[1]].txs.items[tx.Nonce()]; !ok { + if _, ok := list.txs.items[tx.Nonce()]; !ok { t.Errorf("tx %d: valid but future transaction missing from future queue: %v", 100+i, tx) } } else { - if _, ok := pool.queue[accs[1]].txs.items[tx.Nonce()]; ok { + if _, ok := list.txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", 100+i, tx) } } @@ -963,13 +967,14 @@ func TestQueueAccountLimiting(t *testing.T) { if len(pool.pending) != 0 { t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0) } + list, _ := pool.queue.get(account) if i <= testTxPoolConfig.AccountQueue { - if pool.queue[account].Len() != int(i) { - t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i) + if list.Len() != int(i) { + t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, list.Len(), i) } } else { - if pool.queue[account].Len() != int(testTxPoolConfig.AccountQueue) { - t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), testTxPoolConfig.AccountQueue) + if list.Len() != int(testTxPoolConfig.AccountQueue) { + t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, list.Len(), testTxPoolConfig.AccountQueue) } } } @@ -1020,7 +1025,7 @@ func TestQueueGlobalLimiting(t *testing.T) { pool.addRemotesSync(txs) queued := 0 - for addr, list := range pool.queue { + for addr, list := range pool.queue.queued { if list.Len() > int(config.AccountQueue) { t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue) } @@ -1179,8 +1184,8 @@ func TestPendingLimiting(t *testing.T) { if pool.pending[account].Len() != int(i)+1 { t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, pool.pending[account].Len(), i+1) } - if len(pool.queue) != 0 { - t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0) + if len(pool.queue.addresses()) != 0 { + t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue.addresses()), 0) } } if pool.all.Count() != int(testTxPoolConfig.AccountQueue+5) { From a25e5f626c26aaa7bf060850d154ff7ef669a410 Mon Sep 17 00:00:00 2001 From: MariusVanDerWijden Date: Thu, 24 Jul 2025 17:04:53 +0200 Subject: [PATCH 2/7] core/txpool/legacypool: move queue out of main txpool --- core/txpool/legacypool/queue.go | 226 ++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 core/txpool/legacypool/queue.go diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go new file mode 100644 index 00000000000..9a583312543 --- /dev/null +++ b/core/txpool/legacypool/queue.go @@ -0,0 +1,226 @@ +package legacypool + +import ( + "sort" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +type queue struct { + config Config + signer types.Signer + queued map[common.Address]*list // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account +} + +func newQueue(config Config, signer types.Signer) *queue { + return &queue{ + signer: signer, + config: config, + queued: make(map[common.Address]*list), + beats: make(map[common.Address]time.Time), + } +} + +func (q *queue) evict(force bool) []common.Hash { + removed := make([]common.Hash, 0) + for addr, list := range q.queued { + // Any transactions old enough should be removed + if force || time.Since(q.beats[addr]) > q.config.Lifetime { + list := list.Flatten() + for _, tx := range list { + q.removeTx(addr, tx) + removed = append(removed, tx.Hash()) + } + queuedEvictionMeter.Mark(int64(len(list))) + } + } + return removed +} + +func (q *queue) stats() int { + queued := 0 + for _, list := range q.queued { + queued += list.Len() + } + return queued +} + +func (q *queue) content() map[common.Address][]*types.Transaction { + queued := make(map[common.Address][]*types.Transaction, len(q.queued)) + for addr, list := range q.queued { + queued[addr] = list.Flatten() + } + return queued +} + +func (q *queue) contentFrom(addr common.Address) []*types.Transaction { + var queued []*types.Transaction + if list, ok := q.get(addr); ok { + queued = list.Flatten() + } + return queued +} + +func (q *queue) get(addr common.Address) (*list, bool) { + l, ok := q.queued[addr] + return l, ok +} + +func (q *queue) bump(addr common.Address) { + q.beats[addr] = time.Now() +} + +func (q *queue) addresses() []common.Address { + addrs := make([]common.Address, 0, len(q.queued)) + for addr := range q.queued { + addrs = append(addrs, addr) + } + return addrs +} + +func (q queue) removeTx(addr common.Address, tx *types.Transaction) { + if future := q.queued[addr]; future != nil { + if txOld := future.txs.Get(tx.Nonce()); txOld != nil && txOld.Hash() != tx.Hash() { + // Edge case, a different transaction + // with the same nonce is in the queued, just ignore + return + } + if removed, _ := future.Remove(tx); removed { + // Reduce the queued counter + queuedGauge.Dec(1) + } + if future.Empty() { + delete(q.queued, addr) + delete(q.beats, addr) + } + } +} + +func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, error) { + // Try to insert the transaction into the future queue + from, _ := types.Sender(q.signer, tx) // already validated + if q.queued[from] == nil { + q.queued[from] = newList(false) + } + inserted, old := q.queued[from].Add(tx, q.config.PriceBump) + if !inserted { + // An older transaction was better, discard this + queuedDiscardMeter.Mark(1) + return nil, txpool.ErrReplaceUnderpriced + } + // If we never record the heartbeat, do it right now. + if _, exist := q.beats[from]; !exist { + q.beats[from] = time.Now() + } + if old == nil { + // Nothing was replaced, bump the queued counter + queuedGauge.Inc(1) + return nil, nil + } + h := old.Hash() + // Transaction was replaced, bump the replacement counter + queuedReplaceMeter.Mark(1) + return &h, nil +} + +func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash) { + // Track the promoteable transactions to broadcast them at once + var promoteable []*types.Transaction + var removeable []common.Hash + + // Iterate over all accounts and promote any executable transactions + for _, addr := range accounts { + list := q.queued[addr] + if list == nil { + continue // Just in case someone calls with a non existing account + } + // Drop all transactions that are deemed too old (low nonce) + forwards := list.Forward(currentState.GetNonce(addr)) + for _, tx := range forwards { + removeable = append(removeable, tx.Hash()) + } + log.Trace("Removing old queued transactions", "count", len(forwards)) + // Drop all transactions that are too costly (low balance or out of gas) + drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit) + for _, tx := range drops { + removeable = append(removeable, tx.Hash()) + } + log.Trace("Removing unpayable queued transactions", "count", len(drops)) + queuedNofundsMeter.Mark(int64(len(drops))) + + // Gather all executable transactions and promote them + readies := list.Ready(nonces.get(addr)) + promoteable = append(promoteable, readies...) + log.Trace("Promoting queued transactions", "count", len(promoteable)) + queuedGauge.Dec(int64(len(readies))) + + // Drop all transactions over the allowed limit + var caps = list.Cap(int(q.config.AccountQueue)) + for _, tx := range caps { + hash := tx.Hash() + removeable = append(removeable, hash) + log.Trace("Removing cap-exceeding queued transaction", "hash", hash) + } + queuedRateLimitMeter.Mark(int64(len(caps))) + queuedGauge.Dec(int64(len(removeable))) + + // Delete the entire queue entry if it became empty. + if list.Empty() { + delete(q.queued, addr) + delete(q.beats, addr) + } + } + return promoteable, removeable +} + +func (q *queue) truncate() []common.Hash { + queued := uint64(0) + for _, list := range q.queued { + queued += uint64(list.Len()) + } + if queued <= q.config.GlobalQueue { + return nil + } + + // Sort all accounts with queued transactions by heartbeat + addresses := make(addressesByHeartbeat, 0, len(q.queued)) + for addr := range q.queued { + addresses = append(addresses, addressByHeartbeat{addr, q.beats[addr]}) + } + sort.Sort(sort.Reverse(addresses)) + removed := make([]common.Hash, 0) + + // Drop transactions until the total is below the limit + for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; { + addr := addresses[len(addresses)-1] + list := q.queued[addr.address] + + addresses = addresses[:len(addresses)-1] + + // Drop all transactions if they are less than the overflow + if size := uint64(list.Len()); size <= drop { + for _, tx := range list.Flatten() { + q.removeTx(addr.address, tx) + removed = append(removed, tx.Hash()) + } + drop -= size + queuedRateLimitMeter.Mark(int64(size)) + continue + } + // Otherwise drop only last few transactions + txs := list.Flatten() + for i := len(txs) - 1; i >= 0 && drop > 0; i-- { + q.removeTx(addr.address, txs[i]) + removed = append(removed, txs[i].Hash()) + drop-- + queuedRateLimitMeter.Mark(1) + } + } + return removed +} From 07d28277afd301d90dc0bd5e61dfcfade8840597 Mon Sep 17 00:00:00 2001 From: MariusVanDerWijden Date: Thu, 24 Jul 2025 17:05:33 +0200 Subject: [PATCH 3/7] core/txpool/legacypool: move queue out of main txpool --- core/txpool/legacypool/queue.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go index 9a583312543..5ce0be960f2 100644 --- a/core/txpool/legacypool/queue.go +++ b/core/txpool/legacypool/queue.go @@ -1,3 +1,19 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + package legacypool import ( From 39ce64b8b0da1ba7833ebc6360f00e9ac0cf36a0 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 23 Sep 2025 17:58:16 +0200 Subject: [PATCH 4/7] core/txpool/legacypool: move addressesByHeartbeat to queue Signed-off-by: Csaba Kiraly --- core/txpool/legacypool/legacypool.go | 12 ------------ core/txpool/legacypool/queue.go | 12 ++++++++++++ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index e2ae7383559..d57ee02b3da 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1569,18 +1569,6 @@ func (pool *LegacyPool) demoteUnexecutables() { } } -// addressByHeartbeat is an account address tagged with its last activity timestamp. -type addressByHeartbeat struct { - address common.Address - heartbeat time.Time -} - -type addressesByHeartbeat []addressByHeartbeat - -func (a addressesByHeartbeat) Len() int { return len(a) } -func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } -func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - // accountSet is simply a set of addresses to check for existence, and a signer // capable of deriving addresses from transactions. type accountSet struct { diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go index 5ce0be960f2..8edb03056e0 100644 --- a/core/txpool/legacypool/queue.go +++ b/core/txpool/legacypool/queue.go @@ -240,3 +240,15 @@ func (q *queue) truncate() []common.Hash { } return removed } + +// addressByHeartbeat is an account address tagged with its last activity timestamp. +type addressByHeartbeat struct { + address common.Address + heartbeat time.Time +} + +type addressesByHeartbeat []addressByHeartbeat + +func (a addressesByHeartbeat) Len() int { return len(a) } +func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } +func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } From f2b434bbf954aa74b1af2705a4c3c54969b88b34 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 3 Oct 2025 12:53:10 +0200 Subject: [PATCH 5/7] core/txpool/legacypool: fix tx removal promoteExecutable already removes the txs from the queue. We just need to remove them from the lookup. Signed-off-by: Csaba Kiraly --- core/txpool/legacypool/legacypool.go | 12 +++++++++++- core/txpool/legacypool/queue.go | 6 ++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index d57ee02b3da..2ff905603bc 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1413,8 +1413,18 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T // remove all removable transactions for _, hash := range removeable { - pool.removeTx(hash, true, true) + pool.all.Remove(hash) + } + + // release all accounts that have no more transactions in the pool + for _, addr := range accounts { + _, hasPending := pool.pending[addr] + _, hasQueued := pool.queue.get(addr) + if !hasPending && !hasQueued { + pool.reserver.Release(addr) + } } + return promoted } diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go index 8edb03056e0..4b792aa200e 100644 --- a/core/txpool/legacypool/queue.go +++ b/core/txpool/legacypool/queue.go @@ -145,6 +145,12 @@ func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, erro return &h, nil } +// promoteExecutables iterates over all accounts with queued transactions, selecting +// for promotion any that are now executable. It also drops any transactions that are +// deemed too old (nonce too low) or too costly (insufficient funds or over gas limit). +// +// Returns two lists: all transactions that were removed from the queue and selected +// for promotion; all other transactions that were removed from the queue and dropped. func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash) { // Track the promoteable transactions to broadcast them at once var promoteable []*types.Transaction From 0d41d250cef9ec606382a42e1e710faed99764ad Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 3 Oct 2025 13:00:29 +0200 Subject: [PATCH 6/7] core/txpool/legacypool: improve naming Signed-off-by: Csaba Kiraly --- core/txpool/legacypool/legacypool.go | 8 ++++---- core/txpool/legacypool/queue.go | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 2ff905603bc..138f0cda66a 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1400,11 +1400,11 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // invalidated transactions (low nonce, low balance) are deleted. func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction { gasLimit := pool.currentHead.Load().GasLimit - promoteable, removeable := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces) - promoted := make([]*types.Transaction, 0, len(promoteable)) + promotable, dropped := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces) + promoted := make([]*types.Transaction, 0, len(promotable)) // promote all promoteable transactions - for _, tx := range promoteable { + for _, tx := range promotable { from, _ := pool.signer.Sender(tx) if pool.promoteTx(from, tx.Hash(), tx) { promoted = append(promoted, tx) @@ -1412,7 +1412,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T } // remove all removable transactions - for _, hash := range removeable { + for _, hash := range dropped { pool.all.Remove(hash) } diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go index 4b792aa200e..df93cc71955 100644 --- a/core/txpool/legacypool/queue.go +++ b/core/txpool/legacypool/queue.go @@ -152,9 +152,9 @@ func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, erro // Returns two lists: all transactions that were removed from the queue and selected // for promotion; all other transactions that were removed from the queue and dropped. func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash) { - // Track the promoteable transactions to broadcast them at once - var promoteable []*types.Transaction - var removeable []common.Hash + // Track the promotable transactions to broadcast them at once + var promotable []*types.Transaction + var dropped []common.Hash // Iterate over all accounts and promote any executable transactions for _, addr := range accounts { @@ -165,32 +165,32 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c // Drop all transactions that are deemed too old (low nonce) forwards := list.Forward(currentState.GetNonce(addr)) for _, tx := range forwards { - removeable = append(removeable, tx.Hash()) + dropped = append(dropped, tx.Hash()) } log.Trace("Removing old queued transactions", "count", len(forwards)) // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit) for _, tx := range drops { - removeable = append(removeable, tx.Hash()) + dropped = append(dropped, tx.Hash()) } log.Trace("Removing unpayable queued transactions", "count", len(drops)) queuedNofundsMeter.Mark(int64(len(drops))) // Gather all executable transactions and promote them readies := list.Ready(nonces.get(addr)) - promoteable = append(promoteable, readies...) - log.Trace("Promoting queued transactions", "count", len(promoteable)) + promotable = append(promotable, readies...) + log.Trace("Promoting queued transactions", "count", len(promotable)) queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit var caps = list.Cap(int(q.config.AccountQueue)) for _, tx := range caps { hash := tx.Hash() - removeable = append(removeable, hash) + dropped = append(dropped, hash) log.Trace("Removing cap-exceeding queued transaction", "hash", hash) } queuedRateLimitMeter.Mark(int64(len(caps))) - queuedGauge.Dec(int64(len(removeable))) + queuedGauge.Dec(int64(len(dropped))) // Delete the entire queue entry if it became empty. if list.Empty() { @@ -198,7 +198,7 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c delete(q.beats, addr) } } - return promoteable, removeable + return promotable, dropped } func (q *queue) truncate() []common.Hash { From 1ed0e5e95d91f60bf779f89537eb4bc2e3456a9c Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 6 Oct 2025 12:05:42 +0200 Subject: [PATCH 7/7] core/txpool/legacypool: release after truncate Signed-off-by: Csaba Kiraly --- core/txpool/legacypool/legacypool.go | 12 ++++++++++-- core/txpool/legacypool/queue.go | 14 +++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 138f0cda66a..c70e169c840 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1512,10 +1512,18 @@ func (pool *LegacyPool) truncatePending() { // truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit. func (pool *LegacyPool) truncateQueue() { - removed := pool.queue.truncate() + removed, removedAddresses := pool.queue.truncate() + // remove all removable transactions for _, hash := range removed { - pool.removeTx(hash, true, false) + pool.all.Remove(hash) + } + + for _, addr := range removedAddresses { + _, hasPending := pool.pending[addr] + if !hasPending { + pool.reserver.Release(addr) + } } } diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go index df93cc71955..a9e4f0de15c 100644 --- a/core/txpool/legacypool/queue.go +++ b/core/txpool/legacypool/queue.go @@ -201,13 +201,17 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c return promotable, dropped } -func (q *queue) truncate() []common.Hash { +// truncate drops the oldest transactions from the queue until the total +// number is below the configured limit. +// Returns the hashes of all dropped transactions, and the addresses of +// accounts that became empty due to the truncation. +func (q *queue) truncate() ([]common.Hash, []common.Address) { queued := uint64(0) for _, list := range q.queued { queued += uint64(list.Len()) } if queued <= q.config.GlobalQueue { - return nil + return nil, nil } // Sort all accounts with queued transactions by heartbeat @@ -217,6 +221,7 @@ func (q *queue) truncate() []common.Hash { } sort.Sort(sort.Reverse(addresses)) removed := make([]common.Hash, 0) + removedAddresses := make([]common.Address, 0) // Drop transactions until the total is below the limit for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; { @@ -233,6 +238,7 @@ func (q *queue) truncate() []common.Hash { } drop -= size queuedRateLimitMeter.Mark(int64(size)) + removedAddresses = append(removedAddresses, addr.address) continue } // Otherwise drop only last few transactions @@ -244,7 +250,9 @@ func (q *queue) truncate() []common.Hash { queuedRateLimitMeter.Mark(1) } } - return removed + + // no need to clear empty accounts, removeTx already does that + return removed, removedAddresses } // addressByHeartbeat is an account address tagged with its last activity timestamp.