diff --git a/mempool/mempool.go b/mempool/mempool.go index c14265535..432cb89ad 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -21,7 +21,9 @@ import ( "cosmossdk.io/log" "cosmossdk.io/math" + storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/blockstm" "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -87,6 +89,9 @@ type EVMMempoolConfig struct { // If false, comet-bft also operates its own clist-mempool. If true, then the mempool expects exclusive // handling of transactions via ABCI.InsertTx & ABCI.ReapTxs. OperateExclusively bool + // Stores is optional. Only required for using BlockSTM to execute checktx in parallel + // It should consist of all registered store keys in the configured chain. + Stores map[storetypes.StoreKey]int } // NewExperimentalEVMMempool creates a new unified mempool for EVM and Cosmos transactions. @@ -210,6 +215,7 @@ func NewExperimentalEVMMempool( // like a small, we should refactor this into something thats easier to // reason about for callers and the legacypool itself. legacyPool.RecheckTxFnFactory = recheckTxFactory(txConfig, config.AnteHandler) + legacyPool.RecheckTxRunnerFactory = recheckTxRunnerFactory(config.Stores, legacyPool.RecheckTxFnFactory) // Once we have validated that the tx is valid (and can be promoted, set it // to be reaped) @@ -648,6 +654,47 @@ func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig return nil } +func recheckTxRunnerFactory(stores map[storetypes.StoreKey]int, recheckTxFnFactory legacypool.RecheckTxFnFactory) legacypool.RecheckTxRunnerFactory { + return func(chain legacypool.BlockChain) legacypool.RecheckTxRunner { + bc, ok := chain.(*Blockchain) + if !ok { + panic("unexpected type for BlockChain, expected *mempool.Blockchain") + } + ctx, err := bc.GetLatestContext() + if err != nil { + // TODO: we probably dont want to panic here, but for POC im saying + // this is ok, the only real other option here is to nuke the + // entire mempool, or force another recheck but we cant be sure + // that will also not fail here + panic(fmt.Errorf("getting latest context from blockchain: %w", err)) + } + + return func(t ethtypes.Transactions) []error { + blockSize := len(t) + if blockSize == 0 { + return nil + } + results := make([]error, blockSize) + // TODO fixme executors + err = blockstm.ExecuteBlockWithEstimates(ctx, blockSize, stores, ctx.MultiStore(), 20, nil, + func(txn blockstm.TxnIndex, ms blockstm.MultiStore) { + var memTx *ethtypes.Transaction + if t != nil { + memTx = t[txn] + } + fn := recheckTxFnFactory(chain) + err = fn(memTx) + results[txn] = fn(memTx) + }, + ) + if err != nil { + panic(fmt.Errorf("mempool panic running antehandlers via bstm %w", err)) + } + return results + } + } +} + func recheckTxFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler) legacypool.RecheckTxFnFactory { return func(chain legacypool.BlockChain) legacypool.RecheckTxFn { bc, ok := chain.(*Blockchain) diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 3409618a7..4a4ad740f 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -86,6 +86,7 @@ func TestMempool_ReapPromoteDemotePromote(t *testing.T) { return nil } } + legacyPool.RecheckTxRunnerFactory = nil // sync the pool to make sure the above happens require.NoError(t, mp.GetTxPool().Sync()) @@ -171,6 +172,7 @@ func TestMempool_ReapPromoteDemoteReap(t *testing.T) { return nil } } + legacyPool.RecheckTxRunnerFactory = nil // sync the pool to make sure the above happens require.NoError(t, mp.GetTxPool().Sync()) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 81e259bad..c22f43c6c 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -211,8 +211,10 @@ func (config *Config) sanitize() Config { } type ( - RecheckTxFn func(t *types.Transaction) error - RecheckTxFnFactory func(chain BlockChain) RecheckTxFn + RecheckTxFn func(t *types.Transaction) error + RecheckTxFnFactory func(chain BlockChain) RecheckTxFn + RecheckTxRunner func(t types.Transactions) []error + RecheckTxRunnerFactory func(chain BlockChain) RecheckTxRunner ) // LegacyPool contains all currently known transactions. Transactions @@ -270,7 +272,8 @@ type LegacyPool struct { BroadcastTxFn func(txs []*types.Transaction) error - RecheckTxFnFactory RecheckTxFnFactory + RecheckTxFnFactory RecheckTxFnFactory + RecheckTxRunnerFactory RecheckTxRunnerFactory // OnTxPromoted is called when a tx is promoted from queued to pending (may // be called multiple times per tx) @@ -1692,12 +1695,26 @@ func (pool *LegacyPool) demoteUnexecutables() { // dependency on DeliverTx and allow it to use any fn in parallel. var recheckInvalids []*types.Transaction var recheckDrops []*types.Transaction - if pool.RecheckTxFnFactory != nil { + // Try to execute in parallel + if pool.RecheckTxRunnerFactory != nil || pool.RecheckTxFnFactory != nil { recheckStart := time.Now() - recheckFn := pool.RecheckTxFnFactory(pool.chain) - recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { - return recheckFn(tx) != nil - }) + if pool.RecheckTxRunnerFactory != nil { + recheckRunner := pool.RecheckTxRunnerFactory(pool.chain) + inputTxs := list.Flatten() + recheckResults := recheckRunner(inputTxs) + resultsMap := make(map[common.Hash]error) + for i, tx := range inputTxs { + resultsMap[tx.Hash()] = recheckResults[i] + } + recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { + return resultsMap[tx.Hash()] != nil + }) + } else if pool.RecheckTxFnFactory != nil { + recheckFn := pool.RecheckTxFnFactory(pool.chain) + recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { + return recheckFn(tx) != nil + }) + } for _, tx := range recheckDrops { hash := tx.Hash() pool.all.Remove(hash)