Skip to content

Commit 97f4744

Browse files
authored
@peter/refactor (#122)
* refactor so sdk looks more like sdk * refactor test cases * remove sha3
1 parent f23b079 commit 97f4744

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+5586
-1449
lines changed

.github/workflows/go.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ jobs:
1414
unit-test:
1515
runs-on: ubuntu-latest
1616
steps:
17-
- uses: actions/checkout@v4
17+
- uses: actions/checkout@v5
1818

1919
- name: Set up Go
2020
uses: actions/setup-go@v4
2121
with:
22-
go-version: '1.21.5'
22+
go-version: '1.22.5'
2323

2424
- name: Build
2525
run: go build ./...

.github/workflows/tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
swap-storage: true
3030

3131
- name: Checkout sources
32-
uses: actions/checkout@v4
32+
uses: actions/checkout@v5
3333
with:
3434
submodules: recursive
3535

@@ -49,10 +49,10 @@ jobs:
4949
- name: Build
5050
run: go build
5151

52-
- name: Set up Python 3.9
52+
- name: Set up Python 3.11.9
5353
uses: actions/setup-python@v4
5454
with:
55-
python-version: '3.9'
55+
python-version: '3.11.9'
5656
cache: 'pip'
5757

5858
- name: Install dependencies

cmd/upload.go

Lines changed: 34 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@ import (
1111
"github.com/0gfoundation/0g-storage-client/common/blockchain"
1212
"github.com/0gfoundation/0g-storage-client/core"
1313
"github.com/0gfoundation/0g-storage-client/indexer"
14-
"github.com/0gfoundation/0g-storage-client/node"
1514
"github.com/0gfoundation/0g-storage-client/transfer"
1615
"github.com/ethereum/go-ethereum/common"
1716
"github.com/ethereum/go-ethereum/common/hexutil"
18-
"github.com/openweb3/web3go"
19-
"github.com/pkg/errors"
2017
"github.com/sirupsen/logrus"
2118
"github.com/spf13/cobra"
2219
)
@@ -170,94 +167,55 @@ func upload(*cobra.Command, []string) {
170167
}
171168
defer file.Close()
172169

173-
uploader, closer, err := newUploader(ctx, file.NumSegments(), uploadArgs, w3client, opt)
174-
if err != nil {
175-
logrus.WithError(err).Fatal("Failed to initialize uploader")
176-
}
177-
defer closer()
178-
uploader.WithRoutines(uploadArgs.routines)
179-
180-
_, roots, err := uploader.SplitableUpload(ctx, file, uploadArgs.fragmentSize, opt)
181-
if err != nil && opt.FullTrusted {
182-
logrus.WithError(err).Fatal("Failed to upload file with full trusted nodes")
183-
}
184-
185-
if err != nil && !opt.FullTrusted {
186-
logrus.WithError(err).Warn("Upload with full trusted nodes failed, retrying with all full trusted nodes")
187-
opt.FullTrusted = true
188-
fullUploader, fullCloser, err := newUploader(ctx, file.NumSegments(), uploadArgs, w3client, opt)
170+
var roots []common.Hash
171+
if uploadArgs.indexer != "" {
172+
indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{
173+
ProviderOption: providerOption,
174+
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
175+
Routines: uploadArgs.routines,
176+
Contract: &transfer.ContractAddress{
177+
FlowAddress: uploadArgs.flowAddress,
178+
MarketAddress: uploadArgs.marketAddress,
179+
},
180+
})
189181
if err != nil {
190-
logrus.WithError(err).Fatal("Failed to initialize uploader")
182+
logrus.WithError(err).Fatal("Failed to initialize indexer client")
191183
}
192-
defer fullCloser()
193-
fullUploader.WithRoutines(uploadArgs.routines)
194-
_, roots, err = fullUploader.SplitableUpload(ctx, file, uploadArgs.fragmentSize, opt)
184+
defer indexerClient.Close()
185+
186+
_, roots, err = indexerClient.SplitableUpload(ctx, w3client, file, uploadArgs.fragmentSize, opt)
195187
if err != nil {
196-
logrus.WithError(err).Fatal("Failed to upload file with full trusted nodes")
188+
logrus.WithError(err).Fatal("Failed to upload file")
197189
}
198-
}
199-
200-
if len(roots) == 1 {
201-
logrus.Infof("file uploaded, root = %v", roots[0])
202190
} else {
203-
s := make([]string, len(roots))
204-
for i, root := range roots {
205-
s[i] = root.String()
206-
}
207-
logrus.Infof("file uploaded in %v fragments, roots = %v", len(roots), strings.Join(s, ","))
208-
}
209-
}
210-
211-
func newUploader(ctx context.Context, segNum uint64, args uploadArgument, w3client *web3go.Client, opt transfer.UploadOption) (*transfer.Uploader, func(), error) {
212-
var contractConfig *transfer.UploaderContractConfig
213-
if args.flowAddress != "" {
214-
if !common.IsHexAddress(args.flowAddress) {
215-
return nil, nil, errors.Errorf("invalid flow address: %s", args.flowAddress)
216-
}
217-
flowAddr := common.HexToAddress(args.flowAddress)
218-
contractConfig = &transfer.UploaderContractConfig{
219-
FlowAddress: &flowAddr,
220-
}
221-
if args.marketAddress != "" {
222-
if !common.IsHexAddress(args.marketAddress) {
223-
return nil, nil, errors.Errorf("invalid market address: %s", args.marketAddress)
224-
}
225-
marketAddr := common.HexToAddress(args.marketAddress)
226-
contractConfig.MarketAddress = &marketAddr
227-
}
228-
} else if args.marketAddress != "" {
229-
return nil, nil, errors.New("market-address requires flow-address")
230-
}
231-
232-
if args.indexer != "" {
233-
indexerClient, err := indexer.NewClient(args.indexer, indexer.IndexerClientOption{
191+
uploader, closer, err := transfer.NewUploaderFromConfig(ctx, w3client, transfer.UploaderConfig{
192+
Nodes: uploadArgs.node,
234193
ProviderOption: providerOption,
235194
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
195+
Contact: &transfer.ContractAddress{
196+
FlowAddress: uploadArgs.flowAddress,
197+
MarketAddress: uploadArgs.marketAddress,
198+
},
199+
Routines: uploadArgs.routines,
236200
})
237201
if err != nil {
238-
return nil, nil, errors.WithMessage(err, "failed to initialize indexer client")
202+
logrus.WithError(err).Fatal("Failed to initialize uploader")
239203
}
204+
defer closer()
240205

241-
up, err := indexerClient.NewUploaderFromIndexerNodesWithContractConfig(ctx, segNum, w3client, opt.ExpectedReplica, nil, opt.Method, opt.FullTrusted, contractConfig)
206+
_, roots, err = uploader.SplitableUpload(ctx, file, uploadArgs.fragmentSize, opt)
242207
if err != nil {
243-
return nil, nil, err
208+
logrus.WithError(err).Fatal("Failed to upload file")
244209
}
245-
246-
return up, indexerClient.Close, nil
247210
}
248211

249-
clients := node.MustNewZgsClients(args.node, nil, providerOption)
250-
closer := func() {
251-
for _, client := range clients {
252-
client.Close()
212+
if len(roots) == 1 {
213+
logrus.Infof("file uploaded, root = %v", roots[0])
214+
} else {
215+
s := make([]string, len(roots))
216+
for i, root := range roots {
217+
s[i] = root.String()
253218
}
219+
logrus.Infof("file uploaded in %v fragments, roots = %v", len(roots), strings.Join(s, ","))
254220
}
255-
256-
up, err := transfer.NewUploaderWithContractConfig(ctx, w3client, &transfer.SelectedNodes{Trusted: clients}, contractConfig, zg_common.LogOption{Logger: logrus.StandardLogger()})
257-
if err != nil {
258-
closer()
259-
return nil, nil, err
260-
}
261-
262-
return up, closer, nil
263221
}

cmd/upload_dir.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cmd
33
import (
44
"context"
55

6+
zg_common "github.com/0gfoundation/0g-storage-client/common"
67
"github.com/0gfoundation/0g-storage-client/common/blockchain"
78
"github.com/0gfoundation/0g-storage-client/transfer"
89
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -55,12 +56,20 @@ func uploadDir(*cobra.Command, []string) {
5556
FullTrusted: uploadDirArgs.fullTrusted,
5657
}
5758

58-
uploader, closer, err := newUploader(ctx, 0, uploadDirArgs, w3client, opt)
59+
uploader, closer, err := transfer.NewUploaderFromConfig(ctx, w3client, transfer.UploaderConfig{
60+
Nodes: uploadDirArgs.node,
61+
ProviderOption: providerOption,
62+
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
63+
Contact: &transfer.ContractAddress{
64+
FlowAddress: uploadDirArgs.flowAddress,
65+
MarketAddress: uploadDirArgs.marketAddress,
66+
},
67+
Routines: uploadDirArgs.routines,
68+
})
5969
if err != nil {
6070
logrus.WithError(err).Fatal("Failed to initialize uploader")
6171
}
6272
defer closer()
63-
uploader.WithRoutines(uploadArgs.routines)
6473

6574
txnHash, rootHash, err := uploader.UploadDir(ctx, uploadDirArgs.file, opt)
6675
if err != nil {

gateway/local_apis.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,11 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) {
123123
return nil, api.ErrValidation.WithData("node index out of bound")
124124
}
125125

126-
uploader, err := transfer.NewUploader(context.Background(), nil, &transfer.SelectedNodes{Trusted: []*node.ZgsClient{allClients[input.Node]}}, zg_common.LogOption{Logger: logrus.StandardLogger()})
126+
uploader, err := transfer.NewUploaderWithContractConfig(context.Background(), nil, &transfer.SelectedNodes{
127+
Trusted: []*node.ZgsClient{allClients[input.Node]},
128+
}, transfer.UploaderConfig{
129+
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
130+
})
127131
if err != nil {
128132
return nil, errors.WithMessage(err, "Failed to create uploader")
129133
}

indexer/client.go

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,22 @@ type IndexerClientOption struct {
3838
ProviderOption providers.Option
3939
LogOption common.LogOption // log option when uploading data
4040
FullTrusted bool // whether to use full trusted nodes
41+
Routines int // number of routines for uploader
42+
Contract *transfer.ContractAddress
4143
}
4244

4345
// NewClient create new indexer client, url is indexer service url
44-
func NewClient(url string, option ...IndexerClientOption) (*Client, error) {
45-
var opt IndexerClientOption
46-
if len(option) > 0 {
47-
opt = option[0]
48-
}
46+
func NewClient(url string, option IndexerClientOption) (*Client, error) {
4947

50-
client, err := rpc.NewClient(url, opt.ProviderOption)
48+
client, err := rpc.NewClient(url, option.ProviderOption)
5149
if err != nil {
5250
return nil, err
5351
}
5452

5553
return &Client{
5654
Client: client,
57-
option: opt,
58-
logger: common.NewLogger(opt.LogOption),
55+
option: option,
56+
logger: common.NewLogger(option.LogOption),
5957
}, nil
6058
}
6159

@@ -116,41 +114,69 @@ func (c *Client) SelectNodes(ctx context.Context, expectedReplica uint, dropped
116114
}, nil
117115
}
118116

119-
// NewUploaderFromIndexerNodes return an uploader with selected storage nodes from indexer service.
120-
func (c *Client) NewUploaderFromIndexerNodes(ctx context.Context, segNum uint64, w3Client *web3go.Client, expectedReplica uint, dropped []string, method string, fullTrusted bool) (*transfer.Uploader, error) {
121-
return c.NewUploaderFromIndexerNodesWithContractConfig(ctx, segNum, w3Client, expectedReplica, dropped, method, fullTrusted, nil)
122-
}
123-
124117
// NewUploaderFromIndexerNodesWithContractConfig returns an uploader with selected storage nodes and optional contract config.
125-
func (c *Client) NewUploaderFromIndexerNodesWithContractConfig(ctx context.Context, segNum uint64, w3Client *web3go.Client, expectedReplica uint, dropped []string, method string, fullTrusted bool, contractConfig *transfer.UploaderContractConfig) (*transfer.Uploader, error) {
118+
func (c *Client) NewUploaderFromIndexerNodes(ctx context.Context, segNum uint64, w3Client *web3go.Client, expectedReplica uint, dropped []string, method string, fullTrusted bool) (*transfer.Uploader, error) {
126119
selected, err := c.SelectNodes(ctx, expectedReplica, dropped, method, fullTrusted)
127120
if err != nil {
128121
return nil, err
129122
}
130123

131124
c.logger.Infof("get storage nodes from indexer (trusted: %v, discovered: %v)", len(selected.Trusted), len(selected.Discovered))
132-
return transfer.NewUploaderWithContractConfig(ctx, w3Client, selected, contractConfig, c.option.LogOption)
125+
uploader, err := transfer.NewUploaderWithContractConfig(ctx, w3Client, selected, transfer.UploaderConfig{
126+
Routines: c.option.Routines,
127+
LogOption: c.option.LogOption,
128+
Contact: c.option.Contract,
129+
})
130+
if err != nil {
131+
return nil, err
132+
}
133+
return uploader, nil
133134
}
134135

135-
// Upload submit data to 0g storage contract, then transfer the data to the storage nodes selected from indexer service.
136-
func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core.IterableData, option ...transfer.UploadOption) (eth_common.Hash, error) {
137-
expectedReplica := uint(1)
136+
// SplitableUpload submits data and retries on node errors. If FullTrusted is false,
137+
// it tries once and falls back to full trusted nodes.
138+
func (c *Client) SplitableUpload(ctx context.Context, w3Client *web3go.Client, data core.IterableData, fragmentSize int64, option ...transfer.UploadOption) ([]eth_common.Hash, []eth_common.Hash, error) {
139+
var opt transfer.UploadOption
138140
if len(option) > 0 {
139-
expectedReplica = max(expectedReplica, option[0].ExpectedReplica)
141+
opt = option[0]
140142
}
143+
expectedReplica := max(uint(1), opt.ExpectedReplica)
144+
maxRetry := opt.NRetries
145+
if maxRetry <= 0 {
146+
maxRetry = 3
147+
}
148+
141149
dropped := make([]string, 0)
150+
attempts := 0
151+
142152
for {
143-
uploader, err := c.NewUploaderFromIndexerNodes(ctx, data.NumSegments(), w3Client, expectedReplica, dropped, option[0].Method, option[0].FullTrusted)
153+
uploader, err := c.NewUploaderFromIndexerNodes(ctx, data.NumSegments(), w3Client, expectedReplica, dropped, opt.Method, opt.FullTrusted)
144154
if err != nil {
145-
return eth_common.Hash{}, err
155+
return nil, nil, err
156+
}
157+
158+
txHashes, roots, err := uploader.SplitableUpload(ctx, data, fragmentSize, opt)
159+
if err == nil {
160+
return txHashes, roots, nil
146161
}
147-
txHash, _, err := uploader.Upload(ctx, data, option...)
162+
163+
if !opt.FullTrusted {
164+
opt.FullTrusted = true
165+
c.logger.WithError(err).Warn("Upload failed, retrying with full trusted nodes")
166+
} else {
167+
attempts += 1
168+
}
169+
148170
var rpcError *node.RPCError
149171
if errors.As(err, &rpcError) {
150172
dropped = append(dropped, rpcError.URL)
151173
c.logger.Infof("dropped problematic node and retry: %v", rpcError.Error())
152174
} else {
153-
return txHash, err
175+
c.logger.WithError(err).Warn("Upload failed, retrying")
176+
}
177+
178+
if attempts >= maxRetry {
179+
return txHashes, roots, err
154180
}
155181
}
156182
}

kv/batcher.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) (co
5151
}
5252

5353
// upload file
54-
uploader, err := transfer.NewUploader(ctx, b.w3Client, b.clients, zg_common.LogOption{Logger: b.logger})
54+
uploader, err := transfer.NewUploaderWithContractConfig(ctx, b.w3Client, b.clients, transfer.UploaderConfig{
55+
LogOption: zg_common.LogOption{Logger: b.logger},
56+
})
5557
if err != nil {
5658
return common.Hash{}, err
5759
}

requirements.txt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
jsonrpcclient==4.0.3
22
pyyaml==6.0.1
3-
pysha3==1.0.2
4-
coincurve==18.0.0
5-
eth-utils==3.0.0
6-
py-ecc==7.0.0
7-
web3==6.14.0
3+
pycryptodome==3.20.0
4+
coincurve==21.0.0
5+
eth-utils==5.0.0
6+
py-ecc==8.0.0
7+
web3==7.14.0
88
eth_tester
9-
cffi==1.16.0
10-
rtoml==0.10.0
9+
cffi==2.0.0
10+
rtoml==0.13.0

0 commit comments

Comments
 (0)