diff --git a/README.md b/README.md index 493d0e6..0075608 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Here are a few examples that use the following commands: - `ads` Show advertisements on a chain from a specified publisher - `get` Show information about an advertisement from a specified publisher - `list` List advertisements from latest to earlier from a specified publisher + - `crawl` Crawl publisher's advertisements and show information for each advertisement - `dist` Determine the distance between two advertisements in a chain - `find` Find value by CID or multihash in indexer - `provider` Show information about providers known to an indexer @@ -69,6 +70,12 @@ cat ad-cids-list.txt | ipni add get /dns4/ads.example.com/tcp/24001/p2p/ syncSegmentSize { + prevAdCid := func(adCid cid.Cid) (cid.Cid, error) { + ad, err := c.store.loadAd(ctx, adCid) + if err != nil { + return cid.Undef, err + } + return ad.PreviousCid(), nil + } + opts = append(opts, dagsync.ScopedSegmentDepthLimit(syncSegmentSize)) + opts = append(opts, dagsync.ScopedBlockHook(dagsync.MakeGeneralBlockHook(prevAdCid))) + } + origOptsLen := len(opts) + + if n == 0 { + n = -1 + } + batch := batchSize + for n != 0 { + if n != -1 { + if n < batchSize { + batch = n + } + n -= batch + } + + opts = opts[:origOptsLen] + opts = append(opts, dagsync.WithHeadAdCid(latestCid), dagsync.ScopedDepthLimit(int64(batch))) + + var err error + latestCid, err = c.sub.SyncAdChain(ctx, c.publisher, opts...) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return err + } + + latestCid, err = c.store.crawl(ctx, latestCid, batch, ads) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return err + } + if latestCid == cid.Undef { + break + } + } + return nil +} + func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertisement, error) { // Sync the advertisement without entries first. adCid, err := c.syncAdWithRetry(ctx, adCid, c.sub) @@ -116,12 +172,8 @@ func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertis return nil, err } - if ad.IsRemove { - return ad, nil - } - // Return the partially synced advertisement useful for output to client. - return ad, err + return ad, nil } func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid, sub *dagsync.Subscriber) (cid.Cid, error) { diff --git a/pkg/adpub/client_store.go b/pkg/adpub/client_store.go index 7c4ca34..cad8f03 100644 --- a/pkg/adpub/client_store.go +++ b/pkg/adpub/client_store.go @@ -192,3 +192,70 @@ func (s *ClientStore) list(ctx context.Context, nextCid cid.Cid, n int, w io.Wri } return nil } + +func (s *ClientStore) crawl(ctx context.Context, nextCid cid.Cid, n int, ads chan<- *Advertisement) (cid.Cid, error) { + for i := 0; i < n; i++ { + val, err := s.Batching.Get(ctx, datastore.NewKey(nextCid.String())) + if err != nil { + return cid.Undef, err + } + + nb := schema.AdvertisementPrototype.NewBuilder() + decoder, err := multicodec.LookupDecoder(nextCid.Prefix().Codec) + if err != nil { + return cid.Undef, err + } + + err = decoder(nb, bytes.NewBuffer(val)) + if err != nil { + return cid.Undef, err + } + node := nb.Build() + + ad, err := schema.UnwrapAdvertisement(node) + if err != nil { + return cid.Undef, err + } + + dprovid, err := peer.Decode(ad.Provider) + if err != nil { + return cid.Undef, err + } + + a := &Advertisement{ + ID: nextCid, + ProviderID: dprovid, + ContextID: ad.ContextID, + Metadata: ad.Metadata, + Addresses: ad.Addresses, + PreviousID: ad.PreviousCid(), + IsRemove: ad.IsRm, + ExtendedProvider: ad.ExtendedProvider, + } + + if ad.Entries != nil { + entriesCid := ad.Entries.(cidlink.Link).Cid + if entriesCid != cid.Undef { + a.Entries = &EntriesIterator{ + root: entriesCid, + next: entriesCid, + ctx: ctx, + store: s, + } + } + } + + select { + case ads <- a: + case <-ctx.Done(): + return cid.Undef, nil + } + + if ad.PreviousID == nil { + return cid.Undef, nil + } + + nextCid = ad.PreviousID.(cidlink.Link).Cid + } + return nextCid, nil +} diff --git a/pkg/ads/ads.go b/pkg/ads/ads.go index e9d441c..789f1be 100644 --- a/pkg/ads/ads.go +++ b/pkg/ads/ads.go @@ -10,6 +10,7 @@ var AdsCmd = &cli.Command{ Subcommands: []*cli.Command{ adsGetSubCmd, adsListSubCmd, + adsCrawlSubCmd, adsDistSubCmd, }, } diff --git a/pkg/ads/crawl.go b/pkg/ads/crawl.go new file mode 100644 index 0000000..758983a --- /dev/null +++ b/pkg/ads/crawl.go @@ -0,0 +1,242 @@ +package ads + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "strings" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipni/go-libipni/metadata" + "github.com/ipni/ipni-cli/pkg/adpub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/urfave/cli/v2" +) + +var adsCrawlSubCmd = &cli.Command{ + Name: "crawl", + Usage: "Crawl advertisements from latest to earlier from a specified publisher, printing information about each", + Description: `Crawl an advertisement chain, stopping at a specified number of multihashes or number of advertisements. +Example Usage: + + ipni ads crawl -n 10 --ai=/ip4/38.70.220.112/tcp/10201/p2p/12D3KooWEAcRJ5fYjuavKgAhu79juR7mgaznSZxsm2RRUBiWurv9 +`, + Flags: adsCrawlFlags, + Action: adsCrawlAction, +} + +var adsCrawlFlags = []cli.Flag{ + addrInfoFlag, + &cli.StringFlag{ + Name: "latest", + Usage: "CID of latest advertisement in chain to start crawl from. If not specified, use latest advertisement in the chain", + }, + &cli.IntFlag{ + Name: "number", + Usage: "Number of advertisements to crawl. Specify 0 for all.", + Aliases: []string{"n"}, + }, + &cli.IntFlag{ + Name: "stop-mhs", + Usage: "Stop after counting total number of multihashes", + Aliases: []string{"s"}, + }, + &cli.BoolFlag{ + Name: "skip-entries", + Usage: "Do not show or count entries", + }, + &cli.BoolFlag{ + Name: "show-metadata", + Usage: "Show advertisement metadata", + }, + &cli.BoolFlag{ + Name: "show-ext-providers", + Usage: "Show advertisement extended providers", + }, + &cli.BoolFlag{ + Name: "quiet", + Usage: "Only show advertisement ID and multihash count", + Aliases: []string{"q"}, + }, + timeoutFlag, + topicFlag, +} + +func adsCrawlAction(cctx *cli.Context) error { + addrInfo, err := peer.AddrInfoFromString(cctx.String("addr-info")) + if err != nil { + return fmt.Errorf("bad pub-addr-info: %w", err) + } + + provClient, err := adpub.NewClient(*addrInfo, + adpub.WithEntriesDepthLimit(0), + adpub.WithTopicName(cctx.String("topic")), + adpub.WithHttpTimeout(cctx.Duration("timeout"))) + if err != nil { + return err + } + + var latestCid cid.Cid + if cctx.String("latest") != "" { + latestCid, err = cid.Decode(cctx.String("latest")) + if err != nil { + return fmt.Errorf("bad cid: %w", err) + } + } + + quiet := cctx.Bool("quiet") + skipEntries := cctx.Bool("skip-entries") + showMetadata := cctx.Bool("show-metadata") + showExtProviders := cctx.Bool("show-ext-providers") + stopMhs := cctx.Int("stop-mhs") + + if skipEntries && stopMhs != 0 { + return errors.New("cannot use flag --skip-entries with --stop-mhs") + } + + ctx, cancel := context.WithCancel(cctx.Context) + defer cancel() + + ads := make(chan *adpub.Advertisement, 1) + errCh := make(chan error, 1) + go func() { + errCh <- provClient.Crawl(ctx, latestCid, cctx.Int("number"), ads) + close(ads) + }() + + var activeMhs, totalMhs int + var removalAds, totalAds int + removed := make(map[string]struct{}) + + for ad := range ads { + var prevCID string + if ad.PreviousID != cid.Undef { + prevCID = ad.PreviousID.String() + } + contextID := base64.StdEncoding.EncodeToString(ad.ContextID) + if !quiet { + fmt.Println() + fmt.Println("ID:", ad.ID) + fmt.Println("PreviousCID:", prevCID) + fmt.Println("ProviderID:", ad.ProviderID) + fmt.Println("ContextID:", contextID) + fmt.Println("Addresses:", ad.Addresses) + fmt.Println("Is Remove:", ad.IsRemove) + if showMetadata { + fmt.Print("Metadata: ") + if len(ad.Metadata) == 0 { + fmt.Println("none") + } else { + fmt.Println(base64.StdEncoding.EncodeToString(ad.Metadata)) + var mdProtos []string + md := metadata.Default.New() + err = md.UnmarshalBinary(ad.Metadata) + if err == nil { + for _, p := range md.Protocols() { + mdProtos = append(mdProtos, p.String()) + } + } + if len(mdProtos) != 0 { + fmt.Print(" Protocols: ") + fmt.Println(strings.Join(mdProtos, " ")) + } + } + } + if showExtProviders { + fmt.Println("Extended Providers:") + if ad.ExtendedProvider != nil { + fmt.Printf(" Override: %v\n", ad.ExtendedProvider.Override) + fmt.Println(" Providers:") + if len(ad.ExtendedProvider.Providers) != 0 { + for i, ep := range ad.ExtendedProvider.Providers { + fmt.Printf(" %d. ID: %v\n", i+1, ep.ID) + fmt.Printf(" Addresses: %v\n", ep.Addresses) + fmt.Printf(" Metadata: %v\n", base64.StdEncoding.EncodeToString(ep.Metadata)) + } + } else { + fmt.Println(" None") + } + } else { + fmt.Println(" None") + } + } + } + totalAds++ + if ad.IsRemove { + removed[contextID] = struct{}{} + removalAds++ + continue + } + if !ad.HasEntries() { + if !quiet { + fmt.Println("No entries") + } + continue + } + + _, wasRm := removed[contextID] + if wasRm && !quiet { + fmt.Println("Ad removed") + } + + if skipEntries { + continue + } + + err = provClient.SyncEntriesWithRetry(cctx.Context, ad.Entries.Root()) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to sync entries for advertisement %s: %s\n", ad.ID, err) + continue + } + + entries, err := ad.Entries.Drain() + if err != nil { + if !errors.Is(err, datastore.ErrNotFound) { + return err + } + } + if !wasRm { + activeMhs += len(entries) + } + totalMhs += len(entries) + + if quiet { + if wasRm { + fmt.Println(ad.ID, "Multihashes:", len(entries), "(removed)") + } else { + fmt.Println(ad.ID, "Multihashes:", len(entries), "\t\ttotal:", totalMhs) + } + } else { + fmt.Println("Entries:") + fmt.Println(" Chunk Count:", ad.Entries.ChunkCount()) + fmt.Println(" Multihashes:", len(entries)) + fmt.Println("Active mhs:", activeMhs, "Total mhs:", totalMhs) + } + + if stopMhs != 0 && totalMhs >= stopMhs { + break + } + } + cancel() + + err = <-errCh + if err != nil { + return err + } + + fmt.Println() + fmt.Println("ads crawled: ", totalAds) + if totalAds == 0 { + return nil + } + fmt.Printf("removal ads: %d (%d%%)\n", removalAds, removalAds*100/totalAds) + if !skipEntries { + fmt.Println("active multihashes:", activeMhs) + fmt.Println("total multihashes: ", totalMhs) + } + + return nil +}