Skip to content

Commit ddcf02b

Browse files
karalabefjl
authored andcommitted
[release/1.4.17] core/types, miner: switch over to the grouped tx sets
(cherry picked from commit affffb3)
1 parent ff697e8 commit ddcf02b

File tree

3 files changed

+98
-119
lines changed

3 files changed

+98
-119
lines changed

core/types/transaction.go

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -368,41 +368,58 @@ func (s *TxByPrice) Pop() interface{} {
368368
return x
369369
}
370370

371-
// SortByPriceAndNonce sorts the transactions by price in such a way that the
372-
// nonce orderings within a single account are maintained.
373-
//
374-
// Note, this is not as trivial as it seems from the first look as there are three
375-
// different criteria that need to be taken into account (price, nonce, account
376-
// match), which cannot be done with any plain sorting method, as certain items
377-
// cannot be compared without context.
371+
// TransactionsByPriceAndNonce represents a set of transactions that can return
372+
// transactions in a profit-maximising sorted order, while supporting removing
373+
// entire batches of transactions for non-executable accounts.
374+
type TransactionsByPriceAndNonce struct {
375+
txs map[common.Address]Transactions // Per account nonce-sorted list of transactions
376+
heads TxByPrice // Next transaction for each unique account (price heap)
377+
}
378+
379+
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
380+
// price sorted transactions in a nonce-honouring way.
378381
//
379-
// This method first sorts the separates the list of transactions into individual
380-
// sender accounts and sorts them by nonce. After the account nonce ordering is
381-
// satisfied, the results are merged back together by price, always comparing only
382-
// the head transaction from each account. This is done via a heap to keep it fast.
383-
func SortByPriceAndNonce(txs map[common.Address]Transactions) Transactions {
382+
// Note, the input map is reowned so the caller should not interact any more with
383+
// if after providng it to the constructor.
384+
func NewTransactionsByPriceAndNonce(txs map[common.Address]Transactions) *TransactionsByPriceAndNonce {
384385
// Initialize a price based heap with the head transactions
385-
byPrice := make(TxByPrice, 0, len(txs))
386+
heads := make(TxByPrice, 0, len(txs))
386387
for acc, accTxs := range txs {
387-
byPrice = append(byPrice, accTxs[0])
388+
heads = append(heads, accTxs[0])
388389
txs[acc] = accTxs[1:]
389390
}
390-
heap.Init(&byPrice)
391-
392-
// Merge by replacing the best with the next from the same account
393-
var sorted Transactions
394-
for len(byPrice) > 0 {
395-
// Retrieve the next best transaction by price
396-
best := heap.Pop(&byPrice).(*Transaction)
397-
398-
// Push in its place the next transaction from the same account
399-
acc, _ := best.From() // we only sort valid txs so this cannot fail
400-
if accTxs, ok := txs[acc]; ok && len(accTxs) > 0 {
401-
heap.Push(&byPrice, accTxs[0])
402-
txs[acc] = accTxs[1:]
403-
}
404-
// Accumulate the best priced transaction
405-
sorted = append(sorted, best)
391+
heap.Init(&heads)
392+
393+
// Assemble and return the transaction set
394+
return &TransactionsByPriceAndNonce{
395+
txs: txs,
396+
heads: heads,
397+
}
398+
}
399+
400+
// Peek returns the next transaction by price.
401+
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
402+
if len(t.heads) == 0 {
403+
return nil
404+
}
405+
return t.heads[0]
406+
}
407+
408+
// Shift replaces the current best head with the next one from the same account.
409+
func (t *TransactionsByPriceAndNonce) Shift() {
410+
acc, _ := t.heads[0].From() // we only sort valid txs so this cannot fail
411+
412+
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
413+
t.heads[0], t.txs[acc] = txs[0], txs[1:]
414+
heap.Fix(&t.heads, 0)
415+
} else {
416+
heap.Pop(&t.heads)
406417
}
407-
return sorted
418+
}
419+
420+
// Pop removes the best transaction, *not* replacing it with the next one from
421+
// the same account. This should be used when a transaction cannot be executed
422+
// and hence all subsequent ones should be discarded from the same account.
423+
func (t *TransactionsByPriceAndNonce) Pop() {
424+
heap.Pop(&t.heads)
408425
}

