Skip to content

Commit 6793ffa

Browse files
authored
Merge pull request #21300 from rjl493456442/txpool-fix-queued-evictions
core: fix queued transaction eviction
2 parents 1059221 + 5413df1 commit 6793ffa

File tree

2 files changed

+100
-8
lines changed

2 files changed

+100
-8
lines changed

core/tx_pool.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ var (
9898
queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
9999
queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
100100
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
101+
queuedEvictionMeter = metrics.NewRegisteredMeter("txpool/queued/eviction", nil) // Dropped due to lifetime
101102

102103
// General tx metrics
103104
knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
@@ -362,9 +363,11 @@ func (pool *TxPool) loop() {
362363
}
363364
// Any non-locals old enough should be removed
364365
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
365-
for _, tx := range pool.queue[addr].Flatten() {
366+
list := pool.queue[addr].Flatten()
367+
for _, tx := range list {
366368
pool.removeTx(tx.Hash(), true)
367369
}
370+
queuedEvictionMeter.Mark(int64(len(list)))
368371
}
369372
}
370373
pool.mu.Unlock()
@@ -614,6 +617,9 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
614617
pool.journalTx(from, tx)
615618
pool.queueTxEvent(tx)
616619
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
620+
621+
// Successful promotion, bump the heartbeat
622+
pool.beats[from] = time.Now()
617623
return old != nil, nil
618624
}
619625
// New transaction isn't replacing a pending one, push into queue
@@ -665,6 +671,10 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
665671
pool.all.Add(tx)
666672
pool.priced.Put(tx)
667673
}
674+
// If we never record the heartbeat, do it right now.
675+
if _, exist := pool.beats[from]; !exist {
676+
pool.beats[from] = time.Now()
677+
}
668678
return old != nil, nil
669679
}
670680

@@ -696,15 +706,13 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
696706
// An older transaction was better, discard this
697707
pool.all.Remove(hash)
698708
pool.priced.Removed(1)
699-
700709
pendingDiscardMeter.Mark(1)
701710
return false
702711
}
703712
// Otherwise discard any previous transaction and mark this
704713
if old != nil {
705714
pool.all.Remove(old.Hash())
706715
pool.priced.Removed(1)
707-
708716
pendingReplaceMeter.Mark(1)
709717
} else {
710718
// Nothing was replaced, bump the pending counter
@@ -716,9 +724,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
716724
pool.priced.Put(tx)
717725
}
718726
// Set the potentially new pending nonce and notify any subsystems of the new tx
719-
pool.beats[addr] = time.Now()
720727
pool.pendingNonces.set(addr, tx.Nonce()+1)
721728

729+
// Successful promotion, bump the heartbeat
730+
pool.beats[addr] = time.Now()
722731
return true
723732
}
724733

@@ -891,7 +900,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
891900
// If no more pending transactions are left, remove the list
892901
if pending.Empty() {
893902
delete(pool.pending, addr)
894-
delete(pool.beats, addr)
895903
}
896904
// Postpone any invalidated transactions
897905
for _, tx := range invalids {
@@ -912,6 +920,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
912920
}
913921
if future.Empty() {
914922
delete(pool.queue, addr)
923+
delete(pool.beats, addr)
915924
}
916925
}
917926
}
@@ -1229,6 +1238,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
12291238
// Delete the entire queue entry if it became empty.
12301239
if list.Empty() {
12311240
delete(pool.queue, addr)
1241+
delete(pool.beats, addr)
12321242
}
12331243
}
12341244
return promoted
@@ -1410,10 +1420,9 @@ func (pool *TxPool) demoteUnexecutables() {
14101420
}
14111421
pendingGauge.Dec(int64(len(gapped)))
14121422
}
1413-
// Delete the entire queue entry if it became empty.
1423+
// Delete the entire pending entry if it became empty.
14141424
if list.Empty() {
14151425
delete(pool.pending, addr)
1416-
delete(pool.beats, addr)
14171426
}
14181427
}
14191428
}

