Skip to content

Conversation

@kakysha
Copy link

@kakysha kakysha commented Aug 11, 2025

doc: https://injective-labs.atlassian.net/browse/CP-341

Summary by CodeRabbit

  • New Features

    • Added a "proxy" mempool option and a node option to plug in custom mempools and broadcast streams.
    • New mempool reactor for efficient peer broadcast with concurrency limits, sync gating, and per-peer channels.
    • New transaction abstractions and a builder-style API plus a proxy wrapper to swap mempool implementations at runtime.
    • CheckTx made asynchronous: results delivered via async responses instead of immediate errors.
  • Tests

    • Added end-to-end tests validating tx broadcast and ordering across nodes.

kakysha and others added 4 commits July 15, 2025 16:12
feat: add proxy mempool

refactor: use `Mempool` interface instead of `*CListMempool` in mempool.Reactor

WIP: init mempool, mempool Reactor

feat: revert `Reactor` and add `MempoolReactor`

feat: add `TxBroadcastStream` interface

chore: update comment

feat: implement `SetTxBroadcastStream` method in `ProxyMempool`

feat: add MempoolTx interface

fix: type

refactor: change reactor name to MempoolInterfaceReactor

refactor: rename GetNextTx to GetTxChannel in TxBroadcastStream interface

fix: method name

feat: add WithCustomMempoolOnProxyMempool func as node opt

feat: AddSender method to MempoolTx interface

chore(mempool): add broadcast txs message test

feat(mempool): introduce CustomMempool interface and update ProxyMempool initialization

Revert "feat(mempool): introduce CustomMempool interface and update ProxyMempool initialization"

This reverts commit a09a436.

refactor: remove senderID parameter from NewMempoolTx constructor

refactor: implement builder pattern for MempoolTx creation

chore: remove todo
@coderabbitai
Copy link

coderabbitai bot commented Aug 11, 2025

Walkthrough

Adds a Proxy-based mempool and TxBroadcastStream APIs, a MempoolInterfaceReactor for p2p tx broadcast, a ProxyMempool wrapper and node wiring to inject custom mempools/streams, a builder-based MempoolTx, converts local ABCI CheckTxAsync to an asynchronous ReqRes-driven flow, and adds reactor integration tests.

Changes

Cohort / File(s) Summary
ABCI client async CheckTx
abci/client/unsync_local_client.go
CheckTxAsync now creates and returns a ReqRes immediately, runs Application.CheckTx in a goroutine, stores success as ToCheckTxResponse or errors as ToExceptionResponse, invokes callbacks via the ReqRes and no longer returns errors synchronously.
Config: mempool type
config/config.go
Adds exported constant MempoolTypeProxy = "proxy".
Mempool public API
mempool/mempool.go
Adds public interfaces MempoolTx (accessors and sender tracking) and TxBroadcastStream (GetTxChannel()).
Mempool tx implementation & builder
mempool/mempoolTx.go
Introduces mempoolTx compile-time assertion to MempoolTx, atomic Height()/GasWanted() accessors, AddSender, NewMempoolTx, and a MempoolTxBuilder with With* methods and Build().
Mempool interface reactor
mempool/mempool_interface_reactor.go
Adds MempoolInterfaceReactor (BaseReactor) that consumes a TxBroadcastStream, manages per-peer broadcast channels, semaphores for persistent/non-persistent peers, wait-sync gating, inbound Txs handling (CheckTx), global broadcast routine and per-peer routines.
Reactor tests
mempool/mempool_interface_reactor_test.go
Adds integration-style tests: mock TxBroadcastStream, helpers to create reactors/switches/mempools, inject random txs, and assert propagation and ordering across nodes.
Proxy mempool wrapper
mempool/proxy_mempool.go
Adds ProxyMempool embedding Mempool and TxBroadcastStream, with SetMempool and SetTxBroadcastStream for dynamic swapping; compile-time interface assertions included.
Node option for proxy mempool
node/node.go
Adds WithCustomMempoolOnProxyMempool(customMempool, txBroadcastStream) Option to inject and wire a custom mempool/stream into a node's ProxyMempool (panics if node mempool is not a proxy).
Node setup wiring
node/setup.go
Extends createMempoolAndMempoolReactor to handle cfg.MempoolTypeProxy: creates ProxyMempool, MempoolInterfaceReactor (using the proxy for both mempool and stream), optionally enables TxsAvailable when waiting for txs, and sets reactor logger.

Sequence Diagram(s)