core/types/transaction_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,16 @@ func TestTransactionPriceNonceSort(t *testing.T) {
137137
}
138138
}
139139
// Sort the transactions and cross check the nonce ordering
140-
txs := SortByPriceAndNonce(groups)
140+
txset := NewTransactionsByPriceAndNonce(groups)
141+
142+
txs := Transactions{}
143+
for {
144+
if tx := txset.Peek(); tx != nil {
145+
txs = append(txs, tx)
146+
txset.Shift()
147+
}
148+
break
149+
}
141150
for i, txi := range txs {
142151
fromi, _ := txi.From()
143152

miner/worker.go

Lines changed: 40 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,16 @@ type uint64RingBuffer struct {
6363
// environment is the workers current environment and holds
6464
// all of the current state information
6565
type Work struct {
66-
config *core.ChainConfig
67-
state *state.StateDB // apply state changes here
68-
ancestors *set.Set // ancestor set (used for checking uncle parent validity)
69-
family *set.Set // family set (used for checking uncle invalidity)
70-
uncles *set.Set // uncle set
71-
tcount int // tx count in cycle
72-
ignoredTransactors *set.Set
73-
lowGasTransactors *set.Set
74-
ownedAccounts *set.Set
75-
lowGasTxs types.Transactions
76-
failedTxs types.Transactions
77-
localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion)
66+
config *core.ChainConfig
67+
state *state.StateDB // apply state changes here
68+
ancestors *set.Set // ancestor set (used for checking uncle parent validity)
69+
family *set.Set // family set (used for checking uncle invalidity)
70+
uncles *set.Set // uncle set
71+
tcount int // tx count in cycle
72+
ownedAccounts *set.Set
73+
lowGasTxs types.Transactions
74+
failedTxs types.Transactions
75+
localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion)
7876

7977
Block *types.Block // the new block
8078

@@ -236,7 +234,12 @@ func (self *worker) update() {
236234
// Apply transaction to the pending state if we're not mining
237235
if atomic.LoadInt32(&self.mining) == 0 {
238236
self.currentMu.Lock()
239-
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
237+
238+
acc, _ := ev.Tx.From()
239+
txs := map[common.Address]types.Transactions{acc: types.Transactions{ev.Tx}}
240+
txset := types.NewTransactionsByPriceAndNonce(txs)
241+
242+
self.current.commitTransactions(self.mux, txset, self.gasPrice, self.chain)
240243
self.currentMu.Unlock()
241244
}
242245
}
@@ -384,8 +387,6 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error
384387

385388
// Keep track of transactions which return errors so they can be removed
386389
work.tcount = 0
387-
work.ignoredTransactors = set.New()
388-
work.lowGasTransactors = set.New()
389390
work.ownedAccounts = accountAddressesSet(accounts)
390391
if self.current != nil {
391392
work.localMinedBlocks = self.current.localMinedBlocks
@@ -494,43 +495,8 @@ func (self *worker) commitNewWork() {
494495
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
495496
core.ApplyDAOHardFork(work.state)
496497
}
497-
498-
/* //approach 1
499-
transactions := self.eth.TxPool().GetTransactions()
500-
sort.Sort(types.TxByNonce(transactions))
501-
*/
502-
503-
//approach 2
504-
transactions := types.SortByPriceAndNonce(self.eth.TxPool().Pending())
505-
506-
/* // approach 3
507-
// commit transactions for this run.
508-
txPerOwner := make(map[common.Address]types.Transactions)
509-
// Sort transactions by owner
510-
for _, tx := range self.eth.TxPool().GetTransactions() {
511-
from, _ := tx.From() // we can ignore the sender error
512-
txPerOwner[from] = append(txPerOwner[from], tx)
513-
}
514-
var (
515-
singleTxOwner types.Transactions
516-
multiTxOwner types.Transactions
517-
)
518-
// Categorise transactions by
519-
// 1. 1 owner tx per block
520-
// 2. multi txs owner per block
521-
for _, txs := range txPerOwner {
522-
if len(txs) == 1 {
523-
singleTxOwner = append(singleTxOwner, txs[0])
524-
} else {
525-
multiTxOwner = append(multiTxOwner, txs...)
526-
}
527-
}
528-
sort.Sort(types.TxByPrice(singleTxOwner))
529-
sort.Sort(types.TxByNonce(multiTxOwner))
530-
transactions := append(singleTxOwner, multiTxOwner...)
531-
*/
532-
533-
work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain)
498+
txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending())
499+
work.commitTransactions(self.mux, txs, self.gasPrice, self.chain)
534500

