Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions cmd/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ var awsCmd = &cli.Command{
cacher.Start()
defer cacher.Stop()

srvOpts = append(srvOpts, server.WithIPNIPublisherStore(setupIPNIPublisherStore(cfg)))

return server.ListenAndServe(addr, indexer, srvOpts...)
},
}
Expand Down Expand Up @@ -130,10 +128,3 @@ func setupProviderCacher(cfg aws.Config) (*providercacher.CachingQueuePoller, er

return providercacher.NewCachingQueuePoller(cachingQueue, providerCacher)
}

func setupIPNIPublisherStore(cfg aws.Config) *store.AdStore {
ipniStore := aws.NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix)
chunkLinksTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName)
metadataTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName)
return store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable, store.WithMetadataContext(metadata.MetadataContext))
}
114 changes: 0 additions & 114 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -12,19 +10,14 @@ import (

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/find/model"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/metadata"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multibase"
"github.com/multiformats/go-multihash"
"github.com/storacha/go-libstoracha/capabilities/assert"
"github.com/storacha/go-libstoracha/ipnipublisher/store"
"github.com/storacha/go-ucanto/core/car"
"github.com/storacha/go-ucanto/core/dag/blockstore"
"github.com/storacha/go-ucanto/core/delegation"
Expand Down Expand Up @@ -53,7 +46,6 @@ type config struct {
contentClaimsOptions []server.Option
enableTelemetry bool
ipniConfig *ipniConfig
publisherStore store.PublisherStore
}

type Option func(*config) error
Expand Down Expand Up @@ -94,13 +86,6 @@ func WithIPNI(provider peer.AddrInfo, metadata metadata.Metadata) Option {
}
}

func WithIPNIPublisherStore(store store.PublisherStore) Option {
return func(c *config) error {
c.publisherStore = store
return nil
}
}

// ListenAndServe creates a new indexing service HTTP server, and starts it up.
func ListenAndServe(addr string, indexer types.Service, opts ...Option) error {
mux, err := NewServer(indexer, opts...)
Expand Down Expand Up @@ -154,14 +139,6 @@ func NewServer(indexer types.Service, opts ...Option) (*http.ServeMux, error) {
if c.ipniConfig != nil {
maybeInstrumentAndAdd(mux, "GET /cid/{cid}", GetIPNICIDHandler(indexer, c.ipniConfig), c.enableTelemetry)
}
// Temporary endpoint to publish an orphan advert to the indexer's IPNI chain
if c.publisherStore != nil {
sk, err := crypto.UnmarshalEd25519PrivateKey(c.id.Raw())
if err != nil {
return nil, err
}
mux.HandleFunc("POST /ad", PostAdHandler(sk, c.publisherStore))
}
return mux, nil
}

Expand Down Expand Up @@ -437,94 +414,3 @@ func GetDIDDocument(id principal.Signer) http.HandlerFunc {
w.Write(bytes)
}
}

func PostAdHandler(sk crypto.PrivKey, store store.PublisherStore) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad, err := decodeAdvert(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("decoding advert: %s", err.Error()), http.StatusBadRequest)
return
}

if err := validateAdvertSig(sk, ad); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

adlink, err := publishAdvert(r.Context(), sk, store, ad)
if err != nil {
http.Error(w, fmt.Sprintf("publishing advert: %s", err.Error()), http.StatusInternalServerError)
return
}

out, err := json.Marshal(adlink)
if err != nil {
http.Error(w, fmt.Sprintf("marshaling JSON: %s", err.Error()), http.StatusInternalServerError)
return
}
w.Write(out)
})
}

// ensures the advert came from this node originally
func validateAdvertSig(sk crypto.PrivKey, ad schema.Advertisement) error {
sigBytes := ad.Signature
err := ad.Sign(sk)
if err != nil {
return fmt.Errorf("signing advert: %w", err)
}
if !bytes.Equal(sigBytes, ad.Signature) {
return errors.New("advert was not created by this node")
}
return nil
}

// assumed in DAG-JSON encoding
func decodeAdvert(r io.Reader) (schema.Advertisement, error) {
advBytes, err := io.ReadAll(r)
if err != nil {
return schema.Advertisement{}, err
}

adLink, err := cid.V1Builder{
Codec: cid.DagJSON,
MhType: multihash.SHA2_256,
}.Sum(advBytes)
if err != nil {
return schema.Advertisement{}, err
}

return schema.BytesToAdvertisement(adLink, advBytes)
}

func publishAdvert(ctx context.Context, sk crypto.PrivKey, store store.PublisherStore, ad schema.Advertisement) (ipld.Link, error) {
prevHead, err := store.Head(ctx)
if err != nil {
return nil, err
}

ad.PreviousID = prevHead.Head

if err = ad.Sign(sk); err != nil {
return nil, fmt.Errorf("signing advert: %w", err)
}

if err := ad.Validate(); err != nil {
return nil, fmt.Errorf("validating advert: %w", err)
}

link, err := store.PutAdvert(ctx, ad)
if err != nil {
return nil, fmt.Errorf("putting advert: %w", err)
}

head, err := head.NewSignedHead(link.(cidlink.Link).Cid, "/indexer/ingest/mainnet", sk)
if err != nil {
return nil, fmt.Errorf("signing head: %w", err)
}
if _, err := store.ReplaceHead(ctx, prevHead, head); err != nil {
return nil, fmt.Errorf("replacing head: %w", err)
}

return link, nil
}