sequenceDiagram
  participant Client as ABCI Client
  participant ReqRes as ReqRes
  participant App as ABCI App
  Note over Client,ReqRes: async CheckTx flow
  Client->>ReqRes: NewReqRes(CheckTxRequest)
  Client->>Client: return ReqRes immediately
  Client->>App: (goroutine) CheckTx(ctx, req)
  App-->>Client: result or error
  Client->>ReqRes: set Response (CheckTxResponse or ExceptionResponse)
  Client->>ReqRes: Done() + InvokeCallback()
Loading
sequenceDiagram
  participant Node as Node Setup
  participant Proxy as ProxyMempool
  participant Reactor as MempoolInterfaceReactor
  Note over Node,Proxy: proxy mempool wiring
  Node->>Proxy: create ProxyMempool
  Node->>Reactor: NewMempoolInterfaceReactor(cfg, Proxy, Proxy, waitSync)
  alt WithCustomMempoolOnProxyMempool used
    Node->>Proxy: SetMempool(customMempool)
    Node->>Proxy: SetTxBroadcastStream(customStream)
  end
Loading
sequenceDiagram
  participant Stream as TxBroadcastStream
  participant Reactor as MempoolInterfaceReactor
  participant Peer as Peer (per-peer chan)
  participant MP as Mempool
  Stream-->>Reactor: tx := <-GetTxChannel()
  Reactor->>Peer: fan-out non-blocking send to per-peer chans
  Peer-->>Reactor: Receive Txs message
  Reactor->>MP: CheckTx(tx)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60–90 minutes

Poem

I nibble code and plant a stream,
A proxy burrow feeds the team.
Goroutines hum, callbacks hop,
Semaphores guard each eager stop.
Tiny txs hop — the network beams. 🥕

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "CP-341 Async CheckTx and Unified Mempool" succinctly and accurately captures the main changes in the PR — converting CheckTx to an async path and introducing a unified/proxy mempool with related interfaces/reactor — and is concise, specific, and relevant to the modified files.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch CP-341/parallel-checktx-proxy-mempool

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2a6bcc9 and 799a71d.

📒 Files selected for processing (1)
  • abci/client/unsync_local_client.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • abci/client/unsync_local_client.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: test_apps
  • GitHub Check: tests (05)
  • GitHub Check: tests (03)
  • GitHub Check: tests (04)
  • GitHub Check: tests (01)
  • GitHub Check: e2e-test
  • GitHub Check: lint

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Aug 11, 2025

Hey there and thank you for opening this pull request! 👋🏼

We require pull request titles to follow the Conventional Commits specification and it looks like your proposed title needs to be adjusted.

Details:

No release type found in pull request title "[CP-341] Async CheckTx and Unified Mempool". Add a prefix to indicate what kind of release this pull request corresponds to. For reference, see https://www.conventionalcommits.org/

Available types:
 - feat
 - fix
 - build
 - chore
 - ci
 - docs
 - refactor
 - perf
 - test
 - revert
 - spec
 - merge

General format: type(scope): msg
Breaking change: type(scope)!: msg
Multi-scope change: type: msg
Types: feat, fix, build, chore, ci, docs, refactor, perf, test, revert, spec, merge.
Example: fix(cmd/cometbft/commands/debug): execute p.Signal only when p is not nil

@kakysha kakysha changed the title [TEMP PR for visibility] CP-341/parallel checktx proxy mempool [CP-341] Async CheckTx and Unified Mempool Aug 17, 2025
@kakysha kakysha marked this pull request as ready for review August 17, 2025 18:17
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (7)
mempool/mempool.go (1)

185-201: Reduce duplication: make MempoolTx extend Entry and add AddSender

Entry already defines Tx, Height, GasWanted, IsSender, and Senders. Having MempoolTx repeat them increases maintenance overhead. Prefer interface embedding to extend Entry with AddSender.

Apply:

-// MempoolTx defines the interface for a transaction in the mempool
-//
-// It provides methods to access transaction properties and sender information
-//
-type MempoolTx interface {
-	Height() int64
-	GasWanted() int64
-	Tx() types.Tx
-	IsSender(peerID p2p.ID) bool
-	AddSender(peerID p2p.ID) bool
-	Senders() []p2p.ID
-}
+// MempoolTx represents a mempool entry with the ability to register a sender.
+// It extends Entry with mutating sender registration.
+type MempoolTx interface {
+	Entry
+	AddSender(peerID p2p.ID) bool
+}
mempool/proxy_mempool.go (1)

