Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Here are a few examples that use the following commands:
- `ads` Show advertisements on a chain from a specified publisher
- `get` Show information about an advertisement from a specified publisher
- `list` List advertisements from latest to earlier from a specified publisher
- `crawl` Crawl publisher's advertisements and show information for each advertisement
- `dist` Determine the distance between two advertisements in a chain
- `find` Find value by CID or multihash in indexer
- `provider` Show information about providers known to an indexer
Expand Down Expand Up @@ -69,6 +70,12 @@ cat ad-cids-list.txt | ipni add get /dns4/ads.example.com/tcp/24001/p2p/<publish
ipni ads list -n 10 --ai=/ip4/38.70.220.112/tcp/10201/p2p/12D3KooWEAcRJ5fYjuavKgAhu79juR7mgaznSZxsm2RRUBiWurv9
```

### `ads crawl`
- Crawl advertisements from a provider and stop after reading 1000 multihashes:
```sh
ipni ads crawl -stop-mhs 1000 --ai=/ip4/38.70.220.112/tcp/10201/p2p/12D3KooWEAcRJ5fYjuavKgAhu79juR7mgaznSZxsm2RRUBiWurv9
```

### `ads dist`
- Get distance from an advertisement to the head of the advertisement chain:
```sh
Expand Down
62 changes: 57 additions & 5 deletions pkg/adpub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Client interface {
GetAdvertisement(context.Context, cid.Cid) (*Advertisement, error)
Close() error
List(context.Context, cid.Cid, int, io.Writer) error
Crawl(context.Context, cid.Cid, int, chan<- *Advertisement) error
SyncEntriesWithRetry(context.Context, cid.Cid) error
}

Expand Down Expand Up @@ -103,6 +104,61 @@ func (c *client) List(ctx context.Context, latestCid cid.Cid, n int, w io.Writer
return c.store.list(ctx, latestCid, n, w)
}

func (c *client) Crawl(ctx context.Context, latestCid cid.Cid, n int, ads chan<- *Advertisement) error {
const batchSize = 10

var opts []dagsync.SyncOption
if n > syncSegmentSize {
prevAdCid := func(adCid cid.Cid) (cid.Cid, error) {
ad, err := c.store.loadAd(ctx, adCid)
if err != nil {
return cid.Undef, err
}
return ad.PreviousCid(), nil
}
opts = append(opts, dagsync.ScopedSegmentDepthLimit(syncSegmentSize))
opts = append(opts, dagsync.ScopedBlockHook(dagsync.MakeGeneralBlockHook(prevAdCid)))
}
origOptsLen := len(opts)

if n == 0 {
n = -1
}
batch := batchSize
for n != 0 {
if n != -1 {
if n < batchSize {
batch = n
}
n -= batch
}

opts = opts[:origOptsLen]
opts = append(opts, dagsync.WithHeadAdCid(latestCid), dagsync.ScopedDepthLimit(int64(batch)))

var err error
latestCid, err = c.sub.SyncAdChain(ctx, c.publisher, opts...)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

latestCid, err = c.store.crawl(ctx, latestCid, batch, ads)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}
if latestCid == cid.Undef {
break
}
}
return nil
}

func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertisement, error) {
// Sync the advertisement without entries first.
adCid, err := c.syncAdWithRetry(ctx, adCid, c.sub)
Expand All @@ -116,12 +172,8 @@ func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertis
return nil, err
}

if ad.IsRemove {
return ad, nil
}

// Return the partially synced advertisement useful for output to client.
return ad, err
return ad, nil
}

func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid, sub *dagsync.Subscriber) (cid.Cid, error) {
Expand Down
67 changes: 67 additions & 0 deletions pkg/adpub/client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,70 @@ func (s *ClientStore) list(ctx context.Context, nextCid cid.Cid, n int, w io.Wri
}
return nil
}

func (s *ClientStore) crawl(ctx context.Context, nextCid cid.Cid, n int, ads chan<- *Advertisement) (cid.Cid, error) {
for i := 0; i < n; i++ {
val, err := s.Batching.Get(ctx, datastore.NewKey(nextCid.String()))
if err != nil {
return cid.Undef, err
}

nb := schema.AdvertisementPrototype.NewBuilder()
decoder, err := multicodec.LookupDecoder(nextCid.Prefix().Codec)
if err != nil {
return cid.Undef, err
}

err = decoder(nb, bytes.NewBuffer(val))
if err != nil {
return cid.Undef, err
}
node := nb.Build()

ad, err := schema.UnwrapAdvertisement(node)
if err != nil {
return cid.Undef, err
}

dprovid, err := peer.Decode(ad.Provider)
if err != nil {
return cid.Undef, err
}

a := &Advertisement{
ID: nextCid,
ProviderID: dprovid,
ContextID: ad.ContextID,
Metadata: ad.Metadata,
Addresses: ad.Addresses,
PreviousID: ad.PreviousCid(),
IsRemove: ad.IsRm,
ExtendedProvider: ad.ExtendedProvider,
}

if ad.Entries != nil {
entriesCid := ad.Entries.(cidlink.Link).Cid
if entriesCid != cid.Undef {
a.Entries = &EntriesIterator{
root: entriesCid,
next: entriesCid,
ctx: ctx,
store: s,
}
}
}

select {
case ads <- a:
case <-ctx.Done():
return cid.Undef, nil
}

if ad.PreviousID == nil {
return cid.Undef, nil
}

nextCid = ad.PreviousID.(cidlink.Link).Cid
}
return nextCid, nil
}
1 change: 1 addition & 0 deletions pkg/ads/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var AdsCmd = &cli.Command{
Subcommands: []*cli.Command{
adsGetSubCmd,
adsListSubCmd,
adsCrawlSubCmd,
adsDistSubCmd,
},
}
Loading