@@ -20,6 +20,7 @@ import (
20
20
"errors"
21
21
"fmt"
22
22
"math/big"
23
+ "sort"
23
24
"sync"
24
25
"time"
25
26
44
45
ErrNegativeValue = errors .New ("Negative value" )
45
46
)
46
47
47
- const (
48
- maxQueued = 64 // max limit of queued txs per address
48
+ var (
49
+ maxQueuedPerAccount = uint64 (64 ) // Max limit of queued transactions per address
50
+ maxQueuedInTotal = uint64 (65536 ) // Max limit of queued transactions from all accounts
51
+ maxQueuedLifetime = 3 * time .Hour // Max amount of time transactions from idle accounts are queued
52
+ evictionInterval = time .Minute // Time interval to check for evictable transactions
49
53
)
50
54
51
55
type stateFn func () (* state.StateDB , error )
@@ -71,8 +75,10 @@ type TxPool struct {
71
75
pending map [common.Address ]* txList // All currently processable transactions
72
76
queue map [common.Address ]* txList // Queued but non-processable transactions
73
77
all map [common.Hash ]* types.Transaction // All transactions to allow lookups
78
+ beats map [common.Address ]time.Time // Last heartbeat from each known account
74
79
75
- wg sync.WaitGroup // for shutdown sync
80
+ wg sync.WaitGroup // for shutdown sync
81
+ quit chan struct {}
76
82
77
83
homestead bool
78
84
}
@@ -83,17 +89,20 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
83
89
pending : make (map [common.Address ]* txList ),
84
90
queue : make (map [common.Address ]* txList ),
85
91
all : make (map [common.Hash ]* types.Transaction ),
92
+ beats : make (map [common.Address ]time.Time ),
86
93
eventMux : eventMux ,
87
94
currentState : currentStateFn ,
88
95
gasLimit : gasLimitFn ,
89
96
minGasPrice : new (big.Int ),
90
97
pendingState : nil ,
91
98
localTx : newTxSet (),
92
99
events : eventMux .Subscribe (ChainHeadEvent {}, GasPriceChanged {}, RemovedTransactionEvent {}),
100
+ quit : make (chan struct {}),
93
101
}
94
102
95
- pool .wg .Add (1 )
103
+ pool .wg .Add (2 )
96
104
go pool .eventLoop ()
105
+ go pool .expirationLoop ()
97
106
98
107
return pool
99
108
}
@@ -154,6 +163,7 @@ func (pool *TxPool) resetState() {
154
163
155
164
func (pool * TxPool ) Stop () {
156
165
pool .events .Unsubscribe ()
166
+ close (pool .quit )
157
167
pool .wg .Wait ()
158
168
glog .V (logger .Info ).Infoln ("Transaction pool stopped" )
159
169
}
@@ -290,7 +300,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
290
300
if pool .all [hash ] != nil {
291
301
return fmt .Errorf ("Known transaction: %x" , hash [:4 ])
292
302
}
293
- // Otherwise ensure basic validation passes nd queue it up
303
+ // Otherwise ensure basic validation passes and queue it up
294
304
if err := pool .validateTx (tx ); err != nil {
295
305
return err
296
306
}
@@ -308,7 +318,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
308
318
return nil
309
319
}
310
320
311
- // enqueueTx inserts a new transction into the non-executable transaction queue.
321
+ // enqueueTx inserts a new transaction into the non-executable transaction queue.
312
322
//
313
323
// Note, this method assumes the pool lock is held!
314
324
func (pool * TxPool ) enqueueTx (hash common.Hash , tx * types.Transaction ) {
@@ -355,6 +365,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
355
365
pool .all [hash ] = tx // Failsafe to work around direct pending inserts (tests)
356
366
357
367
// Set the potentially new pending nonce and notify any subsystems of the new tx
368
+ pool .beats [addr ] = time .Now ()
358
369
pool .pendingState .SetNonce (addr , list .last + 1 )
359
370
go pool .eventMux .Post (TxPreEvent {tx })
360
371
}
@@ -412,8 +423,8 @@ func (pool *TxPool) RemoveBatch(txs types.Transactions) {
412
423
}
413
424
}
414
425
415
- // removeTx iterates removes a single transaction from the queue, moving all
416
- // subsequent transactions back to the future queue.
426
+ // removeTx removes a single transaction from the queue, moving all subsequent
427
+ // transactions back to the future queue.
417
428
func (pool * TxPool ) removeTx (hash common.Hash ) {
418
429
// Fetch the transaction we wish to delete
419
430
tx , ok := pool .all [hash ]
@@ -431,6 +442,8 @@ func (pool *TxPool) removeTx(hash common.Hash) {
431
442
// If no more transactions are left, remove the list and reset the nonce
432
443
if pending .Empty () {
433
444
delete (pool .pending , addr )
445
+ delete (pool .beats , addr )
446
+
434
447
pool .pendingState .SetNonce (addr , tx .Nonce ())
435
448
} else {
436
449
// Otherwise update the nonce and postpone any invalidated transactions
@@ -465,6 +478,8 @@ func (pool *TxPool) promoteExecutables() {
465
478
return
466
479
}
467
480
// Iterate over all accounts and promote any executable transactions
481
+ queued := uint64 (0 )
482
+
468
483
for addr , list := range pool .queue {
469
484
// Drop all transactions that are deemed too old (low nonce)
470
485
for _ , tx := range list .Forward (state .GetNonce (addr )) {
@@ -489,17 +504,51 @@ func (pool *TxPool) promoteExecutables() {
489
504
pool .promoteTx (addr , tx .Hash (), tx )
490
505
}
491
506
// Drop all transactions over the allowed limit
492
- for _ , tx := range list .Cap (maxQueued ) {
507
+ for _ , tx := range list .Cap (int ( maxQueuedPerAccount ) ) {
493
508
if glog .V (logger .Core ) {
494
509
glog .Infof ("Removed cap-exceeding queued transaction: %v" , tx )
495
510
}
496
511
delete (pool .all , tx .Hash ())
497
512
}
513
+ queued += uint64 (list .Len ())
514
+
498
515
// Delete the entire queue entry if it became empty.
499
516
if list .Empty () {
500
517
delete (pool .queue , addr )
501
518
}
502
519
}
520
+ // If we've queued more transactions than the hard limit, drop oldest ones
521
+ if queued > maxQueuedInTotal {
522
+ // Sort all accounts with queued transactions by heartbeat
523
+ addresses := make (addresssByHeartbeat , 0 , len (pool .queue ))
524
+ for addr , _ := range pool .queue {
525
+ addresses = append (addresses , addressByHeartbeat {addr , pool .beats [addr ]})
526
+ }
527
+ sort .Sort (addresses )
528
+
529
+ // Drop transactions until the total is below the limit
530
+ for drop := queued - maxQueuedInTotal ; drop > 0 ; {
531
+ addr := addresses [len (addresses )- 1 ]
532
+ list := pool .queue [addr .address ]
533
+
534
+ addresses = addresses [:len (addresses )- 1 ]
535
+
536
+ // Drop all transactions if they are less than the overflow
537
+ if size := uint64 (list .Len ()); size <= drop {
538
+ for _ , tx := range list .Flatten () {
539
+ pool .removeTx (tx .Hash ())
540
+ }
541
+ drop -= size
542
+ continue
543
+ }
544
+ // Otherwise drop only last few transactions
545
+ txs := list .Flatten ()
546
+ for i := len (txs ) - 1 ; i >= 0 && drop > 0 ; i -- {
547
+ pool .removeTx (txs [i ].Hash ())
548
+ drop --
549
+ }
550
+ }
551
+ }
503
552
}
504
553
505
554
// demoteUnexecutables removes invalid and processed transactions from the pools
@@ -540,10 +589,51 @@ func (pool *TxPool) demoteUnexecutables() {
540
589
// Delete the entire queue entry if it became empty.
541
590
if list .Empty () {
542
591
delete (pool .pending , addr )
592
+ delete (pool .beats , addr )
543
593
}
544
594
}
545
595
}
546
596
597
+ // expirationLoop is a loop that periodically iterates over all accounts with
598
+ // queued transactions and drop all that have been inactive for a prolonged amount
599
+ // of time.
600
+ func (pool * TxPool ) expirationLoop () {
601
+ defer pool .wg .Done ()
602
+
603
+ evict := time .NewTicker (evictionInterval )
604
+ defer evict .Stop ()
605
+
606
+ for {
607
+ select {
608
+ case <- evict .C :
609
+ pool .mu .Lock ()
610
+ for addr := range pool .queue {
611
+ if time .Since (pool .beats [addr ]) > maxQueuedLifetime {
612
+ for _ , tx := range pool .queue [addr ].Flatten () {
613
+ pool .removeTx (tx .Hash ())
614
+ }
615
+ }
616
+ }
617
+ pool .mu .Unlock ()
618
+
619
+ case <- pool .quit :
620
+ return
621
+ }
622
+ }
623
+ }
624
+
625
+ // addressByHeartbeat is an account address tagged with its last activity timestamp.
626
+ type addressByHeartbeat struct {
627
+ address common.Address
628
+ heartbeat time.Time
629
+ }
630
+
631
+ type addresssByHeartbeat []addressByHeartbeat
632
+
633
+ func (a addresssByHeartbeat ) Len () int { return len (a ) }
634
+ func (a addresssByHeartbeat ) Less (i , j int ) bool { return a [i ].heartbeat .Before (a [j ].heartbeat ) }
635
+ func (a addresssByHeartbeat ) Swap (i , j int ) { a [i ], a [j ] = a [j ], a [i ] }
636
+
547
637
// txSet represents a set of transaction hashes in which entries
548
638
// are automatically dropped after txSetDuration time
549
639
type txSet struct {
0 commit comments