13-19: Provide a constructor and guard against nil underlying implementations

Right now, if ProxyMempool is used before SetMempool/SetTxBroadcastStream are called, any promoted method call (e.g., CheckTx or GetTxChannel) will panic due to a nil embedded interface. A simple constructor will make correct wiring harder to misuse, and you can optionally add defensive checks where appropriate.

Apply this diff to add a constructor:

 func (m *ProxyMempool) SetTxBroadcastStream(stream TxBroadcastStream) {
 	m.TxBroadcastStream = stream
 }
+
+// NewProxyMempool constructs a ProxyMempool pre-wired with the given components.
+// Note: passing nil for either argument is allowed, but calling promoted methods
+// on a nil underlying interface will panic. Prefer setting both before use.
+func NewProxyMempool(mp Mempool, stream TxBroadcastStream) *ProxyMempool {
+	return &ProxyMempool{
+		Mempool:           mp,
+		TxBroadcastStream: stream,
+	}
+}

Additionally, please verify that node/setup initializes the ProxyMempool with concrete Mempool and TxBroadcastStream before handing it to any consumers (reactors, etc.).

mempool/mempool_interface_reactor_test.go (1)

168-176: Wait on CheckTx response before inspecting it

ReqRes embeds a sync.WaitGroup (see abci/client/client.go), so you can call reqres.Wait() to block until the async callback completes. Update the test to wait for the ABCI response before reading res:

-       reqres, err := mempool.CheckTx(tx, senderID)
-       res := reqres.Response.GetCheckTx()
-
-       if res.IsErr() {
-           t.Logf("CheckTx callback failed for tx %X: %s, code: %d, log: %s, info: %s",
-               tx, res.Log, res.Code, res.Log, res.Info)
-       }
-       require.NoError(t, err, "mempool.CheckTx returned an error for tx %X. Error: %v", tx, err)
-       require.EqualValuesf(t, abci.CodeTypeOK, res.Code,
-           "CheckTx callback response code is not OK for tx %X. Got %d", tx, res.Code)
+       reqres, err := mempool.CheckTx(tx, senderID)
+       require.NoError(t, err,
+           "mempool.CheckTx returned an error for tx %X. Error: %v", tx, err)
+       reqres.Wait() // block until the async callback signals completion
+       res := reqres.Response.GetCheckTx()
+       require.NotNil(t, res, "expected CheckTx response for tx %X", tx)
+       if res.IsErr() {
+           t.Logf("CheckTx callback failed for tx %X: %s, code: %d, log: %s, info: %s",
+               tx, res.Log, res.Code, res.Log, res.Info)
+       }
+       require.EqualValuesf(t, abci.CodeTypeOK, res.Code,
+           "CheckTx callback response code is not OK for tx %X. Got %d", tx, res.Code)

This ensures you’re not racing on the async callback before asserting the response.

mempool/mempoolTx.go (2)

29-35: Convenience builder is fine; minor note on semantics of AddSender return value

NewMempoolTx delegating to the builder is clean. Note: AddSender returns true when the sender already exists (per addSender doc). This is slightly counterintuitive naming-wise, but consistent with your existing private method behavior. Consider clarifying this in the exported AddSender comment to avoid misinterpretation by callers.


115-126: Builder Build sets senders map; timestamp/lane/seq left unset by design

Build populates height, gasWanted, tx, and initial senders. Leaving timestamp/lane/seq unset is acceptable if they’re non-essential here; otherwise consider extending the builder with WithTimestamp/WithLane/WithSeq for future-proofing.

mempool/mempool_interface_reactor.go (2)

69-73: Comment vs implementation mismatch: SetLogger does not set the mempool’s logger

The comment says it sets the logger on the reactor and the underlying mempool, but only memR.Logger is set. Either adjust the comment or set the mempool’s logger via a best-effort type assertion.

Apply this diff to honor the comment without changing the Mempool interface:

 func (memR *MempoolInterfaceReactor) SetLogger(l log.Logger) {
 	memR.Logger = l
+	// Best-effort: set the mempool's logger if it supports SetLogger(log.Logger).
+	if ml, ok := any(memR.mempool).(interface{ SetLogger(log.Logger) }); ok {
+		ml.SetLogger(l)
+	}
 }

267-311: Handle closed txStream channel to avoid a silent busy loop

When txChan is closed, reading from it yields the zero value (nil) repeatedly and the loop keeps spinning. Handle the ok flag to exit cleanly.

