Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions abci/client/unsync_local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
}

func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToCheckTxRequest(req),
types.ToCheckTxResponse(res),
), nil
}

func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
if app.Callback != nil {
app.Callback(req, res)
}
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
reqres := NewReqRes(types.ToCheckTxRequest(req))

go func() {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
} else {
reqres.Response = types.ToCheckTxResponse(res)
}

if app.Callback != nil {
app.Callback(reqres.Request, reqres.Response)
}
reqres.InvokeCallback()
}()

return reqres, nil
}
Comment on lines 46 to 67
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Mark ReqRes as done, fix callback data race, and propagate the error message

  • The goroutine never decrements reqres.WaitGroup; any waiter on the ReqRes (e.g., FlushAppConn/Wait) will block indefinitely.
  • Access to app.Callback is unprotected; SetResponseCallback uses a mutex, so invoking without reading under lock can race.
  • ToExceptionResponse("") discards the underlying error. Include err.Error() to aid diagnosis.

Apply:

 func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
-	reqres := NewReqRes(types.ToCheckTxRequest(req))
+	reqres := NewReqRes(types.ToCheckTxRequest(req))

-	go func() {
-		res, err := app.Application.CheckTx(ctx, req)
-		if err != nil {
-			reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
-		} else {
-			reqres.Response = types.ToCheckTxResponse(res)
-		}
-
-		if app.Callback != nil {
-			app.Callback(reqres.Request, reqres.Response)
-		}
-		reqres.InvokeCallback()
-	}()
+	go func() {
+		// Ensure any waiter on this ReqRes unblocks.
+		defer reqres.WaitGroup.Done()
+
+		res, err := app.Application.CheckTx(ctx, req)
+		if err != nil {
+			reqres.Response = types.ToExceptionResponse(err.Error())
+		} else {
+			reqres.Response = types.ToCheckTxResponse(res)
+		}
+
+		// Read callback under lock to avoid races with SetResponseCallback.
+		app.mtx.Lock()
+		cb := app.Callback
+		app.mtx.Unlock()
+		if cb != nil {
+			cb(reqres.Request, reqres.Response)
+		}
+		// If a per-request callback is set on reqres, invoke it too.
+		reqres.InvokeCallback()
+	}()
 
 	return reqres, nil
 }

To double-check the correct way to mark a ReqRes as complete, inspect ReqRes methods:


🏁 Script executed:

#!/bin/bash
rg -n -C2 'type\s+ReqRes\s+struct' abci/client
rg -n -C2 'func\s*\(\*ReqRes\)\s*(Done|SetDone|InvokeCallback)\s*\(' abci/client

Length of output: 363


I didn’t locate NewReqRes, InvokeCallback, or a Done method for ReqRes. Let’s find their definitions across the repo:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== NewReqRes definition ==="
rg -n -C2 '^func\s\+NewReqRes' .

echo
echo "=== ReqRes methods ==="
rg -n -C2 '^func\s*\(\*?ReqRes\)\.' .

echo
echo "=== InvokeCallback usage ==="
rg -n -C2 'InvokeCallback' .

echo
echo "=== WaitGroup field usage ==="
rg -n -C2 'WaitGroup' .

Length of output: 110


Let’s locate all ReqRes usages and definitions to confirm how WaitGroup and InvokeCallback behave:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== All ReqRes references ==="
rg -n -C2 'ReqRes' .

echo
echo "=== NewReqRes occurrences ==="
rg -n 'NewReqRes' .

echo
echo "=== InvokeCallback occurrences ==="
rg -n 'InvokeCallback' .

echo
echo "=== Methods on ReqRes ==="
rg -n -C2 '^func .*ReqRes' .

Length of output: 38394


Let’s inspect unsyncLocalClient to confirm its fields and callback handling:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== unsyncLocalClient definition ==="
rg -n -C3 'type\s\+unsyncLocalClient' abci/client/unsync_local_client.go

echo
echo "=== SetResponseCallback implementation ==="
rg -n -C3 'SetResponseCallback' abci/client/unsync_local_client.go

echo
echo "=== unsyncLocalClient methods ==="
rg -n -C2 '^func\s*\(app\s+\*unsyncLocalClient\)' abci/client/unsync_local_client.go

Length of output: 166


Let’s inspect the unsyncLocalClient type and its methods to verify callback and mutex usage:


🏁 Script executed:

#!/bin/bash
set -e

echo "=== unsync_local_client.go (lines 1–80) ==="
head -n 80 abci/client/unsync_local_client.go

Length of output: 2277


