Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ type EVMMempoolConfig struct {
LegacyPoolConfig *legacypool.Config
CosmosPoolConfig *sdkmempool.PriorityNonceMempoolConfig[math.Int]
AnteHandler sdk.AnteHandler
BroadCastTxFn func(txs []*ethtypes.Transaction) error
// Block gas limit from consensus parameters
BlockGasLimit uint64
MinTip *uint256.Int
Expand Down Expand Up @@ -134,19 +133,6 @@ func NewExperimentalEVMMempool(

legacyPool := legacypool.New(legacyConfig, blockchain)

// Set up broadcast function using clientCtx
if config.BroadCastTxFn != nil {
legacyPool.BroadcastTxFn = config.BroadCastTxFn
} else {
// Create default broadcast function using clientCtx.
// The EVM mempool will broadcast transactions when it promotes them
// from queued into pending, noting their readiness to be executed.
legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error {
logger.Debug("broadcasting EVM transactions", "tx_count", len(txs))
return broadcastEVMTransactions(clientCtx, txConfig, txs)
}
}

txPool, err := txpool.New(uint64(0), blockchain, []txpool.SubPool{legacyPool})
if err != nil {
panic(err)
Expand Down Expand Up @@ -617,35 +603,6 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, i [][]byte)
return orderedEVMPendingTxes, cosmosPendingTxes
}

// broadcastEVMTransactions converts Ethereum transactions to Cosmos SDK format and broadcasts them.
// This function wraps EVM transactions in MsgEthereumTx messages and submits them to the network
// using the provided client context. It handles encoding and error reporting for each transaction.
func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig, ethTxs []*ethtypes.Transaction) error {
for _, ethTx := range ethTxs {
msg := &evmtypes.MsgEthereumTx{}
msg.FromEthereumTx(ethTx)

txBuilder := txConfig.NewTxBuilder()
if err := txBuilder.SetMsgs(msg); err != nil {
return fmt.Errorf("failed to set msg in tx builder: %w", err)
}

txBytes, err := txConfig.TxEncoder()(txBuilder.GetTx())
if err != nil {
return fmt.Errorf("failed to encode transaction: %w", err)
}

res, err := clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return fmt.Errorf("failed to broadcast transaction %s: %w", ethTx.Hash().Hex(), err)
}
if res.Code != 0 {
return fmt.Errorf("transaction %s rejected by mempool: code=%d, log=%s", ethTx.Hash().Hex(), res.Code, res.RawLog)
}
}
return nil
}

func recheckTxFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler) legacypool.RecheckTxFnFactory {
return func(chain legacypool.BlockChain) legacypool.RecheckTxFn {
bc, ok := chain.(*Blockchain)
Expand Down
13 changes: 0 additions & 13 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ type LegacyPool struct {

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

BroadcastTxFn func(txs []*types.Transaction) error

RecheckTxFnFactory RecheckTxFnFactory

// OnTxPromoted is called when a tx is promoted from queued to pending (may
Expand Down Expand Up @@ -1394,17 +1392,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// on site where the tx is inserted into the pending queue, not just
// when handling events.

// On successful transaction, broadcast the transaction through the Comet Mempool
// Two inefficiencies:
// 1. The transactions might have already been broadcasted, demoted, and repromoted
// a. tx_nonces_for_account: [1,2,3,4,5,6], [1,2,3] pass, [4] fails, [5,6] get demoted, [4] gets reinserted, [4,5,6] get re-promoted and thus rebroadcasted
// 2. The transaction will pass through Comet, into the appside mempool, and attempted to be reinserted
// It will not, because there is a check, but the attempt is there.
if pool.BroadcastTxFn != nil {
if err := pool.BroadcastTxFn(txs); err != nil {
log.Error("Failed to broadcast transactions", "err", err, "count", len(txs))
}
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
}
}
Expand Down
34 changes: 17 additions & 17 deletions mempool/txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txpool_test
import (
"math"
"math/big"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -98,43 +99,42 @@ func TestTxPoolCosmosReorg(t *testing.T) {
// wait for newHeadCh to be initialized
<-waitForSubscription

// override broadcast fn to wait until we advance the chain a few blocks
broadcastGuard := make(chan struct{})
legacyPool.BroadcastTxFn = func(txs []*types.Transaction) error {
<-broadcastGuard
return nil
// override recheck fn to wait until we advance the chain a few blocks
recheckGuard := make(chan struct{})
once := sync.Once{} // only want to simulate the slow reorg once
legacyPool.RecheckTxFnFactory = func(_ legacypool.BlockChain) legacypool.RecheckTxFn {
return func(_ *types.Transaction) error {
once.Do(func() {
<-recheckGuard
})
return nil
}
}

// add tx1 to the pool so that the blocking broadcast fn will be called,
// add tx1 to the pool so that the blocking recheck fn will be called,
// simulating a slow runReorg
tx1, _ := types.SignTx(types.NewTransaction(1, common.Address{}, big.NewInt(100), 100_000, big.NewInt(int64(gasTip)+1), nil), signer, key)
errs := pool.Add([]*types.Transaction{tx1}, false)
for _, err := range errs {
require.NoError(t, err)
}

// broadcast fn is now blocking, waiting for broadcastGuard
// recheck fn is now blocking, waiting for recheckGuard

// during this time, we will simulate advancing the chain multiple times by
// sending headers on the newHeadCh
newHeadCh <- core.ChainHeadEvent{Header: height1Header}
newHeadCh <- core.ChainHeadEvent{Header: height2Header}
newHeadCh <- core.ChainHeadEvent{Header: height3Header}

// now that we have advanced the headers, unblock the broadcast fn
broadcastGuard <- struct{}{}
// now that we have advanced the headers, unblock the recheck fn
recheckGuard <- struct{}{}

// a runReorg call will now be scheduled with oldHead=genesis and
// newHead=height3

time.Sleep(500 * time.Millisecond)

// push another tx to make sure that runReorg was processed with the above
// headers
legacyPool.BroadcastTxFn = func(txs []*types.Transaction) error { return nil }
tx2, _ := types.SignTx(types.NewTransaction(2, common.Address{}, big.NewInt(100), 100_000, big.NewInt(int64(gasTip)+1), nil), signer, key)
errs = pool.Add([]*types.Transaction{tx2}, false)
for _, err := range errs {
require.NoError(t, err)
}
// sync the pool to make sure that runReorg has processed the above headers
require.NoError(t, pool.Sync())
}
Loading