Skip to content

Commit 642c8e9

Browse files
mmsqeAlex | Interchain Labs
andauthored
fix: notify new block for mempool in time to avoid insufficient funds error (cosmos#471)
* fix: avoid insufficient funds error when fund and send tx within same block get SpendableCoin from sdk ctx to reflect uncommitted balance change * wrong hash * fix test * cleanup doc * notify * revert * cleanup * doc * unsubscribe * lint * cleanup * align shutdown timeout to avoid stuck on closing JSON-RPC * cleanup * Apply suggestions from code review * Apply suggestions from code review * lint * lint --------- Co-authored-by: Alex | Interchain Labs <[email protected]>
1 parent 2cf6593 commit 642c8e9

File tree

12 files changed

+98
-35
lines changed

12 files changed

+98
-35
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
### BUG FIXES
88

9+
- [\#471](https://github.com/cosmos/evm/pull/471) Notify new block for mempool in time.
10+
911
### IMPROVEMENTS
1012

13+
- [\#467](https://github.com/cosmos/evm/pull/467) Replace GlobalEVMMempool by passing to JSONRPC on initiate.
1114
- [\#352](https://github.com/cosmos/evm/pull/352) Remove the creation of a Geth EVM instance, stateDB during the AnteHandler balance check.
12-
- [\#467](https://github.com/cosmos/evm/pull/467) Ensure SetGlobalEVMMempool is thread-safe and only sets global mempool instance once.
1315

1416
### FEATURES
1517

evmd/ante/evm_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ func newMonoEVMAnteHandler(options ante.HandlerOptions) sdk.AnteHandler {
1818
),
1919
ante.NewTxListenerDecorator(options.PendingTxListener),
2020
}
21-
21+
2222
return sdk.ChainAnteDecorators(decorators...)
2323
}

evmd/app.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package evmd
22

33
import (
44
"encoding/json"
5+
"errors"
56
"fmt"
67
"io"
78
"os"
@@ -1145,6 +1146,22 @@ func (app *EVMD) SetClientCtx(clientCtx client.Context) {
11451146
app.clientCtx = clientCtx
11461147
}
11471148

1149+
// Close unsubscribes from the CometBFT event bus (if set) and closes the underlying BaseApp.
1150+
func (app *EVMD) Close() error {
1151+
var err error
1152+
if m, ok := app.GetMempool().(*evmmempool.ExperimentalEVMMempool); ok {
1153+
err = m.Close()
1154+
}
1155+
err = errors.Join(err, app.BaseApp.Close())
1156+
msg := "Application gracefully shutdown"
1157+
if err == nil {
1158+
app.Logger().Info(msg)
1159+
} else {
1160+
app.Logger().Error(msg, "error", err)
1161+
}
1162+
return err
1163+
}
1164+
11481165
// AutoCliOpts returns the autocli options for the app.
11491166
func (app *EVMD) AutoCliOpts() autocli.AppOptions {
11501167
modules := make(map[string]appmodule.AppModule, 0)

mempool/README.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,6 @@ if evmtypes.GetChainConfig() != nil {
8282
)
8383
app.EVMMempool = evmMempool
8484

85-
// Set the global mempool for RPC access
86-
if err := evmmempool.SetGlobalEVMMempool(evmMempool); err != nil {
87-
panic(err)
88-
}
89-
9085
// Replace BaseApp mempool
9186
app.SetMempool(evmMempool)
9287

@@ -103,6 +98,23 @@ if evmtypes.GetChainConfig() != nil {
10398
)
10499
app.SetPrepareProposal(abciProposalHandler.PrepareProposalHandler())
105100
}
101+
102+
// Close unsubscribes from the CometBFT event bus (if set) and closes the underlying BaseApp.
103+
func (app *EVMD) Close() error {
104+
var err error
105+
if m, ok := app.GetMempool().(*evmmempool.ExperimentalEVMMempool); ok {
106+
err = m.Close()
107+
}
108+
err = errors.Join(err, app.BaseApp.Close())
109+
msg := "Application gracefully shutdown"
110+
if err == nil {
111+
app.Logger().Info(msg)
112+
} else {
113+
app.Logger().Error(msg, "error", err)
114+
}
115+
return err
116+
}
117+
106118
```
107119

108120
### Configuration Options
@@ -211,7 +223,7 @@ ERROR unable to publish transaction nonce=40 expected=12: invalid sequence
211223
ERROR unable to publish transaction nonce=41 expected=12: invalid sequence
212224
```
213225

214-
**Real-World Testing**: The [`tests/systemtests/Counter/script/SimpleSends.s.sol`](../../tests/systemtests/Counter/script/SimpleSends.s.sol) script demonstrates typical Ethereum tooling behavior - it sends 10 sequential transactions in a batch, which naturally arrive out of order and create nonce gaps. With the default Cosmos mempool, this script would fail with sequence errors. With the EVM mempool, all transactions are queued locally and promoted as gaps are filled, allowing the script to succeed.
226+
**Real-World Testing**: The [`tests/systemtests/Counter/script/SimpleSends.s.sol`](../tests/systemtests/Counter/script/SimpleSends.s.sol) script demonstrates typical Ethereum tooling behavior - it sends 10 sequential transactions in a batch, which naturally arrive out of order and create nonce gaps. With the default Cosmos mempool, this script would fail with sequence errors. With the EVM mempool, all transactions are queued locally and promoted as gaps are filled, allowing the script to succeed.
215227

216228
### Design Principles
217229

mempool/iterator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,17 +340,18 @@ func (i *EVMMempoolIterator) convertEVMToSDKTx(nextEVMTx *txpool.LazyTransaction
340340
}
341341

342342
msgEthereumTx := &msgtypes.MsgEthereumTx{}
343+
hash := nextEVMTx.Tx.Hash()
343344
if err := msgEthereumTx.FromSignedEthereumTx(nextEVMTx.Tx, ethtypes.LatestSignerForChainID(i.chainID)); err != nil {
344-
i.logger.Error("failed to convert signed Ethereum transaction", "error", err, "tx_hash", nextEVMTx.Tx.Hash().Hex())
345+
i.logger.Error("failed to convert signed Ethereum transaction", "error", err, "tx_hash", hash)
345346
return nil // Return nil for invalid tx instead of panicking
346347
}
347348

348349
cosmosTx, err := msgEthereumTx.BuildTx(i.txConfig.NewTxBuilder(), i.bondDenom)
349350
if err != nil {
350-
i.logger.Error("failed to build Cosmos transaction from EVM transaction", "error", err, "tx_hash", nextEVMTx.Tx.Hash().Hex())
351+
i.logger.Error("failed to build Cosmos transaction from EVM transaction", "error", err, "tx_hash", hash)
351352
return nil
352353
}
353354

354-
i.logger.Debug("successfully converted EVM transaction to Cosmos transaction", "tx_hash", nextEVMTx.Tx.Hash().Hex())
355+
i.logger.Debug("successfully converted EVM transaction to Cosmos transaction", "tx_hash", hash)
355356
return cosmosTx
356357
}

mempool/mempool.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import (
99
ethtypes "github.com/ethereum/go-ethereum/core/types"
1010
"github.com/holiman/uint256"
1111

12+
cmttypes "github.com/cometbft/cometbft/types"
13+
1214
"github.com/cosmos/evm/mempool/miner"
1315
"github.com/cosmos/evm/mempool/txpool"
1416
"github.com/cosmos/evm/mempool/txpool/legacypool"
17+
"github.com/cosmos/evm/rpc/stream"
1518
"github.com/cosmos/evm/x/precisebank/types"
1619
evmtypes "github.com/cosmos/evm/x/vm/types"
1720

18-
errorsmod "cosmossdk.io/errors"
1921
"cosmossdk.io/log"
2022
"cosmossdk.io/math"
2123

@@ -27,6 +29,8 @@ import (
2729

2830
var _ sdkmempool.ExtMempool = &ExperimentalEVMMempool{}
2931

32+
const SubscriberName = "evm"
33+
3034
type (
3135
// ExperimentalEVMMempool is a unified mempool that manages both EVM and Cosmos SDK transactions.
3236
// It provides a single interface for transaction insertion, selection, and removal while
@@ -54,6 +58,8 @@ type (
5458

5559
/** Concurrency **/
5660
mtx sync.Mutex
61+
62+
eventBus *cmttypes.EventBus
5763
}
5864
)
5965

@@ -202,22 +208,18 @@ func (m *ExperimentalEVMMempool) Insert(goCtx context.Context, tx sdk.Tx) error
202208
blockHeight := ctx.BlockHeight()
203209

204210
m.logger.Debug("inserting transaction into mempool", "block_height", blockHeight)
205-
206-
if blockHeight < 2 {
207-
return errorsmod.Wrap(sdkerrors.ErrInvalidHeight, "Mempool is not ready. Please wait for block 1 to finalize.")
208-
}
209-
210211
ethMsg, err := m.getEVMMessage(tx)
211212
if err == nil {
212213
// Insert into EVM pool
213-
m.logger.Debug("inserting EVM transaction", "tx_hash", ethMsg.Hash)
214+
hash := ethMsg.Hash()
215+
m.logger.Debug("inserting EVM transaction", "tx_hash", hash)
214216
ethTxs := []*ethtypes.Transaction{ethMsg.AsTransaction()}
215217
errs := m.txPool.Add(ethTxs, true)
216218
if len(errs) > 0 && errs[0] != nil {
217-
m.logger.Error("failed to insert EVM transaction", "error", errs[0], "tx_hash", ethMsg.Hash)
219+
m.logger.Error("failed to insert EVM transaction", "error", errs[0], "tx_hash", hash)
218220
return errs[0]
219221
}
220-
m.logger.Debug("EVM transaction inserted successfully", "tx_hash", ethMsg.Hash)
222+
m.logger.Debug("EVM transaction inserted successfully", "tx_hash", hash)
221223
return nil
222224
}
223225

@@ -300,11 +302,12 @@ func (m *ExperimentalEVMMempool) Remove(tx sdk.Tx) error {
300302
// We should not do this with EVM transactions because removing them causes the subsequent ones to
301303
// be dequeued as temporarily invalid, only to be requeued a block later.
302304
// The EVM mempool handles removal based on account nonce automatically.
305+
hash := msg.Hash()
303306
if m.shouldRemoveFromEVMPool(tx) {
304-
m.logger.Debug("manually removing EVM transaction", "tx_hash", msg.Hash())
305-
m.legacyTxPool.RemoveTx(msg.Hash(), false, true)
307+
m.logger.Debug("manually removing EVM transaction", "tx_hash", hash)
308+
m.legacyTxPool.RemoveTx(hash, false, true)
306309
} else {
307-
m.logger.Debug("skipping manual removal of EVM transaction, leaving to mempool to handle", "tx_hash", msg.Hash)
310+
m.logger.Debug("skipping manual removal of EVM transaction, leaving to mempool to handle", "tx_hash", hash)
308311
}
309312
return nil
310313
}
@@ -372,6 +375,31 @@ func (m *ExperimentalEVMMempool) SelectBy(goCtx context.Context, i [][]byte, f f
372375
}
373376
}
374377

378+
// SetEventBus sets CometBFT event bus to listen for new block header event.
379+
func (m *ExperimentalEVMMempool) SetEventBus(eventBus *cmttypes.EventBus) {
380+
if m.eventBus != nil {
381+
m.eventBus.Unsubscribe(context.Background(), SubscriberName, stream.NewBlockHeaderEvents) //nolint: errcheck
382+
}
383+
m.eventBus = eventBus
384+
sub, err := eventBus.Subscribe(context.Background(), SubscriberName, stream.NewBlockHeaderEvents)
385+
if err != nil {
386+
panic(err)
387+
}
388+
go func() {
389+
for range sub.Out() {
390+
m.GetBlockchain().NotifyNewBlock()
391+
}
392+
}()
393+
}
394+
395+
// Close unsubscribes from the CometBFT event bus.
396+
func (m *ExperimentalEVMMempool) Close() error {
397+
if m.eventBus != nil {
398+
return m.eventBus.Unsubscribe(context.Background(), SubscriberName, stream.NewBlockHeaderEvents)
399+
}
400+
return nil
401+
}
402+
375403
// getEVMMessage validates that the transaction contains exactly one message and returns it if it's an EVM message.
376404
// Returns an error if the transaction has no messages, multiple messages, or the single message is not an EVM transaction.
377405
func (m *ExperimentalEVMMempool) getEVMMessage(tx sdk.Tx) (*evmtypes.MsgEthereumTx, error) {

rpc/backend/sign_tx.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/ethereum/go-ethereum/crypto"
1414
"github.com/ethereum/go-ethereum/signer/core/apitypes"
1515

16+
"github.com/cosmos/evm/mempool"
1617
evmtypes "github.com/cosmos/evm/x/vm/types"
1718

1819
errorsmod "cosmossdk.io/errors"
@@ -109,7 +110,7 @@ func (b *Backend) SendTransaction(args evmtypes.TransactionArgs) (common.Hash, e
109110
}
110111
if err != nil {
111112
// Check if this is a nonce gap error that was successfully queued
112-
if strings.Contains(err.Error(), "tx nonce is higher than account nonce") {
113+
if strings.Contains(err.Error(), mempool.ErrNonceGap.Error()) {
113114
// Transaction was successfully queued due to nonce gap, return success to client
114115
b.Logger.Debug("transaction queued due to nonce gap", "hash", txHash.Hex())
115116
return txHash, nil

rpc/stream/rpc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ var (
4040
cmttypes.EventTx,
4141
sdk.EventTypeMessage,
4242
sdk.AttributeKeyModule, evmtypes.ModuleName)).String()
43-
blockEvents = cmttypes.QueryForEvent(cmttypes.EventNewBlock).String()
44-
evmTxHashKey = fmt.Sprintf("%s.%s", evmtypes.TypeMsgEthereumTx, evmtypes.AttributeKeyEthereumTxHash)
43+
blockEvents = cmttypes.QueryForEvent(cmttypes.EventNewBlock).String()
44+
evmTxHashKey = fmt.Sprintf("%s.%s", evmtypes.TypeMsgEthereumTx, evmtypes.AttributeKeyEthereumTxHash)
45+
NewBlockHeaderEvents = cmtquery.MustCompile(fmt.Sprintf("%s='%s'", cmttypes.EventTypeKey, cmttypes.EventNewBlockHeader))
4546
)
4647

4748
type RPCHeader struct {

server/json_rpc.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log/slog"
77
"net/http"
8+
"time"
89

910
"github.com/ethereum/go-ethereum/common"
1011
ethrpc "github.com/ethereum/go-ethereum/rpc"
@@ -24,6 +25,8 @@ import (
2425
"github.com/cosmos/cosmos-sdk/server"
2526
)
2627

28+
const shutdownTimeout = 5 * time.Second
29+
2730
type AppWithPendingTxStream interface {
2831
RegisterPendingTxListener(listener func(common.Hash))
2932
}
@@ -109,7 +112,9 @@ func StartJSONRPC(
109112
// The calling process canceled or closed the provided context, so we must
110113
// gracefully stop the JSON-RPC server.
111114
logger.Info("stopping JSON-RPC server...", "address", config.JSONRPC.Address)
112-
if err := httpSrv.Shutdown(context.Background()); err != nil {
115+
ctxShutdown, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
116+
defer cancel()
117+
if err := httpSrv.Shutdown(ctxShutdown); err != nil {
113118
logger.Error("failed to shutdown JSON-RPC server", "error", err.Error())
114119
}
115120
return nil

server/start.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,9 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
432432
return err
433433
}
434434

435+
if m, ok := evmApp.GetMempool().(*evmmempool.ExperimentalEVMMempool); ok {
436+
m.SetEventBus(tmNode.EventBus())
437+
}
435438
defer func() {
436439
if tmNode.IsRunning() {
437440
_ = tmNode.Stop()

0 commit comments

Comments
 (0)