Skip to content

Commit 4e36b1e

Browse files
bas-vkfjl
authored andcommitted
core: bugfix state change race condition in txpool (#3412)
The transaction pool keeps track of the current nonce in its local pendingState. When a new block comes in the pendingState is reset. During the reset it fetches multiple times the current state through the use of the currentState callback. When a second block comes in during the reset its possible that the state changes during the reset. If that block holds transactions that are currently in the pool the local pendingState that is used to determine nonces can get out of sync.
1 parent 0fe35b9 commit 4e36b1e

File tree

12 files changed

+171
-56
lines changed

12 files changed

+171
-56
lines changed

core/tx_pool.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (pool *TxPool) resetState() {
176176
// any transactions that have been included in the block or
177177
// have been invalidated because of another transaction (e.g.
178178
// higher gas price)
179-
pool.demoteUnexecutables()
179+
pool.demoteUnexecutables(currentState)
180180

181181
// Update all accounts to the latest known pending nonce
182182
for addr, list := range pool.pending {
@@ -185,7 +185,7 @@ func (pool *TxPool) resetState() {
185185
}
186186
// Check the queue and move transactions over to the pending if possible
187187
// or remove those that have become invalid
188-
pool.promoteExecutables()
188+
pool.promoteExecutables(currentState)
189189
}
190190

191191
func (pool *TxPool) Stop() {
@@ -196,8 +196,12 @@ func (pool *TxPool) Stop() {
196196
}
197197

198198
func (pool *TxPool) State() *state.ManagedState {
199-
pool.mu.RLock()
200-
defer pool.mu.RUnlock()
199+
pool.mu.Lock()
200+
defer pool.mu.Unlock()
201+
202+
if pool.pendingState == nil {
203+
pool.resetState()
204+
}
201205

202206
return pool.pendingState
203207
}
@@ -237,21 +241,26 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
237241
// Pending retrieves all currently processable transactions, groupped by origin
238242
// account and sorted by nonce. The returned transaction set is a copy and can be
239243
// freely modified by calling code.
240-
func (pool *TxPool) Pending() map[common.Address]types.Transactions {
244+
func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
241245
pool.mu.Lock()
242246
defer pool.mu.Unlock()
243247

248+
state, err := pool.currentState()
249+
if err != nil {
250+
return nil, err
251+
}
252+
244253
// check queue first
245-
pool.promoteExecutables()
254+
pool.promoteExecutables(state)
246255

247256
// invalidate any txs
248-
pool.demoteUnexecutables()
257+
pool.demoteUnexecutables(state)
249258

250259
pending := make(map[common.Address]types.Transactions)
251260
for addr, list := range pool.pending {
252261
pending[addr] = list.Flatten()
253262
}
254-
return pending
263+
return pending, nil
255264
}
256265

257266
// SetLocal marks a transaction as local, skipping gas price
@@ -410,13 +419,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
410419
if err := pool.add(tx); err != nil {
411420
return err
412421
}
413-
pool.promoteExecutables()
422+
423+
state, err := pool.currentState()
424+
if err != nil {
425+
return err
426+
}
427+
428+
pool.promoteExecutables(state)
414429

415430
return nil
416431
}
417432

418433
// AddBatch attempts to queue a batch of transactions.
419-
func (pool *TxPool) AddBatch(txs []*types.Transaction) {
434+
func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
420435
pool.mu.Lock()
421436
defer pool.mu.Unlock()
422437

@@ -425,7 +440,15 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) {
425440
glog.V(logger.Debug).Infoln("tx error:", err)
426441
}
427442
}
428-
pool.promoteExecutables()
443+
444+
state, err := pool.currentState()
445+
if err != nil {
446+
return err
447+
}
448+
449+
pool.promoteExecutables(state)
450+
451+
return nil
429452
}
430453

