Skip to content

Commit db48d31

Browse files
committed
core: txpool stable underprice drop order, perf fixes
1 parent 7e911b8 commit db48d31

File tree

3 files changed

+101
-24
lines changed

3 files changed

+101
-24
lines changed

core/tx_list.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,20 @@ func (l *txList) Flatten() types.Transactions {
367367
// price-sorted transactions to discard when the pool fills up.
368368
type priceHeap []*types.Transaction
369369

370-
func (h priceHeap) Len() int { return len(h) }
371-
func (h priceHeap) Less(i, j int) bool { return h[i].GasPrice().Cmp(h[j].GasPrice()) < 0 }
372-
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
370+
func (h priceHeap) Len() int { return len(h) }
371+
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
372+
373+
func (h priceHeap) Less(i, j int) bool {
374+
// Sort primarily by price, returning the cheaper one
375+
switch h[i].GasPrice().Cmp(h[j].GasPrice()) {
376+
case -1:
377+
return true
378+
case 1:
379+
return false
380+
}
381+
// If the prices match, stabilize via nonces (high nonce is worse)
382+
return h[i].Nonce() > h[j].Nonce()
383+
}
373384

374385
func (h *priceHeap) Push(x interface{}) {
375386
*h = append(*h, x.(*types.Transaction))

core/tx_pool.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (pool *TxPool) loop() {
320320
// Any non-locals old enough should be removed
321321
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
322322
for _, tx := range pool.queue[addr].Flatten() {
323-
pool.removeTx(tx.Hash())
323+
pool.removeTx(tx.Hash(), true)
324324
}
325325
}
326326
}
@@ -468,7 +468,7 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
468468

469469
pool.gasPrice = price
470470
for _, tx := range pool.priced.Cap(price, pool.locals) {
471-
pool.removeTx(tx.Hash())
471+
pool.removeTx(tx.Hash(), false)
472472
}
473473
log.Info("Transaction pool price threshold updated", "price", price)
474474
}
@@ -630,7 +630,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
630630
for _, tx := range drop {
631631
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
632632
underpricedTxCounter.Inc(1)
633-
pool.removeTx(tx.Hash())
633+
pool.removeTx(tx.Hash(), false)
634634
}
635635
}
636636
// If the transaction is replacing an already pending one, do directly
@@ -695,8 +695,10 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
695695
pool.priced.Removed()
696696
queuedReplaceCounter.Inc(1)
697697
}
698-
pool.all[hash] = tx
699-
pool.priced.Put(tx)
698+
if pool.all[hash] == nil {
699+
pool.all[hash] = tx
700+
pool.priced.Put(tx)
701+
}
700702
return old != nil, nil
701703
}
702704

@@ -862,7 +864,7 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
862864