Fix Request-Response synchronization and callback races in unsyncLocalClient.CheckTxAsync

  • Ensure any waiter on the returned *ReqRes is unblocked by calling Done().
  • Propagate the real error message in the exception response.
  • Read and invoke the client‐level callback under the mutex to prevent races with SetResponseCallback.

File: abci/client/unsync_local_client.go

 func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
-   reqres := NewReqRes(types.ToCheckTxRequest(req))
+   reqres := NewReqRes(types.ToCheckTxRequest(req))

-   go func() {
-       res, err := app.Application.CheckTx(ctx, req)
-       if err != nil {
-           reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
-       } else {
-           reqres.Response = types.ToCheckTxResponse(res)
-       }
-
-       if app.Callback != nil {
-           app.Callback(reqres.Request, reqres.Response)
-       }
-       reqres.InvokeCallback()
-   }()
+   go func() {
+       // Unblock any Wait() calls on this ReqRes.
+       defer reqres.Done()
+
+       res, err := app.Application.CheckTx(ctx, req)
+       if err != nil {
+           reqres.Response = types.ToExceptionResponse(err.Error())
+       } else {
+           reqres.Response = types.ToCheckTxResponse(res)
+       }
+
+       // Safely grab the client‐level callback under lock.
+       app.mtx.Lock()
+       cb := app.Callback
+       app.mtx.Unlock()
+       if cb != nil {
+           cb(reqres.Request, reqres.Response)
+       }
+       // Invoke any per‐ReqRes callback.
+       reqres.InvokeCallback()
+   }()
 
   return reqres, nil
 }
🤖 Prompt for AI Agents
In abci/client/unsync_local_client.go around lines 46 to 64, the goroutine in
CheckTxAsync must (1) ensure any waiter on the returned *ReqRes is unblocked by
calling reqres.Done() before exiting, (2) propagate the actual error text into
the exception response (e.g. types.ToExceptionResponse(err.Error()) or similar)
instead of an empty string, and (3) read and invoke app.Callback under the same
mutex used by SetResponseCallback to avoid races (acquire the callback mutex,
capture the callback variable, release the mutex, then invoke it synchronously
with reqres.Request and reqres.Response). Ensure Done() is called in all code
paths (use defer or explicit calls) and keep response assignment consistent
before invoking the callback.


// -------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
v2 = "v2"

MempoolTypeFlood = "flood"
MempoolTypeProxy = "proxy"
MempoolTypeNop = "nop"
)
Comment on lines 51 to 54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Allow the new "proxy" mempool type in validation; otherwise node won't start

You added MempoolTypeProxy, but ValidateBasic still rejects any type other than "flood" and "nop". This will prevent nodes from booting with type="proxy" despite having a reactor path wired in node/setup.go.

Update MempoolConfig.ValidateBasic to accept the proxy type:

// in func (cfg *MempoolConfig) ValidateBasic() error
switch cfg.Type {
case MempoolTypeFlood, MempoolTypeNop, MempoolTypeProxy:
case "": // allow empty string to be backwards compatible
default:
	return fmt.Errorf("unknown mempool type: %q", cfg.Type)
}

Also consider updating the config docs/toml template to mention "proxy" as a valid mempool type.

Run this to find and update any config templates referencing mempool types:


🏁 Script executed:

#!/bin/bash
rg -nP --type=go -C2 '(MempoolTypeFlood|MempoolTypeNop)' config | sed -n '1,200p'
rg -nP 'mempool.*type' -C2 config
rg -nP --glob '!**/vendor/**' -C2 'proxy' config

Length of output: 6268


Allow “proxy” as a valid mempool type

The ValidateBasic method currently only permits "flood" and "nop", causing any node configured with Type = "proxy" to fail startup. We need to update validation, add a unit test, and document the new option in the TOML template.