535501
self.eth.TxPool().RemoveBatch(work.lowGasTxs)
536502
self.eth.TxPool().RemoveBatch(work.failedTxs)
@@ -591,64 +557,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
591557
return nil
592558
}
593559

594-
func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
560+
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, gasPrice *big.Int, bc *core.BlockChain) {
595561
gp := new(core.GasPool).AddGas(env.header.GasLimit)
596562

597563
var coalescedLogs vm.Logs
598-
for _, tx := range transactions {
564+
for {
565+
// Retrieve the next transaction and abort if all done
566+
tx := txs.Peek()
567+
if tx == nil {
568+
break
569+
}
599570
// Error may be ignored here. The error has already been checked
600571
// during transaction acceptance is the transaction pool.
601572
from, _ := tx.From()
602573

603-
// Check if it falls within margin. Txs from owned accounts are always processed.
574+
// Ignore any transactions (and accounts subsequently) with low gas limits
604575
if tx.GasPrice().Cmp(gasPrice) < 0 && !env.ownedAccounts.Has(from) {
605-
// ignore the transaction and transactor. We ignore the transactor
606-
// because nonce will fail after ignoring this transaction so there's
607-
// no point
608-
env.lowGasTransactors.Add(from)
609-
610-
glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4])
611-
}
576+
// Pop the current low-priced transaction without shifting in the next from the account
577+
glog.V(logger.Info).Infof("Transaction (%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4])
612578

613-
// Continue with the next transaction if the transaction sender is included in
614-
// the low gas tx set. This will also remove the tx and all sequential transaction
615-
// from this transactor
616-
if env.lowGasTransactors.Has(from) {
617-
// add tx to the low gas set. This will be removed at the end of the run
618-
// owned accounts are ignored
619-
if !env.ownedAccounts.Has(from) {
620-
env.lowGasTxs = append(env.lowGasTxs, tx)
621-
}
622-
continue
623-
}
579+
env.lowGasTxs = append(env.lowGasTxs, tx)
580+
txs.Pop()
624581

625-
// Move on to the next transaction when the transactor is in ignored transactions set
626-
// This may occur when a transaction hits the gas limit. When a gas limit is hit and
627-
// the transaction is processed (that could potentially be included in the block) it
628-
// will throw a nonce error because the previous transaction hasn't been processed.
629-
// Therefor we need to ignore any transaction after the ignored one.
630-
if env.ignoredTransactors.Has(from) {
631582
continue
632583
}
633-
584+
// Start executing the transaction
634585
env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
635586

636587
err, logs := env.commitTransaction(tx, bc, gp)
637588
switch {
638589
case core.IsGasLimitErr(err):
639-
// ignore the transactor so no nonce errors will be thrown for this account
640-
// next time the worker is run, they'll be picked up again.
641-
env.ignoredTransactors.Add(from)
590+
// Pop the current out-of-gas transaction without shifting in the next from the account
642591
glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
592+
txs.Pop()
643593

644594
case err != nil:
595+
// Pop the current failed transaction without shifting in the next from the account
596+
glog.V(logger.Detail).Infof("Transaction (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
645597
env.failedTxs = append(env.failedTxs, tx)
646-
if glog.V(logger.Detail) {
647-
glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
648-
}
598+
txs.Pop()
599+
649600
default:
650-
env.tcount++
601+
// Everything ok, collect the logs and shift in the next transaction from the same account
651602
coalescedLogs = append(coalescedLogs, logs...)
603+
env.tcount++
604+
txs.Shift()
652605
}
653606
}
654607
if len(coalescedLogs) > 0 || env.tcount > 0 {

0 commit comments

Comments
 (0)