Skip to content

Commit 134b0b6

Browse files
authored
Del after read (#139)
* Add option to remove items from store after reading them Prevents OOM when reading large datasets and using in-memory datastore. * Set delAfterRead in client store * increase crawl batch size
1 parent aec05a4 commit 134b0b6

File tree

5 files changed

+36
-49
lines changed

5 files changed

+36
-49
lines changed

pkg/adpub/client.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ import (
1818
"github.com/libp2p/go-libp2p/core/peer"
1919
)
2020

21-
const syncSegmentSize = 2048
21+
const (
22+
crawlBatchSize = 16
23+
syncSegmentSize = 2048
24+
)
2225

2326
type Client interface {
2427
GetAdvertisement(context.Context, cid.Cid) (*Advertisement, error)
@@ -72,7 +75,7 @@ func NewClient(addrInfo peer.AddrInfo, options ...Option) (Client, error) {
7275
ownsHost: ownsHost,
7376
topic: opts.topic,
7477

75-
store: newClientStore(),
78+
store: newClientStore(opts.delAfterRead),
7679
}
7780

7881
c.sub, err = dagsync.NewSubscriber(c.host, c.store.Batching, c.store.LinkSystem, c.topic, dagsync.HttpTimeout(opts.httpTimeout))
@@ -105,8 +108,6 @@ func (c *client) List(ctx context.Context, latestCid cid.Cid, n int, w io.Writer
105108
}
106109

107110
func (c *client) Crawl(ctx context.Context, latestCid cid.Cid, n int, ads chan<- *Advertisement) error {
108-
const batchSize = 10
109-
110111
var opts []dagsync.SyncOption
111112
if n > syncSegmentSize {
112113
prevAdCid := func(adCid cid.Cid) (cid.Cid, error) {
@@ -124,10 +125,10 @@ func (c *client) Crawl(ctx context.Context, latestCid cid.Cid, n int, ads chan<-
124125
if n == 0 {
125126
n = -1
126127
}
127-
batch := batchSize
128+
batch := crawlBatchSize
128129
for n != 0 {
129130
if n != -1 {
130-
if n < batchSize {
131+
if n < crawlBatchSize {
131132
batch = n
132133
}
133134
n -= batch

pkg/adpub/client_store.go

Lines changed: 16 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/ipld/go-ipld-prime"
1212
"github.com/ipld/go-ipld-prime/linking"
1313
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
14-
"github.com/ipld/go-ipld-prime/multicodec"
1514
"github.com/ipni/go-libipni/ingest/schema"
1615
"github.com/libp2p/go-libp2p/core/peer"
1716
"github.com/multiformats/go-multihash"
@@ -24,6 +23,8 @@ import (
2423
type ClientStore struct {
2524
datastore.Batching
2625
ipld.LinkSystem
26+
27+
delAfterRead bool
2728
}
2829

2930
// Advertisement contains information about a schema.Advertisement
@@ -47,7 +48,7 @@ func (a *Advertisement) HasEntries() bool {
4748
return a.Entries != nil && a.Entries.IsPresent()
4849
}
4950

50-
func newClientStore() *ClientStore {
51+
func newClientStore(delAfterRead bool) *ClientStore {
5152
store := dssync.MutexWrap(datastore.NewMapDatastore())
5253
lsys := cidlink.DefaultLinkSystem()
5354
lsys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
@@ -66,8 +67,9 @@ func newClientStore() *ClientStore {
6667
}, nil
6768
}
6869
return &ClientStore{
69-
Batching: store,
70-
LinkSystem: lsys,
70+
Batching: store,
71+
LinkSystem: lsys,
72+
delAfterRead: delAfterRead,
7173
}
7274
}
7375

@@ -92,6 +94,9 @@ func (s *ClientStore) getEntriesChunk(ctx context.Context, target cid.Cid) (cid.
9294
if err != nil {
9395
return cid.Undef, nil, err
9496
}
97+
if s.delAfterRead {
98+
s.Batching.Delete(ctx, datastore.NewKey(target.String()))
99+
}
95100

96101
chunk, err := schema.UnwrapEntryChunk(n)
97102
if err != nil {
@@ -108,10 +113,14 @@ func (s *ClientStore) getEntriesChunk(ctx context.Context, target cid.Cid) (cid.
108113
}
109114

110115
func (s *ClientStore) loadAd(ctx context.Context, id cid.Cid) (schema.Advertisement, error) {
111-
val, err := s.Batching.Get(ctx, datastore.NewKey(id.String()))
116+
dsKey := datastore.NewKey(id.String())
117+
val, err := s.Batching.Get(ctx, dsKey)
112118
if err != nil {
113119
return schema.Advertisement{}, err
114120
}
121+
if s.delAfterRead {
122+
s.Batching.Delete(ctx, dsKey)
123+
}
115124
return schema.BytesToAdvertisement(id, val)
116125
}
117126

@@ -156,28 +165,10 @@ func (s *ClientStore) getAdvertisement(ctx context.Context, id cid.Cid) (*Advert
156165

157166
func (s *ClientStore) list(ctx context.Context, nextCid cid.Cid, n int, w io.Writer) error {
158167
for i := 0; i < n; i++ {
159-
val, err := s.Batching.Get(ctx, datastore.NewKey(nextCid.String()))
160-
if err != nil {
161-
return err
162-
}
163-
164-
nb := schema.AdvertisementPrototype.NewBuilder()
165-
decoder, err := multicodec.LookupDecoder(nextCid.Prefix().Codec)
166-
if err != nil {
167-
return err
168-
}
169-
170-
err = decoder(nb, bytes.NewBuffer(val))
171-
if err != nil {
172-
return err
173-
}
174-
node := nb.Build()
175-
176-
ad, err := schema.UnwrapAdvertisement(node)
168+
ad, err := s.loadAd(ctx, nextCid)
177169
if err != nil {
178170
return err
179171
}
180-
181172
if _, err = io.WriteString(w, nextCid.String()); err != nil {
182173
return err
183174
}
@@ -195,24 +186,7 @@ func (s *ClientStore) list(ctx context.Context, nextCid cid.Cid, n int, w io.Wri
195186

196187
func (s *ClientStore) crawl(ctx context.Context, nextCid cid.Cid, n int, ads chan<- *Advertisement) (cid.Cid, error) {
197188
for i := 0; i < n; i++ {
198-
val, err := s.Batching.Get(ctx, datastore.NewKey(nextCid.String()))
199-
if err != nil {
200-
return cid.Undef, err
201-
}
202-
203-
nb := schema.AdvertisementPrototype.NewBuilder()
204-
decoder, err := multicodec.LookupDecoder(nextCid.Prefix().Codec)
205-
if err != nil {
206-
return cid.Undef, err
207-
}
208-
209-
err = decoder(nb, bytes.NewBuffer(val))
210-
if err != nil {
211-
return cid.Undef, err
212-
}
213-
node := nb.Build()
214-
215-
ad, err := schema.UnwrapAdvertisement(node)
189+
ad, err := s.loadAd(ctx, nextCid)
216190
if err != nil {
217191
return cid.Undef, err
218192
}

pkg/adpub/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type config struct {
2020
p2pHost host.Host
2121
syncRetryBackoff time.Duration
2222
topic string
23+
delAfterRead bool
2324
}
2425

2526
// Option is a function that sets a value in a config.
@@ -98,3 +99,11 @@ func WithHttpTimeout(to time.Duration) Option {
9899
return nil
99100
}
100101
}
102+
103+
// WithDeleteAfterRead deleted ifems from the store after reading them.
104+
func WithDeleteAfterRead(del bool) Option {
105+
return func(c *config) error {
106+
c.delAfterRead = del
107+
return nil
108+
}
109+
}

pkg/ads/crawl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func adsCrawlAction(cctx *cli.Context) error {
7272
}
7373

7474
provClient, err := adpub.NewClient(*addrInfo,
75+
adpub.WithDeleteAfterRead(true),
7576
adpub.WithEntriesDepthLimit(0),
7677
adpub.WithTopicName(cctx.String("topic")),
7778
adpub.WithHttpTimeout(cctx.Duration("timeout")))

pkg/ads/list.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ func adsListAction(cctx *cli.Context) error {
4444
return fmt.Errorf("bad pub-addr-info: %w", err)
4545
}
4646

47-
provClient, err := adpub.NewClient(*addrInfo, adpub.WithTopicName(cctx.String("topic")),
47+
provClient, err := adpub.NewClient(*addrInfo,
48+
adpub.WithDeleteAfterRead(true),
49+
adpub.WithTopicName(cctx.String("topic")),
4850
adpub.WithHttpTimeout(cctx.Duration("timeout")))
4951
if err != nil {
5052
return err

0 commit comments

Comments
 (0)