Apply this diff:

-        case tx := <-txChan:
-            if tx != nil {
+        case tx, ok := <-txChan:
+            if !ok {
+                memR.Logger.Info("txStream channel closed; stopping broadcast routine")
+                return
+            }
+            if tx != nil {
                 memR.peerBroadcastChannels.Range(func(key, value interface{}) bool {
                     peerID := key.(p2p.ID)
                     ch := value.(chan MempoolTx)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1615241 and 8df4e19.

📒 Files selected for processing (9)
  • abci/client/unsync_local_client.go (1 hunks)
  • config/config.go (1 hunks)
  • mempool/mempool.go (1 hunks)
  • mempool/mempoolTx.go (4 hunks)
  • mempool/mempool_interface_reactor.go (1 hunks)
  • mempool/mempool_interface_reactor_test.go (1 hunks)
  • mempool/proxy_mempool.go (1 hunks)
  • node/node.go (1 hunks)
  • node/setup.go (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (8)
mempool/mempool.go (2)
types/tx.go (1)
  • Tx (22-22)
p2p/key.go (1)
  • ID (16-16)
node/node.go (2)
mempool/mempool.go (2)
  • Mempool (27-111)
  • TxBroadcastStream (198-201)
mempool/proxy_mempool.go (1)
  • ProxyMempool (5-8)
abci/client/unsync_local_client.go (5)
abci/client/client.go (2)
  • NewReqRes (85-94)
  • Callback (66-66)
abci/types/messages.go (3)
  • ToCheckTxRequest (50-54)
  • ToExceptionResponse (130-134)
  • ToCheckTxResponse (154-158)
abci/types/mocks/application.go (1)
  • Application (13-15)
abci/types/application.go (1)
  • Application (9-35)
abci/types/types.go (2)
  • Response (53-53)
  • Request (13-13)
node/setup.go (5)
config/config.go (1)
  • MempoolTypeProxy (52-52)
mempool/proxy_mempool.go (1)
  • ProxyMempool (5-8)
mempool/mempool_interface_reactor.go (1)
  • NewMempoolInterfaceReactor (45-62)
mempool/mempool.go (1)
  • Mempool (27-111)
rpc/core/env.go (1)
  • Consensus (38-44)
mempool/mempool_interface_reactor_test.go (11)
mempool/mempool.go (2)
  • MempoolTx (187-194)
  • Mempool (27-111)
types/tx.go (1)
  • Tx (22-22)
mempool/clist_mempool.go (2)
  • CListMempool (33-76)
  • NewCListMempool (101-156)
internal/test/config.go (1)
  • ResetTestRoot (12-14)
mempool/mempool_interface_reactor.go (2)
  • MempoolInterfaceReactor (23-42)
  • NewMempoolInterfaceReactor (45-62)
p2p/switch.go (1)
  • Switch (72-98)
abci/example/kvstore/kvstore.go (1)
  • NewInMemoryApplication (99-101)
proxy/client.go (1)
  • NewLocalClientCreator (43-48)
p2p/test_util.go (2)
  • MakeConnectedSwitches (76-83)
  • Connect2Switches (122-145)
p2p/key.go (1)
  • ID (16-16)
mempool/mempoolTx.go (1)
  • NewMempoolTxBuilder (86-90)
mempool/proxy_mempool.go (1)
mempool/mempool.go (2)
  • Mempool (27-111)
  • TxBroadcastStream (198-201)
mempool/mempool_interface_reactor.go (7)
config/config.go (1)
  • MempoolConfig (867-948)
mempool/mempool.go (4)
  • Mempool (27-111)
  • TxBroadcastStream (198-201)
  • MempoolChannel (14-14)
  • MempoolTx (187-194)
p2p/base_reactor.go (1)
  • NewBaseReactor (53-58)
p2p/key.go (1)
  • ID (16-16)
p2p/types.go (1)
  • Envelope (17-21)
types/tx.go (1)
  • Tx (22-22)
mempool/errors.go (2)
  • ErrTxInCache (12-12)
  • ErrMempoolIsFull (64-69)
mempool/mempoolTx.go (3)
mempool/mempool.go (1)
  • MempoolTx (187-194)
types/tx.go (1)
  • Tx (22-22)
p2p/key.go (1)
  • ID (16-16)
🪛 GitHub Check: tests (03)
mempool/mempool_interface_reactor_test.go

[failure] 110-110:
not enough arguments in call to NewMempoolInterfaceReactor


[failure] 110-110:
invalid operation: NewMempoolInterfaceReactor(conf.Mempool, mempools[i], txStreams[i]) (value of type *MempoolInterfaceReactor) is not an interface

🪛 GitHub Check: lint
mempool/mempool_interface_reactor_test.go

[failure] 110-110:
not enough arguments in call to NewMempoolInterfaceReactor


[failure] 110-110:
invalid operation: NewMempoolInterfaceReactor(conf.Mempool, mempools[i], txStreams[i]) (value of type *MempoolInterfaceReactor) is not an interface

🪛 GitHub Actions: Test
mempool/mempool_interface_reactor_test.go

[error] 110-110: invalid operation: NewMempoolInterfaceReactor(conf.Mempool, mempools[i], txStreams[i]) (value of type *MempoolInterfaceReactor) is not an interface

🪛 GitHub Actions: Lint, format and check the code for typos
mempool/mempool_interface_reactor_test.go

[error] 110-110: invalid operation: NewMempoolInterfaceReactor(conf.Mempool, mempools[i], txStreams[i]) (value of type *MempoolInterfaceReactor) is not an interface

🔇 Additional comments (5)
mempool/proxy_mempool.go (1)

10-12: LGTM: compile-time interface assertions are correct

Asserting ProxyMempool implements both Mempool and TxBroadcastStream is appropriate and helps catch interface drift at compile time.

mempool/mempool_interface_reactor_test.go (1)

282-286: PeerState interface exists & is implemented by reactorTestPeerState
The PeerState interface with GetHeight() int64 is declared in mempool/reactor.go (lines 313–315), and your reactorTestPeerState in mempool_interface_reactor_test.go implements that method. The runtime type assertion will succeed—no changes needed.

Likely an incorrect or invalid review comment.

mempool/mempoolTx.go (1)

12-14: LGTM: compile-time assertion ensures interface adherence

Asserting mempoolTx implements MempoolTx provides good safety for the expanded API surface.

mempool/mempool_interface_reactor.go (2)

104-159: Semaphore gating logic looks correct and bounded; defer release lifetime is appropriate

The per-peer goroutine acquires semaphores with a timeout and holds the slot for the duration of the broadcast routine. This matches the intended “active gossip connections” semantics.


220-265: Remove redundant interface suggestion

The PeerState interface is already defined in mempool/reactor.go (lines 313–315), so there’s no need to add a local definition:

  • mempool/reactor.go:313–315
    // PeerState describes the state of a peer.
    type PeerState interface {
        GetHeight() int64
    }

Likely an incorrect or invalid review comment.

Comment on lines 46 to 64
func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToCheckTxRequest(req),
types.ToCheckTxResponse(res),
), nil
}

func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
if app.Callback != nil {
app.Callback(req, res)
}
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
reqres := NewReqRes(types.ToCheckTxRequest(req))

go func() {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
} else {
reqres.Response = types.ToCheckTxResponse(res)
}

if app.Callback != nil {
app.Callback(reqres.Request, reqres.Response)
}
reqres.InvokeCallback()
}()

return reqres, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Mark ReqRes as done, fix callback data race, and propagate the error message

  • The goroutine never decrements reqres.WaitGroup; any waiter on the ReqRes (e.g., FlushAppConn/Wait) will block indefinitely.
  • Access to app.Callback is unprotected; SetResponseCallback uses a mutex, so invoking without reading under lock can race.
  • ToExceptionResponse("") discards the underlying error. Include err.Error() to aid diagnosis.

Apply:

 func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
-	reqres := NewReqRes(types.ToCheckTxRequest(req))
+	reqres := NewReqRes(types.ToCheckTxRequest(req))

-	go func() {
-		res, err := app.Application.CheckTx(ctx, req)
-		if err != nil {
-			reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
-		} else {
-			reqres.Response = types.ToCheckTxResponse(res)
-		}
-
-		if app.Callback != nil {
-			app.Callback(reqres.Request, reqres.Response)
-		}
-		reqres.InvokeCallback()
-	}()
+	go func() {
+		// Ensure any waiter on this ReqRes unblocks.
+		defer reqres.WaitGroup.Done()
+
+		res, err := app.Application.CheckTx(ctx, req)
+		if err != nil {
+			reqres.Response = types.ToExceptionResponse(err.Error())
+		} else {
+			reqres.Response = types.ToCheckTxResponse(res)
+		}
+
+		// Read callback under lock to avoid races with SetResponseCallback.
+		app.mtx.Lock()
+		cb := app.Callback
+		app.mtx.Unlock()
+		if cb != nil {
+			cb(reqres.Request, reqres.Response)
+		}
+		// If a per-request callback is set on reqres, invoke it too.
+		reqres.InvokeCallback()
+	}()
 
 	return reqres, nil
 }

To double-check the correct way to mark a ReqRes as complete, inspect ReqRes methods:


🏁 Script executed:

#!/bin/bash
rg -n -C2 'type\s+ReqRes\s+struct' abci/client
rg -n -C2 'func\s*\(\*ReqRes\)\s*(Done|SetDone|InvokeCallback)\s*\(' abci/client

Length of output: 363


I didn’t locate NewReqRes, InvokeCallback, or a Done method for ReqRes. Let’s find their definitions across the repo:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== NewReqRes definition ==="
rg -n -C2 '^func\s\+NewReqRes' .

echo
echo "=== ReqRes methods ==="
rg -n -C2 '^func\s*\(\*?ReqRes\)\.' .

echo
echo "=== InvokeCallback usage ==="
rg -n -C2 'InvokeCallback' .

echo
echo "=== WaitGroup field usage ==="
rg -n -C2 'WaitGroup' .

Length of output: 110


Let’s locate all ReqRes usages and definitions to confirm how WaitGroup and InvokeCallback behave:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== All ReqRes references ==="
rg -n -C2 'ReqRes' .

echo
echo "=== NewReqRes occurrences ==="
rg -n 'NewReqRes' .

echo
echo "=== InvokeCallback occurrences ==="
rg -n 'InvokeCallback' .

echo
echo "=== Methods on ReqRes ==="
rg -n -C2 '^func .*ReqRes' .

Length of output: 38394


Let’s inspect unsyncLocalClient to confirm its fields and callback handling:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== unsyncLocalClient definition ==="
rg -n -C3 'type\s\+unsyncLocalClient' abci/client/unsync_local_client.go

echo
echo "=== SetResponseCallback implementation ==="
rg -n -C3 'SetResponseCallback' abci/client/unsync_local_client.go

echo
echo "=== unsyncLocalClient methods ==="
rg -n -C2 '^func\s*\(app\s+\*unsyncLocalClient\)' abci/client/unsync_local_client.go

Length of output: 166


Let’s inspect the unsyncLocalClient type and its methods to verify callback and mutex usage:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== unsync_local_client.go (lines 1–80) ==="
head -n 80 abci/client/unsync_local_client.go

Length of output: 2277


Fix Request-Response synchronization and callback races in unsyncLocalClient.CheckTxAsync

  • Ensure any waiter on the returned *ReqRes is unblocked by calling Done().
  • Propagate the real error message in the exception response.
  • Read and invoke the client‐level callback under the mutex to prevent races with SetResponseCallback.

File: abci/client/unsync_local_client.go

 func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
-   reqres := NewReqRes(types.ToCheckTxRequest(req))
+   reqres := NewReqRes(types.ToCheckTxRequest(req))

-   go func() {
-       res, err := app.Application.CheckTx(ctx, req)
-       if err != nil {
-           reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
-       } else {
-           reqres.Response = types.ToCheckTxResponse(res)
-       }
-
-       if app.Callback != nil {
-           app.Callback(reqres.Request, reqres.Response)
-       }
-       reqres.InvokeCallback()
-   }()
+   go func() {
+       // Unblock any Wait() calls on this ReqRes.
+       defer reqres.Done()
+
+       res, err := app.Application.CheckTx(ctx, req)
+       if err != nil {
+           reqres.Response = types.ToExceptionResponse(err.Error())
+       } else {
+           reqres.Response = types.ToCheckTxResponse(res)
+       }
+
+       // Safely grab the client‐level callback under lock.
+       app.mtx.Lock()
+       cb := app.Callback
+       app.mtx.Unlock()
+       if cb != nil {
+           cb(reqres.Request, reqres.Response)
+       }
+       // Invoke any per‐ReqRes callback.
+       reqres.InvokeCallback()
+   }()
 
   return reqres, nil
 }
