Skip to content

Commit 2713706

Browse files
authored
Merge pull request #454 from fairDataSociety/staging
Staging
2 parents 80361a1 + 66b719e commit 2713706

File tree

17 files changed

+341
-123
lines changed

17 files changed

+341
-123
lines changed

cmd/dfs/cmd/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ can consume it.`,
206206

207207
ctx, cancel := context.WithCancel(cmd.Context())
208208
defer cancel()
209-
// datadir will be removed in some future version. it is kept for migration purpose only
210-
hdlr, err := api.New(ctx, beeApi, cookieDomain, postageBlockId, corsOrigins, ensConfig, subscriptionConfig, logger)
209+
210+
hdlr, err := api.New(ctx, beeApi, cookieDomain, postageBlockId, corsOrigins, ensConfig, nil, logger)
211211
if err != nil {
212212
logger.Error(err.Error())
213213
return err

pkg/blockstore/bee/client.go

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@ const (
4646
healthUrl = "/health"
4747
chunkUploadDownloadUrl = "/chunks"
4848
bytesUploadDownloadUrl = "/bytes"
49+
bzzUrl = "/bzz"
4950
tagsUrl = "/tags"
5051
pinsUrl = "/pins/"
5152
_ = pinsUrl
5253
swarmPinHeader = "Swarm-Pin"
5354
swarmEncryptHeader = "Swarm-Encrypt"
5455
swarmPostageBatchId = "Swarm-Postage-Batch-Id"
5556
//swarmDeferredUploadHeader = "Swarm-Deferred-Upload"
56-
swarmTagHeader = "Swarm-Tag"
57+
swarmTagHeader = "Swarm-Tag"
58+
contentTypeHeader = "Content-Type"
5759
)
5860

5961
// Client is a bee http client that satisfies blockstore.Client
@@ -462,6 +464,110 @@ func (s *Client) DownloadBlob(address []byte) ([]byte, int, error) {
462464
return respData, response.StatusCode, nil
463465
}
464466

467+
// UploadBzz uploads a file through bzz api
468+
func (s *Client) UploadBzz(data []byte, fileName string) (address []byte, err error) {
469+
to := time.Now()
470+
471+
fullUrl := s.url + bzzUrl + "?name=" + fileName
472+
req, err := http.NewRequest(http.MethodPost, fullUrl, bytes.NewBuffer(data))
473+
if err != nil {
474+
return nil, err
475+
}
476+
477+
req.Header.Set(swarmPostageBatchId, s.postageBlockId)
478+
req.Header.Set(contentTypeHeader, "application/json")
479+
480+
response, err := s.client.Do(req)
481+
if err != nil {
482+
return nil, err
483+
}
484+
defer response.Body.Close()
485+
486+
req.Close = true
487+
488+
respData, err := io.ReadAll(response.Body)
489+
if err != nil {
490+
return nil, errors.New("error downloading bzz")
491+
}
492+
493+
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated {
494+
var beeErr *beeError
495+
err = json.Unmarshal(respData, &beeErr)
496+
if err != nil {
497+
return nil, errors.New(string(respData))
498+
}
499+
return nil, errors.New(beeErr.Message)
500+
}
501+
502+
var resp bytesPostResponse
503+
err = json.Unmarshal(respData, &resp)
504+
505+
fields := logrus.Fields{
506+
"reference": resp.Reference.String(),
507+
"size": len(respData),
508+
"duration": time.Since(to).String(),
509+
}
510+
s.logger.WithFields(fields).Log(logrus.DebugLevel, "upload bzz: ")
511+
512+
// add the data and ref if it is not in cache
513+
if !s.inBlockCache(s.downloadBlockCache, resp.Reference.String()) {
514+
s.addToBlockCache(s.downloadBlockCache, resp.Reference.String(), data)
515+
}
516+
return resp.Reference.Bytes(), nil
517+
}
518+
519+
// DownloadBzz downloads bzz data from the Swarm network.
520+
func (s *Client) DownloadBzz(address []byte) ([]byte, int, error) {
521+
to := time.Now()
522+
523+
// return the data if this address is already in cache
524+
addrString := swarm.NewAddress(address).String()
525+
if s.inBlockCache(s.downloadBlockCache, addrString) {
526+
return s.getFromBlockCache(s.downloadBlockCache, addrString), 200, nil
527+
}
528+
529+
fullUrl := s.url + bzzUrl + "/" + addrString
530+
req, err := http.NewRequest(http.MethodGet, fullUrl, http.NoBody)
531+
if err != nil {
532+
return nil, http.StatusNotFound, err
533+
}
534+
535+
response, err := s.client.Do(req)
536+
if err != nil {
537+
return nil, http.StatusNotFound, err
538+
}
539+
defer response.Body.Close()
540+
541+
req.Close = true
542+
543+
respData, err := io.ReadAll(response.Body)
544+
if err != nil {
545+
return nil, response.StatusCode, errors.New("error downloading bzz")
546+
}
547+
548+
if response.StatusCode != http.StatusOK {
549+
var beeErr *beeError
550+
err = json.Unmarshal(respData, &beeErr)
551+
if err != nil {
552+
return nil, response.StatusCode, errors.New(string(respData))
553+
}
554+
return nil, response.StatusCode, errors.New(beeErr.Message)
555+
}
556+
557+
fields := logrus.Fields{
558+
"reference": addrString,
559+
"size": len(respData),
560+
"duration": time.Since(to).String(),
561+
}
562+
s.logger.WithFields(fields).Log(logrus.DebugLevel, "download bzz: ")
563+
564+
// add the data and ref if it is not in cache
565+
if !s.inBlockCache(s.downloadBlockCache, addrString) {
566+
s.addToBlockCache(s.downloadBlockCache, addrString, respData)
567+
}
568+
return respData, response.StatusCode, nil
569+
}
570+
465571
// DeleteReference unpins a reference so that it will be garbage collected by the Swarm network.
466572
func (s *Client) DeleteReference(address []byte) error {
467573
// TODO uncomment after unpinning is fixed

pkg/blockstore/bee/mock/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ func (m *BeeClient) DownloadBlob(address []byte) ([]byte, int, error) {
132132
return nil, http.StatusInternalServerError, fmt.Errorf("error downloading data")
133133
}
134134

135+
// UploadBzz downloads data to bzz api from the Swarm network.
136+
func (m *BeeClient) UploadBzz(_ []byte, _ string) ([]byte, error) {
137+
return nil, nil
138+
}
139+
140+
// DownloadBzz downloads bzz data from the Swarm network.
141+
func (m *BeeClient) DownloadBzz(_ []byte) ([]byte, int, error) {
142+
return nil, 0, nil
143+
}
144+
135145
// DeleteReference unpins chunk in swarm
136146
func (m *BeeClient) DeleteReference(address []byte) error {
137147
m.storerMu.Lock()

pkg/blockstore/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ type Client interface {
2828
UploadSOC(owner string, id string, signature string, data []byte) (address []byte, err error)
2929
UploadChunk(ch swarm.Chunk) (address []byte, err error)
3030
UploadBlob(data []byte, tag uint32, encrypt bool) (address []byte, err error)
31+
UploadBzz(data []byte, fileName string) (address []byte, err error)
3132
DownloadChunk(ctx context.Context, address []byte) (data []byte, err error)
3233
DownloadBlob(address []byte) (data []byte, respCode int, err error)
34+
DownloadBzz(address []byte) (data []byte, respCode int, err error)
3335
DeleteReference(address []byte) error
3436
CreateTag(address []byte) (uint32, error)
3537
GetTag(tag uint32) (int64, int64, int64, error)

pkg/contracts/config.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,7 @@ func TestnetConfig() (*ENSConfig, *SubscriptionConfig) {
2525
PublicResolverAddress: "0xbfeCC6c32B224F7D0026ac86506Fe40A9607BD14",
2626
ProviderDomain: "fds",
2727
}
28-
29-
s := &SubscriptionConfig{
30-
DataHubAddress: "0x1949beB6CC2db0241Dd625dcaC09891DF5c3756b",
31-
}
32-
return e, s
28+
return e, nil
3329
}
3430

3531
// PlayConfig defines the configuration for fdp-play

pkg/contracts/datahub/Datahub.go

Lines changed: 63 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/dfs/pod_api.go

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ func (a *API) RequestSubscription(sessionId string, subHash [32]byte) error {
551551
if err != nil {
552552
return err
553553
}
554+
554555
return ui.GetPod().RequestSubscription(subHash, nameHash)
555556
}
556557

@@ -636,7 +637,7 @@ type SubscriptionInfo struct {
636637
}
637638

638639
// GetSubscriptions
639-
func (a *API) GetSubscriptions(sessionId string) ([]*SubscriptionInfo, error) {
640+
func (a *API) GetSubscriptions(sessionId string) ([]SubscriptionInfo, error) {
640641
// get the loggedin user information
641642
ui := a.users.GetLoggedInUserInfo(sessionId)
642643
if ui == nil {
@@ -647,26 +648,34 @@ func (a *API) GetSubscriptions(sessionId string) ([]*SubscriptionInfo, error) {
647648
return nil, errNilSubManager
648649
}
649650

650-
subscriptions, err := ui.GetPod().GetSubscriptions()
651+
nameHash, err := a.users.GetNameHash(ui.GetUserName())
651652
if err != nil {
652653
return nil, err
653654
}
654655

655-
subs := []*SubscriptionInfo{}
656-
for _, item := range subscriptions {
656+
subscriptions, err := ui.GetPod().GetSubscriptions(nameHash)
657+
if err != nil {
658+
return nil, err
659+
}
660+
661+
subs := make([]SubscriptionInfo, len(subscriptions))
662+
for i, item := range subscriptions {
657663
info, err := ui.GetPod().GetSubscribablePodInfo(item.SubHash)
658664
if err != nil {
659665
return subs, err
660666
}
661-
sub := &SubscriptionInfo{
667+
var infoLocation = make([]byte, 32)
668+
copy(infoLocation, item.UnlockKeyLocation[:])
669+
sub := SubscriptionInfo{
662670
SubHash: item.SubHash,
663671
PodName: info.PodName,
664672
PodAddress: info.PodAddress,
665-
InfoLocation: item.UnlockKeyLocation[:],
673+
InfoLocation: infoLocation,
666674
ValidTill: item.ValidTill.Int64(),
667675
Category: info.Category,
668676
}
669-
subs = append(subs, sub)
677+
678+
subs[i] = sub
670679
}
671680

672681
return subs, nil
@@ -678,31 +687,37 @@ func (a *API) GetSubscribablePodInfo(sessionId string, subHash [32]byte) (*rpc.S
678687
if ui == nil {
679688
return nil, ErrUserNotLoggedIn
680689
}
681-
690+
if a.sm == nil {
691+
return nil, errNilSubManager
692+
}
682693
return a.sm.GetSubscribablePodInfo(subHash)
683694
}
684695

685696
// OpenSubscribedPod
686-
func (a *API) OpenSubscribedPod(sessionId string, subHash [32]byte) (*pod.Info, error) {
697+
func (a *API) OpenSubscribedPod(sessionId string, subHash [32]byte, infoLocation string) (*pod.Info, error) {
698+
// get the loggedin user information
699+
ui := a.users.GetLoggedInUserInfo(sessionId)
700+
if ui == nil {
701+
return nil, ErrUserNotLoggedIn
702+
}
687703

704+
if a.sm == nil {
705+
return nil, errNilSubManager
706+
}
688707
sub, err := a.sm.GetSub(subHash)
689708
if err != nil {
690709
return nil, err
691710
}
692711

712+
subHashString := utils.Encode(subHash[:])
713+
693714
_, ownerPublicKey, err := a.users.GetUserInfoFromENS(sub.FdpSellerNameHash)
694715
if err != nil {
695716
return nil, err
696717
}
697718

698-
// get the loggedin user information
699-
ui := a.users.GetLoggedInUserInfo(sessionId)
700-
if ui == nil {
701-
return nil, ErrUserNotLoggedIn
702-
}
703-
704719
// open the pod
705-
pi, err := ui.GetPod().OpenSubscribedPod(subHash, ownerPublicKey)
720+
pi, err := ui.GetPod().OpenSubscribedPodFromReference(infoLocation, ownerPublicKey)
706721
if err != nil {
707722
return nil, err
708723
}
@@ -711,7 +726,7 @@ func (a *API) OpenSubscribedPod(sessionId string, subHash [32]byte) (*pod.Info,
711726
return nil, err
712727
}
713728
// Add podName in the login user session
714-
ui.AddPodName(pi.GetPodName(), pi)
729+
ui.AddPodName("0x"+subHashString, pi)
715730
return pi, nil
716731
}
717732

@@ -722,7 +737,9 @@ func (a *API) GetSubscribablePods(sessionId string) ([]datahub.DataHubSub, error
722737
if ui == nil {
723738
return nil, ErrUserNotLoggedIn
724739
}
725-
740+
if a.sm == nil {
741+
return nil, errNilSubManager
742+
}
726743
return ui.GetPod().GetMarketplace()
727744
}
728745

@@ -733,6 +750,8 @@ func (a *API) GetSubsRequests(sessionId string) ([]datahub.DataHubSubRequest, er
733750
if ui == nil {
734751
return nil, ErrUserNotLoggedIn
735752
}
736-
753+
if a.sm == nil {
754+
return nil, errNilSubManager
755+
}
737756
return ui.GetPod().GetSubRequests()
738757
}

pkg/dfs/user_api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (a *API) ConnectPortableAccountWithWallet(userName, passPhrase, addressHex,
8585
}
8686

8787
// LoginWithWallet is a controller function which calls the users login function.
88-
func (a *API) LoginWithWallet(addressHex, signature, sessionId string) (*user.Info, error) {
88+
func (a *API) LoginWithWallet(addressHex, signature, sessionId string) (*user.Info, string, error) {
8989
return a.users.LoginWithWallet(addressHex, signature, a.client, a.tm, a.sm, sessionId)
9090
}
9191

pkg/dir/ls.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (d *Directory) ListDir(dirNameWithPath, podPassword string) ([]Entry, []str
7575
lsTask := newLsTask(d, dirTopic, dirPath, podPassword, listEntries, mtx, wg)
7676
_, err := d.syncManager.Go(lsTask)
7777
if err != nil {
78-
return nil, nil, fmt.Errorf("list dir : %v", err)
78+
wg.Done()
7979
}
8080
} else if strings.HasPrefix(fileOrDirName, "_F_") {
8181
fileName := strings.TrimPrefix(fileOrDirName, "_F_")

pkg/dir/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (d *Directory) SyncDirectoryAsync(ctx context.Context, dirNameWithPath, pod
8585
syncTask := newSyncTask(d, filePath, podPassword, wg)
8686
_, err = d.syncManager.Go(syncTask)
8787
if err != nil { // skipcq: TCV-001
88-
return err
88+
wg.Done()
8989
}
9090
} else if strings.HasPrefix(fileOrDirName, "_D_") {
9191
dirName := strings.TrimPrefix(fileOrDirName, "_D_")

0 commit comments

Comments
 (0)