Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 95 additions & 125 deletions client/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ChainClient interface {
SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*txtypes.SimulateResponse, error)
AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error)

// Build signed tx with given accNum and accSeq, useful for offline siging
// If simulate is set to false, initialGas will be used
Expand Down Expand Up @@ -681,35 +682,6 @@ func (c *chainClient) GetAccount(ctx context.Context, address string) (*authtype
return res, err
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, msgs...)

if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed synchronously broadcast messages:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) GetFeeDiscountInfo(ctx context.Context, account string) (*exchangetypes.QueryFeeDiscountAccountInfoResponse, error) {
req := &exchangetypes.QueryFeeDiscountAccountInfoRequest{
Account: account,
Expand Down Expand Up @@ -746,36 +718,6 @@ func (c *chainClient) SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*t
return simRes, nil
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) BuildSignedTx(clientCtx client.Context, accNum, accSeq, initialGas uint64, msgs ...sdk.Msg) ([]byte, error) {
txf := NewTxFactory(clientCtx).WithSequence(accSeq).WithAccountNumber(accNum).WithGas(initialGas)
return c.buildSignedTx(clientCtx, txf, msgs...)
Expand Down Expand Up @@ -890,57 +832,23 @@ func (c *chainClient) AsyncBroadcastSignedTx(txBytes []byte) (*txtypes.Broadcast
func (c *chainClient) broadcastTx(
clientCtx client.Context,
txf tx.Factory,
await bool,
broadcastMode txtypes.BroadcastMode,
msgs ...sdk.Msg,
) (*txtypes.BroadcastTxResponse, error) {
) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
txBytes, err := c.buildSignedTx(clientCtx, txf, msgs...)
if err != nil {
err = errors.Wrap(err, "failed to build signed Tx")
return nil, err
return nil, nil, err
}

req := txtypes.BroadcastTxRequest{
TxBytes: txBytes,
Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC,
Mode: broadcastMode,
}

res, err := common.ExecuteCall(context.Background(), c.network.ChainCookieAssistant, c.txClient.BroadcastTx, &req)
if err != nil || res.TxResponse.Code != 0 || !await {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, txBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}
return &req, res, err

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue
Expand Down Expand Up @@ -970,37 +878,20 @@ func (c *chainClient) runBatchBroadcast() {
msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit)

submitBatch := func(toSubmit []sdk.Msg) {
c.syncMux.Lock()
defer c.syncMux.Unlock()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("broadcastTx with nonce", sequence)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to broadcast messages batch:", string(resJSON))
return
}
}
res, err := c.SyncBroadcastMsg(toSubmit...)

if res.TxResponse.Code != 0 {
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
log.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
if err != nil {
c.logger.WithError(err)
} else {
log.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
if res.TxResponse.Code != 0 {
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
c.logger.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
} else {
c.logger.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
}
}

log.Debugln("gas wanted: ", c.gasWanted)
c.logger.Debugln("gas wanted: ", c.gasWanted)
}

for {
Expand Down Expand Up @@ -2651,3 +2542,82 @@ func (c *chainClient) FetchVouchersForAddress(ctx context.Context, address strin
func (c *chainClient) GetNetwork() common.Network {
return c.network
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
req, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_SYNC, msgs...)

if err != nil || res.TxResponse.Code != 0 {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := c.ctx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, req.TxBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
_, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_ASYNC, msgs...)
return res, err
}

// BroadcastMsg submits a group of messages in one transaction to the chain
// The function uses the broadcast mode specified with the broadcastMode parameter
func (c *chainClient) BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
req, res, err := c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
c.logger.Debugln("retrying broadcastTx with nonce", sequence)
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, nil, err
}
}

return req, res, nil
}
Loading
Loading