Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 95 additions & 125 deletions client/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ChainClient interface {
SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*txtypes.SimulateResponse, error)
AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error)

// Build signed tx with given accNum and accSeq, useful for offline siging
// If simulate is set to false, initialGas will be used
Expand Down Expand Up @@ -681,35 +682,6 @@ func (c *chainClient) GetAccount(ctx context.Context, address string) (*authtype
return res, err
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, msgs...)

if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed synchronously broadcast messages:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) GetFeeDiscountInfo(ctx context.Context, account string) (*exchangetypes.QueryFeeDiscountAccountInfoResponse, error) {
req := &exchangetypes.QueryFeeDiscountAccountInfoRequest{
Account: account,
Expand Down Expand Up @@ -746,36 +718,6 @@ func (c *chainClient) SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*t
return simRes, nil
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) BuildSignedTx(clientCtx client.Context, accNum, accSeq, initialGas uint64, msgs ...sdk.Msg) ([]byte, error) {
txf := NewTxFactory(clientCtx).WithSequence(accSeq).WithAccountNumber(accNum).WithGas(initialGas)
return c.buildSignedTx(clientCtx, txf, msgs...)
Expand Down Expand Up @@ -890,57 +832,23 @@ func (c *chainClient) AsyncBroadcastSignedTx(txBytes []byte) (*txtypes.Broadcast
func (c *chainClient) broadcastTx(
clientCtx client.Context,
txf tx.Factory,
await bool,
broadcastMode txtypes.BroadcastMode,
msgs ...sdk.Msg,
) (*txtypes.BroadcastTxResponse, error) {
) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
txBytes, err := c.buildSignedTx(clientCtx, txf, msgs...)
if err != nil {
err = errors.Wrap(err, "failed to build signed Tx")
return nil, err
return nil, nil, err
}

req := txtypes.BroadcastTxRequest{
TxBytes: txBytes,
Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC,
Mode: broadcastMode,
}

res, err := common.ExecuteCall(context.Background(), c.network.ChainCookieAssistant, c.txClient.BroadcastTx, &req)
if err != nil || res.TxResponse.Code != 0 || !await {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, txBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}
return &req, res, err

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue
Expand Down Expand Up @@ -970,37 +878,20 @@ func (c *chainClient) runBatchBroadcast() {
msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit)

submitBatch := func(toSubmit []sdk.Msg) {
c.syncMux.Lock()
defer c.syncMux.Unlock()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("broadcastTx with nonce", sequence)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to broadcast messages batch:", string(resJSON))
return
}
}
res, err := c.SyncBroadcastMsg(toSubmit...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle the error returned by SyncBroadcastMsg

At line 881, the error returned by c.SyncBroadcastMsg(toSubmit...) is assigned to err but not used. Ignoring errors can lead to silent failures and unexpected behaviors. It's important to check and handle errors appropriately.

Apply this diff to handle the error:

 func (c *chainClient) runBatchBroadcast() {
     // ... existing code ...

     submitBatch := func(toSubmit []sdk.Msg) {
-        res, err := c.SyncBroadcastMsg(toSubmit...)
+        res, err := c.SyncBroadcastMsg(toSubmit...)

+        if err != nil {
+            c.logger.WithError(err).Errorln("failed to broadcast messages batch")
+            return
+        }

         if res.TxResponse.Code != 0 {
             err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
             log.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: pre-commit

[error] 881-881: Ineffectual assignment to err variable. The error value is assigned but never used.


if res.TxResponse.Code != 0 {
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
log.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
if err != nil {
c.logger.WithError(err)
} else {
log.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
if res.TxResponse.Code != 0 {
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
c.logger.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
} else {
c.logger.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
}
Comment on lines +881 to +891
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check for nil response before accessing fields

In the submitBatch function within runBatchBroadcast, there is a risk of a nil pointer dereference if res is nil when res.TxResponse.Code is accessed. Ensure that res and res.TxResponse are not nil before accessing their fields.

Apply this diff to prevent potential panic:

if err != nil {
    c.logger.WithError(err).Errorln("failed to broadcast messages batch")
    return
} else {
-   if res.TxResponse.Code != 0 {
+   if res == nil || res.TxResponse == nil || res.TxResponse.Code != 0 {
        err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
        c.logger.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
    } else {
        c.logger.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
    }
}

Committable suggestion skipped: line range outside the PR's diff.

}

log.Debugln("gas wanted: ", c.gasWanted)
c.logger.Debugln("gas wanted: ", c.gasWanted)
}

for {
Expand Down Expand Up @@ -2651,3 +2542,82 @@ func (c *chainClient) FetchVouchersForAddress(ctx context.Context, address strin
func (c *chainClient) GetNetwork() common.Network {
return c.network
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
req, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_SYNC, msgs...)

if err != nil || res.TxResponse.Code != 0 {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := c.ctx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, req.TxBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
_, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_ASYNC, msgs...)
return res, err
}

// BroadcastMsg submits a group of messages in one transaction to the chain
// The function uses the broadcast mode specified with the broadcastMode parameter
func (c *chainClient) BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
req, res, err := c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
c.logger.Debugln("retrying broadcastTx with nonce", sequence)
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, nil, err
}
}

return req, res, nil
}
Loading
Loading