Skip to content

Commit a25e5f6

Browse files
core/txpool/legacypool: move queue out of main txpool
1 parent 8a20e87 commit a25e5f6

File tree

1 file changed

+226
-0
lines changed

1 file changed

+226
-0
lines changed

core/txpool/legacypool/queue.go

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package legacypool
2+
3+
import (
4+
"sort"
5+
"time"
6+
7+
"github.com/ethereum/go-ethereum/common"
8+
"github.com/ethereum/go-ethereum/core/state"
9+
"github.com/ethereum/go-ethereum/core/txpool"
10+
"github.com/ethereum/go-ethereum/core/types"
11+
"github.com/ethereum/go-ethereum/log"
12+
)
13+
14+
type queue struct {
15+
config Config
16+
signer types.Signer
17+
queued map[common.Address]*list // Queued but non-processable transactions
18+
beats map[common.Address]time.Time // Last heartbeat from each known account
19+
}
20+
21+
func newQueue(config Config, signer types.Signer) *queue {
22+
return &queue{
23+
signer: signer,
24+
config: config,
25+
queued: make(map[common.Address]*list),
26+
beats: make(map[common.Address]time.Time),
27+
}
28+
}
29+
30+
func (q *queue) evict(force bool) []common.Hash {
31+
removed := make([]common.Hash, 0)
32+
for addr, list := range q.queued {
33+
// Any transactions old enough should be removed
34+
if force || time.Since(q.beats[addr]) > q.config.Lifetime {
35+
list := list.Flatten()
36+
for _, tx := range list {
37+
q.removeTx(addr, tx)
38+
removed = append(removed, tx.Hash())
39+
}
40+
queuedEvictionMeter.Mark(int64(len(list)))
41+
}
42+
}
43+
return removed
44+
}
45+
46+
func (q *queue) stats() int {
47+
queued := 0
48+
for _, list := range q.queued {
49+
queued += list.Len()
50+
}
51+
return queued
52+
}
53+
54+
func (q *queue) content() map[common.Address][]*types.Transaction {
55+
queued := make(map[common.Address][]*types.Transaction, len(q.queued))
56+
for addr, list := range q.queued {
57+
queued[addr] = list.Flatten()
58+
}
59+
return queued
60+
}
61+
62+
func (q *queue) contentFrom(addr common.Address) []*types.Transaction {
63+
var queued []*types.Transaction
64+
if list, ok := q.get(addr); ok {
65+
queued = list.Flatten()
66+
}
67+
return queued
68+
}
69+
70+
func (q *queue) get(addr common.Address) (*list, bool) {
71+
l, ok := q.queued[addr]
72+
return l, ok
73+
}
74+
75+
func (q *queue) bump(addr common.Address) {
76+
q.beats[addr] = time.Now()
77+
}
78+
79+
func (q *queue) addresses() []common.Address {
80+
addrs := make([]common.Address, 0, len(q.queued))
81+
for addr := range q.queued {
82+
addrs = append(addrs, addr)
83+
}
84+
return addrs
85+
}
86+
87+
func (q queue) removeTx(addr common.Address, tx *types.Transaction) {
88+
if future := q.queued[addr]; future != nil {
89+
if txOld := future.txs.Get(tx.Nonce()); txOld != nil && txOld.Hash() != tx.Hash() {
90+
// Edge case, a different transaction
91+
// with the same nonce is in the queued, just ignore
92+
return
93+
}
94+
if removed, _ := future.Remove(tx); removed {
95+
// Reduce the queued counter
96+
queuedGauge.Dec(1)
97+
}
98+
if future.Empty() {
99+
delete(q.queued, addr)
100+
delete(q.beats, addr)
101+
}
102+
}
103+
}
104+
105+
func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, error) {
106+
// Try to insert the transaction into the future queue
107+
from, _ := types.Sender(q.signer, tx) // already validated
108+
if q.queued[from] == nil {
109+
q.queued[from] = newList(false)
110+
}
111+
inserted, old := q.queued[from].Add(tx, q.config.PriceBump)
112+
if !inserted {
113+
// An older transaction was better, discard this
114+
queuedDiscardMeter.Mark(1)
115+
return nil, txpool.ErrReplaceUnderpriced
116+
}
117+
// If we never record the heartbeat, do it right now.
118+
if _, exist := q.beats[from]; !exist {
119+
q.beats[from] = time.Now()
120+
}
121+
if old == nil {
122+
// Nothing was replaced, bump the queued counter
123+
queuedGauge.Inc(1)
124+
return nil, nil
125+
}
126+
h := old.Hash()
127+
// Transaction was replaced, bump the replacement counter
128+
queuedReplaceMeter.Mark(1)
129+
return &h, nil
130+
}
131+
132+
func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash) {
133+
// Track the promoteable transactions to broadcast them at once
134+
var promoteable []*types.Transaction
135+
var removeable []common.Hash
136+
137+
// Iterate over all accounts and promote any executable transactions
138+
for _, addr := range accounts {
139+
list := q.queued[addr]
140+
if list == nil {
141+
continue // Just in case someone calls with a non existing account
142+
}
143+
// Drop all transactions that are deemed too old (low nonce)
144+
forwards := list.Forward(currentState.GetNonce(addr))
145+
for _, tx := range forwards {
146+
removeable = append(removeable, tx.Hash())
147+
}
148+
log.Trace("Removing old queued transactions", "count", len(forwards))
149+
// Drop all transactions that are too costly (low balance or out of gas)
150+
drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit)
151+
for _, tx := range drops {
152+
removeable = append(removeable, tx.Hash())
153+
}
154+
log.Trace("Removing unpayable queued transactions", "count", len(drops))
155+
queuedNofundsMeter.Mark(int64(len(drops)))
156+
157+
// Gather all executable transactions and promote them
158+
readies := list.Ready(nonces.get(addr))
159+
promoteable = append(promoteable, readies...)
160+
log.Trace("Promoting queued transactions", "count", len(promoteable))
161+
queuedGauge.Dec(int64(len(readies)))
162+
163+
// Drop all transactions over the allowed limit
164+
var caps = list.Cap(int(q.config.AccountQueue))
165+
for _, tx := range caps {
166+
hash := tx.Hash()
167+
removeable = append(removeable, hash)
168+
log.Trace("Removing cap-exceeding queued transaction", "hash", hash)
169+
}
170+
queuedRateLimitMeter.Mark(int64(len(caps)))
171+
queuedGauge.Dec(int64(len(removeable)))
172+
173+
// Delete the entire queue entry if it became empty.
174+
if list.Empty() {
175+
delete(q.queued, addr)
176+
delete(q.beats, addr)
177+
}
178+
}
179+
return promoteable, removeable
180+
}
181+
182+
func (q *queue) truncate() []common.Hash {
183+
queued := uint64(0)
184+
for _, list := range q.queued {
185+
queued += uint64(list.Len())
186+
}
187+
if queued <= q.config.GlobalQueue {
188+
return nil
189+
}
190+
191+
// Sort all accounts with queued transactions by heartbeat
192+
addresses := make(addressesByHeartbeat, 0, len(q.queued))
193+
for addr := range q.queued {
194+
addresses = append(addresses, addressByHeartbeat{addr, q.beats[addr]})
195+
}
196+
sort.Sort(sort.Reverse(addresses))
197+
removed := make([]common.Hash, 0)
198+
199+
// Drop transactions until the total is below the limit
200+
for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
201+
addr := addresses[len(addresses)-1]
202+
list := q.queued[addr.address]
203+
204+
addresses = addresses[:len(addresses)-1]
205+
206+
// Drop all transactions if they are less than the overflow
207+
if size := uint64(list.Len()); size <= drop {
208+
for _, tx := range list.Flatten() {
209+
q.removeTx(addr.address, tx)
210+
removed = append(removed, tx.Hash())
211+
}
212+
drop -= size
213+
queuedRateLimitMeter.Mark(int64(size))
214+
continue
215+
}
216+
// Otherwise drop only last few transactions
217+
txs := list.Flatten()
218+
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
219+
q.removeTx(addr.address, txs[i])
220+
removed = append(removed, txs[i].Hash())
221+
drop--
222+
queuedRateLimitMeter.Mark(1)
223+
}
224+
}
225+
return removed
226+
}

0 commit comments

Comments
 (0)