🤖 Prompt for AI Agents
In abci/client/unsync_local_client.go around lines 46 to 64, the goroutine in
CheckTxAsync must (1) ensure any waiter on the returned *ReqRes is unblocked by
calling reqres.Done() before exiting, (2) propagate the actual error text into
the exception response (e.g. types.ToExceptionResponse(err.Error()) or similar)
instead of an empty string, and (3) read and invoke app.Callback under the same
mutex used by SetResponseCallback to avoid races (acquire the callback mutex,
capture the callback variable, release the mutex, then invoke it synchronously
with reqres.Request and reqres.Response). Ensure Done() is called in all code
paths (use defer or explicit calls) and keep response assignment consistent
before invoking the callback.

Comment on lines 51 to 54
MempoolTypeFlood = "flood"
MempoolTypeProxy = "proxy"
MempoolTypeNop = "nop"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Allow the new "proxy" mempool type in validation; otherwise node won't start

You added MempoolTypeProxy, but ValidateBasic still rejects any type other than "flood" and "nop". This will prevent nodes from booting with type="proxy" despite having a reactor path wired in node/setup.go.

Update MempoolConfig.ValidateBasic to accept the proxy type:

// in func (cfg *MempoolConfig) ValidateBasic() error
switch cfg.Type {
case MempoolTypeFlood, MempoolTypeNop, MempoolTypeProxy:
case "": // allow empty string to be backwards compatible
default:
	return fmt.Errorf("unknown mempool type: %q", cfg.Type)
}

