Skip to content

Commit 2425a74

Browse files
committed
[release/1.4.18] core: add global (soft) limits on the pending transactions
(cherry picked from commit 182d9cb)
1 parent facfe40 commit 2425a74

File tree

2 files changed

+150
-5
lines changed

2 files changed

+150
-5
lines changed

core/tx_pool.go

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/ethereum/go-ethereum/event"
3131
"github.com/ethereum/go-ethereum/logger"
3232
"github.com/ethereum/go-ethereum/logger/glog"
33+
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
3334
)
3435

3536
var (
@@ -46,10 +47,12 @@ var (
4647
)
4748

4849
var (
49-
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
50-
maxQueuedInTotal = uint64(8192) // Max limit of queued transactions from all accounts
51-
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
52-
evictionInterval = time.Minute // Time interval to check for evictable transactions
50+
minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
51+
maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
52+
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
53+
maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts
54+
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
55+
evictionInterval = time.Minute // Time interval to check for evictable transactions
5356
)
5457

5558
type stateFn func() (*state.StateDB, error)
@@ -481,7 +484,6 @@ func (pool *TxPool) promoteExecutables() {
481484
}
482485
// Iterate over all accounts and promote any executable transactions
483486
queued := uint64(0)
484-
485487
for addr, list := range pool.queue {
486488
// Drop all transactions that are deemed too old (low nonce)
487489
for _, tx := range list.Forward(state.GetNonce(addr)) {
@@ -519,6 +521,59 @@ func (pool *TxPool) promoteExecutables() {
519521
delete(pool.queue, addr)
520522
}
521523
}
524+
// If the pending limit is overflown, start equalizing allowances
525+
pending := uint64(0)
526+
for _, list := range pool.pending {
527+
pending += uint64(list.Len())
528+
}
529+
if pending > maxPendingTotal {
530+
// Assemble a spam order to penalize large transactors first
531+
spammers := prque.New()
532+
for addr, list := range pool.pending {
533+
// Only evict transactions from high rollers
534+
if uint64(list.Len()) > minPendingPerAccount {
535+
// Skip local accounts as pools should maintain backlogs for themselves
536+
for _, tx := range list.txs.items {
537+
if !pool.localTx.contains(tx.Hash()) {
538+
spammers.Push(addr, float32(list.Len()))
539+
}
540+
break // Checking on transaction for locality is enough
541+
}
542+
}
543+
}
544+
// Gradually drop transactions from offenders
545+
offenders := []common.Address{}
546+
for pending > maxPendingTotal && !spammers.Empty() {
547+
// Retrieve the next offender if not local address
548+
offender, _ := spammers.Pop()
549+
offenders = append(offenders, offender.(common.Address))
550+
551+
// Equalize balances until all the same or below threshold
552+
if len(offenders) > 1 {
553+
// Calculate the equalization threshold for all current offenders
554+
threshold := pool.pending[offender.(common.Address)].Len()
555+
556+
// Iteratively reduce all offenders until below limit or threshold reached
557+
for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
558+
for i := 0; i < len(offenders)-1; i++ {
559+
list := pool.pending[offenders[i]]
560+
list.Cap(list.Len() - 1)
561+
pending--
562+
}
563+
}
564+
}
565+
}
566+
// If still above threshold, reduce to limit or min allowance
567+
if pending > maxPendingTotal && len(offenders) > 0 {
568+
for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount {
569+
for _, addr := range offenders {
570+
list := pool.pending[addr]
571+
list.Cap(list.Len() - 1)
572+
pending--
573+
}
574+
}
575+
}
576+
}
522577
// If we've queued more transactions than the hard limit, drop oldest ones
523578
if queued > maxQueuedInTotal {
524579
// Sort all accounts with queued transactions by heartbeat

core/tx_pool_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,96 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
618618
}
619619
}
620620

621+
// Tests that if the transaction count belonging to multiple accounts go above
622+
// some hard threshold, the higher transactions are dropped to prevent DOS
623+
// attacks.
624+
func TestTransactionPendingGlobalLimiting(t *testing.T) {
625+
// Reduce the queue limits to shorten test time
626+
defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
627+
maxPendingTotal = minPendingPerAccount * 10
628+
629+
// Create the pool to test the limit enforcement with
630+
db, _ := ethdb.NewMemDatabase()
631+
statedb, _ := state.New(common.Hash{}, db)
632+
633+
pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
634+
pool.resetState()
635+
636+
// Create a number of test accounts and fund them
637+
state, _ := pool.currentState()
638+
639+
keys := make([]*ecdsa.PrivateKey, 5)
640+
for i := 0; i < len(keys); i++ {
641+
keys[i], _ = crypto.GenerateKey()
642+
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
643+
}
644+
// Generate and queue a batch of transactions
645+
nonces := make(map[common.Address]uint64)
646+
647+
txs := types.Transactions{}
648+
for _, key := range keys {
649+
addr := crypto.PubkeyToAddress(key.PublicKey)
650+
for j := 0; j < int(maxPendingTotal)/len(keys)*2; j++ {
651+
txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key))
652+
nonces[addr]++
653+
}
654+
}
655+
// Import the batch and verify that limits have been enforced
656+
pool.AddBatch(txs)
657+
658+
pending := 0
659+
for _, list := range pool.pending {
660+
pending += list.Len()
661+
}
662+
if pending > int(maxPendingTotal) {
663+
t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, maxPendingTotal)
664+
}
665+
}
666+
667+
// Tests that if the transaction count belonging to multiple accounts go above
668+
// some hard threshold, if they are under the minimum guaranteed slot count then
669+
// the transactions are still kept.
670+
func TestTransactionPendingMinimumAllowance(t *testing.T) {
671+
// Reduce the queue limits to shorten test time
672+
defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
673+
maxPendingTotal = 0
674+
675+
// Create the pool to test the limit enforcement with
676+
db, _ := ethdb.NewMemDatabase()
677+
statedb, _ := state.New(common.Hash{}, db)
678+
679+
pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
680+
pool.resetState()
681+
682+
// Create a number of test accounts and fund them
683+
state, _ := pool.currentState()
684+
685+
keys := make([]*ecdsa.PrivateKey, 5)
686+
for i := 0; i < len(keys); i++ {
687+
keys[i], _ = crypto.GenerateKey()
688+
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
689+
}
690+
// Generate and queue a batch of transactions
691+
nonces := make(map[common.Address]uint64)
692+
693+
txs := types.Transactions{}
694+
for _, key := range keys {
695+
addr := crypto.PubkeyToAddress(key.PublicKey)
696+
for j := 0; j < int(minPendingPerAccount)*2; j++ {
697+
txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key))
698+
nonces[addr]++
699+
}
700+
}
701+
// Import the batch and verify that limits have been enforced
702+
pool.AddBatch(txs)
703+
704+
for addr, list := range pool.pending {
705+
if list.Len() != int(minPendingPerAccount) {
706+
t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), minPendingPerAccount)
707+
}
708+
}
709+
}
710+
621711
// Benchmarks the speed of validating the contents of the pending queue of the
622712
// transaction pool.
623713
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }

0 commit comments

Comments
 (0)