863865
// removeTx removes a single transaction from the queue, moving all subsequent
864866
// transactions back to the future queue.
865-
func (pool *TxPool) removeTx(hash common.Hash) {
867+
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
866868
// Fetch the transaction we wish to delete
867869
tx, ok := pool.all[hash]
868870
if !ok {
@@ -872,8 +874,9 @@ func (pool *TxPool) removeTx(hash common.Hash) {
872874

873875
// Remove it from the list of known transactions
874876
delete(pool.all, hash)
875-
pool.priced.Removed()
876-
877+
if outofbound {
878+
pool.priced.Removed()
879+
}
877880
// Remove the transaction from the pending lists and reset the account nonce
878881
if pending := pool.pending[addr]; pending != nil {
879882
if removed, invalids := pending.Remove(tx); removed {
@@ -1052,7 +1055,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
10521055
// Drop all transactions if they are less than the overflow
10531056
if size := uint64(list.Len()); size <= drop {
10541057
for _, tx := range list.Flatten() {
1055-
pool.removeTx(tx.Hash())
1058+
pool.removeTx(tx.Hash(), true)
10561059
}
10571060
drop -= size
10581061
queuedRateLimitCounter.Inc(int64(size))
@@ -1061,7 +1064,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
10611064
// Otherwise drop only last few transactions
10621065
txs := list.Flatten()
10631066
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
1064-
pool.removeTx(txs[i].Hash())
1067+
pool.removeTx(txs[i].Hash(), true)
10651068
drop--
10661069
queuedRateLimitCounter.Inc(1)
10671070
}

core/tx_pool_test.go

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,10 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
209209

210210
pool.lockedReset(nil, nil)
211211

212-
pendingTx, err := pool.Pending()
212+
_, err := pool.Pending()
213213
if err != nil {
214214
t.Fatalf("Could not fetch pending transactions: %v", err)
215215
}
216-
217-
for addr, txs := range pendingTx {
218-
t.Logf("%0x: %d\n", addr, len(txs))
219-
}
220-
221216
nonce = pool.State().GetNonce(address)
222217
if nonce != 2 {
223218
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
@@ -350,7 +345,7 @@ func TestTransactionChainFork(t *testing.T) {
350345
if _, err := pool.add(tx, false); err != nil {
351346
t.Error("didn't expect error", err)
352347
}
353-
pool.removeTx(tx.Hash())
348+
pool.removeTx(tx.Hash(), true)
354349

355350
// reset the pool's internal state
356351
resetState()
@@ -1388,13 +1383,13 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
13881383
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
13891384
}
13901385
// Ensure that adding high priced transactions drops cheap ones, but not own
1391-
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
1386+
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
13921387
t.Fatalf("failed to add well priced transaction: %v", err)
13931388
}
1394-
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil {
1389+
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2
13951390
t.Fatalf("failed to add well priced transaction: %v", err)
13961391
}
1397-
if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil {
1392+
if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
13981393
t.Fatalf("failed to add well priced transaction: %v", err)
13991394
}
14001395
pending, queued = pool.Stats()
@@ -1404,7 +1399,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
14041399
if queued != 2 {
14051400
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
14061401
}
1407-
if err := validateEvents(events, 2); err != nil {
1402+
if err := validateEvents(events, 1); err != nil {
14081403
t.Fatalf("additional event firing failed: %v", err)
14091404
}
14101405
if err := validateTxPoolInternals(pool); err != nil {
@@ -1430,6 +1425,74 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
14301425
}
14311426
}
14321427

1428+
// Tests that more expensive transactions push out cheap ones from the pool, but
1429+
// without producing instability by creating gaps that start jumping transactions
1430+
// back and forth between queued/pending.
1431+
func TestTransactionPoolStableUnderpricing(t *testing.T) {
1432+
t.Parallel()
1433+
1434+
// Create the pool to test the pricing enforcement with
1435+
db, _ := ethdb.NewMemDatabase()
1436+
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
1437+
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
1438+
1439+
config := testTxPoolConfig
1440+
config.GlobalSlots = 128
1441+
config.GlobalQueue = 0
1442+
1443+
pool := NewTxPool(config, params.TestChainConfig, blockchain)
1444+
defer pool.Stop()
1445+
1446+
// Keep track of transaction events to ensure all executables get announced
1447+
events := make(chan TxPreEvent, 32)
1448+
sub := pool.txFeed.Subscribe(events)
1449+
defer sub.Unsubscribe()
1450+
1451+
// Create a number of test accounts and fund them
1452+
keys := make([]*ecdsa.PrivateKey, 2)
1453+
for i := 0; i < len(keys); i++ {
1454+
keys[i], _ = crypto.GenerateKey()
1455+
pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
1456+
}
1457+
// Fill up the entire queue with the same transaction price points
1458+
txs := types.Transactions{}
1459+
for i := uint64(0); i < config.GlobalSlots; i++ {
1460+
txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0]))
1461+
}
1462+
pool.AddRemotes(txs)
1463+
1464+
pending, queued := pool.Stats()
1465+
if pending != int(config.GlobalSlots) {
1466+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots)
1467+
}
1468+
if queued != 0 {
1469+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
1470+
}
1471+
if err := validateEvents(events, int(config.GlobalSlots)); err != nil {
1472+
t.Fatalf("original event firing failed: %v", err)
1473+
}
1474+
if err := validateTxPoolInternals(pool); err != nil {
1475+
t.Fatalf("pool internal state corrupted: %v", err)
1476+
}
1477+
// Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap
1478+
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
1479+
t.Fatalf("failed to add well priced transaction: %v", err)
1480+
}
1481+
pending, queued = pool.Stats()
1482+
if pending != int(config.GlobalSlots) {
1483+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots)
1484+
}
1485+
if queued != 0 {
1486+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
1487+
}
1488+
if err := validateEvents(events, 1); err != nil {
1489+
t.Fatalf("additional event firing failed: %v", err)
1490+
}
1491+
if err := validateTxPoolInternals(pool); err != nil {
1492+
t.Fatalf("pool internal state corrupted: %v", err)
1493+
}
1494+
}
1495+
14331496
// Tests that the pool rejects replacement transactions that don't meet the minimum
14341497
// price bump required.
14351498
func TestTransactionReplacement(t *testing.T) {

0 commit comments

Comments
 (0)