Also consider updating the config docs/toml template to mention "proxy" as a valid mempool type.

Run this to find and update any config templates referencing mempool types:


🏁 Script executed:

#!/bin/bash
rg -nP --type=go -C2 '(MempoolTypeFlood|MempoolTypeNop)' config | sed -n '1,200p'
rg -nP 'mempool.*type' -C2 config
rg -nP --glob '!**/vendor/**' -C2 'proxy' config

Length of output: 6268


Allow “proxy” as a valid mempool type

The ValidateBasic method currently only permits "flood" and "nop", causing any node configured with Type = "proxy" to fail startup. We need to update validation, add a unit test, and document the new option in the TOML template.

• config/config.go
– In func (cfg *MempoolConfig) ValidateBasic(), include MempoolTypeProxy in the allowed cases.
• Replace:
go switch cfg.Type { case MempoolTypeFlood, MempoolTypeNop: case "": // allow empty string… default: return fmt.Errorf("unknown mempool type: %q", cfg.Type) }
with:
go switch cfg.Type { case MempoolTypeFlood, MempoolTypeNop, MempoolTypeProxy: case "": // allow empty string… default: return fmt.Errorf("unknown mempool type: %q", cfg.Type) }

• config/config_test.go
– Add a test asserting that ValidateBasic() succeeds when cfg.Mempool.Type = config.MempoolTypeProxy.

• config/config.toml.tpl
– Update the mempool section comment to list "proxy" as a valid type. For example:
diff # mempool_type = "flood" # options: "flood", "nop" + # mempool_type = "flood" # options: "flood", "nop", "proxy"

🤖 Prompt for AI Agents
In config/config.go around lines 51-54, the mempool type validation only allows
"flood" and "nop" so add MempoolTypeProxy to the allowed cases in
MempoolConfig.ValidateBasic(); also add a unit test in config/config_test.go
that sets cfg.Mempool.Type = config.MempoolTypeProxy and asserts ValidateBasic()
returns no error; finally update config/config.toml.tpl mempool section comment
to list "proxy" as a valid type so documentation matches the code.

Comment on lines +148 to +158
func WithCustomMempoolOnProxyMempool(customMempool mempl.Mempool, txBroadcastStream mempl.TxBroadcastStream) Option {
return func(n *Node) {
proxyMempool, ok := n.mempool.(*mempl.ProxyMempool)
if !ok {
panic("mempool is not a proxy mempool")
}

proxyMempool.SetMempool(customMempool)
proxyMempool.SetTxBroadcastStream(txBroadcastStream)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enable TxsAvailable on the injected mempool when consensus waits for txs

Since ProxyMempool is configured later via this option, enable the TxsAvailable channel on the actual underlying mempool here (instead of in setup), so consensus can block on tx availability if configured.

Apply:

 func WithCustomMempoolOnProxyMempool(customMempool mempl.Mempool, txBroadcastStream mempl.TxBroadcastStream) Option {
 	return func(n *Node) {
 		proxyMempool, ok := n.mempool.(*mempl.ProxyMempool)
 		if !ok {
 			panic("mempool is not a proxy mempool")
 		}
 
 		proxyMempool.SetMempool(customMempool)
 		proxyMempool.SetTxBroadcastStream(txBroadcastStream)
+		// Mirror setup behavior: if consensus is configured to wait for txs,
+		// ensure the underlying mempool exposes the TxsAvailable channel.
+		if n.config.Consensus.WaitForTxs() {
+			customMempool.EnableTxsAvailable()
+		}
 	}
 }

Optional: consider avoiding a hard panic and instead log an error or make this a no-op to keep node boot resilient if the option is misused.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func WithCustomMempoolOnProxyMempool(customMempool mempl.Mempool, txBroadcastStream mempl.TxBroadcastStream) Option {
return func(n *Node) {
proxyMempool, ok := n.mempool.(*mempl.ProxyMempool)
if !ok {
panic("mempool is not a proxy mempool")
}
proxyMempool.SetMempool(customMempool)
proxyMempool.SetTxBroadcastStream(txBroadcastStream)
}
}
func WithCustomMempoolOnProxyMempool(customMempool mempl.Mempool, txBroadcastStream mempl.TxBroadcastStream) Option {
return func(n *Node) {
proxyMempool, ok := n.mempool.(*mempl.ProxyMempool)
if !ok {
panic("mempool is not a proxy mempool")
}
proxyMempool.SetMempool(customMempool)
proxyMempool.SetTxBroadcastStream(txBroadcastStream)
// Mirror setup behavior: if consensus is configured to wait for txs,
// ensure the underlying mempool exposes the TxsAvailable channel.
if n.config.Consensus.WaitForTxs() {
customMempool.EnableTxsAvailable()
}
}
}
🤖 Prompt for AI Agents
In node/node.go around lines 148 to 158, the option that injects a custom
mempool into a ProxyMempool currently sets the underlying mempool but does not
enable its TxsAvailable channel, so consensus cannot block on tx availability
when the proxy is configured later; modify this function to detect the
underlying mempool (after SetMempool) and call the method to enable or
initialize its TxsAvailable channel (e.g., EnableTxsAvailable or equivalent) on
the actual injected mempool so consensus can wait for transactions, and replace
the hard panic on type mismatch with either a logged error or a no-op to avoid
crashing node bootstrap if the option is misused.

Comment on lines +325 to +339
case cfg.MempoolTypeProxy:
mp := &mempl.ProxyMempool{}
reactor := mempl.NewMempoolInterfaceReactor(
config.Mempool,
mp,
mp,
waitSync,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
reactor.SetLogger(logger)

return mp, reactor

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid calling methods on an uninitialized ProxyMempool; this will panic

mp is instantiated as &mempl.ProxyMempool{} without an underlying Mempool set. Because ProxyMempool embeds the Mempool interface, calling mp.EnableTxsAvailable() dereferences a nil interface and will panic if WaitForTxs is true. The underlying mempool and tx stream are provided later via options.

Remove the EnableTxsAvailable call here and enable it when wiring the custom mempool (see suggested change in node/node.go):

 	case cfg.MempoolTypeProxy:
 		mp := &mempl.ProxyMempool{}
 		reactor := mempl.NewMempoolInterfaceReactor(
 			config.Mempool,
 			mp,
 			mp,
 			waitSync,
 		)
-		if config.Consensus.WaitForTxs() {
-			mp.EnableTxsAvailable()
-		}
 		reactor.SetLogger(logger)
 
 		return mp, reactor
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case cfg.MempoolTypeProxy:
mp := &mempl.ProxyMempool{}
reactor := mempl.NewMempoolInterfaceReactor(
config.Mempool,
mp,
mp,
waitSync,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
reactor.SetLogger(logger)
return mp, reactor
case cfg.MempoolTypeProxy:
mp := &mempl.ProxyMempool{}
reactor := mempl.NewMempoolInterfaceReactor(
config.Mempool,
mp,
mp,
waitSync,
)
reactor.SetLogger(logger)
return mp, reactor
🤖 Prompt for AI Agents
In node/setup.go around lines 325 to 339, remove the call to
mp.EnableTxsAvailable() because mp is created as &mempl.ProxyMempool{} with no
underlying Mempool set and calling methods on its nil embedded interface can
panic; instead, leave ProxyMempool initialization and reactor setup as-is
(including reactor.SetLogger(logger)) and move enabling of TxsAvailable to the
point where the real mempool is wired (e.g., in node/node.go when the custom
mempool is attached to the proxy), calling EnableTxsAvailable on the
fully-initialized ProxyMempool after its underlying mempool/tx stream has been
set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants