Skip to content

Commit 8c78a80

Browse files
committed
core: fix race conditions in txpool (ethereum#23474)
1 parent e46f41d commit 8c78a80

File tree

3 files changed

+26
-13
lines changed

3 files changed

+26
-13
lines changed

core/tx_list.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"math"
2222
"math/big"
2323
"sort"
24+
"sync"
25+
"sync/atomic"
2426

2527
"github.com/XinFinOrg/XDPoSChain/common"
2628
"github.com/XinFinOrg/XDPoSChain/core/types"
@@ -450,9 +452,10 @@ func (h *priceHeap) Pop() interface{} {
450452
// in txpool but only interested in the remote part. It means only remote transactions
451453
// will be considered for tracking, sorting, eviction, etc.
452454
type txPricedList struct {
453-
all *txLookup // Pointer to the map of all transactions
454-
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
455-
stales int // Number of stale price points to (re-heap trigger)
455+
all *txLookup // Pointer to the map of all transactions
456+
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
457+
stales int64 // Number of stale price points to (re-heap trigger)
458+
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
456459
}
457460

458461
// newTxPricedList creates a new price-sorted transaction heap.
@@ -476,8 +479,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
476479
// the heap if a large enough ratio of transactions go stale.
477480
func (l *txPricedList) Removed(count int) {
478481
// Bump the stale counter, but exit if still too low (< 25%)
479-
l.stales += count
480-
if l.stales <= len(*l.remotes)/4 {
482+
stales := atomic.AddInt64(&l.stales, int64(count))
483+
if int(stales) <= len(*l.remotes)/4 {
481484
return
482485
}
483486
// Seems we've reached a critical number of stale transactions, reheap
@@ -515,7 +518,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
515518
for len(*l.remotes) > 0 {
516519
head := []*types.Transaction(*l.remotes)[0]
517520
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
518-
l.stales--
521+
atomic.AddInt64(&l.stales, -1)
519522
heap.Pop(l.remotes)
520523
continue
521524
}
@@ -541,7 +544,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
541544
// Discard stale transactions if found during cleanup
542545
tx := heap.Pop(l.remotes).(*types.Transaction)
543546
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
544-
l.stales--
547+
atomic.AddInt64(&l.stales, -1)
545548
continue
546549
}
547550
// Non stale transaction found, discard it
@@ -560,9 +563,12 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
560563

561564
// Reheap forcibly rebuilds the heap based on the current remote transaction set.
562565
func (l *txPricedList) Reheap() {
566+
l.reheapMu.Lock()
567+
defer l.reheapMu.Unlock()
563568
reheap := make(priceHeap, 0, l.all.RemoteCount())
564569

565-
l.stales, l.remotes = 0, &reheap
570+
atomic.StoreInt64(&l.stales, 0)
571+
l.remotes = &reheap
566572
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
567573
*l.remotes = append(*l.remotes, tx)
568574
return true

core/tx_pool.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math/big"
2424
"sort"
2525
"sync"
26+
"sync/atomic"
2627
"time"
2728

2829
"github.com/XinFinOrg/XDPoSChain/common"
@@ -282,6 +283,7 @@ type TxPool struct {
282283
reorgDoneCh chan chan struct{}
283284
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
284285
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
286+
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
285287

286288
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
287289
IsSigner func(address common.Address) bool
@@ -314,6 +316,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
314316
queueTxEventCh: make(chan *types.Transaction),
315317
reorgDoneCh: make(chan chan struct{}),
316318
reorgShutdownCh: make(chan struct{}),
319+
initDoneCh: make(chan struct{}),
317320
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
318321
trc21FeeCapacity: map[common.Address]*big.Int{},
319322
}
@@ -368,6 +371,8 @@ func (pool *TxPool) loop() {
368371
defer evict.Stop()
369372
defer journal.Stop()
370373

374+
// Notify tests that the init phase is done
375+
close(pool.initDoneCh)
371376
for {
372377
select {
373378
// Handle ChainHeadEvent
@@ -386,8 +391,8 @@ func (pool *TxPool) loop() {
386391
case <-report.C:
387392
pool.mu.RLock()
388393
pending, queued := pool.stats()
389-
stales := pool.priced.stales
390394
pool.mu.RUnlock()
395+
stales := int(atomic.LoadInt64(&pool.priced.stales))
391396

392397
if pending != prevPending || queued != prevQueued || stales != prevStales {
393398
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)

core/tx_pool_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ import (
2222
"math/big"
2323
"math/rand"
2424
"os"
25+
"sync/atomic"
2526
"testing"
2627
"time"
2728

29+
"github.com/XinFinOrg/XDPoSChain/common"
2830
"github.com/XinFinOrg/XDPoSChain/consensus"
2931
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
30-
31-
"github.com/XinFinOrg/XDPoSChain/common"
3232
"github.com/XinFinOrg/XDPoSChain/core/state"
3333
"github.com/XinFinOrg/XDPoSChain/core/types"
3434
"github.com/XinFinOrg/XDPoSChain/crypto"
@@ -69,7 +69,7 @@ func (bc *testBlockChain) Config() *params.ChainConfig {
6969

7070
func (bc *testBlockChain) CurrentBlock() *types.Block {
7171
return types.NewBlock(&types.Header{
72-
GasLimit: bc.gasLimit,
72+
GasLimit: atomic.LoadUint64(&bc.gasLimit),
7373
}, nil, nil, nil)
7474
}
7575

@@ -110,6 +110,8 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
110110
key, _ := crypto.GenerateKey()
111111
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
112112

113+
// wait for the pool to initialize
114+
<-pool.initDoneCh
113115
return pool, key
114116
}
115117

@@ -572,7 +574,7 @@ func TestTransactionDropping(t *testing.T) {
572574
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
573575
}
574576
// Reduce the block gas limit, check that invalidated transactions are dropped
575-
pool.chain.(*testBlockChain).gasLimit = 100
577+
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
576578
<-pool.requestReset(nil, nil)
577579

578580
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {

0 commit comments

Comments
 (0)