Skip to content
9 changes: 7 additions & 2 deletions pkg/fx/principalresolver/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package principalresolver

import (
"fmt"
"strings"
"time"

"github.com/storacha/go-ucanto/did"
Expand All @@ -27,10 +28,14 @@ var Module = fx.Module("principalresolver",
func NewPrincipalResolver(cfg app.AppConfig) (validator.PrincipalResolver, error) {
services := make([]did.DID, 0, 2)
if idxSvc := cfg.UCANService.Services.Indexer.Connection; idxSvc != nil {
services = append(services, idxSvc.ID().DID())
if strings.HasPrefix(idxSvc.ID().DID().String(), "did:web:") {
services = append(services, idxSvc.ID().DID())
}
}
if uplSvc := cfg.UCANService.Services.Upload.Connection; uplSvc != nil {
services = append(services, uplSvc.ID().DID())
if strings.HasPrefix(uplSvc.ID().DID().String(), "did:web:") {
services = append(services, uplSvc.ID().DID())
}
}
hr, err := principalresolver.NewHTTPResolver(services)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/fx/retrieval/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var Module = fx.Module("retrieval",
fx.Provide(
fx.Annotate(
NewRetrievalService,
fx.As(new(ucan.BlobRetrievalService)),
fx.As(new(ucan.SpaceContentRetrievalService)),
),
),
Expand Down
4 changes: 4 additions & 0 deletions pkg/fx/retrieval/ucan/handlers/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var log = logging.Logger("retrieval/ucan")

var Module = fx.Module("retrieval/ucan/handlers",
fx.Provide(
fx.Annotate(
ucan.BlobRetrieve,
fx.ResultTags(`group:"ucan_retrieval_options"`),
),
fx.Annotate(
ucan.SpaceContentRetrieve,
fx.ResultTags(`group:"ucan_retrieval_options"`),
Expand Down
7 changes: 6 additions & 1 deletion pkg/pdp/service/piece_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ func (p *PDPService) ReadPiece(ctx context.Context, piece cid.Cid, options ...ty
}
}()

var getOptions []blobstore.GetOption
if cfg.ByteRange.Start > 0 || cfg.ByteRange.End != nil {
getOptions = append(getOptions, blobstore.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
}

// TODO(forrest): Nice to have in follow on is attempting to map the `piece` arg to a PieceCIDV2, then
// performing the query to blobstore with that CID. allowing the read pieces with the cid they allocated them using
obj, err := p.blobstore.Get(ctx, piece.Hash(), blobstore.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
obj, err := p.blobstore.Get(ctx, piece.Hash(), getOptions...)

if err != nil {
if errors.Is(err, store.ErrNotFound) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/pdp/store/adapter/blobgetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (bga *BlobGetterAdapter) Get(ctx context.Context, digest multihash.Multihas
if err != nil {
return nil, fmt.Errorf("finding piece link for %s: %w", digestutil.Format(digest), err)
}
res, err := bga.pieceReader.ReadPiece(ctx, pieceLink.Link().(cidlink.Link).Cid, types.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
var readOptions []types.ReadPieceOption
if cfg.ByteRange.Start > 0 || cfg.ByteRange.End != nil {
readOptions = append(readOptions, types.WithRange(cfg.ByteRange.Start, cfg.ByteRange.End))
}
res, err := bga.pieceReader.ReadPiece(ctx, pieceLink.Link().(cidlink.Link).Cid, readOptions...)
if err != nil {
return nil, fmt.Errorf("reading piece: %w", err)
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/service/retrieval/handlers/spacecontent/retrieve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package spacecontent

import (
"context"
"errors"
"fmt"
"net/http"

logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multihash"
"github.com/storacha/go-libstoracha/capabilities/space/content"
"github.com/storacha/go-libstoracha/digestutil"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/result"
"github.com/storacha/go-ucanto/core/result/failure"
"github.com/storacha/go-ucanto/server/retrieval"
"github.com/storacha/piri/pkg/store"
"github.com/storacha/piri/pkg/store/blobstore"
)

var log = logging.Logger("retrieval/handlers/spacecontent")

func Retrieve(
ctx context.Context,
blobs blobstore.BlobGetter,
inv invocation.Invocation,
digest multihash.Multihash,
byteRange blobstore.Range,
) (result.Result[content.RetrieveOk, failure.IPLDBuilderFailure], retrieval.Response, error) {
digestStr := digestutil.Format(digest)
start := byteRange.Start
end := byteRange.End
rangeStr := fmt.Sprintf("%d-", start)
if end != nil {
rangeStr += fmt.Sprintf("%d", end)
}

cap := inv.Capabilities()[0]
log := log.With("iss", inv.Issuer().DID(), "can", cap.Can(), "with", cap.With(), "digest", digestStr, "range", rangeStr)

var getOpts []blobstore.GetOption
if start > 0 || end != nil {
getOpts = append(getOpts, blobstore.WithRange(start, end))
}

blob, err := blobs.Get(ctx, digest, getOpts...)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
log.Debugw("blob not found", "status", http.StatusNotFound)
notFoundErr := content.NewNotFoundError(fmt.Sprintf("blob not found: %s", digestStr))
res := result.Error[content.RetrieveOk, failure.IPLDBuilderFailure](notFoundErr)
resp := retrieval.NewResponse(http.StatusNotFound, nil, nil)
return res, resp, nil
} else if errors.Is(err, blobstore.ErrRangeNotSatisfiable) {
log.Debugw("range not satisfiable", "status", http.StatusRequestedRangeNotSatisfiable)
rangeNotSatisfiableErr := content.NewRangeNotSatisfiableError(fmt.Sprintf("range not satisfiable: %d-%d", start, end))
res := result.Error[content.RetrieveOk, failure.IPLDBuilderFailure](rangeNotSatisfiableErr)
resp := retrieval.NewResponse(http.StatusRequestedRangeNotSatisfiable, nil, nil)
return res, resp, nil
}
log.Errorw("getting blob", "error", err)
return nil, retrieval.Response{}, fmt.Errorf("getting blob: %w", err)
}

if end == nil {
rend := uint64(blob.Size() - 1)
end = &rend
}

res := result.Ok[content.RetrieveOk, failure.IPLDBuilderFailure](content.RetrieveOk{})
status := http.StatusOK
contentLength := *end - start + 1
headers := http.Header{}
headers.Set("Content-Length", fmt.Sprintf("%d", contentLength))
headers.Set("Content-Type", "application/octet-stream")
headers.Set("Cache-Control", "public, max-age=29030400, immutable")
headers.Set("Etag", fmt.Sprintf(`"%s"`, digestStr))
headers.Set("Vary", "Accept-Encoding")
if contentLength != uint64(blob.Size()) {
status = http.StatusPartialContent
headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, *end, blob.Size()))
headers.Add("Vary", "Range")
}
log.Debugw("serving bytes", "status", status, "size", contentLength)
resp := retrieval.NewResponse(status, headers, blob.Body())
return res, resp, nil
}
1 change: 1 addition & 0 deletions pkg/service/retrieval/ucan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
func NewUCANServer(retrievalService Service, options ...retrieval.Option) (server.ServerView[retrieval.Service], error) {
options = append(
options,
ucan.BlobRetrieve(retrievalService),
ucan.SpaceContentRetrieve(retrievalService),
)

Expand Down
52 changes: 52 additions & 0 deletions pkg/service/retrieval/ucan/blob_retrieve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ucan

import (
"context"
"fmt"

"github.com/storacha/go-libstoracha/capabilities/blob"
"github.com/storacha/go-libstoracha/capabilities/space/content"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/receipt/fx"
"github.com/storacha/go-ucanto/core/result"
"github.com/storacha/go-ucanto/core/result/failure"
"github.com/storacha/go-ucanto/principal"
"github.com/storacha/go-ucanto/server"
"github.com/storacha/go-ucanto/server/retrieval"
"github.com/storacha/go-ucanto/ucan"
"github.com/storacha/piri/pkg/service/retrieval/handlers/spacecontent"
"github.com/storacha/piri/pkg/store/blobstore"
)

// InvalidResourceErrorName is the name given to an error where the resource did
// not match the service DID.
const InvalidResourceErrorName = "InvalidResource"

type BlobRetrievalService interface {
ID() principal.Signer
Blobs() blobstore.BlobGetter
}

func BlobRetrieve(service BlobRetrievalService) retrieval.Option {
return retrieval.WithServiceMethod(
blob.RetrieveAbility,
retrieval.Provide(
blob.Retrieve,
func(ctx context.Context, cap ucan.Capability[blob.RetrieveCaveats], inv invocation.Invocation, iCtx server.InvocationContext, request retrieval.Request) (result.Result[blob.RetrieveOk, failure.IPLDBuilderFailure], fx.Effects, retrieval.Response, error) {
if cap.With() != service.ID().DID().String() {
return result.Error[blob.RetrieveOk, failure.IPLDBuilderFailure](blob.RetrieveError{
ErrorName: InvalidResourceErrorName,
Message: fmt.Sprintf("resource is %s not %s", cap.With(), service.ID().DID()),
}), nil, retrieval.Response{}, nil
}
res, resp, err := spacecontent.Retrieve(ctx, service.Blobs(), inv, cap.Nb().Blob.Digest, blobstore.Range{})
if err != nil {
return nil, nil, retrieval.Response{}, err
}
return result.MapOk(res, func(o content.RetrieveOk) blob.RetrieveOk {
return blob.RetrieveOk{}
}), nil, resp, nil
},
),
)
}
Loading