• config/config.go
– In func (cfg *MempoolConfig) ValidateBasic(), include MempoolTypeProxy in the allowed cases.
• Replace:
go switch cfg.Type { case MempoolTypeFlood, MempoolTypeNop: case "": // allow empty string… default: return fmt.Errorf("unknown mempool type: %q", cfg.Type) }
with:
go switch cfg.Type { case MempoolTypeFlood, MempoolTypeNop, MempoolTypeProxy: case "": // allow empty string… default: return fmt.Errorf("unknown mempool type: %q", cfg.Type) }

• config/config_test.go
– Add a test asserting that ValidateBasic() succeeds when cfg.Mempool.Type = config.MempoolTypeProxy.

• config/config.toml.tpl
– Update the mempool section comment to list "proxy" as a valid type. For example:
diff # mempool_type = "flood" # options: "flood", "nop" + # mempool_type = "flood" # options: "flood", "nop", "proxy"

🤖 Prompt for AI Agents
In config/config.go around lines 51-54, the mempool type validation only allows
"flood" and "nop" so add MempoolTypeProxy to the allowed cases in
MempoolConfig.ValidateBasic(); also add a unit test in config/config_test.go
that sets cfg.Mempool.Type = config.MempoolTypeProxy and asserts ValidateBasic()
returns no error; finally update config/config.toml.tpl mempool section comment
to list "proxy" as a valid type so documentation matches the code.


Expand Down
18 changes: 18 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,21 @@ type Iterator interface {
// WaitNextCh returns a channel on which to wait for the next available entry.
WaitNextCh() <-chan Entry
}

// MempoolTx defines the interface for a transaction in the mempool
// It provides methods to access transaction properties and sender information
type MempoolTx interface {
Height() int64
GasWanted() int64
Tx() types.Tx
IsSender(peerID p2p.ID) bool
AddSender(peerID p2p.ID) bool
Senders() []p2p.ID
}

// TxBroadcastStream defines the interface for streaming transactions to broadcast.
// It provides a channel that will receive transactions to be broadcasted to peers.
type TxBroadcastStream interface {
// GetTxChannel returns a channel that will receive transactions to broadcast.
GetTxChannel() <-chan MempoolTx
}
69 changes: 67 additions & 2 deletions mempool/mempoolTx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
)

// mempoolTx is an entry in the mempool.
var _ MempoolTx = (*mempoolTx)(nil)

// mempoolTx is an entry in the mempool
type mempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
Expand All @@ -19,10 +22,17 @@ type mempoolTx struct {
timestamp time.Time // time when entry was created

// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> struct{}
// senders: PeerID -> bool
senders sync.Map
}

// NewMempoolTx creates a new mempoolTx using the builder pattern
func NewMempoolTx(tx types.Tx) MempoolTx {
return NewMempoolTxBuilder().
WithTx(tx).
Build()
}

func (memTx *mempoolTx) Tx() types.Tx {
return memTx.tx
}
Expand All @@ -32,14 +42,18 @@ func (memTx *mempoolTx) Height() int64 {
}

func (memTx *mempoolTx) GasWanted() int64 {
return memTx.gasWanted
return atomic.LoadInt64(&memTx.gasWanted)
}

func (memTx *mempoolTx) IsSender(peerID p2p.ID) bool {
_, ok := memTx.senders.Load(peerID)
return ok
}

func (memTx *mempoolTx) AddSender(peerID p2p.ID) bool {
return memTx.addSender(peerID)
}

// Add the peer ID to the list of senders. Return true iff it exists already in the list.
func (memTx *mempoolTx) addSender(peerID p2p.ID) bool {
if len(peerID) == 0 {
Expand All @@ -59,3 +73,54 @@ func (memTx *mempoolTx) Senders() []p2p.ID {
})
return senders
}

// MempoolTxBuilder is a builder for creating mempoolTx instances
type MempoolTxBuilder struct {
height int64
gasWanted int64
tx types.Tx
senders []p2p.ID
}

// NewMempoolTxBuilder creates a new builder for mempoolTx
func NewMempoolTxBuilder() *MempoolTxBuilder {
return &MempoolTxBuilder{
senders: make([]p2p.ID, 0),
}
}

// WithHeight sets the height for the mempoolTx
func (b *MempoolTxBuilder) WithHeight(height int64) *MempoolTxBuilder {
b.height = height
return b
}

// WithGasWanted sets the gas wanted for the mempoolTx
func (b *MempoolTxBuilder) WithGasWanted(gasWanted int64) *MempoolTxBuilder {
b.gasWanted = gasWanted
return b
}

// WithTx sets the transaction for the mempoolTx
func (b *MempoolTxBuilder) WithTx(tx types.Tx) *MempoolTxBuilder {
b.tx = tx
return b
}

func (b *MempoolTxBuilder) WithSender(sender p2p.ID) *MempoolTxBuilder {
b.senders = append(b.senders, sender)
return b
}

// Build creates the final mempoolTx instance
func (b *MempoolTxBuilder) Build() MempoolTx {
memTx := &mempoolTx{
height: b.height,
gasWanted: b.gasWanted,
tx: b.tx,
}
for _, sender := range b.senders {
memTx.senders.Store(sender, struct{}{})
}
return memTx
}
Loading
Loading