Skip to content
This repository was archived by the owner on Apr 18, 2024. It is now read-only.

Commit a335720

Browse files
author
Alan Shaw
authored
refactor: modularise (#8)
1 parent 1570977 commit a335720

File tree

4 files changed

+388
-356
lines changed

4 files changed

+388
-356
lines changed

client.go

Lines changed: 0 additions & 356 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,19 @@ package w3s
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"io"
87
"io/fs"
98
"net/http"
10-
"time"
119

12-
"github.com/alanshaw/go-carbites"
13-
"github.com/filecoin-project/go-address"
1410
bserv "github.com/ipfs/go-blockservice"
1511
"github.com/ipfs/go-cid"
1612
ds "github.com/ipfs/go-datastore"
1713
dssync "github.com/ipfs/go-datastore/sync"
1814
blockstore "github.com/ipfs/go-ipfs-blockstore"
19-
"github.com/ipfs/go-merkledag"
20-
"github.com/ipfs/ipfs-cluster/api"
21-
"github.com/ipld/go-car"
22-
peer "github.com/libp2p/go-libp2p-core/peer"
23-
"github.com/web3-storage/go-w3s-client/adder"
2415
w3http "github.com/web3-storage/go-w3s-client/http"
2516
)
2617

27-
const targetChunkSize = 1024 * 1024 * 10
28-
const iso8601 = "2006-01-02T15:04:05Z0700"
29-
3018
// Client is a HTTP API client to the web3.storage service.
3119
type Client interface {
3220
Get(context.Context, cid.Cid) (*w3http.Web3Response, error)
@@ -35,194 +23,6 @@ type Client interface {
3523
Status(context.Context, cid.Cid) (*Status, error)
3624
}
3725

38-
type PinStatus int
39-
40-
const (
41-
PinStatusPinned = PinStatus(api.TrackerStatusPinned)
42-
PinStatusPinning = PinStatus(api.TrackerStatusPinning)
43-
PinStatusPinQueued = PinStatus(api.TrackerStatusPinQueued)
44-
)
45-
46-
func (s PinStatus) String() string {
47-
if s == PinStatusPinned {
48-
return "Pinned"
49-
}
50-
if s == PinStatusPinning {
51-
return "Pinning"
52-
}
53-
if s == PinStatusPinQueued {
54-
return "PinQueued"
55-
}
56-
return "Unknown"
57-
}
58-
59-
type Pin struct {
60-
PeerID peer.ID
61-
PeerName string
62-
Region string
63-
Status PinStatus
64-
Updated time.Time
65-
}
66-
67-
type pinJson struct {
68-
PeerID string `json:"peerId"`
69-
PeerName string `json:"peerName"`
70-
Region string `json:"region"`
71-
Status string `json:"status"`
72-
Updated string `json:"updated"`
73-
}
74-
75-
func (p *Pin) UnmarshalJSON(b []byte) error {
76-
var raw pinJson
77-
err := json.Unmarshal(b, &raw)
78-
if err != nil {
79-
return err
80-
}
81-
p.PeerID, err = peer.Decode(raw.PeerID)
82-
if err != nil {
83-
return err
84-
}
85-
p.PeerName = raw.PeerName
86-
p.Region = raw.Region
87-
if raw.Status == "Pinned" {
88-
p.Status = PinStatusPinned
89-
} else if raw.Status == "Pinning" {
90-
p.Status = PinStatusPinning
91-
} else if raw.Status == "PinQueued" {
92-
p.Status = PinStatusPinQueued
93-
} else {
94-
return fmt.Errorf("unknown pin status: %s", raw.Status)
95-
}
96-
return nil
97-
}
98-
99-
type DealStatus int
100-
101-
const (
102-
DealStatusQueued DealStatus = iota
103-
DealStatusPublished
104-
DealStatusActive
105-
)
106-
107-
func (s DealStatus) String() string {
108-
return []string{"Queued", "Published", "Active"}[s]
109-
}
110-
111-
type Deal struct {
112-
DealID uint64
113-
StorageProvider address.Address
114-
Status DealStatus
115-
PieceCid cid.Cid
116-
DataCid cid.Cid
117-
DataModelSelector string
118-
Activation time.Time
119-
Created time.Time
120-
Updated time.Time
121-
}
122-
123-
type dealJson struct {
124-
DealID uint64 `json:"dealId,omitempty"`
125-
StorageProvider string `json:"storageProvider,omitempty"`
126-
Status string `json:"status"`
127-
PieceCid string `json:"pieceCid,omitempty"`
128-
DataCid string `json:"dataCid,omitempty"`
129-
DataModelSelector string `json:"dataModelSelector,omitempty"`
130-
Activation string `json:"activation,omitempty"`
131-
Created string `json:"created"`
132-
Updated string `json:"updated"`
133-
}
134-
135-
func (d *Deal) UnmarshalJSON(b []byte) error {
136-
var raw dealJson
137-
err := json.Unmarshal(b, &raw)
138-
if err != nil {
139-
return err
140-
}
141-
d.DealID = raw.DealID
142-
d.StorageProvider, err = address.NewFromString(raw.StorageProvider)
143-
if err != nil {
144-
return err
145-
}
146-
if raw.Status == "Queued" {
147-
d.Status = DealStatusQueued
148-
} else if raw.Status == "Published" {
149-
d.Status = DealStatusPublished
150-
} else if raw.Status == "Active" {
151-
d.Status = DealStatusActive
152-
} else {
153-
return fmt.Errorf("unknown deal status: %s", raw.Status)
154-
}
155-
if raw.PieceCid != "" {
156-
d.PieceCid, err = cid.Parse(raw.PieceCid)
157-
if err != nil {
158-
return err
159-
}
160-
} else {
161-
d.PieceCid = cid.Undef
162-
}
163-
if raw.DataCid != "" {
164-
d.DataCid, err = cid.Parse(raw.DataCid)
165-
if err != nil {
166-
return err
167-
}
168-
} else {
169-
d.DataCid = cid.Undef
170-
}
171-
d.DataModelSelector = raw.DataModelSelector
172-
if raw.Activation != "" {
173-
d.Activation, err = time.Parse(iso8601, raw.Activation)
174-
if err != nil {
175-
return err
176-
}
177-
}
178-
d.Created, err = time.Parse(iso8601, raw.Created)
179-
if err != nil {
180-
return err
181-
}
182-
d.Updated, err = time.Parse(iso8601, raw.Updated)
183-
if err != nil {
184-
return err
185-
}
186-
return nil
187-
}
188-
189-
// Status is IPFS pin and Filecoin deal status for a given CID.
190-
type Status struct {
191-
Cid cid.Cid
192-
DagSize uint64
193-
Created time.Time
194-
Pins []Pin
195-
Deals []Deal
196-
}
197-
198-
type statusJson struct {
199-
Cid string `json:"cid"`
200-
DagSize uint64 `json:"dagSize"`
201-
Created string `json:"created"`
202-
Pins []Pin `json:"pins"`
203-
Deals []Deal `json:"deals"`
204-
}
205-
206-
func (s *Status) UnmarshalJSON(b []byte) error {
207-
var raw statusJson
208-
err := json.Unmarshal(b, &raw)
209-
if err != nil {
210-
return err
211-
}
212-
s.Cid, err = cid.Parse(raw.Cid)
213-
if err != nil {
214-
return err
215-
}
216-
s.DagSize = raw.DagSize
217-
s.Created, err = time.Parse(iso8601, raw.Created)
218-
if err != nil {
219-
return err
220-
}
221-
s.Pins = raw.Pins
222-
s.Deals = raw.Deals
223-
return nil
224-
}
225-
22626
type clientConfig struct {
22727
token string
22828
endpoint string
@@ -257,159 +57,3 @@ func NewClient(options ...Option) (Client, error) {
25757
}
25858
return &c, nil
25959
}
260-
261-
// TODO: retry
262-
func (c *client) sendCar(ctx context.Context, r io.Reader) (cid.Cid, error) {
263-
req, err := http.NewRequestWithContext(ctx, "POST", c.cfg.endpoint+"/car", r)
264-
if err != nil {
265-
return cid.Undef, err
266-
}
267-
req.Header.Add("Content-Type", "application/car")
268-
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", c.cfg.token))
269-
req.Header.Add("X-Client", "web3.storage/go")
270-
res, err := c.hc.Do(req)
271-
if err != nil {
272-
return cid.Undef, err
273-
}
274-
if res.StatusCode != 200 {
275-
return cid.Undef, fmt.Errorf("unexpected response status: %d", res.StatusCode)
276-
}
277-
d := json.NewDecoder(res.Body)
278-
var out struct {
279-
Cid string `json:"cid"`
280-
}
281-
err = d.Decode(&out)
282-
if err != nil {
283-
return cid.Undef, err
284-
}
285-
return cid.Parse(out.Cid)
286-
}
287-
288-
func (c *client) Get(ctx context.Context, cid cid.Cid) (*w3http.Web3Response, error) {
289-
req, err := http.NewRequest("GET", fmt.Sprintf("%s/car/%s", c.cfg.endpoint, cid), nil)
290-
if err != nil {
291-
return nil, err
292-
}
293-
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", c.cfg.token))
294-
req.Header.Add("X-Client", "web3.storage/go")
295-
res, err := c.hc.Do(req)
296-
return w3http.NewWeb3Response(res, c.bsvc), err
297-
}
298-
299-
type putConfig struct {
300-
fsys fs.FS
301-
dirname string
302-
}
303-
304-
// Put uploads files to Web3.Storage. The file argument can be a single file or
305-
// a directory. If a directory is passed and the directory does NOT implement
306-
// fs.ReadDirFile then the WithDirname option should be passed (or the current
307-
// process working directory will be used).
308-
func (c *client) Put(ctx context.Context, file fs.File, options ...PutOption) (cid.Cid, error) {
309-
var cfg putConfig
310-
for _, opt := range options {
311-
if err := opt(&cfg); err != nil {
312-
return cid.Undef, err
313-
}
314-
}
315-
316-
info, err := file.Stat()
317-
if err != nil {
318-
return cid.Undef, err
319-
}
320-
321-
dag := merkledag.NewDAGService(c.bsvc)
322-
dagFmtr, err := adder.NewAdder(ctx, dag)
323-
if err != nil {
324-
return cid.Undef, err
325-
}
326-
327-
root, err := dagFmtr.Add(file, cfg.dirname, cfg.fsys)
328-
if err != nil {
329-
return cid.Undef, err
330-
}
331-
332-
// If file is a dir, do not wrap in another.
333-
if info.IsDir() {
334-
mr, err := dagFmtr.MfsRoot()
335-
if err != nil {
336-
return cid.Undef, err
337-
}
338-
rdir := mr.GetDirectory()
339-
cdir, err := rdir.Child(info.Name())
340-
if err != nil {
341-
return cid.Undef, err
342-
}
343-
cnode, err := cdir.GetNode()
344-
if err != nil {
345-
return cid.Undef, err
346-
}
347-
root = cnode.Cid()
348-
}
349-
350-
// fmt.Println("root CID", root)
351-
352-
carReader, carWriter := io.Pipe()
353-
354-
go func() {
355-
err = car.WriteCar(ctx, dag, []cid.Cid{root}, carWriter)
356-
if err != nil {
357-
carWriter.CloseWithError(err)
358-
return
359-
}
360-
carWriter.Close()
361-
}()
362-
363-
return c.PutCar(ctx, carReader)
364-
}
365-
366-
// PutCar uploads a CAR (Content Addressable Archive) to Web3.Storage.
367-
func (c *client) PutCar(ctx context.Context, car io.Reader) (cid.Cid, error) {
368-
spltr, err := carbites.Split(car, targetChunkSize, carbites.Treewalk)
369-
if err != nil {
370-
return cid.Undef, err
371-
}
372-
373-
var root cid.Cid
374-
for {
375-
r, err := spltr.Next()
376-
if err != nil {
377-
if err == io.EOF {
378-
break
379-
}
380-
return cid.Undef, err
381-
}
382-
383-
// TODO: concurrency
384-
c, err := c.sendCar(ctx, r)
385-
if err != nil {
386-
return cid.Undef, err
387-
}
388-
root = c
389-
}
390-
391-
return root, nil
392-
}
393-
394-
func (c *client) Status(ctx context.Context, cid cid.Cid) (*Status, error) {
395-
req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/%s", c.cfg.endpoint, cid), nil)
396-
if err != nil {
397-
return nil, err
398-
}
399-
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", c.cfg.token))
400-
req.Header.Add("X-Client", "web3.storage/go")
401-
res, err := c.hc.Do(req)
402-
if err != nil {
403-
return nil, err
404-
}
405-
if res.StatusCode != 200 {
406-
return nil, fmt.Errorf("unexpected response status: %d", res.StatusCode)
407-
}
408-
var s Status
409-
d := json.NewDecoder(res.Body)
410-
err = d.Decode(&s)
411-
if err != nil {
412-
return nil, err
413-
}
414-
return &s, nil
415-
}

0 commit comments

Comments
 (0)