diff --git a/pkg/fx/principalresolver/provider.go b/pkg/fx/principalresolver/provider.go index 504f7e05..f0a3b19f 100644 --- a/pkg/fx/principalresolver/provider.go +++ b/pkg/fx/principalresolver/provider.go @@ -2,6 +2,7 @@ package principalresolver import ( "fmt" + "strings" "time" "github.com/storacha/go-ucanto/did" @@ -27,10 +28,14 @@ var Module = fx.Module("principalresolver", func NewPrincipalResolver(cfg app.AppConfig) (validator.PrincipalResolver, error) { services := make([]did.DID, 0, 2) if idxSvc := cfg.UCANService.Services.Indexer.Connection; idxSvc != nil { - services = append(services, idxSvc.ID().DID()) + if strings.HasPrefix(idxSvc.ID().DID().String(), "did:web:") { + services = append(services, idxSvc.ID().DID()) + } } if uplSvc := cfg.UCANService.Services.Upload.Connection; uplSvc != nil { - services = append(services, uplSvc.ID().DID()) + if strings.HasPrefix(uplSvc.ID().DID().String(), "did:web:") { + services = append(services, uplSvc.ID().DID()) + } } hr, err := principalresolver.NewHTTPResolver(services) if err != nil { diff --git a/pkg/fx/retrieval/provider.go b/pkg/fx/retrieval/provider.go index c611746e..fef778d8 100644 --- a/pkg/fx/retrieval/provider.go +++ b/pkg/fx/retrieval/provider.go @@ -16,6 +16,7 @@ var Module = fx.Module("retrieval", fx.Provide( fx.Annotate( NewRetrievalService, + fx.As(new(ucan.BlobRetrievalService)), fx.As(new(ucan.SpaceContentRetrievalService)), ), ), diff --git a/pkg/fx/retrieval/ucan/handlers/provider.go b/pkg/fx/retrieval/ucan/handlers/provider.go index 4742cb99..4585cdcb 100644 --- a/pkg/fx/retrieval/ucan/handlers/provider.go +++ b/pkg/fx/retrieval/ucan/handlers/provider.go @@ -21,6 +21,10 @@ var log = logging.Logger("retrieval/ucan") var Module = fx.Module("retrieval/ucan/handlers", fx.Provide( + fx.Annotate( + ucan.BlobRetrieve, + fx.ResultTags(`group:"ucan_retrieval_options"`), + ), fx.Annotate( ucan.SpaceContentRetrieve, fx.ResultTags(`group:"ucan_retrieval_options"`), diff --git a/pkg/pdp/service/piece_read.go b/pkg/pdp/service/piece_read.go index edc2c948..cb9413da 100644 --- a/pkg/pdp/service/piece_read.go +++ b/pkg/pdp/service/piece_read.go @@ -26,9 +26,14 @@ func (p *PDPService) ReadPiece(ctx context.Context, piece cid.Cid, options ...ty } }() + var getOptions []blobstore.GetOption + if cfg.ByteRange.Start > 0 || cfg.ByteRange.End != nil { + getOptions = append(getOptions, blobstore.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End)) + } + // TODO(forrest): Nice to have in follow on is attempting to map the `piece` arg to a PieceCIDV2, then // performing the query to blobstore with that CID. allowing the read pieces with the cid they allocated them using - obj, err := p.blobstore.Get(ctx, piece.Hash(), blobstore.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End)) + obj, err := p.blobstore.Get(ctx, piece.Hash(), getOptions...) if err != nil { if errors.Is(err, store.ErrNotFound) { diff --git a/pkg/pdp/store/adapter/blobgetter.go b/pkg/pdp/store/adapter/blobgetter.go index 2cc057be..4caf58ea 100644 --- a/pkg/pdp/store/adapter/blobgetter.go +++ b/pkg/pdp/store/adapter/blobgetter.go @@ -52,7 +52,11 @@ func (bga *BlobGetterAdapter) Get(ctx context.Context, digest multihash.Multihas if err != nil { return nil, fmt.Errorf("finding piece link for %s: %w", digestutil.Format(digest), err) } - res, err := bga.pieceReader.ReadPiece(ctx, pieceLink.Link().(cidlink.Link).Cid, types.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End)) + var readOptions []types.ReadPieceOption + if cfg.ByteRange.Start > 0 || cfg.ByteRange.End != nil { + readOptions = append(readOptions, types.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End)) + } + res, err := bga.pieceReader.ReadPiece(ctx, pieceLink.Link().(cidlink.Link).Cid, readOptions...) if err != nil { return nil, fmt.Errorf("reading piece: %w", err) } diff --git a/pkg/service/retrieval/handlers/spacecontent/retrieve.go b/pkg/service/retrieval/handlers/spacecontent/retrieve.go new file mode 100644 index 00000000..fd9af49b --- /dev/null +++ b/pkg/service/retrieval/handlers/spacecontent/retrieve.go @@ -0,0 +1,87 @@ +package spacecontent + +import ( + "context" + "errors" + "fmt" + "net/http" + + logging "github.com/ipfs/go-log/v2" + "github.com/multiformats/go-multihash" + "github.com/storacha/go-libstoracha/capabilities/space/content" + "github.com/storacha/go-libstoracha/digestutil" + "github.com/storacha/go-ucanto/core/invocation" + "github.com/storacha/go-ucanto/core/result" + "github.com/storacha/go-ucanto/core/result/failure" + "github.com/storacha/go-ucanto/server/retrieval" + "github.com/storacha/piri/pkg/store" + "github.com/storacha/piri/pkg/store/blobstore" +) + +var log = logging.Logger("retrieval/handlers/spacecontent") + +func Retrieve( + ctx context.Context, + blobs blobstore.BlobGetter, + inv invocation.Invocation, + digest multihash.Multihash, + byteRange blobstore.Range, +) (result.Result[content.RetrieveOk, failure.IPLDBuilderFailure], retrieval.Response, error) { + digestStr := digestutil.Format(digest) + start := byteRange.Start + end := byteRange.End + rangeStr := fmt.Sprintf("%d-", start) + if end != nil { + rangeStr += fmt.Sprintf("%d", end) + } + + cap := inv.Capabilities()[0] + log := log.With("iss", inv.Issuer().DID(), "can", cap.Can(), "with", cap.With(), "digest", digestStr, "range", rangeStr) + + var getOpts []blobstore.GetOption + if start > 0 || end != nil { + getOpts = append(getOpts, blobstore.WithRange(start, end)) + } + + blob, err := blobs.Get(ctx, digest, getOpts...) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + log.Debugw("blob not found", "status", http.StatusNotFound) + notFoundErr := content.NewNotFoundError(fmt.Sprintf("blob not found: %s", digestStr)) + res := result.Error[content.RetrieveOk, failure.IPLDBuilderFailure](notFoundErr) + resp := retrieval.NewResponse(http.StatusNotFound, nil, nil) + return res, resp, nil + } else if errors.Is(err, blobstore.ErrRangeNotSatisfiable) { + log.Debugw("range not satisfiable", "status", http.StatusRequestedRangeNotSatisfiable) + rangeNotSatisfiableErr := content.NewRangeNotSatisfiableError(fmt.Sprintf("range not satisfiable: %d-%d", start, end)) + res := result.Error[content.RetrieveOk, failure.IPLDBuilderFailure](rangeNotSatisfiableErr) + resp := retrieval.NewResponse(http.StatusRequestedRangeNotSatisfiable, nil, nil) + return res, resp, nil + } + log.Errorw("getting blob", "error", err) + return nil, retrieval.Response{}, fmt.Errorf("getting blob: %w", err) + } + + if end == nil { + rend := uint64(blob.Size() - 1) + end = &rend + } + + res := result.Ok[content.RetrieveOk, failure.IPLDBuilderFailure](content.RetrieveOk{}) + status := http.StatusOK + contentLength := *end - start + 1 + headers := http.Header{} + headers.Set("Content-Length", fmt.Sprintf("%d", contentLength)) + headers.Set("Content-Type", "application/octet-stream") + headers.Set("Cache-Control", "public, max-age=29030400, immutable") + headers.Set("Etag", fmt.Sprintf(`"%s"`, digestStr)) + headers.Set("Vary", "Accept-Encoding") + if contentLength != uint64(blob.Size()) { + status = http.StatusPartialContent + headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, *end, blob.Size())) + headers.Add("Vary", "Range") + } + log.Debugw("serving bytes", "status", status, "size", contentLength) + resp := retrieval.NewResponse(status, headers, blob.Body()) + return res, resp, nil +} diff --git a/pkg/service/retrieval/ucan.go b/pkg/service/retrieval/ucan.go index ffdde74b..9fbfda41 100644 --- a/pkg/service/retrieval/ucan.go +++ b/pkg/service/retrieval/ucan.go @@ -9,6 +9,7 @@ import ( func NewUCANServer(retrievalService Service, options ...retrieval.Option) (server.ServerView[retrieval.Service], error) { options = append( options, + ucan.BlobRetrieve(retrievalService), ucan.SpaceContentRetrieve(retrievalService), ) diff --git a/pkg/service/retrieval/ucan/blob_retrieve.go b/pkg/service/retrieval/ucan/blob_retrieve.go new file mode 100644 index 00000000..2b7d255b --- /dev/null +++ b/pkg/service/retrieval/ucan/blob_retrieve.go @@ -0,0 +1,52 @@ +package ucan + +import ( + "context" + "fmt" + + "github.com/storacha/go-libstoracha/capabilities/blob" + "github.com/storacha/go-libstoracha/capabilities/space/content" + "github.com/storacha/go-ucanto/core/invocation" + "github.com/storacha/go-ucanto/core/receipt/fx" + "github.com/storacha/go-ucanto/core/result" + "github.com/storacha/go-ucanto/core/result/failure" + "github.com/storacha/go-ucanto/principal" + "github.com/storacha/go-ucanto/server" + "github.com/storacha/go-ucanto/server/retrieval" + "github.com/storacha/go-ucanto/ucan" + "github.com/storacha/piri/pkg/service/retrieval/handlers/spacecontent" + "github.com/storacha/piri/pkg/store/blobstore" +) + +// InvalidResourceErrorName is the name given to an error where the resource did +// not match the service DID. +const InvalidResourceErrorName = "InvalidResource" + +type BlobRetrievalService interface { + ID() principal.Signer + Blobs() blobstore.BlobGetter +} + +func BlobRetrieve(service BlobRetrievalService) retrieval.Option { + return retrieval.WithServiceMethod( + blob.RetrieveAbility, + retrieval.Provide( + blob.Retrieve, + func(ctx context.Context, cap ucan.Capability[blob.RetrieveCaveats], inv invocation.Invocation, iCtx server.InvocationContext, request retrieval.Request) (result.Result[blob.RetrieveOk, failure.IPLDBuilderFailure], fx.Effects, retrieval.Response, error) { + if cap.With() != service.ID().DID().String() { + return result.Error[blob.RetrieveOk, failure.IPLDBuilderFailure](blob.RetrieveError{ + ErrorName: InvalidResourceErrorName, + Message: fmt.Sprintf("resource is %s not %s", cap.With(), service.ID().DID()), + }), nil, retrieval.Response{}, nil + } + res, resp, err := spacecontent.Retrieve(ctx, service.Blobs(), inv, cap.Nb().Blob.Digest, blobstore.Range{}) + if err != nil { + return nil, nil, retrieval.Response{}, err + } + return result.MapOk(res, func(o content.RetrieveOk) blob.RetrieveOk { + return blob.RetrieveOk{} + }), nil, resp, nil + }, + ), + ) +} diff --git a/pkg/service/retrieval/ucan/blob_retrieve_test.go b/pkg/service/retrieval/ucan/blob_retrieve_test.go new file mode 100644 index 00000000..4838e030 --- /dev/null +++ b/pkg/service/retrieval/ucan/blob_retrieve_test.go @@ -0,0 +1,203 @@ +package ucan + +import ( + "bytes" + "io" + "net/http" + "testing" + + logging "github.com/ipfs/go-log/v2" + "github.com/multiformats/go-multihash" + blobcaps "github.com/storacha/go-libstoracha/capabilities/blob" + "github.com/storacha/go-libstoracha/capabilities/space/content" + "github.com/storacha/go-libstoracha/testutil" + "github.com/storacha/go-ucanto/client" + rclient "github.com/storacha/go-ucanto/client/retrieval" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/go-ucanto/core/invocation" + "github.com/storacha/go-ucanto/core/ipld" + "github.com/storacha/go-ucanto/core/receipt" + "github.com/storacha/go-ucanto/core/result" + fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel" + "github.com/storacha/go-ucanto/principal" + "github.com/storacha/go-ucanto/server/retrieval" + "github.com/storacha/go-ucanto/transport/headercar" + "github.com/storacha/go-ucanto/ucan" + "github.com/storacha/piri/pkg/store/blobstore" + "github.com/stretchr/testify/require" +) + +type blobRetrievalService struct { + id principal.Signer + blobs blobstore.BlobGetter +} + +func (brs *blobRetrievalService) ID() principal.Signer { + return brs.id +} + +func (brs *blobRetrievalService) Blobs() blobstore.BlobGetter { + return brs.blobs +} + +func TestBlobRetrieve(t *testing.T) { + logging.SetLogLevel("retrieval/ucan", "DEBUG") + alice := testutil.Alice + proof, err := delegation.Delegate( + testutil.Service, + alice, + []ucan.Capability[ucan.NoCaveats]{ + ucan.NewCapability( + blobcaps.RetrieveAbility, + testutil.Service.DID().String(), + ucan.NoCaveats{}, + ), + }, + ) + require.NoError(t, err) + + randBytes := testutil.RandomBytes(t, 32) + blob := struct { + bytes []byte + digest multihash.Multihash + }{randBytes, testutil.MultihashFromBytes(t, randBytes)} + + testCases := []struct { + name string + agent ucan.Signer + resource ucan.Resource + proof delegation.Delegation + blobs [][]byte + caveats blobcaps.RetrieveCaveats + expectStatus int + expectHeaders http.Header + expectBody []byte + assertError func(ipld.Node) + }{ + { + name: "not found when missing blob", + agent: alice, + resource: testutil.Service.DID().String(), + proof: proof, + blobs: [][]byte{}, + caveats: blobcaps.RetrieveCaveats{ + Blob: blobcaps.Blob{Digest: blob.digest}, + }, + expectStatus: http.StatusNotFound, + expectBody: []byte{}, + assertError: func(n ipld.Node) { + x, err := ipld.Rebind[content.NotFoundError](n, content.NotFoundErrorType()) + require.NoError(t, err) + require.Equal(t, content.NotFoundErrorName, x.Name()) + }, + }, + { + name: "bad proof", + agent: alice, + resource: testutil.Service.DID().String(), + proof: testutil.Must( + delegation.Delegate( + testutil.Bob, + alice, + []ucan.Capability[ucan.NoCaveats]{ + ucan.NewCapability( + blobcaps.RetrieveAbility, + testutil.Service.DID().String(), + ucan.NoCaveats{}, + ), + }, + ), + )(t), + blobs: [][]byte{blob.bytes}, + caveats: blobcaps.RetrieveCaveats{ + Blob: blobcaps.Blob{Digest: blob.digest}, + }, + expectStatus: http.StatusOK, + expectBody: []byte{}, + assertError: func(n ipld.Node) { + x, err := ipld.Rebind[fdm.FailureModel](n, fdm.FailureType()) + require.NoError(t, err) + require.Equal(t, "Unauthorized", *x.Name) + }, + }, + { + name: "wrong resource", + agent: alice, + resource: testutil.Mallory.DID().String(), + proof: testutil.Must( + delegation.Delegate( + testutil.Mallory, + alice, + []ucan.Capability[ucan.NoCaveats]{ + ucan.NewCapability( + blobcaps.RetrieveAbility, + testutil.Mallory.DID().String(), + ucan.NoCaveats{}, + ), + }, + ), + )(t), + blobs: [][]byte{blob.bytes}, + caveats: blobcaps.RetrieveCaveats{ + Blob: blobcaps.Blob{Digest: blob.digest}, + }, + expectStatus: http.StatusOK, + expectBody: []byte{}, + assertError: func(n ipld.Node) { + x, err := ipld.Rebind[fdm.FailureModel](n, fdm.FailureType()) + require.NoError(t, err) + require.Equal(t, InvalidResourceErrorName, *x.Name) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + blobs := blobstore.NewMapBlobstore() + for _, b := range tc.blobs { + digest, err := multihash.Sum(b, multihash.SHA2_256, -1) + require.NoError(t, err) + err = blobs.Put(t.Context(), digest, uint64(len(b)), bytes.NewReader(b)) + require.NoError(t, err) + } + + service := blobRetrievalService{testutil.Service, blobs} + server, err := retrieval.NewServer(testutil.Service, BlobRetrieve(&service)) + require.NoError(t, err) + + inv, err := invocation.Invoke( + tc.agent, + testutil.Service, + blobcaps.Retrieve.New(tc.resource, tc.caveats), + delegation.WithProof(delegation.FromDelegation(tc.proof)), + ) + require.NoError(t, err) + + codecOpt := client.WithOutboundCodec(headercar.NewOutboundCodec()) + conn, err := client.NewConnection(testutil.Service, server, codecOpt) + require.NoError(t, err) + + xres, hres, err := rclient.Execute(t.Context(), inv, conn) + require.NoError(t, err) + + require.Equal(t, tc.expectStatus, hres.Status()) + for k, v := range tc.expectHeaders { + require.Equal(t, v, hres.Headers().Values(k)) + } + require.Equal(t, tc.expectBody, testutil.Must(io.ReadAll(hres.Body()))(t)) + + rcptLink, ok := xres.Get(inv.Link()) + require.True(t, ok) + + rcpt, err := receipt.NewAnyReceiptReader().Read(rcptLink, xres.Blocks()) + require.NoError(t, err) + + _, x := result.Unwrap(rcpt.Out()) + if tc.assertError != nil { + tc.assertError(x) + } else { + require.Nil(t, x) + } + }) + } +} diff --git a/pkg/service/retrieval/ucan/space_content_retrieve.go b/pkg/service/retrieval/ucan/space_content_retrieve.go index 7996d09b..669e1e8b 100644 --- a/pkg/service/retrieval/ucan/space_content_retrieve.go +++ b/pkg/service/retrieval/ucan/space_content_retrieve.go @@ -17,6 +17,7 @@ import ( "github.com/storacha/go-ucanto/server" "github.com/storacha/go-ucanto/server/retrieval" "github.com/storacha/go-ucanto/ucan" + "github.com/storacha/piri/pkg/service/retrieval/handlers/spacecontent" "github.com/storacha/piri/pkg/store" "github.com/storacha/piri/pkg/store/allocationstore" "github.com/storacha/piri/pkg/store/blobstore" @@ -47,8 +48,9 @@ func SpaceContentRetrieve(retrievalService SpaceContentRetrievalService) retriev end := nb.Range.End log := log.With( - "client", inv.Issuer().DID().String(), - "space", space.String(), + "iss", inv.Issuer().DID().String(), + "can", content.RetrieveAbility, + "with", space.String(), "digest", digestStr, "range", fmt.Sprintf("%d-%d", start, end), ) @@ -66,41 +68,10 @@ func SpaceContentRetrieve(retrievalService SpaceContentRetrievalService) retriev return nil, nil, retrieval.Response{}, fmt.Errorf("getting allocation: %w", err) } - blob, err := retrievalService.Blobs().Get(ctx, digest, blobstore.WithRange(start, &end)) + res, resp, err := spacecontent.Retrieve(ctx, retrievalService.Blobs(), inv, digest, blobstore.Range{Start: start, End: &end}) if err != nil { - if errors.Is(err, store.ErrNotFound) { - log.Debugw("blob not found", "status", http.StatusNotFound) - notFoundErr := content.NewNotFoundError(fmt.Sprintf("blob not found: %s", digestStr)) - res := result.Error[content.RetrieveOk, failure.IPLDBuilderFailure](notFoundErr) - resp := retrieval.NewResponse(http.StatusNotFound, nil, nil) - return res, nil, resp, nil - } else if errors.Is(err, blobstore.ErrRangeNotSatisfiable) { - log.Debugw("range not satisfiable", "status", http.StatusRequestedRangeNotSatisfiable) - rangeNotSatisfiableErr := content.NewRangeNotSatisfiableError(fmt.Sprintf("range not satisfiable: %d-%d", start, end)) - res := result.Error[content.RetrieveOk, failure.IPLDBuilderFailure](rangeNotSatisfiableErr) - resp := retrieval.NewResponse(http.StatusRequestedRangeNotSatisfiable, nil, nil) - return res, nil, resp, nil - } - log.Errorw("getting blob", "error", err) - return nil, nil, retrieval.Response{}, fmt.Errorf("getting blob: %w", err) - } - - res := result.Ok[content.RetrieveOk, failure.IPLDBuilderFailure](content.RetrieveOk{}) - status := http.StatusOK - contentLength := end - start + 1 - headers := http.Header{} - headers.Set("Content-Length", fmt.Sprintf("%d", contentLength)) - headers.Set("Content-Type", "application/octet-stream") - headers.Set("Cache-Control", "public, max-age=29030400, immutable") - headers.Set("Etag", fmt.Sprintf(`"%s"`, digestStr)) - headers.Set("Vary", "Accept-Encoding") - if contentLength != uint64(blob.Size()) { - status = http.StatusPartialContent - headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, blob.Size())) - headers.Add("Vary", "Range") + return nil, nil, retrieval.Response{}, err } - log.Debugw("serving bytes", "status", status, "size", contentLength) - resp := retrieval.NewResponse(status, headers, blob.Body()) return res, nil, resp, nil }, ), diff --git a/pkg/service/retrieval/ucan_fx_test.go b/pkg/service/retrieval/ucan_fx_test.go index 2dcacea6..b46f79b7 100644 --- a/pkg/service/retrieval/ucan_fx_test.go +++ b/pkg/service/retrieval/ucan_fx_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/ipfs/go-cid" + blobcaps "github.com/storacha/go-libstoracha/capabilities/blob" "github.com/storacha/go-libstoracha/capabilities/space/content" "github.com/storacha/go-libstoracha/testutil" retrievalclient "github.com/storacha/go-ucanto/client/retrieval" @@ -127,3 +128,92 @@ func TestFXSpaceContentRetrieve(t *testing.T) { require.Nil(t, x) }) } + +func TestFXBlobRetrieve(t *testing.T) { + var svc storage.Service + + appConfig := piritestutil.NewTestConfig(t, piritestutil.WithSigner(testutil.Alice)) + testApp := fxtest.New(t, + app.CommonModules(appConfig), + app.UCANModule, + fx.Populate(&svc), + ) + + testApp.RequireStart() + defer testApp.RequireStop() + + t.Run("blob/retrieve", func(t *testing.T) { + randBytes := testutil.RandomBytes(t, 256) + blob := struct { + bytes []byte + cid cid.Cid + }{randBytes, cid.NewCidV1(cid.Raw, testutil.MultihashFromBytes(t, randBytes))} + + err := svc.Blobs().Store().Put( + t.Context(), + blob.cid.Hash(), + uint64(len(blob.bytes)), + bytes.NewReader(blob.bytes), + ) + require.NoError(t, err) + + prf := delegation.FromDelegation( + testutil.Must( + delegation.Delegate( + testutil.Alice, + testutil.Bob, + []ucan.Capability[blobcaps.RetrieveCaveats]{ + ucan.NewCapability( + blobcaps.RetrieveAbility, + testutil.Alice.DID().String(), + blobcaps.RetrieveCaveats{ + Blob: blobcaps.Blob{Digest: blob.cid.Hash()}, + }, + ), + }, + ), + )(t), + ) + + url := appConfig.Server.PublicURL.JoinPath("piece", blob.cid.String()) + conn, err := retrievalclient.NewConnection(testutil.Alice, url) + require.NoError(t, err) + + inv, err := invocation.Invoke( + testutil.Bob, + testutil.Alice, + blobcaps.Retrieve.New( + testutil.Alice.DID().String(), + blobcaps.RetrieveCaveats{ + Blob: blobcaps.Blob{Digest: blob.cid.Hash()}, + }, + ), + delegation.WithProof(prf), + ) + require.NoError(t, err) + + xres, hres, err := retrievalclient.Execute(t.Context(), inv, conn) + require.NoError(t, err) + + expectStatus := http.StatusOK + expectHeaders := http.Header{ + http.CanonicalHeaderKey("Content-Length"): []string{fmt.Sprintf("%d", len(blob.bytes))}, + } + expectBody := blob.bytes + + require.Equal(t, expectStatus, hres.Status()) + for k, v := range expectHeaders { + require.Equal(t, v, hres.Headers().Values(k)) + } + require.Equal(t, expectBody, testutil.Must(io.ReadAll(hres.Body()))(t)) + + rcptLink, ok := xres.Get(inv.Link()) + require.True(t, ok) + + rcpt, err := receipt.NewAnyReceiptReader().Read(rcptLink, xres.Blocks()) + require.NoError(t, err) + + _, x := result.Unwrap(rcpt.Out()) + require.Nil(t, x) + }) +}