431454
// Get returns a transaction if it is contained in the pool
@@ -499,17 +522,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
499522
// promoteExecutables moves transactions that have become processable from the
500523
// future queue to the set of pending transactions. During this process, all
501524
// invalidated transactions (low nonce, low balance) are deleted.
502-
func (pool *TxPool) promoteExecutables() {
503-
// Init delayed since tx pool could have been started before any state sync
504-
if pool.pendingState == nil {
505-
pool.resetState()
506-
}
507-
// Retrieve the current state to allow nonce and balance checking
508-
state, err := pool.currentState()
509-
if err != nil {
510-
glog.Errorf("Could not get current state: %v", err)
511-
return
512-
}
525+
func (pool *TxPool) promoteExecutables(state *state.StateDB) {
513526
// Iterate over all accounts and promote any executable transactions
514527
queued := uint64(0)
515528
for addr, list := range pool.queue {
@@ -645,13 +658,7 @@ func (pool *TxPool) promoteExecutables() {
645658
// demoteUnexecutables removes invalid and processed transactions from the pools
646659
// executable/pending queue and any subsequent transactions that become unexecutable
647660
// are moved back into the future queue.
648-
func (pool *TxPool) demoteUnexecutables() {
649-
// Retrieve the current state to allow nonce and balance checking
650-
state, err := pool.currentState()
651-
if err != nil {
652-
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
653-
return
654-
}
661+
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
655662
// Iterate over all accounts and demote any non-executable transactions
656663
for addr, list := range pool.pending {
657664
nonce := state.GetNonce(addr)

core/tx_pool_test.go

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,80 @@ func deriveSender(tx *types.Transaction) (common.Address, error) {
5151
return types.Sender(types.HomesteadSigner{}, tx)
5252
}
5353

54+
// This test simulates a scenario where a new block is imported during a
55+
// state reset and tests whether the pending state is in sync with the
56+
// block head event that initiated the resetState().
57+
func TestStateChangeDuringPoolReset(t *testing.T) {
58+
var (
59+
db, _ = ethdb.NewMemDatabase()
60+
key, _ = crypto.GenerateKey()
61+
address = crypto.PubkeyToAddress(key.PublicKey)
62+
mux = new(event.TypeMux)
63+
statedb, _ = state.New(common.Hash{}, db)
64+
trigger = false
65+
)
66+
67+
// setup pool with 2 transaction in it
68+
statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
69+
70+
tx0 := transaction(0, big.NewInt(100000), key)
71+
tx1 := transaction(1, big.NewInt(100000), key)
72+
73+
// stateFunc is used multiple times to reset the pending state.
74+
// when simulate is true it will create a state that indicates
75+
// that tx0 and tx1 are included in the chain.
76+
stateFunc := func() (*state.StateDB, error) {
77+
// delay "state change" by one. The tx pool fetches the
78+
// state multiple times and by delaying it a bit we simulate
79+
// a state change between those fetches.
80+
stdb := statedb
81+
if trigger {
82+
statedb, _ = state.New(common.Hash{}, db)
83+
// simulate that the new head block included tx0 and tx1
84+
statedb.SetNonce(address, 2)
85+
statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
86+
trigger = false
87+
}
88+
return stdb, nil
89+
}
90+
91+
gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) }
92+
93+
txpool := NewTxPool(testChainConfig(), mux, stateFunc, gasLimitFunc)
94+
txpool.resetState()
95+
96+
nonce := txpool.State().GetNonce(address)
97+
if nonce != 0 {
98+
t.Fatalf("Invalid nonce, want 0, got %d", nonce)
99+
}
100+
101+
txpool.AddBatch(types.Transactions{tx0, tx1})
102+
103+
nonce = txpool.State().GetNonce(address)
104+
if nonce != 2 {
105+
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
106+
}
107+
108+
// trigger state change in the background
109+
trigger = true
110+
111+
txpool.resetState()
112+
113+
pendingTx, err := txpool.Pending()
114+
if err != nil {
115+
t.Fatalf("Could not fetch pending transactions: %v", err)
116+
}
117+
118+
for addr, txs := range pendingTx {
119+
t.Logf("%0x: %d\n", addr, len(txs))
120+
}
121+
122+
nonce = txpool.State().GetNonce(address)
123+
if nonce != 2 {
124+
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
125+
}
126+
}
127+
54128
func TestInvalidTransactions(t *testing.T) {
55129
pool, key := setupTxPool()
56130

@@ -97,9 +171,10 @@ func TestTransactionQueue(t *testing.T) {
97171
from, _ := deriveSender(tx)
98172
currentState, _ := pool.currentState()
99173
currentState.AddBalance(from, big.NewInt(1000))
174+
pool.resetState()
100175
pool.enqueueTx(tx.Hash(), tx)
101176

102-
pool.promoteExecutables()
177+
pool.promoteExecutables(currentState)
103178
if len(pool.pending) != 1 {
104179
t.Error("expected valid txs to be 1 is", len(pool.pending))
105180
}
@@ -108,7 +183,7 @@ func TestTransactionQueue(t *testing.T) {
108183
from, _ = deriveSender(tx)
109184
currentState.SetNonce(from, 2)
110185
pool.enqueueTx(tx.Hash(), tx)
111-
pool.promoteExecutables()
186+
pool.promoteExecutables(currentState)
112187
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
113188
t.Error("expected transaction to be in tx pool")
114189
}
@@ -124,11 +199,13 @@ func TestTransactionQueue(t *testing.T) {
124199
from, _ = deriveSender(tx1)
125200
currentState, _ = pool.currentState()
126201
currentState.AddBalance(from, big.NewInt(1000))
202+
pool.resetState()
203+
127204
pool.enqueueTx(tx1.Hash(), tx1)
128205
pool.enqueueTx(tx2.Hash(), tx2)
129206
pool.enqueueTx(tx3.Hash(), tx3)
130207

131-
pool.promoteExecutables()
208+
pool.promoteExecutables(currentState)
132209

133210
if len(pool.pending) != 1 {
134211
t.Error("expected tx pool to be 1, got", len(pool.pending))
@@ -225,7 +302,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
225302
if err := pool.add(tx2); err != nil {
226303
t.Error("didn't expect error", err)
227304
}
228-
pool.promoteExecutables()
305+
state, _ := pool.currentState()
306+
pool.promoteExecutables(state)
229307
if pool.pending[addr].Len() != 1 {
230308
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
231309
}
@@ -236,7 +314,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
236314
if err := pool.add(tx3); err != nil {
237315
t.Error("didn't expect error", err)
238316
}
239-
pool.promoteExecutables()
317+
pool.promoteExecutables(state)
240318
if pool.pending[addr].Len() != 1 {
241319
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
242320
}
@@ -295,6 +373,7 @@ func TestRemovedTxEvent(t *testing.T) {
295373
from, _ := deriveSender(tx)
296374
currentState, _ := pool.currentState()
297375
currentState.AddBalance(from, big.NewInt(1000000000000))
376+
pool.resetState()
298377
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
299378
pool.eventMux.Post(ChainHeadEvent{nil})
300379
if pool.pending[from].Len() != 1 {
@@ -452,6 +531,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
452531

453532
state, _ := pool.currentState()
454533
state.AddBalance(account, big.NewInt(1000000))
534+
pool.resetState()
455535

456536
// Keep queuing up transactions and make sure all above a limit are dropped
457537
for i := uint64(1); i <= maxQueuedPerAccount+5; i++ {
@@ -564,6 +644,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
564644

565645
state, _ := pool.currentState()
566646
state.AddBalance(account, big.NewInt(1000000))
647+
pool.resetState()
567648

568649
// Keep queuing up transactions and make sure all above a limit are dropped
569650
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
@@ -733,7 +814,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
733814
// Benchmark the speed of pool validation
734815
b.ResetTimer()
735816
for i := 0; i < b.N; i++ {
736-
pool.demoteUnexecutables()
817+
pool.demoteUnexecutables(state)
737818
}
738819
}
739820

@@ -757,7 +838,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
757838
// Benchmark the speed of pool validation
758839
b.ResetTimer()
759840
for i := 0; i < b.N; i++ {
760-
pool.promoteExecutables()
841+
pool.promoteExecutables(state)
761842
}
762843
}
763844

eth/api_backend.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,20 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
131131
b.eth.txPool.Remove(txHash)
132132
}
133133

134-
func (b *EthApiBackend) GetPoolTransactions() types.Transactions {
134+
func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
135135
b.eth.txMu.Lock()
136136
defer b.eth.txMu.Unlock()
137137

138+
pending, err := b.eth.txPool.Pending()
139+
if err != nil {
140+
return nil, err
141+
}
142+
138143
var txs types.Transactions
139-
for _, batch := range b.eth.txPool.Pending() {
144+
for _, batch := range pending {
140145
txs = append(txs, batch...)
141146
}
142-
return txs
147+
return txs, nil
143148
}
144149

145150
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {

eth/helper_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,20 @@ type testTxPool struct {
9393

9494
// AddBatch appends a batch of transactions to the pool, and notifies any
9595
// listeners if the addition channel is non nil
96-
func (p *testTxPool) AddBatch(txs []*types.Transaction) {
96+
func (p *testTxPool) AddBatch(txs []*types.Transaction) error {
9797
p.lock.Lock()
9898
defer p.lock.Unlock()
9999

100100
p.pool = append(p.pool, txs...)
101101
if p.added != nil {
102102
p.added <- txs
103103
}
104+
105+
return nil
104106
}
105107

106108
// Pending returns all the transactions known to the pool
107-
func (p *testTxPool) Pending() map[common.Address]types.Transactions {
109+
func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
108110
p.lock.RLock()
109111
defer p.lock.RUnlock()
110112

@@ -116,7 +118,7 @@ func (p *testTxPool) Pending() map[common.Address]types.Transactions {
116118
for _, batch := range batches {
117119
sort.Sort(types.TxByNonce(batch))
118120
}
119-
return batches
121+
return batches, nil
120122
}
121123

122124
// newTestTransaction create a new dummy transaction.

eth/protocol.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@ var errorToString = map[int]string{
9898

9999
type txPool interface {
100100
// AddBatch should add the given transactions to the pool.
101-
AddBatch([]*types.Transaction)
101+
AddBatch([]*types.Transaction) error
102102

103103
// Pending should return pending transactions.
104104
// The slice should be modifiable by the caller.
105-
Pending() map[common.Address]types.Transactions
105+
Pending() (map[common.Address]types.Transactions, error)
106106
}
107107

108108
// statusData is the network packet for the status message.

eth/sync.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ type txsync struct {
4646
// syncTransactions starts sending all currently pending transactions to the given peer.
4747
func (pm *ProtocolManager) syncTransactions(p *peer) {
4848
var txs types.Transactions
49-
for _, batch := range pm.txpool.Pending() {
49+
pending, _ := pm.txpool.Pending()
50+
for _, batch := range pending {
5051
txs = append(txs, batch...)
5152
}
5253
if len(txs) == 0 {

0 commit comments

Comments
 (0)