Skip to content

Commit 9439f95

Browse files
wangdayong228Pana
andauthored
Optimize bulk sender (#196)
* optimize bulk sender * fill estimate result for correct estimated txs * merge master to optimize-bulk-sender (#192) * Update README.md * remove offset&limit from LogFilter (#189) * update changelog * add util for sign tx by private key (#191) Co-authored-by: Pana <pana.wang@outlook.com> * handle every action error in nil case after bulk execute * set nonce after estimate to avoid setting nonce of tx estimated error * remove * populate could use pendingNonce or txNonce * support specify if use tx pending nonce when bulk populate * use map to enable new internal contracts for multi client * add test * fix typo * fix typo Co-authored-by: Pana <pana.wang@outlook.com>
1 parent fd589e9 commit 9439f95

File tree

9 files changed

+244
-53
lines changed

9 files changed

+244
-53
lines changed

cfxclient/bulk/bulk_sender.go

Lines changed: 87 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,39 @@ import (
1414
// BulkSender used for bulk send unsigned tranactions in one request to improve efficiency,
1515
// it will auto populate missing fields and nonce of unsigned transactions in queue before send.
1616
type BulkSender struct {
17-
signalbeCaller sdk.ClientOperator
18-
unsignedTxs []*types.UnsignedTransaction
17+
signableCaller sdk.ClientOperator
18+
unsignedTxs []*types.UnsignedTransaction
19+
bulkEstimateErrors *ErrBulkEstimate
20+
isPopulated bool
1921
}
2022

21-
// NewBuckSender creates new bulk sender instance
22-
func NewBuckSender(signableClient sdk.Client) *BulkSender {
23+
// NewBulkSender creates new bulk sender instance
24+
func NewBulkSender(signableClient sdk.Client) *BulkSender {
2325
return &BulkSender{
24-
signalbeCaller: &signableClient,
26+
signableCaller: &signableClient,
2527
}
2628
}
2729

2830
// AppendTransaction append unsigned transaction to queue
29-
func (b *BulkSender) AppendTransaction(tx types.UnsignedTransaction) *BulkSender {
30-
b.unsignedTxs = append(b.unsignedTxs, &tx)
31+
func (b *BulkSender) AppendTransaction(tx *types.UnsignedTransaction) *BulkSender {
32+
b.unsignedTxs = append(b.unsignedTxs, tx)
3133
return b
3234
}
3335

3436
// PopulateTransactions fill missing fields and nonce for unsigned transactions in queue
35-
func (b *BulkSender) PopulateTransactions() error {
37+
// default use pending nonce
38+
func (b *BulkSender) PopulateTransactions(usePendingNonce ...bool) ([]*types.UnsignedTransaction, error) {
39+
isUsePendingNonce := true
40+
if len(usePendingNonce) > 0 {
41+
isUsePendingNonce = usePendingNonce[0]
42+
}
43+
3644
defaultAccount, chainID, networkId, gasPrice, epochHeight, err := b.getChainInfos()
3745
if err != nil {
38-
return errors.WithStack(err)
46+
return nil, errors.WithStack(err)
3947
}
4048
if defaultAccount == nil {
41-
return errors.Wrap(err, "failed to pupulate, no account found")
49+
return nil, errors.Wrap(err, "failed to pupulate, no account found")
4250
}
4351

4452
for _, utx := range b.unsignedTxs {
@@ -47,14 +55,22 @@ func (b *BulkSender) PopulateTransactions() error {
4755
}
4856
}
4957

58+
estimateErrs, err := b.populateGasAndStorage()
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
// set nonce
5064
userUsedNoncesMap := b.gatherUsedNonces()
51-
// fill nonce
52-
userNextNonceCache, err := b.gatherInitNextNonces()
65+
userNextNonceCache, err := b.gatherInitNextNonces(isUsePendingNonce)
5366
if err != nil {
54-
return errors.WithStack(err)
67+
return nil, errors.WithStack(err)
5568
}
69+
for i, utx := range b.unsignedTxs {
70+
if estimateErrs != nil && (*estimateErrs)[i] != nil {
71+
continue
72+
}
5673

57-
for _, utx := range b.unsignedTxs {
5874
utx.From.CompleteByNetworkID(networkId)
5975
utx.To.CompleteByNetworkID(networkId)
6076

@@ -87,12 +103,19 @@ func (b *BulkSender) PopulateTransactions() error {
87103

88104
}
89105
}
90-
return b.populateGasAndStorage()
106+
107+
// return results, estimatErrs
108+
b.isPopulated = true
109+
if estimateErrs != nil {
110+
b.bulkEstimateErrors = estimateErrs
111+
return b.unsignedTxs, b.bulkEstimateErrors
112+
}
113+
return b.unsignedTxs, nil
91114
}
92115

93-
func (b *BulkSender) populateGasAndStorage() error {
116+
func (b *BulkSender) populateGasAndStorage() (*ErrBulkEstimate, error) {
94117
estimatPtrs, errPtrs := make([]*types.Estimate, len(b.unsignedTxs)), make([]*error, len(b.unsignedTxs))
95-
bulkCaller := NewBulkCaller(b.signalbeCaller)
118+
bulkCaller := NewBulkCaller(b.signableCaller)
96119
for i, utx := range b.unsignedTxs {
97120
if utx.StorageLimit != nil && utx.Gas != nil {
98121
continue
@@ -105,16 +128,26 @@ func (b *BulkSender) populateGasAndStorage() error {
105128

106129
err := bulkCaller.Execute()
107130
if err != nil {
108-
return errors.WithStack(err)
131+
return nil, errors.WithStack(err)
132+
}
133+
134+
estimateErrors := ErrBulkEstimate{}
135+
for i, e := range errPtrs {
136+
// not estimate because of both StorageLimit and Gas have values
137+
if e == nil || *e == nil {
138+
continue
139+
}
140+
estimateErrors[i] = &ErrEstimate{*e}
109141
}
110142

111143
for i, utx := range b.unsignedTxs {
112-
if utx.StorageLimit != nil && utx.Gas != nil {
144+
145+
if _, ok := estimateErrors[i]; ok {
113146
continue
114147
}
115148

116-
if *errPtrs[i] != nil {
117-
return errors.WithMessagef(*errPtrs[i], "failed to estimate %vth transaction %+v", i, utx)
149+
if utx.StorageLimit != nil && utx.Gas != nil {
150+
continue
118151
}
119152

120153
if utx.Gas == nil {
@@ -125,7 +158,11 @@ func (b *BulkSender) populateGasAndStorage() error {
125158
utx.StorageLimit = types.NewUint64(estimatPtrs[i].StorageCollateralized.ToInt().Uint64())
126159
}
127160
}
128-
return nil
161+
162+
if len(estimateErrors) > 0 {
163+
return &estimateErrors, nil
164+
}
165+
return nil, nil
129166
}
130167

131168
func (b *BulkSender) gatherUsedNonces() map[string]map[string]bool {
@@ -142,10 +179,10 @@ func (b *BulkSender) gatherUsedNonces() map[string]map[string]bool {
142179
return result
143180
}
144181

145-
func (b *BulkSender) gatherInitNextNonces() (map[string]*big.Int, error) {
182+
func (b *BulkSender) gatherInitNextNonces(usePendingNonce bool) (map[string]*big.Int, error) {
146183
result := make(map[string]*big.Int)
147184

148-
bulkCaller := NewBulkCaller(b.signalbeCaller)
185+
bulkCaller := NewBulkCaller(b.signableCaller)
149186
isUserCached := make(map[string]bool)
150187
poolNextNonces, poolNextNonceErrs := make(map[string]*hexutil.Big), make(map[string]*error)
151188
nextNonces, nextNonceErrs := make(map[string]*hexutil.Big), make(map[string]*error)
@@ -169,7 +206,7 @@ func (b *BulkSender) gatherInitNextNonces() (map[string]*big.Int, error) {
169206
continue
170207
}
171208

172-
if *poolNextNonceErrs[user] == nil {
209+
if *poolNextNonceErrs[user] == nil && usePendingNonce {
173210
result[utx.From.String()] = poolNextNonces[user].ToInt()
174211
continue
175212
}
@@ -200,7 +237,7 @@ func (b *BulkSender) getChainInfos() (
200237
epochHeight *hexutil.Uint64,
201238
err error,
202239
) {
203-
_client := b.signalbeCaller
240+
_client := b.signableCaller
204241

205242
_defaultAccount, err := _client.GetAccountManager().GetDefault()
206243
if err != nil {
@@ -232,40 +269,52 @@ func (b *BulkSender) getChainInfos() (
232269
// Clear clear batch elems and errors in queue for new bulk call action
233270
func (b *BulkSender) Clear() {
234271
b.unsignedTxs = b.unsignedTxs[:0]
272+
b.isPopulated = false
273+
}
274+
275+
func (b *BulkSender) IsPopulated() bool {
276+
return b.isPopulated
235277
}
236278

237279
// SignAndSend signs and sends all unsigned transactions in queue by rpc call "batch" on one request
238280
// and returns the result of sending transactions.
239-
// If there is any error on rpc "batch", it will be returned with batchErr not nil.
281+
// If there is any error on rpc "batch", it will be returned with err not nil.
240282
// If there is no error on rpc "batch", it will return the txHashes or txErrors of sending transactions.
241-
func (b *BulkSender) SignAndSend() (txHashes []*types.Hash, txErrors []error, batchErr error) {
283+
func (b *BulkSender) SignAndSend() (txHashes []*types.Hash, txErrors []error, err error) {
284+
if !b.IsPopulated() {
285+
_, err := b.PopulateTransactions()
286+
if err != nil {
287+
return nil, nil, err
288+
}
289+
}
290+
242291
rawTxs := make([][]byte, len(b.unsignedTxs))
243292

244293
for i, utx := range b.unsignedTxs {
245294
var err error
246-
rawTxs[i], err = b.signalbeCaller.GetAccountManager().SignTransaction(*utx)
295+
rawTxs[i], err = b.signableCaller.GetAccountManager().SignTransaction(*utx)
247296
if err != nil {
248297
return nil, nil, errors.Wrapf(err, "failed to encode the %vth transaction: %+v", i, utx)
249298
}
250299
}
251300

252301
// send
253-
bulkCaller := NewBulkCaller(b.signalbeCaller)
302+
bulkCaller := NewBulkCaller(b.signableCaller)
254303
hashes := make([]*types.Hash, len(rawTxs))
255-
errs := make([]*error, len(rawTxs))
304+
txErrs := make([]*error, len(rawTxs))
256305
for i, rawTx := range rawTxs {
257-
hashes[i], errs[i] = bulkCaller.Cfx().SendRawTransaction(rawTx)
306+
hashes[i], txErrs[i] = bulkCaller.Cfx().SendRawTransaction(rawTx)
258307
}
259308

260-
batchErr = bulkCaller.Execute()
261-
if batchErr != nil {
262-
return nil, nil, errors.Wrapf(batchErr, "failed to batch send transactions")
309+
err = bulkCaller.Execute()
310+
if err != nil {
311+
return nil, nil, errors.Wrapf(err, "failed to batch send transactions")
263312
}
264313

265-
errorVals := make([]error, len(errs))
266-
for i, err := range errs {
314+
errorVals := make([]error, len(txErrs))
315+
for i, err := range txErrs {
267316
errorVals[i] = *err
268317
}
269318

270-
return hashes, errorVals, batchErr
319+
return hashes, errorVals, err
271320
}

cfxclient/bulk/bulk_sender_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package bulk
2+
3+
import (
4+
"fmt"
5+
"math/big"
6+
"testing"
7+
8+
client "github.com/Conflux-Chain/go-conflux-sdk"
9+
sdk "github.com/Conflux-Chain/go-conflux-sdk"
10+
"github.com/Conflux-Chain/go-conflux-sdk/types"
11+
"github.com/Conflux-Chain/go-conflux-sdk/types/cfxaddress"
12+
"github.com/Conflux-Chain/go-conflux-sdk/utils"
13+
"github.com/status-im/keycard-go/hexutils"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func _initClinetForTest() *sdk.Client {
18+
_client, err := client.NewClient("https://test.confluxrpc.com", client.ClientOption{
19+
KeystorePath: "keystore",
20+
})
21+
if err != nil {
22+
panic(err)
23+
}
24+
if len(_client.AccountManager.List()) == 0 {
25+
_client.AccountManager.Create("")
26+
}
27+
return _client
28+
}
29+
30+
// nil, nil and will err, nil, nil and will err, nil => 0, nil, 1, nil, 2
31+
// nil, 3 , nil, nil, nil => 0, 3, 1, 2, 4
32+
func TestNonceCorrectWhenBulkSendPopulate(t *testing.T) {
33+
34+
bulkSender := NewBulkSender(*_initClinetForTest())
35+
36+
user := cfxaddress.MustNew("cfxtest:aaskvgxcfej371g4ecepx9an78ngpejvcekupe69t3")
37+
// value := types.NewBigInt(10000)
38+
usdt := cfxaddress.MustNew("cfxtest:acepe88unk7fvs18436178up33hb4zkuf62a9dk1gv")
39+
dtatOfTransfer1000 := hexutils.HexToBytes("32f289cf00000000000000000000000088c27bd05a7a58bafed6797efa0cce4e1d55302f")
40+
41+
bulkSender.
42+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil}). // correct
43+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &usdt, dtatOfTransfer1000}). // fail
44+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil}). // correct
45+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &usdt, dtatOfTransfer1000}). // fail
46+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil}) // correct
47+
populated, err := bulkSender.PopulateTransactions(false)
48+
fmt.Printf("%v\n", utils.PrettyJSON(populated))
49+
fmt.Printf("error %+v\n", err)
50+
assert.True(t, err != nil)
51+
assert.True(t, populated != nil)
52+
assert.True(t, populated[0].Nonce.ToInt().Cmp(big.NewInt(0)) == 0)
53+
assert.True(t, populated[1].Nonce == nil)
54+
assert.True(t, populated[2].Nonce.ToInt().Cmp(big.NewInt(1)) == 0)
55+
assert.True(t, populated[3].Nonce == nil)
56+
assert.True(t, populated[4].Nonce.ToInt().Cmp(big.NewInt(2)) == 0)
57+
58+
bulkSender.Clear()
59+
bulkSender.
60+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil}).
61+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user, Nonce: types.NewBigInt(3)}, &usdt, nil}).
62+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil}).
63+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil}).
64+
AppendTransaction(&types.UnsignedTransaction{types.UnsignedTransactionBase{From: &user}, &user, nil})
65+
populated, err = bulkSender.PopulateTransactions(false)
66+
fmt.Printf("%v\n", utils.PrettyJSON(populated))
67+
fmt.Printf("error %+v\n", err)
68+
assert.True(t, err != nil)
69+
assert.True(t, populated != nil)
70+
assert.True(t, populated[0].Nonce.ToInt().Cmp(big.NewInt(0)) == 0)
71+
assert.True(t, populated[1].Nonce.ToInt().Cmp(big.NewInt(3)) == 0)
72+
assert.True(t, populated[2].Nonce.ToInt().Cmp(big.NewInt(1)) == 0)
73+
assert.True(t, populated[3].Nonce.ToInt().Cmp(big.NewInt(2)) == 0)
74+
assert.True(t, populated[4].Nonce.ToInt().Cmp(big.NewInt(4)) == 0)
75+
}

cfxclient/bulk/errors.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package bulk
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
)
7+
8+
type ErrBulkEstimate map[int]*ErrEstimate
9+
10+
func (e ErrBulkEstimate) Error() string {
11+
msgs := []string{}
12+
for k, v := range e {
13+
msg := fmt.Sprintf("%v: %v", k, v.Error())
14+
msgs = append(msgs, msg)
15+
}
16+
return strings.Join(msgs, "\n")
17+
}
18+
19+
type ErrEstimate struct {
20+
Inner error
21+
}
22+
23+
func (e ErrEstimate) Error() string {
24+
return fmt.Sprintf("estimate error: %v", e.Inner.Error())
25+
}

changeLog.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
Go-conflux-sdk Change Log
22
============
3-
v1.4.1
4-
------------
5-
- Support cfx_getParamsFromVote
63

74
v1.4.0
85
------------

contract_meta/internal_contract/admin_control.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@ type AdminControl struct {
1616
sdk.Contract
1717
}
1818

19-
var adminControl *AdminControl
19+
var adminControlMap sync.Map
2020
var adminControlMu sync.Mutex
2121

2222
// NewAdminControl gets the AdminControl contract object
2323
func NewAdminControl(client sdk.ClientOperator) (ac AdminControl, err error) {
24-
if adminControl == nil {
24+
netId, err := client.GetNetworkID()
25+
if err != nil {
26+
return AdminControl{}, err
27+
}
28+
val, ok := adminControlMap.Load(netId)
29+
if !ok {
2530
adminControlMu.Lock()
2631
defer adminControlMu.Unlock()
2732
abi := getAdminControlAbi()
@@ -33,9 +38,11 @@ func NewAdminControl(client sdk.ClientOperator) (ac AdminControl, err error) {
3338
if e != nil {
3439
return ac, errors.Wrap(e, "failed to new admin control contract")
3540
}
36-
adminControl = &AdminControl{Contract: *contract}
41+
42+
val = AdminControl{Contract: *contract}
43+
adminControlMap.Store(netId, val)
3744
}
38-
return *adminControl, nil
45+
return val.(AdminControl), nil
3946
}
4047

4148
// Destroy destroies contract `contractAddr`.

0 commit comments

Comments
 (0)