Skip to content
Merged
21 changes: 18 additions & 3 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ var queryCmd = &cli.Command{
Usage: "type of query to perform ['standard' | 'location' | 'index_or_location']",
Value: "standard",
},
&cli.StringFlag{
Name: "delegation",
Aliases: []string{"d"},
Usage: "a delegation allowing the indexer to fetch content from the space",
},
},
Action: func(cCtx *cli.Context) error {
serviceURL, err := url.Parse(cCtx.String("url"))
Expand Down Expand Up @@ -91,10 +96,20 @@ var queryCmd = &cli.Command{
}
}

var delegations []delegation.Delegation
if cCtx.IsSet("delegation") {
d, err := delegation.Parse(cCtx.String("delegation"))
if err != nil {
return fmt.Errorf("parsing delegation: %w", err)
}
delegations = append(delegations, d)
}

qr, err := c.QueryClaims(cCtx.Context, types.Query{
Type: queryType,
Hashes: digests,
Match: types.Match{Subject: spaces},
Type: queryType,
Hashes: digests,
Match: types.Match{Subject: spaces},
Delegations: delegations,
})
if err != nil {
return fmt.Errorf("querying service: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/redis/go-redis/extra/redisotel/v9 v9.10.0
github.com/redis/go-redis/v9 v9.10.0
github.com/storacha/go-libstoracha v0.3.1
github.com/storacha/go-libstoracha v0.3.2
github.com/storacha/go-ucanto v0.6.5
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t6
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/storacha/go-libstoracha v0.3.1 h1:cjXEVfEJHdBLBcs2cbk94Gf5xoL/eMxWtY6o6RxnY/0=
github.com/storacha/go-libstoracha v0.3.1/go.mod h1:UF4t2uPwq7vhqqoRWVPnsTsvnWghd8uTJ5WW6QekjVA=
github.com/storacha/go-libstoracha v0.3.2 h1:GH/6Q+lJ/HmjI3/T+3ECqOdblLTYLPtZWqLB5AG6Dus=
github.com/storacha/go-libstoracha v0.3.2/go.mod h1:UF4t2uPwq7vhqqoRWVPnsTsvnWghd8uTJ5WW6QekjVA=
github.com/storacha/go-ucanto v0.6.5 h1:mxy1UkJDqszAGe6SkoT0N2SG9YJ62YX7fzU1Pg9lxnA=
github.com/storacha/go-ucanto v0.6.5/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
12 changes: 11 additions & 1 deletion pkg/construct/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/storacha/go-libstoracha/ipnipublisher/store"
"github.com/storacha/go-libstoracha/jobqueue"
"github.com/storacha/go-libstoracha/metadata"
ed25519 "github.com/storacha/go-ucanto/principal/ed25519/signer"
"github.com/storacha/indexing-service/pkg/redis"
"github.com/storacha/indexing-service/pkg/service"
"github.com/storacha/indexing-service/pkg/service/blobindexlookup"
Expand Down Expand Up @@ -498,10 +499,19 @@ func Construct(sc ServiceConfig, opts ...Option) (Service, error) {
publicAddrInfo.Addrs = append(publicAddrInfo.Addrs, addr)
}

skBytes, err := sc.PrivateKey.Raw()
if err != nil {
return nil, err
}
serviceID, err := ed25519.FromRaw(skBytes)
if err != nil {
return nil, err
}

// with concurrency will still get overridden if a different walker setting is used
serviceOpts := append([]service.Option{service.WithConcurrency(15)}, cfg.opts...)

s.IndexingService = service.NewIndexingService(blobIndexLookup, claims, publicAddrInfo, providerIndex, serviceOpts...)
s.IndexingService = service.NewIndexingService(serviceID, blobIndexLookup, claims, publicAddrInfo, providerIndex, serviceOpts...)

return s, nil
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
ed25519 "github.com/storacha/go-ucanto/principal/ed25519/signer"
"github.com/storacha/go-ucanto/principal/signer"
"github.com/storacha/go-ucanto/server"
hcmsg "github.com/storacha/go-ucanto/transport/headercar/message"
ucanhttp "github.com/storacha/go-ucanto/transport/http"
"github.com/storacha/indexing-service/pkg/build"
"github.com/storacha/indexing-service/pkg/service/contentclaims"
Expand Down Expand Up @@ -291,12 +292,36 @@ func GetClaimsHandler(service types.Querier) http.HandlerFunc {
spaces = append(spaces, space)
}

var dlgs []delegation.Delegation
agentMsgHeader := r.Header.Get(hcmsg.HeaderName)
if agentMsgHeader != "" {
msg, err := hcmsg.DecodeHeader(agentMsgHeader)
if err != nil {
http.Error(w, fmt.Sprintf("decoding agent message: %s", err.Error()), http.StatusBadRequest)
return
}

for _, root := range msg.Invocations() {
dlg, ok, err := msg.Invocation(root)
if err != nil {
log.Warnf("failed to extract delegation from agent message: %w", err)
continue
}
if !ok {
log.Warnf("delegation not found in agent message: %s", root.String())
continue
}
dlgs = append(dlgs, dlg)
}
}

qr, err := service.Query(ctx, types.Query{
Type: queryType,
Hashes: hashes,
Match: types.Match{
Subject: spaces,
},
Delegations: dlgs,
})
if err != nil {
http.Error(w, fmt.Sprintf("processing query: %s", err.Error()), http.StatusInternalServerError)
Expand Down
62 changes: 62 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ import (
"github.com/multiformats/go-multihash"
"github.com/storacha/go-libstoracha/blobindex"
"github.com/storacha/go-libstoracha/bytemap"
"github.com/storacha/go-libstoracha/capabilities/space/content"
"github.com/storacha/go-libstoracha/digestutil"
"github.com/storacha/go-libstoracha/testutil"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/message"
"github.com/storacha/go-ucanto/did"
"github.com/storacha/go-ucanto/principal/signer"
hcmsg "github.com/storacha/go-ucanto/transport/headercar/message"
"github.com/storacha/go-ucanto/ucan"
"github.com/storacha/indexing-service/pkg/internal/link"
"github.com/storacha/indexing-service/pkg/service/contentclaims"
"github.com/storacha/indexing-service/pkg/service/queryresult"
Expand Down Expand Up @@ -219,6 +224,63 @@ func TestGetClaimsHandler(t *testing.T) {
require.Equal(t, http.StatusOK, res.StatusCode)
})

t.Run("authorized retrieval from space", func(t *testing.T) {
mockService := types.NewMockService(t)

randomHash := testutil.RandomMultihash(t)
randomSubject := testutil.RandomPrincipal(t).DID()

dlg, err := delegation.Delegate(
testutil.Alice,
testutil.Service,
[]ucan.Capability[ucan.NoCaveats]{
ucan.NewCapability(content.RetrieveAbility, randomSubject.String(), ucan.NoCaveats{}),
},
)
require.NoError(t, err)

claims := map[cid.Cid]delegation.Delegation{}
indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](-1)
queryResult := testutil.Must(queryresult.Build(claims, indexes))(t)
mockService.EXPECT().Query(mock.Anything, mock.AnythingOfType("Query")).Return(queryResult, nil)

svr := httptest.NewServer(GetClaimsHandler(mockService))
defer svr.Close()

url := fmt.Sprintf("%s/claims?multihash=%s&spaces=%s", svr.URL, digestutil.Format(randomHash), randomSubject.String())
req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)

msg, err := message.Build([]invocation.Invocation{dlg}, nil)
require.NoError(t, err)

headerValue, err := hcmsg.EncodeHeader(msg)
require.NoError(t, err)

req.Header.Set(hcmsg.HeaderName, headerValue)

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)
})