core/tx_pool_test.go

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func validateTxPoolInternals(pool *TxPool) error {
109109
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
110110
return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
111111
}
112+
112113
// Ensure the next nonce to assign is the correct one
113114
for addr, txs := range pool.pending {
114115
// Find the last transaction
@@ -868,7 +869,7 @@ func TestTransactionQueueTimeLimitingNoLocals(t *testing.T) {
868869
func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
869870
// Reduce the eviction interval to a testable amount
870871
defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
871-
evictionInterval = time.Second
872+
evictionInterval = time.Millisecond * 100
872873

873874
// Create the pool to test the non-expiration enforcement
874875
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
@@ -905,6 +906,22 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
905906
if err := validateTxPoolInternals(pool); err != nil {
906907
t.Fatalf("pool internal state corrupted: %v", err)
907908
}
909+
910+
// Allow the eviction interval to run
911+
time.Sleep(2 * evictionInterval)
912+
913+
// Transactions should not be evicted from the queue yet since lifetime duration has not passed
914+
pending, queued = pool.Stats()
915+
if pending != 0 {
916+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
917+
}
918+
if queued != 2 {
919+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
920+
}
921+
if err := validateTxPoolInternals(pool); err != nil {
922+
t.Fatalf("pool internal state corrupted: %v", err)
923+
}
924+
908925
// Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains
909926
time.Sleep(2 * config.Lifetime)
910927

@@ -924,6 +941,72 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
924941
if err := validateTxPoolInternals(pool); err != nil {
925942
t.Fatalf("pool internal state corrupted: %v", err)
926943
}
944+
945+
// remove current transactions and increase nonce to prepare for a reset and cleanup
946+
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
947+
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
948+
<-pool.requestReset(nil, nil)
949+
950+
// make sure queue, pending are cleared
951+
pending, queued = pool.Stats()
952+
if pending != 0 {
953+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
954+
}
955+
if queued != 0 {
956+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
957+
}
958+
if err := validateTxPoolInternals(pool); err != nil {
959+
t.Fatalf("pool internal state corrupted: %v", err)
960+
}
961+
962+
// Queue gapped transactions
963+
if err := pool.AddLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil {
964+
t.Fatalf("failed to add remote transaction: %v", err)
965+
}
966+
if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil {
967+
t.Fatalf("failed to add remote transaction: %v", err)
968+
}
969+
time.Sleep(5 * evictionInterval) // A half lifetime pass
970+
971+
// Queue executable transactions, the life cycle should be restarted.
972+
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
973+
t.Fatalf("failed to add remote transaction: %v", err)
974+
}
975+
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
976+
t.Fatalf("failed to add remote transaction: %v", err)
977+
}
978+
time.Sleep(6 * evictionInterval)
979+
980+
// All gapped transactions shouldn't be kicked out
981+
pending, queued = pool.Stats()
982+
if pending != 2 {
983+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
984+
}
985+
if queued != 2 {
986+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
987+
}
988+
if err := validateTxPoolInternals(pool); err != nil {
989+
t.Fatalf("pool internal state corrupted: %v", err)
990+
}
991+
992+
// The whole life time pass after last promotion, kick out stale transactions
993+
time.Sleep(2 * config.Lifetime)
994+
pending, queued = pool.Stats()
995+
if pending != 2 {
996+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
997+
}
998+
if nolocals {
999+
if queued != 0 {
1000+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
1001+
}
1002+
} else {
1003+
if queued != 1 {
1004+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
1005+
}
1006+
}
1007+
if err := validateTxPoolInternals(pool); err != nil {
1008+
t.Fatalf("pool internal state corrupted: %v", err)
1009+
}
9271010
}
9281011

9291012
// Tests that even if the transaction count belonging to a single account goes

0 commit comments

Comments
 (0)