diff --git a/mempool/mempool.go b/mempool/mempool.go index e82e64001..23a883d04 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -80,7 +80,6 @@ type EVMMempoolConfig struct { LegacyPoolConfig *legacypool.Config CosmosPoolConfig *sdkmempool.PriorityNonceMempoolConfig[math.Int] AnteHandler sdk.AnteHandler - BroadCastTxFn func(txs []*ethtypes.Transaction) error // Block gas limit from consensus parameters BlockGasLimit uint64 MinTip *uint256.Int @@ -134,19 +133,6 @@ func NewExperimentalEVMMempool( legacyPool := legacypool.New(legacyConfig, blockchain) - // Set up broadcast function using clientCtx - if config.BroadCastTxFn != nil { - legacyPool.BroadcastTxFn = config.BroadCastTxFn - } else { - // Create default broadcast function using clientCtx. - // The EVM mempool will broadcast transactions when it promotes them - // from queued into pending, noting their readiness to be executed. - legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error { - logger.Debug("broadcasting EVM transactions", "tx_count", len(txs)) - return broadcastEVMTransactions(clientCtx, txConfig, txs) - } - } - txPool, err := txpool.New(uint64(0), blockchain, []txpool.SubPool{legacyPool}) if err != nil { panic(err) @@ -617,35 +603,6 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, i [][]byte) return orderedEVMPendingTxes, cosmosPendingTxes } -// broadcastEVMTransactions converts Ethereum transactions to Cosmos SDK format and broadcasts them. -// This function wraps EVM transactions in MsgEthereumTx messages and submits them to the network -// using the provided client context. It handles encoding and error reporting for each transaction. -func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig, ethTxs []*ethtypes.Transaction) error { - for _, ethTx := range ethTxs { - msg := &evmtypes.MsgEthereumTx{} - msg.FromEthereumTx(ethTx) - - txBuilder := txConfig.NewTxBuilder() - if err := txBuilder.SetMsgs(msg); err != nil { - return fmt.Errorf("failed to set msg in tx builder: %w", err) - } - - txBytes, err := txConfig.TxEncoder()(txBuilder.GetTx()) - if err != nil { - return fmt.Errorf("failed to encode transaction: %w", err) - } - - res, err := clientCtx.BroadcastTxSync(txBytes) - if err != nil { - return fmt.Errorf("failed to broadcast transaction %s: %w", ethTx.Hash().Hex(), err) - } - if res.Code != 0 { - return fmt.Errorf("transaction %s rejected by mempool: code=%d, log=%s", ethTx.Hash().Hex(), res.Code, res.RawLog) - } - } - return nil -} - func recheckTxFactory(txConfig client.TxConfig, anteHandler sdk.AnteHandler) legacypool.RecheckTxFnFactory { return func(chain legacypool.BlockChain) legacypool.RecheckTxFn { bc, ok := chain.(*Blockchain) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 81e259bad..4ad4bf3fa 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -268,8 +268,6 @@ type LegacyPool struct { changesSinceReorg int // A counter for how many drops we've performed in-between reorg. - BroadcastTxFn func(txs []*types.Transaction) error - RecheckTxFnFactory RecheckTxFnFactory // OnTxPromoted is called when a tx is promoted from queued to pending (may @@ -1394,17 +1392,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // on site where the tx is inserted into the pending queue, not just // when handling events. - // On successful transaction, broadcast the transaction through the Comet Mempool - // Two inefficiencies: - // 1. The transactions might have already been broadcasted, demoted, and repromoted - // a. tx_nonces_for_account: [1,2,3,4,5,6], [1,2,3] pass, [4] fails, [5,6] get demoted, [4] gets reinserted, [4,5,6] get re-promoted and thus rebroadcasted - // 2. The transaction will pass through Comet, into the appside mempool, and attempted to be reinserted - // It will not, because there is a check, but the attempt is there. - if pool.BroadcastTxFn != nil { - if err := pool.BroadcastTxFn(txs); err != nil { - log.Error("Failed to broadcast transactions", "err", err, "count", len(txs)) - } - } pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) } } diff --git a/mempool/txpool/txpool_test.go b/mempool/txpool/txpool_test.go index e80805ba6..7f9fc1d7c 100644 --- a/mempool/txpool/txpool_test.go +++ b/mempool/txpool/txpool_test.go @@ -3,6 +3,7 @@ package txpool_test import ( "math" "math/big" + "sync" "testing" "time" @@ -98,14 +99,19 @@ func TestTxPoolCosmosReorg(t *testing.T) { // wait for newHeadCh to be initialized <-waitForSubscription - // override broadcast fn to wait until we advance the chain a few blocks - broadcastGuard := make(chan struct{}) - legacyPool.BroadcastTxFn = func(txs []*types.Transaction) error { - <-broadcastGuard - return nil + // override recheck fn to wait until we advance the chain a few blocks + recheckGuard := make(chan struct{}) + once := sync.Once{} // only want to simulate the slow reorg once + legacyPool.RecheckTxFnFactory = func(_ legacypool.BlockChain) legacypool.RecheckTxFn { + return func(_ *types.Transaction) error { + once.Do(func() { + <-recheckGuard + }) + return nil + } } - // add tx1 to the pool so that the blocking broadcast fn will be called, + // add tx1 to the pool so that the blocking recheck fn will be called, // simulating a slow runReorg tx1, _ := types.SignTx(types.NewTransaction(1, common.Address{}, big.NewInt(100), 100_000, big.NewInt(int64(gasTip)+1), nil), signer, key) errs := pool.Add([]*types.Transaction{tx1}, false) @@ -113,7 +119,7 @@ func TestTxPoolCosmosReorg(t *testing.T) { require.NoError(t, err) } - // broadcast fn is now blocking, waiting for broadcastGuard + // recheck fn is now blocking, waiting for recheckGuard // during this time, we will simulate advancing the chain multiple times by // sending headers on the newHeadCh @@ -121,20 +127,14 @@ func TestTxPoolCosmosReorg(t *testing.T) { newHeadCh <- core.ChainHeadEvent{Header: height2Header} newHeadCh <- core.ChainHeadEvent{Header: height3Header} - // now that we have advanced the headers, unblock the broadcast fn - broadcastGuard <- struct{}{} + // now that we have advanced the headers, unblock the recheck fn + recheckGuard <- struct{}{} // a runReorg call will now be scheduled with oldHead=genesis and // newHead=height3 time.Sleep(500 * time.Millisecond) - // push another tx to make sure that runReorg was processed with the above - // headers - legacyPool.BroadcastTxFn = func(txs []*types.Transaction) error { return nil } - tx2, _ := types.SignTx(types.NewTransaction(2, common.Address{}, big.NewInt(100), 100_000, big.NewInt(int64(gasTip)+1), nil), signer, key) - errs = pool.Add([]*types.Transaction{tx2}, false) - for _, err := range errs { - require.NoError(t, err) - } + // sync the pool to make sure that runReorg has processed the above headers + require.NoError(t, pool.Sync()) }