Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

"cosmossdk.io/log"
"cosmossdk.io/math"
storetypes "cosmossdk.io/store/types"

"github.com/cosmos/cosmos-sdk/blockstm"
"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -87,6 +89,9 @@ type EVMMempoolConfig struct {
// If false, comet-bft also operates its own clist-mempool. If true, then the mempool expects exclusive
// handling of transactions via ABCI.InsertTx & ABCI.ReapTxs.
OperateExclusively bool
// Stores is optional. Only required for using BlockSTM to execute checktx in parallel
// It should consist of all registered store keys in the configured chain.
Stores map[storetypes.StoreKey]int
}

// NewExperimentalEVMMempool creates a new unified mempool for EVM and Cosmos transactions.
Expand Down Expand Up @@ -210,6 +215,7 @@ func NewExperimentalEVMMempool(
// like a small, we should refactor this into something thats easier to
// reason about for callers and the legacypool itself.
legacyPool.RecheckTxFnFactory = recheckTxFactory(txConfig, config.AnteHandler)
legacyPool.RecheckTxRunnerFactory = recheckTxRunnerFactory(config.Stores, legacyPool.RecheckTxFnFactory)

// Once we have validated that the tx is valid (and can be promoted, set it
// to be reaped)
Expand Down Expand Up @@ -648,6 +654,47 @@ func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig
return nil
}

func recheckTxRunnerFactory(stores map[storetypes.StoreKey]int, recheckTxFnFactory legacypool.RecheckTxFnFactory) legacypool.RecheckTxRunnerFactory {
return func(chain legacypool.BlockChain) legacypool.RecheckTxRunner {
bc, ok := chain.(*Blockchain)
if !ok {
panic("unexpected type for BlockChain, expected *mempool.Blockchain")
}
ctx, err := bc.GetLatestContext()
if err != nil {
// TODO: we probably dont want to panic here, but for POC im saying
// this is ok, the only real other option here is to nuke the
// entire mempool, or force another recheck but we cant be sure
// that will also not fail here
panic(fmt.Errorf("getting latest context from blockchain: %w", err))
}

return func(t ethtypes.Transactions) []error {
blockSize := len(t)
if blockSize == 0 {
return nil
}
results := make([]error, blockSize)
// TODO fixme executors
err = blockstm.ExecuteBlockWithEstimates(ctx, blockSize, stores, ctx.MultiStore(), 20, nil,
func(txn blockstm.TxnIndex, ms blockstm.MultiStore) {
var memTx *ethtypes.Transaction
if t != nil {
memTx = t[txn]
}
fn := recheckTxFnFactory(chain)
err = fn(memTx)
results[txn] = fn(memTx)
},
)
if err != nil {
panic(fmt.Errorf("mempool panic running antehandlers via bstm %w", err))
}
return results
}
}
}

func recheckTxFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler) legacypool.RecheckTxFnFactory {
return func(chain legacypool.BlockChain) legacypool.RecheckTxFn {
bc, ok := chain.(*Blockchain)
Expand Down
2 changes: 2 additions & 0 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestMempool_ReapPromoteDemotePromote(t *testing.T) {
return nil
}
}
legacyPool.RecheckTxRunnerFactory = nil

// sync the pool to make sure the above happens
require.NoError(t, mp.GetTxPool().Sync())
Expand Down Expand Up @@ -171,6 +172,7 @@ func TestMempool_ReapPromoteDemoteReap(t *testing.T) {
return nil
}
}
legacyPool.RecheckTxRunnerFactory = nil

// sync the pool to make sure the above happens
require.NoError(t, mp.GetTxPool().Sync())
Expand Down
33 changes: 25 additions & 8 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,10 @@ func (config *Config) sanitize() Config {
}

type (
RecheckTxFn func(t *types.Transaction) error
RecheckTxFnFactory func(chain BlockChain) RecheckTxFn
RecheckTxFn func(t *types.Transaction) error
RecheckTxFnFactory func(chain BlockChain) RecheckTxFn
RecheckTxRunner func(t types.Transactions) []error
RecheckTxRunnerFactory func(chain BlockChain) RecheckTxRunner
)

// LegacyPool contains all currently known transactions. Transactions
Expand Down Expand Up @@ -270,7 +272,8 @@ type LegacyPool struct {

BroadcastTxFn func(txs []*types.Transaction) error

RecheckTxFnFactory RecheckTxFnFactory
RecheckTxFnFactory RecheckTxFnFactory
RecheckTxRunnerFactory RecheckTxRunnerFactory

// OnTxPromoted is called when a tx is promoted from queued to pending (may
// be called multiple times per tx)
Expand Down Expand Up @@ -1692,12 +1695,26 @@ func (pool *LegacyPool) demoteUnexecutables() {
// dependency on DeliverTx and allow it to use any fn in parallel.
var recheckInvalids []*types.Transaction
var recheckDrops []*types.Transaction
if pool.RecheckTxFnFactory != nil {
// Try to execute in parallel
if pool.RecheckTxRunnerFactory != nil || pool.RecheckTxFnFactory != nil {
recheckStart := time.Now()
recheckFn := pool.RecheckTxFnFactory(pool.chain)
recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool {
return recheckFn(tx) != nil
})
if pool.RecheckTxRunnerFactory != nil {
recheckRunner := pool.RecheckTxRunnerFactory(pool.chain)
inputTxs := list.Flatten()
recheckResults := recheckRunner(inputTxs)
resultsMap := make(map[common.Hash]error)
for i, tx := range inputTxs {
resultsMap[tx.Hash()] = recheckResults[i]
}
recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool {
return resultsMap[tx.Hash()] != nil
})
} else if pool.RecheckTxFnFactory != nil {
recheckFn := pool.RecheckTxFnFactory(pool.chain)
recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool {
return recheckFn(tx) != nil
})
}
for _, tx := range recheckDrops {
hash := tx.Hash()
pool.all.Remove(hash)
Expand Down
Loading