Skip to content

Commit d4dce43

Browse files
authored
Merge pull request #20081 from karalabe/txpool-lockless-dedup
core: dedup known transactions without global lock, track metrics
2 parents 8d41e88 + 056183c commit d4dce43

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

core/tx_pool.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ var (
9797
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
9898

9999
// General tx metrics
100-
validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
100+
knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
101+
validTxMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
101102
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
102103
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
103104

@@ -564,16 +565,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
564565
hash := tx.Hash()
565566
if pool.all.Get(hash) != nil {
566567
log.Trace("Discarding already known transaction", "hash", hash)
568+
knownTxMeter.Mark(1)
567569
return false, fmt.Errorf("known transaction: %x", hash)
568570
}
569-
570571
// If the transaction fails basic validation, discard it
571572
if err := pool.validateTx(tx, local); err != nil {
572573
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
573574
invalidTxMeter.Mark(1)
574575
return false, err
575576
}
576-
577577
// If the transaction pool is full, discard underpriced transactions
578578
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
579579
// If the new transaction is underpriced, don't accept it
@@ -590,7 +590,6 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
590590
pool.removeTx(tx.Hash(), false)
591591
}
592592
}
593-
594593
// Try to replace an existing transaction in the pending pool
595594
from, _ := types.Sender(pool.signer, tx) // already validated
596595
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
@@ -613,13 +612,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
613612
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
614613
return old != nil, nil
615614
}
616-
617615
// New transaction isn't replacing a pending one, push into queue
618616
replaced, err = pool.enqueueTx(hash, tx)
619617
if err != nil {
620618
return false, err
621619
}
622-
623620
// Mark local addresses and journal local transactions
624621
if local {
625622
if !pool.locals.contains(from) {
@@ -768,11 +765,18 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
768765

769766
// addTxs attempts to queue a batch of transactions if they are valid.
770767
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
768+
// Filter out known ones without obtaining the pool lock or recovering signatures
769+
for i := 0; i < len(txs); i++ {
770+
if pool.all.Get(txs[i].Hash()) != nil {
771+
knownTxMeter.Mark(1)
772+
txs = append(txs[:i], txs[i+1:]...)
773+
i--
774+
}
775+
}
771776
// Cache senders in transactions before obtaining lock (pool.signer is immutable)
772777
for _, tx := range txs {
773778
types.Sender(pool.signer, tx)
774779
}
775-
776780
pool.mu.Lock()
777781
errs, dirtyAddrs := pool.addTxsLocked(txs, local)
778782
pool.mu.Unlock()
@@ -796,7 +800,7 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error,
796800
dirty.addTx(tx)
797801
}
798802
}
799-
validMeter.Mark(int64(len(dirty.accounts)))
803+
validTxMeter.Mark(int64(len(dirty.accounts)))
800804
return errs, dirty
801805
}
802806

0 commit comments

Comments
 (0)