diff --git a/core/txpool/errors.go b/core/txpool/errors.go index b8f0e896d262..a4ed6a14986b 100644 --- a/core/txpool/errors.go +++ b/core/txpool/errors.go @@ -38,6 +38,10 @@ var ( // with a different one without the required price bump. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") + // ErrAccountLimitExceeded is returned if a transaction would exceed the number + // allowed by a pool for a single account. + ErrAccountLimitExceeded = errors.New("account limit exceeded") + // ErrGasLimit is returned if a transaction's requested gas limit exceeds the // maximum allowance of the current block. ErrGasLimit = errors.New("exceeds block gas limit") diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 53d7368f84a6..94016da7b7c3 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -231,6 +231,7 @@ type LegacyPool struct { locals *accountSet // Set of local transaction to exempt from eviction rules journal *journal // Journal of local transaction to back up to disk + reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account @@ -307,7 +308,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. The internal // goroutines will be spun up and the pool deemed operational afterwards. -func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header) error { +func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error { + // Set the address reserver to request exclusive access to pooled accounts + pool.reserve = reserve + // Set the basic pool parameters pool.gasTip.Store(gasTip) pool.reset(nil, head) @@ -377,7 +381,7 @@ func (pool *LegacyPool) loop() { if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { - pool.removeTx(tx.Hash(), true) + pool.removeTx(tx.Hash(), true, true) } queuedEvictionMeter.Mark(int64(len(list))) } @@ -455,7 +459,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) error { // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead drop := pool.all.RemotesBelowTip(tip) for _, tx := range drop { - pool.removeTx(tx.Hash(), false) + pool.removeTx(tx.Hash(), false, true) } pool.priced.Removed(len(drop)) } @@ -536,11 +540,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The enforceTips parameter can be used to do an extra filtering on the pending // transactions and only return those whose **effective** tip is large enough in // the next pending execution environment. -func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction { +func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction { pool.mu.Lock() defer pool.mu.Unlock() - pending := make(map[common.Address][]*types.Transaction, len(pool.pending)) + pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) for addr, list := range pool.pending { txs := list.Flatten() @@ -554,7 +558,18 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Tr } } if len(txs) > 0 { - pending[addr] = txs + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i := 0; i < len(txs); i++ { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: txs[i].Hash(), + Tx: &txpool.Transaction{Tx: txs[i]}, + Time: txs[i].Time(), + GasFeeCap: txs[i].GasFeeCap(), + GasTipCap: txs[i].GasTipCap(), + } + } + pending[addr] = lazies } } return pending @@ -617,6 +632,16 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error { State: pool.currentState, FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps + UsedAndLeftSlots: func(addr common.Address) (int, int) { + var have int + if list := pool.pending[addr]; list != nil { + have += list.Len() + } + if list := pool.queue[addr]; list != nil { + have += list.Len() + } + return have, math.MaxInt + }, ExistingExpenditure: func(addr common.Address) *big.Int { if list := pool.pending[addr]; list != nil { return list.totalcost @@ -690,13 +715,35 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e invalidTxMeter.Mark(1) return false, err } - // already validated by this point from, _ := types.Sender(pool.signer, tx) + if tx.IsSpecialTransaction() && pool.IsSigner(from) && pool.pendingNonces.get(from) == tx.Nonce() { return pool.promoteSpecialTx(from, tx, isLocal) } + // If the address is not yet known, request exclusivity to track the account + // only by this subpool until all transactions are evicted + var ( + _, hasPending = pool.pending[from] + _, hasQueued = pool.queue[from] + ) + if !hasPending && !hasQueued { + if err := pool.reserve(from, true); err != nil { + return false, err + } + defer func() { + // If the transaction is rejected by some post-validation check, remove + // the lock on the reservation set. + // + // Note, `err` here is the named error return, which will be initialized + // by a return statement before running deferred methods. Take care with + // removing or subscoping err as it will break this clause. + if err != nil { + pool.reserve(from, false) + } + }() + } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it @@ -750,7 +797,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) - dropped := pool.removeTx(tx.Hash(), false) + + sender, _ := types.Sender(pool.signer, tx) + dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc + pool.changesSinceReorg += dropped } } @@ -1113,8 +1163,14 @@ func (pool *LegacyPool) Has(hash common.Hash) bool { // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. +// +// In unreserve is false, the account will not be relinquished to the main txpool +// even if there are no more references to it. This is used to handle a race when +// a tx being added, and it evicts a previously scheduled tx from the same account, +// which could lead to a premature release of the lock. +// // Returns the number of transactions removed from the pending queue. -func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int { +func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int { // Fetch the transaction we wish to delete tx := pool.all.Get(hash) if tx == nil { @@ -1122,6 +1178,20 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int { } addr, _ := types.Sender(pool.signer, tx) // already validated during insertion + // If after deletion there are no more transactions belonging to this account, + // relinquish the address reservation. It's a bit convoluted do this, via a + // defer, but it's safer vs. the many return pathways. + if unreserve { + defer func() { + var ( + _, hasPending = pool.pending[addr] + _, hasQueued = pool.queue[addr] + ) + if !hasPending && !hasQueued { + pool.reserve(addr, false) + } + }() + } // Remove it from the list of known transactions pool.all.Remove(hash) if outofbound { @@ -1355,7 +1425,6 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { log.Debug("Skipping deep transaction reorg", "depth", depth) } else { // Reorg seems shallow enough to pull in all transactions into memory - var discarded, included types.Transactions var ( rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) @@ -1367,7 +1436,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // there's nothing to add if newNum >= oldNum { // If we reorged to a same or higher number, then it's not a case of setHead - log.Warn("Transaction pool reset with missing oldhead", + log.Warn("Transaction pool reset with missing old head", "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) return } @@ -1376,6 +1445,15 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) // We still need to update the current state s.th. the lost transactions can be readded by the user } else { + if add == nil { + // if the new head is nil, it means that something happened between + // the firing of newhead-event and _now_: most likely a + // reorg caused by sync-reversion or explicit sethead back to an + // earlier block. + log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash()) + return + } + var discarded, included types.Transactions for rem.NumberU64() > add.NumberU64() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { @@ -1402,7 +1480,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { return } } - reinject = types.TxDifference(discarded, included) + lost := make([]*types.Transaction, 0, len(discarded)) + for _, tx := range types.TxDifference(discarded, included) { + if pool.Filter(tx) { + lost = append(lost, tx) + } + } + reinject = lost } } } @@ -1497,6 +1581,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T if list.Empty() { delete(pool.queue, addr) delete(pool.beats, addr) + if _, ok := pool.pending[addr]; !ok { + pool.reserve(addr, false) + } } } return promoted @@ -1618,7 +1705,7 @@ func (pool *LegacyPool) truncateQueue() { // Drop all transactions if they are less than the overflow if size := uint64(list.Len()); size <= drop { for _, tx := range list.Flatten() { - pool.removeTx(tx.Hash(), true) + pool.removeTx(tx.Hash(), true, true) } drop -= size queuedRateLimitMeter.Mark(int64(size)) @@ -1627,7 +1714,7 @@ func (pool *LegacyPool) truncateQueue() { // Otherwise drop only last few transactions txs := list.Flatten() for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - pool.removeTx(txs[i].Hash(), true) + pool.removeTx(txs[i].Hash(), true, true) drop-- queuedRateLimitMeter.Mark(1) } @@ -1694,6 +1781,9 @@ func (pool *LegacyPool) demoteUnexecutables() { // Delete the entire pending entry if it became empty. if list.Empty() { delete(pool.pending, addr) + if _, ok := pool.queue[addr]; !ok { + pool.reserve(addr, false) + } } } } diff --git a/core/txpool/legacypool/legacypool2_test.go b/core/txpool/legacypool/legacypool2_test.go index 14c01eac6b87..cfecb8d4eb75 100644 --- a/core/txpool/legacypool/legacypool2_test.go +++ b/core/txpool/legacypool/legacypool2_test.go @@ -85,7 +85,7 @@ func TestTransactionFutureAttack(t *testing.T) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() fillPool(t, pool) pending, _ := pool.Stats() @@ -119,7 +119,7 @@ func TestTransactionFuture1559(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions @@ -152,7 +152,7 @@ func TestTransactionZAttack(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions fillPool(t, pool) @@ -226,7 +226,7 @@ func BenchmarkFutureAttack(b *testing.B) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() fillPool(b, pool) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 4a618fbcf535..44b53f977d42 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -25,6 +25,7 @@ import ( "math/big" "math/rand" "os" + "sync" "sync/atomic" "testing" "time" @@ -143,6 +144,31 @@ func dynamicFeeTx(nonce uint64, gaslimit uint64, gasFee *big.Int, tip *big.Int, return tx } +func makeAddressReserver() txpool.AddressReserver { + var ( + reserved = make(map[common.Address]struct{}) + lock sync.Mutex + ) + return func(addr common.Address, reserve bool) error { + lock.Lock() + defer lock.Unlock() + + _, exists := reserved[addr] + if reserve { + if exists { + panic("already reserved") + } + reserved[addr] = struct{}{} + return nil + } + if !exists { + panic("not reserved") + } + delete(reserved, addr) + return nil + } +} + func setupPool() (*LegacyPool, *ecdsa.PrivateKey) { return setupPoolWithConfig(params.TestChainConfig) } @@ -154,7 +180,7 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat key, _ := crypto.GenerateKey() pool := New(testTxPoolConfig, blockchain) - if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()); err != nil { + if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()); err != nil { panic(err) } // wait for the pool to initialize @@ -274,7 +300,7 @@ func TestStateChangeDuringReset(t *testing.T) { tx1 := pricedTransaction(1, 100000, big.NewInt(250000000), key) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() nonce := pool.Nonce(address) @@ -546,7 +572,7 @@ func TestChainFork(t *testing.T) { if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } - pool.removeTx(tx.Hash(), true) + pool.removeTx(tx.Hash(), true, true) // reset the pool's internal state resetState() @@ -769,7 +795,7 @@ func TestPostponing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create two test accounts to produce different gap profiles with @@ -987,7 +1013,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them (last one will be the local) @@ -1078,7 +1104,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { config.NoLocals = nolocals pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create two test accounts to ensure remotes expire but locals do not @@ -1264,7 +1290,7 @@ func TestPendingGlobalLimiting(t *testing.T) { config.GlobalSlots = config.AccountSlots * 10 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1368,7 +1394,7 @@ func TestCapClearsFromAll(t *testing.T) { config.GlobalSlots = 8 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1402,7 +1428,7 @@ func TestPendingMinimumAllowance(t *testing.T) { config.GlobalSlots = 1 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1448,7 +1474,7 @@ func TestRepricing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1697,7 +1723,7 @@ func TestRepricingKeepsLocals(t *testing.T) { blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1776,7 +1802,7 @@ func TestUnderpricing(t *testing.T) { config.GlobalQueue = 2 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1895,7 +1921,7 @@ func TestStableUnderpricing(t *testing.T) { config.AccountSlots = config.GlobalSlots - 1 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -2124,7 +2150,7 @@ func TestDeduplication(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a test account to add transactions with @@ -2191,7 +2217,7 @@ func TestReplacement(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -2403,7 +2429,7 @@ func testJournaling(t *testing.T, nolocals bool) { config.Rejournal = time.Second pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() @@ -2441,7 +2467,7 @@ func testJournaling(t *testing.T, nolocals bool) { blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool = New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) pending, queued = pool.Stats() if queued != 0 { @@ -2468,7 +2494,7 @@ func testJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool = New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) pending, queued = pool.Stats() if pending != 0 { @@ -2499,7 +2525,7 @@ func TestStatusCheck(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create the test accounts to check various transaction statuses with diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index bcb71cb320c3..bc3ecae9c226 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -18,6 +18,7 @@ package txpool import ( "math/big" + "time" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core" @@ -31,6 +32,32 @@ type Transaction struct { Tx *types.Transaction // Canonical transaction } +// LazyTransaction contains a small subset of the transaction properties that is +// enough for the miner and other APIs to handle large batches of transactions; +// and supports pulling up the entire transaction when really needed. +type LazyTransaction struct { + Pool SubPool // Transaction subpool to pull the real transaction up + Hash common.Hash // Transaction hash to pull up if needed + Tx *Transaction // Transaction if already resolved + + Time time.Time // Time when the transaction was first seen + GasFeeCap *big.Int // Maximum fee per gas the transaction may consume + GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay +} + +// Resolve retrieves the full transaction belonging to a lazy handle if it is still +// maintained by the transaction pool. +func (ltx *LazyTransaction) Resolve() *Transaction { + if ltx.Tx == nil { + ltx.Tx = ltx.Pool.Get(ltx.Hash) + } + return ltx.Tx +} + +// AddressReserver is passed by the main transaction pool to subpools, so they +// may request (and relinquish) exclusive access to certain addresses. +type AddressReserver func(addr common.Address, reserve bool) error + // SubPool represents a specialized transaction pool that lives on its own (e.g. // blob pool). Since independent of how many specialized pools we have, they do // need to be updated in lockstep and assemble into one coherent view for block @@ -48,7 +75,7 @@ type SubPool interface { // These should not be passed as a constructor argument - nor should the pools // start by themselves - in order to keep multiple subpools in lockstep with // one another. - Init(gasTip *big.Int, head *types.Header) error + Init(gasTip *big.Int, head *types.Header, reserve AddressReserver) error // Close terminates any background processing threads and releases any held // resources. @@ -76,7 +103,7 @@ type SubPool interface { // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. - Pending(enforceTips bool) map[common.Address][]*types.Transaction + Pending(enforceTips bool) map[common.Address][]*LazyTransaction // SubscribeTransactions subscribes to new transaction events. SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 109b626b00d0..2064d969da1b 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -18,13 +18,17 @@ package txpool import ( "errors" + "fmt" "maps" "math/big" + "sync" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/metrics" ) // TxStatus is the current status of a transaction as seen by the pool. @@ -37,6 +41,15 @@ const ( TxStatusIncluded ) +var ( + // reservationsGaugeName is the prefix of a per-subpool address reservation + // metric. + // + // This is mostly a sanity metric to ensure there's no bug that would make + // some subpool hog all the reservations due to mis-accounting. + reservationsGaugeName = "txpool/reservations" +) + // BlockChain defines the minimal set of methods needed to back a tx pool with // a chain. Exists to allow mocking the live chain out of tests. type BlockChain interface { @@ -53,9 +66,13 @@ type BlockChain interface { // They exit the pool when they are included in the blockchain or evicted due to // resource constraints. type TxPool struct { - subpools []SubPool // List of subpools for specialized transaction handling - subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown - quit chan chan error // Quit channel to tear down the head updater + subpools []SubPool // List of subpools for specialized transaction handling + + reservations map[common.Address]SubPool // Map with the account to pool reservations + reserveLock sync.Mutex // Lock protecting the account reservations + + subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown + quit chan chan error // Quit channel to tear down the head updater } // New creates a new transaction pool to gather, sort and filter inbound @@ -67,11 +84,12 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error) head := chain.CurrentBlock() pool := &TxPool{ - subpools: subpools, - quit: make(chan chan error), + subpools: subpools, + reservations: make(map[common.Address]SubPool), + quit: make(chan chan error), } for i, subpool := range subpools { - if err := subpool.Init(gasTip, head); err != nil { + if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil { for j := i - 1; j >= 0; j-- { subpools[j].Close() } @@ -82,6 +100,52 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error) return pool, nil } +// reserver is a method to create an address reservation callback to exclusively +// assign/deassign addresses to/from subpools. This can ensure that at any point +// in time, only a single subpool is able to manage an account, avoiding cross +// subpool eviction issues and nonce conflicts. +func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver { + return func(addr common.Address, reserve bool) error { + p.reserveLock.Lock() + defer p.reserveLock.Unlock() + + owner, exists := p.reservations[addr] + if reserve { + // Double reservations are forbidden even from the same pool to + // avoid subtle bugs in the long term. + if exists { + if owner == subpool { + log.Error("pool attempted to reserve already-owned address", "address", addr) + return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed + } + return errors.New("address already reserved") + } + p.reservations[addr] = subpool + if metrics.Enabled() { + m := fmt.Sprintf("%s/%d", reservationsGaugeName, id) + metrics.GetOrRegisterGauge(m, nil).Inc(1) + } + return nil + } + // Ensure subpools only attempt to unreserve their own owned addresses, + // otherwise flag as a programming error. + if !exists { + log.Error("pool attempted to unreserve non-reserved address", "address", addr) + return errors.New("address not reserved") + } + if subpool != owner { + log.Error("pool attempted to unreserve non-owned address", "address", addr) + return errors.New("address not owned") + } + delete(p.reservations, addr) + if metrics.Enabled() { + m := fmt.Sprintf("%s/%d", reservationsGaugeName, id) + metrics.GetOrRegisterGauge(m, nil).Dec(1) + } + return nil + } +} + // Close terminates the transaction pool and all its subpools. func (p *TxPool) Close() error { var errs []error @@ -239,8 +303,8 @@ func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error { // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. -func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction { - txs := make(map[common.Address][]*types.Transaction) +func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction { + txs := make(map[common.Address][]*LazyTransaction) for _, subpool := range p.subpools { maps.Copy(txs, subpool.Pending(enforceTips)) } diff --git a/core/txpool/validation.go b/core/txpool/validation.go index daf5b106a077..74e314bdd18a 100644 --- a/core/txpool/validation.go +++ b/core/txpool/validation.go @@ -134,6 +134,11 @@ type ValidationOptionsWithState struct { // nonce gaps will be ignored and permitted. FirstNonceGap func(addr common.Address) uint64 + // UsedAndLeftSlots is a mandatory callback to retrieve the number of tx slots + // used and the number still permitted for an account. New transactions will + // be rejected once the number of remaining slots reaches zero. + UsedAndLeftSlots func(addr common.Address) (int, int) + // ExistingExpenditure is a mandatory callback to retrieve the cumulative // cost of the already pooled transactions to check for overdrafts. ExistingExpenditure func(addr common.Address) *big.Int @@ -212,6 +217,12 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op if newBalance.Cmp(need) < 0 { return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, newBalance)) } + // Transaction takes a new nonce value out of the pool. Ensure it doesn't + // overflow the number of permitted transactions from a single accoun + // (i.e. max cancellable via out-of-bound transaction). + if used, left := opts.UsedAndLeftSlots(from); left <= 0 { + return fmt.Errorf("%w: pooled %d txs", ErrAccountLimitExceeded, used) + } } // Ensure sender and receiver are not in black list diff --git a/core/types/transaction.go b/core/types/transaction.go index 91d1be4904bc..d5164aaa66ed 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -18,7 +18,6 @@ package types import ( "bytes" - "container/heap" "errors" "fmt" "io" @@ -401,6 +400,19 @@ func (tx *Transaction) SetCodeAuthorizations() []SetCodeAuthorization { return setcodetx.AuthList } +// SetTime sets the decoding time of a transaction. This is used by tests to set +// arbitrary times and by persistent transaction pools when loading old txs from +// disk. +func (tx *Transaction) SetTime(t time.Time) { + tx.time = t +} + +// Time returns the time when the transaction was first seen on the network. It +// is a heuristic to prefer mining older txs vs new all other things equal. +func (tx *Transaction) Time() time.Time { + return tx.time +} + // Hash returns the transaction hash. func (tx *Transaction) Hash() common.Hash { if hash := tx.hash.Load(); hash != nil { @@ -715,128 +727,6 @@ func (s TxByNonce) Len() int { return len(s) } func (s TxByNonce) Less(i, j int) bool { return s[i].Nonce() < s[j].Nonce() } func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -// TxByPriceAndTime implements both the sort and the heap interface, making it useful -// for all at once sorting as well as individually adding and removing elements. -type TxByPriceAndTime struct { - txs Transactions - payersSwap map[common.Address]*big.Int -} - -func (s TxByPriceAndTime) Len() int { return len(s.txs) } -func (s TxByPriceAndTime) Less(i, j int) bool { - i_price := s.txs[i].GasPrice() - if s.txs[i].To() != nil { - if _, ok := s.payersSwap[*s.txs[i].To()]; ok { - i_price = common.TRC21GasPrice - } - } - - j_price := s.txs[j].GasPrice() - if s.txs[j].To() != nil { - if _, ok := s.payersSwap[*s.txs[j].To()]; ok { - j_price = common.TRC21GasPrice - } - } - - // If the prices are equal, use the time the transaction was first seen for - // deterministic sorting - cmp := i_price.Cmp(j_price) - if cmp == 0 { - return s.txs[i].time.Before(s.txs[j].time) - } - return cmp > 0 -} -func (s TxByPriceAndTime) Swap(i, j int) { s.txs[i], s.txs[j] = s.txs[j], s.txs[i] } - -func (s *TxByPriceAndTime) Push(x interface{}) { - s.txs = append(s.txs, x.(*Transaction)) -} - -func (s *TxByPriceAndTime) Pop() interface{} { - old := s.txs - n := len(old) - x := old[n-1] - old[n-1] = nil // avoid memory leak - s.txs = old[0 : n-1] - return x -} - -// TransactionsByPriceAndNonce represents a set of transactions that can return -// transactions in a profit-maximizing sorted order, while supporting removing -// entire batches of transactions for non-executable accounts. -type TransactionsByPriceAndNonce struct { - txs map[common.Address][]*Transaction // Per account nonce-sorted list of transactions - heads TxByPriceAndTime // Next transaction for each unique account (price heap) - signer Signer // Signer for the set of transactions -} - -// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve -// price sorted transactions in a nonce-honouring way. -// -// Note, the input map is reowned so the caller should not interact any more with -// if after providing it to the constructor. -// -// It also classifies special txs and normal txs -func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address][]*Transaction, payersSwap map[common.Address]*big.Int) (*TransactionsByPriceAndNonce, Transactions) { - // Initialize a price and received time based heap with the head transactions - heads := TxByPriceAndTime{} - heads.payersSwap = payersSwap - specialTxs := Transactions{} - for _, accTxs := range txs { - from, _ := Sender(signer, accTxs[0]) - var normalTxs Transactions - for _, tx := range accTxs { - if tx.IsSpecialTransaction() { - specialTxs = append(specialTxs, tx) - } else { - normalTxs = append(normalTxs, tx) - } - } - if len(normalTxs) > 0 { - heads.txs = append(heads.txs, normalTxs[0]) - // Remove the first normal transaction for this sender - txs[from] = normalTxs[1:] - } else { - // Remove the account if there are no normal transactions - delete(txs, from) - } - } - heap.Init(&heads) - - // Assemble and return the transaction set - return &TransactionsByPriceAndNonce{ - txs: txs, - heads: heads, - signer: signer, - }, specialTxs -} - -// Peek returns the next transaction by price. -func (t *TransactionsByPriceAndNonce) Peek() *Transaction { - if len(t.heads.txs) == 0 { - return nil - } - return t.heads.txs[0] -} - -// Shift replaces the current best head with the next one from the same account. -func (t *TransactionsByPriceAndNonce) Shift() { - acc, _ := Sender(t.signer, t.heads.txs[0]) - if txs, ok := t.txs[acc]; ok && len(txs) > 0 { - t.heads.txs[0], t.txs[acc] = txs[0], txs[1:] - heap.Fix(&t.heads, 0) - } else { - heap.Pop(&t.heads) - } -} - -// Pop removes the best transaction, *not* replacing it with the next one from -// the same account. This should be used when a transaction cannot be executed -// and hence all subsequent ones should be discarded from the same account. -func (t *TransactionsByPriceAndNonce) Pop() { - heap.Pop(&t.heads) -} - // copyAddressPtr copies an address. func copyAddressPtr(a *common.Address) *common.Address { if a == nil { diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 259d9db23972..f29cce28a0f5 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -25,7 +25,6 @@ import ( "math/big" "reflect" "testing" - "time" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/crypto" @@ -260,75 +259,6 @@ func TestRecipientNormal(t *testing.T) { } } -// Tests that transactions can be correctly sorted according to their price in -// decreasing order, but at the same time with increasing nonces when issued by -// the same account. -func TestTransactionPriceNonceSort(t *testing.T) { - // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 25) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - } - - signer := HomesteadSigner{} - // Generate a batch of transactions with overlapping values, but shifted nonces - groups := map[common.Address][]*Transaction{} - for start, key := range keys { - addr := crypto.PubkeyToAddress(key.PublicKey) - for i := 0; i < 25; i++ { - tx, _ := SignTx(NewTransaction(uint64(start+i), common.Address{}, big.NewInt(100), 100, big.NewInt(int64(start+i)), nil), signer, key) - groups[addr] = append(groups[addr], tx) - } - } - // Sort the transactions and cross check the nonce ordering - txset, _ := NewTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{}) - - txs := Transactions{} - for tx := txset.Peek(); tx != nil; tx = txset.Peek() { - txs = append(txs, tx) - txset.Shift() - } - if len(txs) != 25*25 { - t.Errorf("expected %d transactions, found %d", 25*25, len(txs)) - } - for i, txi := range txs { - fromi, _ := Sender(signer, txi) - - // Make sure the nonce order is valid - for j, txj := range txs[i+1:] { - fromj, _ := Sender(signer, txj) - - if fromi == fromj && txi.Nonce() > txj.Nonce() { - t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce()) - } - } - // Find the previous and next nonce of this account - prev, next := i-1, i+1 - for j := i - 1; j >= 0; j-- { - if fromj, _ := Sender(signer, txs[j]); fromi == fromj { - prev = j - break - } - } - for j := i + 1; j < len(txs); j++ { - if fromj, _ := Sender(signer, txs[j]); fromi == fromj { - next = j - break - } - } - // Make sure that in between the neighbor nonces, the transaction is correctly positioned price wise - for j := prev + 1; j < next; j++ { - fromj, _ := Sender(signer, txs[j]) - if j < i && txs[j].GasPrice().Cmp(txi.GasPrice()) < 0 { - t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", j, fromj[:4], txs[j].GasPrice(), i, fromi[:4], txi.GasPrice()) - } - if j > i && txs[j].GasPrice().Cmp(txi.GasPrice()) > 0 { - t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) > tx #%d (A=%x P=%v)", j, fromj[:4], txs[j].GasPrice(), i, fromi[:4], txi.GasPrice()) - } - } - } -} - // TestTransactionJSON tests serializing/de-serializing to/from JSON. func TestTransactionJSON(t *testing.T) { key, err := crypto.GenerateKey() @@ -371,54 +301,6 @@ func TestTransactionJSON(t *testing.T) { } } -// Tests that if multiple transactions have the same price, the ones seen earlier -// are prioritized to avoid network spam attacks aiming for a specific ordering. -func TestTransactionTimeSort(t *testing.T) { - // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - } - signer := HomesteadSigner{} - - // Generate a batch of transactions with overlapping prices, but different creation times - groups := map[common.Address][]*Transaction{} - for start, key := range keys { - addr := crypto.PubkeyToAddress(key.PublicKey) - - tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - tx.time = time.Unix(0, int64(len(keys)-start)) - - groups[addr] = append(groups[addr], tx) - } - // Sort the transactions and cross check the nonce ordering - txset, _ := NewTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{}) - - txs := Transactions{} - for tx := txset.Peek(); tx != nil; tx = txset.Peek() { - txs = append(txs, tx) - txset.Shift() - } - if len(txs) != len(keys) { - t.Errorf("expected %d transactions, found %d", len(keys), len(txs)) - } - for i, txi := range txs { - fromi, _ := Sender(signer, txi) - if i+1 < len(txs) { - next := txs[i+1] - fromNext, _ := Sender(signer, next) - - if txi.GasPrice().Cmp(next.GasPrice()) < 0 { - t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice()) - } - // Make sure time order is ascending if the txs have the same gas price - if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.time.After(next.time) { - t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.time, i+1, fromNext[:4], next.time) - } - } - } -} - // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. func TestTransactionCoding(t *testing.T) { key, err := crypto.GenerateKey() @@ -718,78 +600,3 @@ func TestIsNonEVMTx(t *testing.T) { }) } } - -// TestNewTransactionsByPriceAndNonce_SpecialSeparation uses table-driven tests to verify separation of special and normal transactions. -func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) { - signer := HomesteadSigner{} - - genNormalTx := func(nonce uint64, key *ecdsa.PrivateKey) *Transaction { - tx, _ := SignTx(NewTransaction(nonce, common.HexToAddress("0x1234567890123456789012345678901234567890"), big.NewInt(1), 21000, big.NewInt(1), nil), signer, key) - return tx - } - genSpecialTx := func(nonce uint64, key *ecdsa.PrivateKey) *Transaction { - tx, _ := SignTx(NewTransaction(nonce, common.BlockSignersBinary, big.NewInt(1), 21000, big.NewInt(1), nil), signer, key) - return tx - } - - testCases := []struct { - name string - normalCount int - specialCount int - expectNormal int - expectSpecial int - }{ - {"no transactions", 0, 0, 0, 0}, - {"only 1 normal", 1, 0, 1, 0}, - {"only 2 normal", 2, 0, 2, 0}, - {"only 3 normal", 3, 0, 3, 0}, - {"only 1 special", 0, 1, 0, 1}, - {"only 2 special", 0, 2, 0, 2}, - {"only 3 special", 0, 3, 0, 3}, - {"1 normal, 1 special", 1, 1, 1, 1}, - {"2 normal, 2 special", 2, 2, 2, 2}, - {"3 normal, 3 special", 3, 3, 3, 3}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - key, _ := crypto.GenerateKey() - addr := crypto.PubkeyToAddress(key.PublicKey) - txs := make(Transactions, 0, tc.normalCount+tc.specialCount) - for i := 0; i < tc.normalCount; i++ { - txs = append(txs, genNormalTx(uint64(i), key)) - } - for i := 0; i < tc.specialCount; i++ { - txs = append(txs, genSpecialTx(uint64(tc.normalCount+i), key)) - } - group := map[common.Address][]*Transaction{} - if len(txs) > 0 { - group[addr] = txs - } - txset, specialTxs := NewTransactionsByPriceAndNonce(signer, group, map[common.Address]*big.Int{}) - - // Check special transactions - if len(specialTxs) != tc.expectSpecial { - t.Errorf("expected %d special txs, got %d", tc.expectSpecial, len(specialTxs)) - } - for _, tx := range specialTxs { - if tx.To() == nil || *tx.To() != common.BlockSignersBinary { - t.Errorf("specialTxs contains non-special tx: %v", tx) - } - } - - // Check normal transactions - normalCount := 0 - for tx := txset.Peek(); tx != nil; tx = txset.Peek() { - if tx.To() == nil || *tx.To() == common.BlockSignersBinary { - t.Errorf("txset contains special or nil-to tx: %v", tx) - } - normalCount++ - txset.Shift() - } - if normalCount != tc.expectNormal { - t.Errorf("expected %d normal txs, got %d", tc.expectNormal, normalCount) - } - }) - } -} diff --git a/eth/api_backend.go b/eth/api_backend.go index fecd4024cd0a..aefb0bcc32db 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -312,7 +312,11 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(false) var txs types.Transactions for _, batch := range pending { - txs = append(txs, batch...) + for _, lazy := range batch { + if tx := lazy.Resolve(); tx != nil { + txs = append(txs, tx.Tx) + } + } } return txs, nil } diff --git a/eth/helper_test.go b/eth/helper_test.go index c73db31db7dc..0e29afa2a867 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -150,7 +150,7 @@ func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []err } // Pending returns all the transactions known to the pool -func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction { +func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction { p.lock.RLock() defer p.lock.RUnlock() @@ -162,7 +162,19 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Trans for _, batch := range batches { sort.Sort(types.TxByNonce(batch)) } - return batches + pending := make(map[common.Address][]*txpool.LazyTransaction) + for addr, batch := range batches { + for _, tx := range batch { + pending[addr] = append(pending[addr], &txpool.LazyTransaction{ + Hash: tx.Hash(), + Tx: &txpool.Transaction{Tx: tx}, + Time: tx.Time(), + GasFeeCap: tx.GasFeeCap(), + GasTipCap: tx.GasTipCap(), + }) + } + } + return pending } // SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and diff --git a/eth/protocol.go b/eth/protocol.go index f10e58b28016..0cd3f6c82997 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -109,7 +109,7 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. - Pending(enforceTips bool) map[common.Address][]*types.Transaction + Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction // SubscribeNewTxsEvent should return an event subscription of // NewTxsEvent and send events to the given channel. diff --git a/eth/sync.go b/eth/sync.go index f816af2d7fbc..975d29ec9a79 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -47,7 +47,11 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { var txs types.Transactions pending := pm.txpool.Pending(false) for _, batch := range pending { - txs = append(txs, batch...) + for _, lazy := range batch { + if tx := lazy.Resolve(); tx != nil { + txs = append(txs, tx.Tx) + } + } } if len(txs) == 0 { return diff --git a/miner/ordering.go b/miner/ordering.go new file mode 100644 index 000000000000..6ee17187c7c8 --- /dev/null +++ b/miner/ordering.go @@ -0,0 +1,190 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "container/heap" + "math/big" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/core/txpool" + "github.com/XinFinOrg/XDPoSChain/core/types" +) + +// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap +type txWithMinerFee struct { + tx *txpool.LazyTransaction + from common.Address + fees *big.Int +} + +// newTxWithMinerFee creates a wrapped transaction, calculating the effective +// miner gasTipCap if a base fee is provided. +// Returns error in case of a negative effective miner gasTipCap. +func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *big.Int) (*txWithMinerFee, error) { + tip := new(big.Int).Set(tx.GasTipCap) + if baseFee != nil { + if tx.GasFeeCap.Cmp(baseFee) < 0 { + return nil, types.ErrGasFeeCapTooLow + } + effectiveTip := new(big.Int).Sub(tx.GasFeeCap, baseFee) + if tip.Cmp(effectiveTip) > 0 { + tip = effectiveTip + } + } + return &txWithMinerFee{ + tx: tx, + from: from, + fees: tip, + }, nil +} + +// TxByPriceAndTime implements both the sort and the heap interface, making it useful +// for all at once sorting as well as individually adding and removing elements. +type txByPriceAndTime struct { + txs []*txWithMinerFee + payersSwap map[common.Address]*big.Int +} + +func (s txByPriceAndTime) Len() int { + return len(s.txs) +} + +func (s txByPriceAndTime) Less(i, j int) bool { + i_price := s.txs[i].fees + if tx := s.txs[i].tx.Resolve(); tx != nil && tx.Tx.To() != nil { + if _, ok := s.payersSwap[*tx.Tx.To()]; ok { + i_price = common.TRC21GasPrice + } + } + + j_price := s.txs[j].fees + if tx := s.txs[j].tx.Resolve(); tx != nil && tx.Tx.To() != nil { + if _, ok := s.payersSwap[*tx.Tx.To()]; ok { + j_price = common.TRC21GasPrice + } + } + + // If the prices are equal, use the time the transaction was first seen for + // deterministic sorting + cmp := i_price.Cmp(j_price) + if cmp == 0 { + return s.txs[i].tx.Time.Before(s.txs[j].tx.Time) + } + return cmp > 0 +} + +func (s txByPriceAndTime) Swap(i, j int) { + s.txs[i], s.txs[j] = s.txs[j], s.txs[i] +} + +func (s *txByPriceAndTime) Push(x interface{}) { + s.txs = append(s.txs, x.(*txWithMinerFee)) +} + +func (s *txByPriceAndTime) Pop() interface{} { + old := s.txs + n := len(old) + x := old[n-1] + old[n-1] = nil // avoid memory leak + s.txs = old[0 : n-1] + return x +} + +// transactionsByPriceAndNonce represents a set of transactions that can return +// transactions in a profit-maximizing sorted order, while supporting removing +// entire batches of transactions for non-executable accounts. +type transactionsByPriceAndNonce struct { + txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions + heads txByPriceAndTime // Next transaction for each unique account (price heap) + signer types.Signer // Signer for the set of transactions + baseFee *big.Int // Current base fee +} + +// newTransactionsByPriceAndNonce creates a transaction set that can retrieve +// price sorted transactions in a nonce-honouring way. +// +// Note, the input map is reowned so the caller should not interact any more with +// if after providing it to the constructor. +func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, payersSwap map[common.Address]*big.Int, baseFee *big.Int) (*transactionsByPriceAndNonce, types.Transactions) { + // Initialize a price and received time based heap with the head transactions + heads := txByPriceAndTime{ + txs: make([]*txWithMinerFee, 0, len(txs)), + payersSwap: payersSwap, + } + specialTxs := types.Transactions{} + for from, accTxs := range txs { + var normalTxs []*txpool.LazyTransaction + for _, lazyTx := range accTxs { + if tx := lazyTx.Resolve(); tx.Tx.IsSpecialTransaction() { + specialTxs = append(specialTxs, tx.Tx) + } else { + normalTxs = append(normalTxs, lazyTx) + } + } + if len(normalTxs) > 0 { + wrapped, err := newTxWithMinerFee(normalTxs[0], from, baseFee) + if err != nil { + delete(txs, from) + continue + } + heads.txs = append(heads.txs, wrapped) + // Remove the first normal transaction for this sender + txs[from] = normalTxs[1:] + } else { + // Remove the account if there are no normal transactions + delete(txs, from) + } + } + heap.Init(&heads) + + // Assemble and return the transaction set + return &transactionsByPriceAndNonce{ + txs: txs, + heads: heads, + signer: signer, + baseFee: baseFee, + }, specialTxs +} + +// Peek returns the next transaction by price. +func (t *transactionsByPriceAndNonce) Peek() *txpool.LazyTransaction { + if len(t.heads.txs) == 0 { + return nil + } + return t.heads.txs[0].tx +} + +// Shift replaces the current best head with the next one from the same account. +func (t *transactionsByPriceAndNonce) Shift() { + acc := t.heads.txs[0].from + if txs, ok := t.txs[acc]; ok && len(txs) > 0 { + if wrapped, err := newTxWithMinerFee(txs[0], acc, t.baseFee); err == nil { + t.heads.txs[0], t.txs[acc] = wrapped, txs[1:] + heap.Fix(&t.heads, 0) + return + } + } + heap.Pop(&t.heads) +} + +// Pop removes the best transaction, *not* replacing it with the next one from +// the same account. This should be used when a transaction cannot be executed +// and hence all subsequent ones should be discarded from the same account. +func (t *transactionsByPriceAndNonce) Pop() { + heap.Pop(&t.heads) +} diff --git a/miner/ordering_test.go b/miner/ordering_test.go new file mode 100644 index 000000000000..46da07c90e2e --- /dev/null +++ b/miner/ordering_test.go @@ -0,0 +1,264 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "crypto/ecdsa" + "math/big" + "math/rand" + "testing" + "time" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/core/txpool" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/crypto" +) + +func TestTransactionPriceNonceSortLegacy(t *testing.T) { + testTransactionPriceNonceSort(t, nil) +} + +func TestTransactionPriceNonceSort1559(t *testing.T) { + testTransactionPriceNonceSort(t, big.NewInt(0)) + testTransactionPriceNonceSort(t, big.NewInt(5)) + testTransactionPriceNonceSort(t, big.NewInt(50)) +} + +// Tests that transactions can be correctly sorted according to their price in +// decreasing order, but at the same time with increasing nonces when issued by +// the same account. +func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 25) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := types.LatestSignerForChainID(common.Big1) + + // Generate a batch of transactions with overlapping values, but shifted nonces + groups := map[common.Address][]*txpool.LazyTransaction{} + expectedCount := 0 + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + count := 25 + for i := 0; i < 25; i++ { + var tx *types.Transaction + gasFeeCap := rand.Intn(50) + if baseFee == nil { + tx = types.NewTx(&types.LegacyTx{ + Nonce: uint64(start + i), + To: &common.Address{}, + Value: big.NewInt(100), + Gas: 100, + GasPrice: big.NewInt(int64(gasFeeCap)), + Data: nil, + }) + } else { + tx = types.NewTx(&types.DynamicFeeTx{ + Nonce: uint64(start + i), + To: &common.Address{}, + Value: big.NewInt(100), + Gas: 100, + GasFeeCap: big.NewInt(int64(gasFeeCap)), + GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))), + Data: nil, + }) + if count == 25 && int64(gasFeeCap) < baseFee.Int64() { + count = i + } + } + tx, err := types.SignTx(tx, signer, key) + if err != nil { + t.Fatalf("failed to sign tx: %s", err) + } + groups[addr] = append(groups[addr], &txpool.LazyTransaction{ + Hash: tx.Hash(), + Tx: &txpool.Transaction{Tx: tx}, + Time: tx.Time(), + GasFeeCap: tx.GasFeeCap(), + GasTipCap: tx.GasTipCap(), + }) + } + expectedCount += count + } + // Sort the transactions and cross check the nonce ordering + txset, _ := newTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{}, baseFee) + + txs := types.Transactions{} + for tx := txset.Peek(); tx != nil; tx = txset.Peek() { + txs = append(txs, tx.Tx.Tx) + txset.Shift() + } + if len(txs) != expectedCount { + t.Errorf("expected %d transactions, found %d", expectedCount, len(txs)) + } + for i, txi := range txs { + fromi, _ := types.Sender(signer, txi) + + // Make sure the nonce order is valid + for j, txj := range txs[i+1:] { + fromj, _ := types.Sender(signer, txj) + if fromi == fromj && txi.Nonce() > txj.Nonce() { + t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce()) + } + } + // If the next tx has different from account, the price must be lower than the current one + if i+1 < len(txs) { + next := txs[i+1] + fromNext, _ := types.Sender(signer, next) + tip, err := txi.EffectiveGasTip(baseFee) + nextTip, nextErr := next.EffectiveGasTip(baseFee) + if err != nil || nextErr != nil { + t.Errorf("error calculating effective tip: %v, %v", err, nextErr) + } + if fromi != fromNext && tip.Cmp(nextTip) < 0 { + t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice()) + } + } + } +} + +// Tests that if multiple transactions have the same price, the ones seen earlier +// are prioritized to avoid network spam attacks aiming for a specific ordering. +func TestTransactionTimeSort(t *testing.T) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := types.HomesteadSigner{} + + // Generate a batch of transactions with overlapping prices, but different creation times + groups := map[common.Address][]*txpool.LazyTransaction{} + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + + tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + tx.SetTime(time.Unix(0, int64(len(keys)-start))) + + groups[addr] = append(groups[addr], &txpool.LazyTransaction{ + Hash: tx.Hash(), + Tx: &txpool.Transaction{Tx: tx}, + Time: tx.Time(), + GasFeeCap: tx.GasFeeCap(), + GasTipCap: tx.GasTipCap(), + }) + } + // Sort the transactions and cross check the nonce ordering + txset, _ := newTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{}, nil) + + txs := types.Transactions{} + for tx := txset.Peek(); tx != nil; tx = txset.Peek() { + txs = append(txs, tx.Tx.Tx) + txset.Shift() + } + if len(txs) != len(keys) { + t.Errorf("expected %d transactions, found %d", len(keys), len(txs)) + } + for i, txi := range txs { + fromi, _ := types.Sender(signer, txi) + if i+1 < len(txs) { + next := txs[i+1] + fromNext, _ := types.Sender(signer, next) + + if txi.GasPrice().Cmp(next.GasPrice()) < 0 { + t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice()) + } + // Make sure time order is ascending if the txs have the same gas price + if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.Time().After(next.Time()) { + t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.Time(), i+1, fromNext[:4], next.Time()) + } + } + } +} + +// TestNewTransactionsByPriceAndNonce_SpecialSeparation uses table-driven tests to verify separation of special and normal transactions. +func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) { + signer := types.HomesteadSigner{} + + genNormalTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction { + tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0x1234567890123456789012345678901234567890"), big.NewInt(1), 21000, big.NewInt(1), nil), signer, key) + return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()} + } + genSpecialTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction { + tx, _ := types.SignTx(types.NewTransaction(nonce, common.BlockSignersBinary, big.NewInt(1), 21000, big.NewInt(1), nil), signer, key) + return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()} + } + + testCases := []struct { + name string + normalCount int + specialCount int + expectNormal int + expectSpecial int + }{ + {"no transactions", 0, 0, 0, 0}, + {"only 1 normal", 1, 0, 1, 0}, + {"only 2 normal", 2, 0, 2, 0}, + {"only 3 normal", 3, 0, 3, 0}, + {"only 1 special", 0, 1, 0, 1}, + {"only 2 special", 0, 2, 0, 2}, + {"only 3 special", 0, 3, 0, 3}, + {"1 normal, 1 special", 1, 1, 1, 1}, + {"2 normal, 2 special", 2, 2, 2, 2}, + {"3 normal, 3 special", 3, 3, 3, 3}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + key, _ := crypto.GenerateKey() + addr := crypto.PubkeyToAddress(key.PublicKey) + txs := make([]*txpool.LazyTransaction, 0, tc.normalCount+tc.specialCount) + for i := 0; i < tc.normalCount; i++ { + txs = append(txs, genNormalTx(uint64(i), key)) + } + for i := 0; i < tc.specialCount; i++ { + txs = append(txs, genSpecialTx(uint64(tc.normalCount+i), key)) + } + group := map[common.Address][]*txpool.LazyTransaction{} + if len(txs) > 0 { + group[addr] = txs + } + txset, specialTxs := newTransactionsByPriceAndNonce(signer, group, map[common.Address]*big.Int{}, nil) + + // Check special transactions + if len(specialTxs) != tc.expectSpecial { + t.Errorf("expected %d special txs, got %d", tc.expectSpecial, len(specialTxs)) + } + for _, tx := range specialTxs { + if tx.To() == nil || *tx.To() != common.BlockSignersBinary { + t.Errorf("specialTxs contains non-special tx: %v", tx) + } + } + + // Check normal transactions + normalCount := 0 + for tx := txset.Peek(); tx != nil; tx = txset.Peek() { + resolved := tx.Resolve() + if resolved == nil || resolved.Tx.To() == nil || *resolved.Tx.To() == common.BlockSignersBinary { + t.Errorf("txset contains special or nil-to tx: %v", resolved) + } + normalCount++ + txset.Shift() + } + if normalCount != tc.expectNormal { + t.Errorf("expected %d normal txs, got %d", tc.expectNormal, normalCount) + } + }) + } +} diff --git a/miner/worker.go b/miner/worker.go index 75494d6a82db..17aad92588cc 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -36,6 +36,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/contracts" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/state" + "github.com/XinFinOrg/XDPoSChain/core/txpool" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" "github.com/XinFinOrg/XDPoSChain/ethdb" @@ -355,13 +356,19 @@ func (w *worker) update() { // be automatically eliminated. if atomic.LoadInt32(&w.mining) == 0 { w.currentMu.Lock() - txs := make(map[common.Address][]*types.Transaction, len(ev.Txs)) + txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs)) for _, tx := range ev.Txs { acc, _ := types.Sender(w.current.signer, tx) - txs[acc] = append(txs[acc], tx) + txs[acc] = append(txs[acc], &txpool.LazyTransaction{ + Hash: tx.Hash(), + Tx: &txpool.Transaction{Tx: tx}, + Time: tx.Time(), + GasFeeCap: tx.GasFeeCap(), + GasTipCap: tx.GasTipCap(), + }) } feeCapacity := w.current.state.GetTRC21FeeCapacityFromState() - txset, specialTxs := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, feeCapacity) + txset, specialTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, feeCapacity, w.current.header.BaseFee) tcount := w.current.tcount w.current.commitTransactions(w.mux, feeCapacity, txset, specialTxs, w.chain, w.coinbase, &w.pendingLogsFeed) @@ -778,7 +785,7 @@ func (w *worker) commitNewWork() { } // won't grasp txs at checkpoint var ( - txs *types.TransactionsByPriceAndNonce + txs *transactionsByPriceAndNonce specialTxs types.Transactions tradingTxMatches []tradingstate.TxDataMatch lendingMatchingResults map[common.Hash]lendingstate.MatchingResult @@ -794,7 +801,7 @@ func (w *worker) commitNewWork() { } if !isEpochSwitchBlock { pending := w.eth.TxPool().Pending(true) - txs, specialTxs = types.NewTransactionsByPriceAndNonce(w.current.signer, pending, feeCapacity) + txs, specialTxs = newTransactionsByPriceAndNonce(w.current.signer, pending, feeCapacity, header.BaseFee) } } if atomic.LoadInt32(&w.mining) == 1 { @@ -959,7 +966,7 @@ func (w *worker) commitNewWork() { w.updateSnapshot() } -func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address, pendingLogsFeed *event.Feed) { +func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *transactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address, pendingLogsFeed *event.Feed) { gp := new(core.GasPool).AddGas(w.header.GasLimit) balanceUpdated := map[common.Address]*big.Int{} totalFeeUsed := big.NewInt(0) @@ -1070,12 +1077,15 @@ func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Addr break } // Retrieve the next transaction and abort if all done - tx := txs.Peek() - - if tx == nil { + lazyTx := txs.Peek() + if lazyTx == nil { break } - + warped := lazyTx.Resolve() + if warped == nil || warped.Tx == nil { + break + } + tx := warped.Tx to := tx.To() if w.header.Number.Uint64() >= common.BlackListHFNumber { from := tx.From()