From df4aea65ef15073f8d55f4e5db658b59e8724836 Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Fri, 5 Dec 2025 15:37:23 -0800 Subject: [PATCH 1/3] Use bstm in demote unexec --- mempool/mempool.go | 45 +++++++++++++++++++++++++ mempool/txpool/legacypool/legacypool.go | 33 +++++++++++++----- 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index c14265535..38a9a7eea 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "github.com/ethereum/go-ethereum/core/types" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/holiman/uint256" @@ -21,7 +22,9 @@ import ( "cosmossdk.io/log" "cosmossdk.io/math" + storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/blockstm" "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -87,6 +90,9 @@ type EVMMempoolConfig struct { // If false, comet-bft also operates its own clist-mempool. If true, then the mempool expects exclusive // handling of transactions via ABCI.InsertTx & ABCI.ReapTxs. OperateExclusively bool + // Stores is optional. Only required for using BlockSTM to execute checktx in parallel + // It should consist of all registered store keys in the configured chain. + Stores map[storetypes.StoreKey]int } // NewExperimentalEVMMempool creates a new unified mempool for EVM and Cosmos transactions. @@ -210,6 +216,7 @@ func NewExperimentalEVMMempool( // like a small, we should refactor this into something thats easier to // reason about for callers and the legacypool itself. legacyPool.RecheckTxFnFactory = recheckTxFactory(txConfig, config.AnteHandler) + legacyPool.RecheckTxRunnerFactory = recheckTxRunnerFactory(txConfig, config.AnteHandler, config.Stores, legacyPool.RecheckTxFnFactory) // Once we have validated that the tx is valid (and can be promoted, set it // to be reaped) @@ -648,6 +655,44 @@ func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig return nil } +func recheckTxRunnerFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler, stores map[storetypes.StoreKey]int, recheckTxFnFactory legacypool.RecheckTxFnFactory) legacypool.RecheckTxRunnerFactory { + return func(chain legacypool.BlockChain) legacypool.RecheckTxRunner { + bc, ok := chain.(*Blockchain) + if !ok { + panic("unexpected type for BlockChain, expected *mempool.Blockchain") + } + ctx, err := bc.GetLatestContext() + if err != nil { + // TODO: we probably dont want to panic here, but for POC im saying + // this is ok, the only real other option here is to nuke the + // entire mempool, or force another recheck but we cant be sure + // that will also not fail here + panic(fmt.Errorf("getting latest context from blockchain: %w", err)) + } + + return func(t types.Transactions) []error { + blockSize := len(t) + if blockSize == 0 { + return nil + } + results := make([]error, blockSize) + // TODO fixme executors + blockstm.ExecuteBlockWithEstimates(ctx, blockSize, stores, ctx.MultiStore(), 20, nil, + func(txn blockstm.TxnIndex, ms blockstm.MultiStore) { + var memTx *types.Transaction + if t != nil { + memTx = t[txn] + } + fn := recheckTxFnFactory(chain) + err = fn(memTx) + results[txn] = fn(memTx) + }, + ) + return results + } + } +} + func recheckTxFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler) legacypool.RecheckTxFnFactory { return func(chain legacypool.BlockChain) legacypool.RecheckTxFn { bc, ok := chain.(*Blockchain) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 81e259bad..c007f47f1 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -211,8 +211,10 @@ func (config *Config) sanitize() Config { } type ( - RecheckTxFn func(t *types.Transaction) error - RecheckTxFnFactory func(chain BlockChain) RecheckTxFn + RecheckTxFn func(t *types.Transaction) error + RecheckTxFnFactory func(chain BlockChain) RecheckTxFn + RecheckTxRunner func(t types.Transactions) []error + RecheckTxRunnerFactory func(chain BlockChain) RecheckTxRunner ) // LegacyPool contains all currently known transactions. Transactions @@ -270,7 +272,8 @@ type LegacyPool struct { BroadcastTxFn func(txs []*types.Transaction) error - RecheckTxFnFactory RecheckTxFnFactory + RecheckTxFnFactory RecheckTxFnFactory + RecheckTxRunnerFactory RecheckTxRunnerFactory // OnTxPromoted is called when a tx is promoted from queued to pending (may // be called multiple times per tx) @@ -1692,12 +1695,26 @@ func (pool *LegacyPool) demoteUnexecutables() { // dependency on DeliverTx and allow it to use any fn in parallel. var recheckInvalids []*types.Transaction var recheckDrops []*types.Transaction - if pool.RecheckTxFnFactory != nil { + // Try to execute in parallel + if pool.RecheckTxRunnerFactory != nil || pool.RecheckTxFnFactory != nil { recheckStart := time.Now() - recheckFn := pool.RecheckTxFnFactory(pool.chain) - recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { - return recheckFn(tx) != nil - }) + if pool.RecheckTxRunnerFactory != nil { + recheckRunner := pool.RecheckTxRunnerFactory(pool.chain) + inputTxs := list.Flatten() + recheckResults := recheckRunner(inputTxs) + for i, tx := range inputTxs { + if recheckResults[i] != nil { + recheckDrops = append(recheckDrops, tx) + } else { + recheckInvalids = append(recheckInvalids, tx) + } + } + } else if pool.RecheckTxFnFactory != nil { + recheckFn := pool.RecheckTxFnFactory(pool.chain) + recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { + return recheckFn(tx) != nil + }) + } for _, tx := range recheckDrops { hash := tx.Hash() pool.all.Remove(hash) From ee1814a2d1dfb824e1fc1c595b41666f61f44dce Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Fri, 5 Dec 2025 17:01:38 -0800 Subject: [PATCH 2/3] Correct logic --- mempool/mempool.go | 9 ++++++--- mempool/mempool_test.go | 2 ++ mempool/txpool/legacypool/legacypool.go | 12 ++++++------ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 38a9a7eea..ff96ff3db 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -216,7 +216,7 @@ func NewExperimentalEVMMempool( // like a small, we should refactor this into something thats easier to // reason about for callers and the legacypool itself. legacyPool.RecheckTxFnFactory = recheckTxFactory(txConfig, config.AnteHandler) - legacyPool.RecheckTxRunnerFactory = recheckTxRunnerFactory(txConfig, config.AnteHandler, config.Stores, legacyPool.RecheckTxFnFactory) + legacyPool.RecheckTxRunnerFactory = recheckTxRunnerFactory(config.Stores, legacyPool.RecheckTxFnFactory) // Once we have validated that the tx is valid (and can be promoted, set it // to be reaped) @@ -655,7 +655,7 @@ func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig return nil } -func recheckTxRunnerFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler, stores map[storetypes.StoreKey]int, recheckTxFnFactory legacypool.RecheckTxFnFactory) legacypool.RecheckTxRunnerFactory { +func recheckTxRunnerFactory(stores map[storetypes.StoreKey]int, recheckTxFnFactory legacypool.RecheckTxFnFactory) legacypool.RecheckTxRunnerFactory { return func(chain legacypool.BlockChain) legacypool.RecheckTxRunner { bc, ok := chain.(*Blockchain) if !ok { @@ -677,7 +677,7 @@ func recheckTxRunnerFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandle } results := make([]error, blockSize) // TODO fixme executors - blockstm.ExecuteBlockWithEstimates(ctx, blockSize, stores, ctx.MultiStore(), 20, nil, + err = blockstm.ExecuteBlockWithEstimates(ctx, blockSize, stores, ctx.MultiStore(), 20, nil, func(txn blockstm.TxnIndex, ms blockstm.MultiStore) { var memTx *types.Transaction if t != nil { @@ -688,6 +688,9 @@ func recheckTxRunnerFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandle results[txn] = fn(memTx) }, ) + if err != nil { + panic(fmt.Errorf("mempool panic running antehandlers via bstm %w", err)) + } return results } } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 3409618a7..4a4ad740f 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -86,6 +86,7 @@ func TestMempool_ReapPromoteDemotePromote(t *testing.T) { return nil } } + legacyPool.RecheckTxRunnerFactory = nil // sync the pool to make sure the above happens require.NoError(t, mp.GetTxPool().Sync()) @@ -171,6 +172,7 @@ func TestMempool_ReapPromoteDemoteReap(t *testing.T) { return nil } } + legacyPool.RecheckTxRunnerFactory = nil // sync the pool to make sure the above happens require.NoError(t, mp.GetTxPool().Sync()) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index c007f47f1..c22f43c6c 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -272,7 +272,7 @@ type LegacyPool struct { BroadcastTxFn func(txs []*types.Transaction) error - RecheckTxFnFactory RecheckTxFnFactory + RecheckTxFnFactory RecheckTxFnFactory RecheckTxRunnerFactory RecheckTxRunnerFactory // OnTxPromoted is called when a tx is promoted from queued to pending (may @@ -1702,13 +1702,13 @@ func (pool *LegacyPool) demoteUnexecutables() { recheckRunner := pool.RecheckTxRunnerFactory(pool.chain) inputTxs := list.Flatten() recheckResults := recheckRunner(inputTxs) + resultsMap := make(map[common.Hash]error) for i, tx := range inputTxs { - if recheckResults[i] != nil { - recheckDrops = append(recheckDrops, tx) - } else { - recheckInvalids = append(recheckInvalids, tx) - } + resultsMap[tx.Hash()] = recheckResults[i] } + recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { + return resultsMap[tx.Hash()] != nil + }) } else if pool.RecheckTxFnFactory != nil { recheckFn := pool.RecheckTxFnFactory(pool.chain) recheckDrops, recheckInvalids = list.Filter(func(tx *types.Transaction) bool { From bc81cd397b638ebb5ae138fd3bec484553f0d1ed Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Fri, 5 Dec 2025 17:06:10 -0800 Subject: [PATCH 3/3] Fix double import --- mempool/mempool.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index ff96ff3db..432cb89ad 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -6,7 +6,6 @@ import ( "fmt" "sync" - "github.com/ethereum/go-ethereum/core/types" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/holiman/uint256" @@ -670,7 +669,7 @@ func recheckTxRunnerFactory(stores map[storetypes.StoreKey]int, recheckTxFnFacto panic(fmt.Errorf("getting latest context from blockchain: %w", err)) } - return func(t types.Transactions) []error { + return func(t ethtypes.Transactions) []error { blockSize := len(t) if blockSize == 0 { return nil @@ -679,7 +678,7 @@ func recheckTxRunnerFactory(stores map[storetypes.StoreKey]int, recheckTxFnFacto // TODO fixme executors err = blockstm.ExecuteBlockWithEstimates(ctx, blockSize, stores, ctx.MultiStore(), 20, nil, func(txn blockstm.TxnIndex, ms blockstm.MultiStore) { - var memTx *types.Transaction + var memTx *ethtypes.Transaction if t != nil { memTx = t[txn] }