t.Run("invalid "+hcmsg.HeaderName, func(t *testing.T) {
mockService := types.NewMockService(t)

svr := httptest.NewServer(GetClaimsHandler(mockService))
defer svr.Close()

url := fmt.Sprintf("%s/claims?multihash=%s", svr.URL, digestutil.Format(testutil.RandomMultihash(t)))
req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)

req.Header.Set(hcmsg.HeaderName, "NOT AN AGENT MESSAGE")

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)
})

t.Run("invalid space", func(t *testing.T) {
mockService := types.NewMockService(t)

Expand Down
6 changes: 2 additions & 4 deletions pkg/service/blobindexlookup/cachinglookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"errors"
"fmt"
"net/url"

"github.com/ipni/go-libipni/find/model"
"github.com/storacha/go-libstoracha/blobindex"
"github.com/storacha/go-libstoracha/metadata"
"github.com/storacha/indexing-service/pkg/service/providercacher"
"github.com/storacha/indexing-service/pkg/types"
)
Expand All @@ -30,7 +28,7 @@ func WithCache(blobIndexLookup BlobIndexLookup, shardedDagIndexCache types.Shard
}
}

func (b *cachingLookup) Find(ctx context.Context, contextID types.EncodedContextID, provider model.ProviderResult, fetchURL *url.URL, rng *metadata.Range) (blobindex.ShardedDagIndexView, error) {
func (b *cachingLookup) Find(ctx context.Context, contextID types.EncodedContextID, provider model.ProviderResult, req types.RetrievalRequest) (blobindex.ShardedDagIndexView, error) {
// attempt to read index from cache and return it if succesful
index, err := b.shardDagIndexCache.Get(ctx, contextID)
if err == nil {
Expand All @@ -43,7 +41,7 @@ func (b *cachingLookup) Find(ctx context.Context, contextID types.EncodedContext
}

// attempt to fetch the index from the underlying blob index lookup
index, err = b.blobIndexLookup.Find(ctx, contextID, provider, fetchURL, rng)
index, err = b.blobIndexLookup.Find(ctx, contextID, provider, req)
if err != nil {
return nil, fmt.Errorf("fetching underlying index: %w", err)
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/service/blobindexlookup/cachinglookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"errors"
"fmt"
"net/url"
"testing"

"github.com/ipni/go-libipni/find/model"
"github.com/storacha/go-libstoracha/blobindex"
"github.com/storacha/go-libstoracha/metadata"
"github.com/storacha/go-libstoracha/testutil"
"github.com/storacha/indexing-service/pkg/service/blobindexlookup"
"github.com/storacha/indexing-service/pkg/service/providercacher"
Expand Down Expand Up @@ -124,7 +122,8 @@ func TestWithCache__Find(t *testing.T) {
// Create ClaimLookup instance
cl := blobindexlookup.WithCache(lookup, mockStore, providerCacher)

index, err := cl.Find(context.Background(), tc.contextID, provider, testutil.TestURL, nil)
req := types.NewRetrievalRequest(testutil.TestURL, nil, nil)
index, err := cl.Find(context.Background(), tc.contextID, provider, req)
if tc.expectedErr != nil {
require.EqualError(t, err, tc.expectedErr.Error())
} else {
Expand Down Expand Up @@ -181,7 +180,7 @@ type mockBlobIndexLookup struct {
err error
}

func (m *mockBlobIndexLookup) Find(ctx context.Context, contextID types.EncodedContextID, provider model.ProviderResult, fetchURL *url.URL, rng *metadata.Range) (blobindex.ShardedDagIndexView, error) {
func (m *mockBlobIndexLookup) Find(ctx context.Context, contextID types.EncodedContextID, provider model.ProviderResult, req types.RetrievalRequest) (blobindex.ShardedDagIndexView, error) {
return m.index, m.err
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/service/blobindexlookup/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package blobindexlookup

import (
"context"
"net/url"

"github.com/ipni/go-libipni/find/model"
"github.com/storacha/go-libstoracha/blobindex"
"github.com/storacha/go-libstoracha/metadata"
"github.com/storacha/indexing-service/pkg/types"
)

Expand All @@ -18,5 +16,5 @@ type BlobIndexLookup interface {
// 3. return the index
// 4. asyncronously, add records to the ProviderStore from the parsed blob index so that we can avoid future queries to IPNI for
// other multihashes in the index
Find(ctx context.Context, contextID types.EncodedContextID, provider model.ProviderResult, fetchURL *url.URL, rng *metadata.Range) (blobindex.ShardedDagIndexView, error)
Find(ctx context.Context, contextID types.EncodedContextID, provider model.ProviderResult, req types.RetrievalRequest) (blobindex.ShardedDagIndexView, error)
}
38 changes: 15 additions & 23 deletions pkg/service/blobindexlookup/mock_BlobIndexLookup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading