Skip to content

Commit c374447

Browse files
villanuevawillkaralabe
authored andcommitted
core: fix queued transaction eviction
Solves issue#20582. Non-executable transactions should not be evicted on each tick if there are no promote transactions or if a pending/reset empties the pending list. Tests and logging expanded to handle these cases in the future. core/tx_pool: use a ts for each tx in the queue, but only update the heartbeat on promotion or pending replaced queuedTs proper naming
1 parent 6315b6f commit c374447

File tree

2 files changed

+157
-12
lines changed

2 files changed

+157
-12
lines changed

core/tx_pool.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ var (
9898
queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
9999
queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
100100
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
101+
queuedEvictionMeter = metrics.NewRegisteredMeter("txpool/queued/eviction", nil) // Dropped due to lifetime
101102

102103
// General tx metrics
103104
knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
@@ -231,11 +232,12 @@ type TxPool struct {
231232
locals *accountSet // Set of local transaction to exempt from eviction rules
232233
journal *txJournal // Journal of local transaction to back up to disk
233234

234-
pending map[common.Address]*txList // All currently processable transactions
235-
queue map[common.Address]*txList // Queued but non-processable transactions
236-
beats map[common.Address]time.Time // Last heartbeat from each known account
237-
all *txLookup // All transactions to allow lookups
238-
priced *txPricedList // All transactions sorted by price
235+
pending map[common.Address]*txList // All currently processable transactions
236+
queue map[common.Address]*txList // Queued but non-processable transactions
237+
beats map[common.Address]time.Time // Last heartbeat from each known account
238+
queuedTs map[common.Hash]time.Time // Timestamp for when queued transactions were added
239+
all *txLookup // All transactions to allow lookups
240+
priced *txPricedList // All transactions sorted by price
239241

240242
chainHeadCh chan ChainHeadEvent
241243
chainHeadSub event.Subscription
@@ -266,6 +268,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
266268
pending: make(map[common.Address]*txList),
267269
queue: make(map[common.Address]*txList),
268270
beats: make(map[common.Address]time.Time),
271+
queuedTs: make(map[common.Hash]time.Time),
269272
all: newTxLookup(),
270273
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
271274
reqResetCh: make(chan *txpoolResetRequest),
@@ -363,7 +366,10 @@ func (pool *TxPool) loop() {
363366
// Any non-locals old enough should be removed
364367
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
365368
for _, tx := range pool.queue[addr].Flatten() {
366-
pool.removeTx(tx.Hash(), true)
369+
if time.Since(pool.queuedTs[tx.Hash()]) > pool.config.Lifetime {
370+
queuedEvictionMeter.Mark(1)
371+
pool.removeTx(tx.Hash(), true)
372+
}
367373
}
368374
}
369375
}
@@ -616,6 +622,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
616622
pool.all.Add(tx)
617623
pool.priced.Put(tx)
618624
pool.journalTx(from, tx)
625+
pool.beats[from] = time.Now()
619626
pool.queueTxEvent(tx)
620627
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
621628
return old != nil, nil
@@ -658,16 +665,20 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
658665
}
659666
// Discard any previous transaction and mark this
660667
if old != nil {
661-
pool.all.Remove(old.Hash())
668+
old_hash := old.Hash()
669+
pool.all.Remove(old_hash)
662670
pool.priced.Removed(1)
671+
delete(pool.queuedTs, old_hash)
663672
queuedReplaceMeter.Mark(1)
664673
} else {
665674
// Nothing was replaced, bump the queued counter
666675
queuedGauge.Inc(1)
676+
pool.queuedTs[hash] = time.Now()
667677
}
668678
if pool.all.Get(hash) == nil {
669679
pool.all.Add(tx)
670680
pool.priced.Put(tx)
681+
pool.queuedTs[hash] = time.Now()
671682
}
672683
return old != nil, nil
673684
}
@@ -700,15 +711,14 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
700711
// An older transaction was better, discard this
701712
pool.all.Remove(hash)
702713
pool.priced.Removed(1)
703-
714+
delete(pool.queuedTs, hash)
704715
pendingDiscardMeter.Mark(1)
705716
return false
706717
}
707718
// Otherwise discard any previous transaction and mark this
708719
if old != nil {
709720
pool.all.Remove(old.Hash())
710721
pool.priced.Removed(1)
711-
712722
pendingReplaceMeter.Mark(1)
713723
} else {
714724
// Nothing was replaced, bump the pending counter
@@ -721,6 +731,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
721731
}
722732
// Set the potentially new pending nonce and notify any subsystems of the new tx
723733
pool.beats[addr] = time.Now()
734+
delete(pool.queuedTs, hash)
724735
pool.pendingNonces.set(addr, tx.Nonce()+1)
725736

726737
return true
@@ -895,7 +906,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
895906
// If no more pending transactions are left, remove the list
896907
if pending.Empty() {
897908
delete(pool.pending, addr)
898-
delete(pool.beats, addr)
899909
}
900910
// Postpone any invalidated transactions
901911
for _, tx := range invalids {
@@ -913,6 +923,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
913923
if removed, _ := future.Remove(tx); removed {
914924
// Reduce the queued counter
915925
queuedGauge.Dec(1)
926+
delete(pool.queuedTs, hash)
916927
}
917928
if future.Empty() {
918929
delete(pool.queue, addr)
@@ -1191,13 +1202,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
11911202
for _, tx := range forwards {
11921203
hash := tx.Hash()
11931204
pool.all.Remove(hash)
1205+
delete(pool.queuedTs, hash)
11941206
}
11951207
log.Trace("Removed old queued transactions", "count", len(forwards))
11961208
// Drop all transactions that are too costly (low balance or out of gas)
11971209
drops, _ := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
11981210
for _, tx := range drops {
11991211
hash := tx.Hash()
12001212
pool.all.Remove(hash)
1213+
delete(pool.queuedTs, hash)
12011214
}
12021215
log.Trace("Removed unpayable queued transactions", "count", len(drops))
12031216
queuedNofundsMeter.Mark(int64(len(drops)))
@@ -1220,6 +1233,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
12201233
for _, tx := range caps {
12211234
hash := tx.Hash()
12221235
pool.all.Remove(hash)
1236+
delete(pool.queuedTs, hash)
12231237
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
12241238
}
12251239
queuedRateLimitMeter.Mark(int64(len(caps)))
@@ -1414,7 +1428,7 @@ func (pool *TxPool) demoteUnexecutables() {
14141428
}
14151429
pendingGauge.Dec(int64(len(gapped)))
14161430
}
1417-
// Delete the entire queue entry if it became empty.
1431+
// Delete the entire pending entry if it became empty.
14181432
if list.Empty() {
14191433
delete(pool.pending, addr)
14201434
delete(pool.beats, addr)

core/tx_pool_test.go

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ func validateTxPoolInternals(pool *TxPool) error {
109109
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
110110
return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
111111
}
112+
if queued != len(pool.queuedTs) {
113+
return fmt.Errorf("total queued transaction count %d != %d queuedTs length", queued, len(pool.queuedTs))
114+
}
115+
112116
// Ensure the next nonce to assign is the correct one
113117
for addr, txs := range pool.pending {
114118
// Find the last transaction
@@ -868,7 +872,7 @@ func TestTransactionQueueTimeLimitingNoLocals(t *testing.T) {
868872
func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
869873
// Reduce the eviction interval to a testable amount
870874
defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
871-
evictionInterval = time.Second
875+
evictionInterval = time.Millisecond * 100
872876

873877
// Create the pool to test the non-expiration enforcement
874878
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
@@ -905,6 +909,22 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
905909
if err := validateTxPoolInternals(pool); err != nil {
906910
t.Fatalf("pool internal state corrupted: %v", err)
907911
}
912+
913+
// Allow the eviction interval to run
914+
time.Sleep(2 * evictionInterval)
915+
916+
// Transactions should not be evicted from the queue yet since lifetime duration has not passed
917+
pending, queued = pool.Stats()
918+
if pending != 0 {
919+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
920+
}
921+
if queued != 2 {
922+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
923+
}
924+
if err := validateTxPoolInternals(pool); err != nil {
925+
t.Fatalf("pool internal state corrupted: %v", err)
926+
}
927+
908928
// Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains
909929
time.Sleep(2 * config.Lifetime)
910930

@@ -924,6 +944,117 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
924944
if err := validateTxPoolInternals(pool); err != nil {
925945
t.Fatalf("pool internal state corrupted: %v", err)
926946
}
947+
948+
// remove current transactions and increase nonce to prepare for a reset and cleanup
949+
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
950+
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
951+
952+
<-pool.requestReset(nil, nil)
953+
954+
// make sure queue, pending are cleared
955+
pending, queued = pool.Stats()
956+
if pending != 0 {
957+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
958+
}
959+
if queued != 0 {
960+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
961+
}
962+
if err := validateTxPoolInternals(pool); err != nil {
963+
t.Fatalf("pool internal state corrupted: %v", err)
964+
}
965+
966+
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
967+
t.Fatalf("failed to add remote transaction: %v", err)
968+
}
969+
if err := pool.AddLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil {
970+
t.Fatalf("failed to add remote transaction: %v", err)
971+
}
972+
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
973+
t.Fatalf("failed to add remote transaction: %v", err)
974+
}
975+
if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil {
976+
t.Fatalf("failed to add remote transaction: %v", err)
977+
}
978+
979+
// wait a short amount of time to add an additional future queued item to test proper eviction when
980+
// pending is removed
981+
time.Sleep(2 * evictionInterval)
982+
if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(1), remote)); err != nil {
983+
t.Fatalf("failed to add remote transaction: %v", err)
984+
}
985+
986+
// Make sure future queue and pending have transactions
987+
pending, queued = pool.Stats()
988+
if pending != 2 {
989+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
990+
}
991+
if queued != 3 {
992+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
993+
}
994+
if err := validateTxPoolInternals(pool); err != nil {
995+
t.Fatalf("pool internal state corrupted: %v", err)
996+
}
997+
998+
// Trigger a reset to make sure queued items are not evicted
999+
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 3)
1000+
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 3)
1001+
<-pool.requestReset(nil, nil)
1002+
1003+
// Wait for eviction to run
1004+
time.Sleep(evictionInterval * 2)
1005+
1006+
// a pool reset, empty pending list, or demotion of pending transactions should maintain
1007+
// queued transactions for non locals and locals alike if the lifetime duration has not passed yet
1008+
pending, queued = pool.Stats()
1009+
if pending != 0 {
1010+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
1011+
}
1012+
if queued != 3 {
1013+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
1014+
}
1015+
if err := validateTxPoolInternals(pool); err != nil {
1016+
t.Fatalf("pool internal state corrupted: %v", err)
1017+
}
1018+
1019+
// Wait for the lifetime to run for all transactions except the one that was added later
1020+
time.Sleep(evictionInterval * 7)
1021+
pending, queued = pool.Stats()
1022+
if pending != 0 {
1023+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
1024+
}
1025+
if nolocals {
1026+
if queued != 1 {
1027+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
1028+
}
1029+
} else {
1030+
if queued != 2 {
1031+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
1032+
}
1033+
}
1034+
1035+
if err := validateTxPoolInternals(pool); err != nil {
1036+
t.Fatalf("pool internal state corrupted: %v", err)
1037+
}
1038+
1039+
// lifetime should pass for the final transaction
1040+
time.Sleep(evictionInterval * 2)
1041+
1042+
pending, queued = pool.Stats()
1043+
if pending != 0 {
1044+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
1045+
}
1046+
if nolocals {
1047+
if queued != 0 {
1048+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
1049+
}
1050+
} else {
1051+
if queued != 1 {
1052+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
1053+
}
1054+
}
1055+
if err := validateTxPoolInternals(pool); err != nil {
1056+
t.Fatalf("pool internal state corrupted: %v", err)
1057+
}
9271058
}
9281059

9291060
// Tests that even if the transaction count belonging to a single account goes

0 commit comments

Comments
 (0)