Skip to content

Commit 9ec0bce

Browse files
committed
feat: implement blob/retrieve capability
1 parent afb0092 commit 9ec0bce

File tree

9 files changed

+400
-2
lines changed

9 files changed

+400
-2
lines changed

pkg/fx/retrieval/provider.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ var Module = fx.Module("retrieval",
1616
fx.Provide(
1717
fx.Annotate(
1818
NewRetrievalService,
19+
fx.As(new(ucan.BlobRetrievalService)),
1920
fx.As(new(ucan.SpaceContentRetrievalService)),
2021
),
2122
),

pkg/fx/retrieval/ucan/handlers/provider.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ var log = logging.Logger("retrieval/ucan")
2121

2222
var Module = fx.Module("retrieval/ucan/handlers",
2323
fx.Provide(
24+
fx.Annotate(
25+
ucan.BlobRetrieve,
26+
fx.ResultTags(`group:"ucan_retrieval_options"`),
27+
),
2428
fx.Annotate(
2529
ucan.SpaceContentRetrieve,
2630
fx.ResultTags(`group:"ucan_retrieval_options"`),

pkg/pdp/service/piece_read.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@ func (p *PDPService) ReadPiece(ctx context.Context, piece cid.Cid, options ...ty
2626
}
2727
}()
2828

29+
getOptions := []blobstore.GetOption{}
30+
if cfg.ByteRange.Start > 0 || cfg.ByteRange.End != nil {
31+
getOptions = append(getOptions, blobstore.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
32+
}
33+
2934
// TODO(forrest): Nice to have in follow on is attempting to map the `piece` arg to a PieceCIDV2, then
3035
// performing the query to blobstore with that CID. allowing the read pieces with the cid they allocated them using
31-
obj, err := p.blobstore.Get(ctx, piece.Hash(), blobstore.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
36+
obj, err := p.blobstore.Get(ctx, piece.Hash(), getOptions...)
3237

3338
if err != nil {
3439
if errors.Is(err, store.ErrNotFound) {

pkg/pdp/store/adapter/blobgetter.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ func (bga *BlobGetterAdapter) Get(ctx context.Context, digest multihash.Multihas
5252
if err != nil {
5353
return nil, fmt.Errorf("finding piece link for %s: %w", digestutil.Format(digest), err)
5454
}
55-
res, err := bga.pieceReader.ReadPiece(ctx, pieceLink.Link().(cidlink.Link).Cid, types.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
55+
readOptions := []types.ReadPieceOption{}
56+
if cfg.ByteRange.Start > 0 || cfg.ByteRange.End != nil {
57+
readOptions = append(readOptions, types.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
58+
}
59+
res, err := bga.pieceReader.ReadPiece(ctx, pieceLink.Link().(cidlink.Link).Cid, readOptions...)
5660
if err != nil {
5761
return nil, fmt.Errorf("reading piece: %w", err)
5862
}

pkg/service/retrieval/ucan.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
func NewUCANServer(retrievalService Service, options ...retrieval.Option) (server.ServerView[retrieval.Service], error) {
1010
options = append(
1111
options,
12+
ucan.BlobRetrieve(retrievalService),
1213
ucan.SpaceContentRetrieve(retrievalService),
1314
)
1415

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package ucan
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
9+
"github.com/storacha/go-libstoracha/capabilities/blob"
10+
"github.com/storacha/go-libstoracha/capabilities/space/content"
11+
"github.com/storacha/go-libstoracha/digestutil"
12+
"github.com/storacha/go-ucanto/core/invocation"
13+
"github.com/storacha/go-ucanto/core/receipt/fx"
14+
"github.com/storacha/go-ucanto/core/result"
15+
"github.com/storacha/go-ucanto/core/result/failure"
16+
"github.com/storacha/go-ucanto/did"
17+
"github.com/storacha/go-ucanto/principal"
18+
"github.com/storacha/go-ucanto/server"
19+
"github.com/storacha/go-ucanto/server/retrieval"
20+
"github.com/storacha/go-ucanto/ucan"
21+
"github.com/storacha/piri/pkg/store"
22+
"github.com/storacha/piri/pkg/store/blobstore"
23+
)
24+
25+
// InvalidResourceErrorName is the name given to an error where the resource did
26+
// not match the service DID.
27+
const InvalidResourceErrorName = "InvalidResource"
28+
29+
type BlobRetrievalService interface {
30+
ID() principal.Signer
31+
Blobs() blobstore.BlobGetter
32+
}
33+
34+
func BlobRetrieve(service BlobRetrievalService) retrieval.Option {
35+
return retrieval.WithServiceMethod(
36+
blob.RetrieveAbility,
37+
retrieval.Provide(
38+
blob.Retrieve,
39+
func(ctx context.Context, cap ucan.Capability[blob.RetrieveCaveats], inv invocation.Invocation, iCtx server.InvocationContext, request retrieval.Request) (result.Result[blob.RetrieveOk, failure.IPLDBuilderFailure], fx.Effects, retrieval.Response, error) {
40+
resource, err := did.Parse(cap.With())
41+
if err != nil {
42+
return nil, nil, retrieval.Response{}, fmt.Errorf("parsing resource DID: %w", err)
43+
}
44+
if resource != service.ID().DID() {
45+
return result.Error[blob.RetrieveOk, failure.IPLDBuilderFailure](blob.RetrieveError{
46+
ErrorName: InvalidResourceErrorName,
47+
Message: fmt.Sprintf("resource is %s not %s", resource, service.ID().DID()),
48+
}), nil, retrieval.Response{}, nil
49+
}
50+
51+
nb := cap.Nb()
52+
digest := nb.Blob.Digest
53+
digestStr := digestutil.Format(digest)
54+
55+
log := log.With(
56+
"client", inv.Issuer().DID().String(),
57+
"ability", blob.RetrieveAbility,
58+
"digest", digestStr,
59+
)
60+
61+
obj, err := service.Blobs().Get(ctx, digest)
62+
if err != nil {
63+
if errors.Is(err, store.ErrNotFound) {
64+
log.Debugw("blob not found", "status", http.StatusNotFound)
65+
notFoundErr := content.NewNotFoundError(fmt.Sprintf("blob not found: %s", digestStr))
66+
res := result.Error[blob.RetrieveOk, failure.IPLDBuilderFailure](notFoundErr)
67+
resp := retrieval.NewResponse(http.StatusNotFound, nil, nil)
68+
return res, nil, resp, nil
69+
}
70+
log.Errorw("getting blob", "error", err)
71+
return nil, nil, retrieval.Response{}, fmt.Errorf("getting blob: %w", err)
72+
}
73+
74+
res := result.Ok[blob.RetrieveOk, failure.IPLDBuilderFailure](blob.RetrieveOk{})
75+
status := http.StatusOK
76+
contentLength := obj.Size()
77+
headers := http.Header{}
78+
headers.Set("Content-Length", fmt.Sprintf("%d", contentLength))
79+
headers.Set("Content-Type", "application/octet-stream")
80+
headers.Set("Cache-Control", "public, max-age=29030400, immutable")
81+
headers.Set("Etag", fmt.Sprintf(`"%s"`, digestStr))
82+
headers.Set("Vary", "Accept-Encoding")
83+
log.Debugw("serving bytes", "status", status, "size", contentLength)
84+
resp := retrieval.NewResponse(status, headers, obj.Body())
85+
return res, nil, resp, nil
86+
},
87+
),
88+
)
89+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package ucan
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"net/http"
7+
"testing"
8+
9+
logging "github.com/ipfs/go-log/v2"
10+
"github.com/multiformats/go-multihash"
11+
blobcaps "github.com/storacha/go-libstoracha/capabilities/blob"
12+
"github.com/storacha/go-libstoracha/capabilities/space/content"
13+
"github.com/storacha/go-libstoracha/testutil"
14+
"github.com/storacha/go-ucanto/client"
15+
rclient "github.com/storacha/go-ucanto/client/retrieval"
16+
"github.com/storacha/go-ucanto/core/delegation"
17+
"github.com/storacha/go-ucanto/core/invocation"
18+
"github.com/storacha/go-ucanto/core/ipld"
19+
"github.com/storacha/go-ucanto/core/receipt"
20+
"github.com/storacha/go-ucanto/core/result"
21+
fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel"
22+
"github.com/storacha/go-ucanto/principal"
23+
"github.com/storacha/go-ucanto/server/retrieval"
24+
"github.com/storacha/go-ucanto/transport/headercar"
25+
"github.com/storacha/go-ucanto/ucan"
26+
"github.com/storacha/piri/pkg/store/blobstore"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
type blobRetrievalService struct {
31+
id principal.Signer
32+
blobs blobstore.BlobGetter
33+
}
34+
35+
func (brs *blobRetrievalService) ID() principal.Signer {
36+
return brs.id
37+
}
38+
39+
func (brs *blobRetrievalService) Blobs() blobstore.BlobGetter {
40+
return brs.blobs
41+
}
42+
43+
func TestBlobRetrieve(t *testing.T) {
44+
logging.SetLogLevel("retrieval/ucan", "DEBUG")
45+
alice := testutil.Alice
46+
proof, err := delegation.Delegate(
47+
testutil.Service,
48+
alice,
49+
[]ucan.Capability[ucan.NoCaveats]{
50+
ucan.NewCapability(
51+
blobcaps.RetrieveAbility,
52+
testutil.Service.DID().String(),
53+
ucan.NoCaveats{},
54+
),
55+
},
56+
)
57+
require.NoError(t, err)
58+
59+
randBytes := testutil.RandomBytes(t, 32)
60+
blob := struct {
61+
bytes []byte
62+
digest multihash.Multihash
63+
}{randBytes, testutil.MultihashFromBytes(t, randBytes)}
64+
65+
testCases := []struct {
66+
name string
67+
agent ucan.Signer
68+
resource ucan.Resource
69+
proof delegation.Delegation
70+
blobs [][]byte
71+
caveats blobcaps.RetrieveCaveats
72+
expectStatus int
73+
expectHeaders http.Header
74+
expectBody []byte
75+
assertError func(ipld.Node)
76+
}{
77+
{
78+
name: "not found when missing blob",
79+
agent: alice,
80+
resource: testutil.Service.DID().String(),
81+
proof: proof,
82+
blobs: [][]byte{},
83+
caveats: blobcaps.RetrieveCaveats{
84+
Blob: blobcaps.Blob{Digest: blob.digest},
85+
},
86+
expectStatus: http.StatusNotFound,
87+
expectBody: []byte{},
88+
assertError: func(n ipld.Node) {
89+
x, err := ipld.Rebind[content.NotFoundError](n, content.NotFoundErrorType())
90+
require.NoError(t, err)
91+
require.Equal(t, content.NotFoundErrorName, x.Name())
92+
},
93+
},
94+
{
95+
name: "bad proof",
96+
agent: alice,
97+
resource: testutil.Service.DID().String(),
98+
proof: testutil.Must(
99+
delegation.Delegate(
100+
testutil.Bob,
101+
alice,
102+
[]ucan.Capability[ucan.NoCaveats]{
103+
ucan.NewCapability(
104+
blobcaps.RetrieveAbility,
105+
testutil.Service.DID().String(),
106+
ucan.NoCaveats{},
107+
),
108+
},
109+
),
110+
)(t),
111+
blobs: [][]byte{blob.bytes},
112+
caveats: blobcaps.RetrieveCaveats{
113+
Blob: blobcaps.Blob{Digest: blob.digest},
114+
},
115+
expectStatus: http.StatusOK,
116+
expectBody: []byte{},
117+
assertError: func(n ipld.Node) {
118+
x, err := ipld.Rebind[fdm.FailureModel](n, fdm.FailureType())
119+
require.NoError(t, err)
120+
require.Equal(t, "Unauthorized", *x.Name)
121+
},
122+
},
123+
{
124+
name: "wrong resource",
125+
agent: alice,
126+
resource: testutil.Mallory.DID().String(),
127+
proof: testutil.Must(
128+
delegation.Delegate(
129+
testutil.Mallory,
130+
alice,
131+
[]ucan.Capability[ucan.NoCaveats]{
132+
ucan.NewCapability(
133+
blobcaps.RetrieveAbility,
134+
testutil.Mallory.DID().String(),
135+
ucan.NoCaveats{},
136+
),
137+
},
138+
),
139+
)(t),
140+
blobs: [][]byte{blob.bytes},
141+
caveats: blobcaps.RetrieveCaveats{
142+
Blob: blobcaps.Blob{Digest: blob.digest},
143+
},
144+
expectStatus: http.StatusOK,
145+
expectBody: []byte{},
146+
assertError: func(n ipld.Node) {
147+
x, err := ipld.Rebind[fdm.FailureModel](n, fdm.FailureType())
148+
require.NoError(t, err)
149+
require.Equal(t, InvalidResourceErrorName, *x.Name)
150+
},
151+
},
152+
}
153+
154+
for _, tc := range testCases {
155+
t.Run(tc.name, func(t *testing.T) {
156+
blobs := blobstore.NewMapBlobstore()
157+
for _, b := range tc.blobs {
158+
digest, err := multihash.Sum(b, multihash.SHA2_256, -1)
159+
require.NoError(t, err)
160+
err = blobs.Put(t.Context(), digest, uint64(len(b)), bytes.NewReader(b))
161+
require.NoError(t, err)
162+
}
163+
164+
service := blobRetrievalService{testutil.Service, blobs}
165+
server, err := retrieval.NewServer(testutil.Service, BlobRetrieve(&service))
166+
require.NoError(t, err)
167+
168+
inv, err := invocation.Invoke(
169+
tc.agent,
170+
testutil.Service,
171+
blobcaps.Retrieve.New(tc.resource, tc.caveats),
172+
delegation.WithProof(delegation.FromDelegation(tc.proof)),
173+
)
174+
require.NoError(t, err)
175+
176+
codecOpt := client.WithOutboundCodec(headercar.NewOutboundCodec())
177+
conn, err := client.NewConnection(testutil.Service, server, codecOpt)
178+
require.NoError(t, err)
179+
180+
xres, hres, err := rclient.Execute(t.Context(), inv, conn)
181+
require.NoError(t, err)
182+
183+
require.Equal(t, tc.expectStatus, hres.Status())
184+
for k, v := range tc.expectHeaders {
185+
require.Equal(t, v, hres.Headers().Values(k))
186+
}
187+
require.Equal(t, tc.expectBody, testutil.Must(io.ReadAll(hres.Body()))(t))
188+
189+
rcptLink, ok := xres.Get(inv.Link())
190+
require.True(t, ok)
191+
192+
rcpt, err := receipt.NewAnyReceiptReader().Read(rcptLink, xres.Blocks())
193+
require.NoError(t, err)
194+
195+
_, x := result.Unwrap(rcpt.Out())
196+
if tc.assertError != nil {
197+
tc.assertError(x)
198+
} else {
199+
require.Nil(t, x)
200+
}
201+
})
202+
}
203+
}

pkg/service/retrieval/ucan/space_content_retrieve.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func SpaceContentRetrieve(retrievalService SpaceContentRetrievalService) retriev
4848

4949
log := log.With(
5050
"client", inv.Issuer().DID().String(),
51+
"ability", content.RetrieveAbility,
5152
"space", space.String(),
5253
"digest", digestStr,
5354
"range", fmt.Sprintf("%d-%d", start, end),

0 commit comments

Comments
 (0)