Skip to content

Commit 70ed4bf

Browse files
committed
apply suggestions
1 parent 22ef709 commit 70ed4bf

File tree

6 files changed

+85
-83
lines changed

6 files changed

+85
-83
lines changed

deps/config/doc_gen.go

Lines changed: 15 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deps/config/types.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ func DefaultCurioConfig() *CurioConfig {
9595
ExpectedSnapSealDuration: Duration(2 * time.Hour),
9696
},
9797
IPNI: IPNIConfig{
98-
Enable: true,
99-
TopicName: "",
100-
WebHost: "https://cid.contact",
101-
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
98+
EntriesCacheCapacity: 4096,
99+
WebHost: "https://cid.contact",
100+
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
102101
},
103102
},
104103
},
@@ -668,17 +667,20 @@ type Libp2pConfig struct {
668667
}
669668

670669
type IPNIConfig struct {
671-
// Enable set whether to enable indexing announcement to the network and expose endpoints that
672-
// allow indexer nodes to process announcements. Enabled by default.
673-
Enable bool
674-
675-
// TopicName sets the topic name on which the changes to the advertised content are announced.
676-
// If not explicitly specified, the topic name is automatically inferred from the network name
677-
// in following format: '/indexer/ingest/<network-name>'
678-
// Defaults to empty, which implies the topic name is inferred from network name.
679-
TopicName string
670+
// Disable set whether to disable indexing announcement to the network and expose endpoints that
671+
// allow indexer nodes to process announcements. Default: False
672+
Disable bool
673+
674+
// EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement
675+
// entries. Defaults to 4096 if not specified. The cache is evicted using LRU policy. The
676+
// maximum storage used by the cache is a factor of EntriesCacheCapacity, EntriesChunkSize(16384) and
677+
// the length of multihashes being advertised. For example, advertising 128-bit long multihashes
678+
// with the default EntriesCacheCapacity, and EntriesChunkSize(16384) means the cache size can grow to
679+
// 1GiB when full.
680+
EntriesCacheCapacity int
680681

681682
// The network indexer host that the web UI should link to for published announcements
683+
// TODO: should we use this for checking published heas before publishing? Later commit
682684
WebHost string
683685

684686
// The list of URLs of indexing nodes to announce to.

documentation/en/configuration/default-curio-configuration.md

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,17 +229,13 @@ description: The default curio configuration
229229
# type: bool
230230
#EnableBatchSeal = false
231231

232-
<<<<<<< HEAD
233232
# EnableDealMarket enabled the deal market on the node. This would also enable libp2p on the node, if configured.
234233
#
235234
# type: bool
236235
#EnableDealMarket = false
237236

238237
# EnableCommP enables the commP task on te node. CommP is calculated before sending PublishDealMessage for a Mk12 deal
239238
# Must have EnableDealMarket = True
240-
=======
241-
# EnableCommP enabled the commP task on te node. CommP is calculated before sending PublishDealMessage for a Mk12 deal
242-
>>>>>>> 563f95c (ipni provider)
243239
#
244240
# type: bool
245241
#EnableCommP = false
@@ -436,21 +432,24 @@ description: The default curio configuration
436432
#InsertConcurrency = 8
437433

438434
[Market.StorageMarketConfig.IPNI]
439-
# Enable set whether to enable indexing announcement to the network and expose endpoints that
440-
# allow indexer nodes to process announcements. Enabled by default.
435+
# Disable set whether to disable indexing announcement to the network and expose endpoints that
436+
# allow indexer nodes to process announcements. Default: False
441437
#
442438
# type: bool
443-
#Enable = true
444-
445-
# TopicName sets the topic name on which the changes to the advertised content are announced.
446-
# If not explicitly specified, the topic name is automatically inferred from the network name
447-
# in following format: '/indexer/ingest/<network-name>'
448-
# Defaults to empty, which implies the topic name is inferred from network name.
439+
#Disable = false
440+
441+
# EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement
442+
# entries. Defaults to 4096 if not specified. The cache is evicted using LRU policy. The
443+
# maximum storage used by the cache is a factor of EntriesCacheCapacity, EntriesChunkSize(16384) and
444+
# the length of multihashes being advertised. For example, advertising 128-bit long multihashes
445+
# with the default EntriesCacheCapacity, and EntriesChunkSize(16384) means the cache size can grow to
446+
# 1GiB when full.
449447
#
450-
# type: string
451-
#TopicName = ""
448+
# type: int
449+
#EntriesCacheCapacity = 4096
452450

453451
# The network indexer host that the web UI should link to for published announcements
452+
# TODO: should we use this for checking published heas before publishing? Later commit
454453
#
455454
# type: string
456455
#WebHost = "https://cid.contact"

lib/ipni/chunker/chunker.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import (
2020

2121
var log = logging.Logger("chunker")
2222

23-
const EntriesChunkSize = 16384
24-
const EntriesCacheCapacity = 4096
23+
const entriesChunkSize = 16384
2524

2625
// Chunker chunks advertisement entries as a chained series of schema.EntryChunk nodes.
2726
// See: NewChunker
@@ -36,13 +35,10 @@ type Chunker struct {
3635
//
3736
// See: schema.EntryChunk.
3837
func NewChunker(cache *lru.Cache[ipld.Link, datamodel.Node]) *Chunker {
39-
chunker := &Chunker{
40-
chunkSize: EntriesChunkSize,
38+
return &Chunker{
39+
chunkSize: entriesChunkSize,
40+
cache: cache,
4141
}
42-
if cache != nil {
43-
chunker.cache = cache
44-
}
45-
return chunker
4642
}
4743

4844
// Chunk chunks all the mulithashes returned by the given iterator into a chain of schema.EntryChunk
@@ -63,7 +59,6 @@ func (ls *Chunker) Chunk(mhi SliceMhIterator) (ipld.Link, error) {
6359
return nil, err
6460
}
6561
mhs = append(mhs, mh)
66-
mhCount++
6762
if len(mhs) >= ls.chunkSize {
6863
cNode, err := newEntriesChunkNode(mhs, next)
6964
if err != nil {
@@ -77,6 +72,7 @@ func (ls *Chunker) Chunk(mhi SliceMhIterator) (ipld.Link, error) {
7772
ls.cache.Add(next, cNode)
7873
}
7974
chunkCount++
75+
mhCount += len(mhs)
8076
// NewLinkedListOfMhs makes it own copy, so safe to reuse mhs
8177
mhs = mhs[:0]
8278
}
@@ -91,6 +87,7 @@ func (ls *Chunker) Chunk(mhi SliceMhIterator) (ipld.Link, error) {
9187
return nil, err
9288
}
9389
chunkCount++
90+
mhCount += len(mhs)
9491
}
9592

9693
log.Infow("Generated linked chunks of multihashes", "totalMhCount", mhCount, "chunkCount", chunkCount)

lib/ipni/ipni-provider/ipni-provider.go

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
const IPNIRoutePath = "/ipni-provider"
4848
const IPNIPath = "/ipni/v1/ad/"
4949
const ProviderPath = "/ipni-provider"
50+
const publishInterval = 10 * time.Minute
5051

5152
var (
5253
log = logging.Logger("ipni-provider")
@@ -68,7 +69,6 @@ type Provider struct {
6869
pieceProvider *pieceprovider.PieceProvider
6970
entriesChunker *chunker.Chunker
7071
cache *lru.Cache[ipld.Link, datamodel.Node]
71-
topic string
7272
httpPrefix string
7373
keys map[string]*peerInfo // map[peerID String]Private_Key
7474
// announceURLs enables sending direct announcements via HTTP. This is
@@ -78,7 +78,7 @@ type Provider struct {
7878
}
7979

8080
func NewProvider(api ipniAPI, deps *deps.Deps) (*Provider, error) {
81-
c, err := lru.New[ipld.Link, datamodel.Node](chunker.EntriesCacheCapacity)
81+
c, err := lru.New[ipld.Link, datamodel.Node](deps.Cfg.Market.StorageMarketConfig.IPNI.EntriesCacheCapacity)
8282
if err != nil {
8383
return nil, xerrors.Errorf("creating new cache: %w", err)
8484
}
@@ -121,30 +121,19 @@ func NewProvider(api ipniAPI, deps *deps.Deps) (*Provider, error) {
121121
return nil, err
122122
}
123123

124-
nn, err := api.StateNetworkName(ctx)
125-
if err != nil {
126-
return nil, err
127-
}
128-
129-
topic := "/indexer/ingest/" + string(nn)
130-
131-
if deps.Cfg.Market.StorageMarketConfig.IPNI.TopicName != "" {
132-
topic = deps.Cfg.Market.StorageMarketConfig.IPNI.TopicName
133-
}
134-
135-
var announceURLs []*url.URL
124+
announceURLs := make([]*url.URL, 0, len(deps.Cfg.Market.StorageMarketConfig.IPNI.DirectAnnounceURLs))
136125

137-
for _, us := range deps.Cfg.Market.StorageMarketConfig.IPNI.DirectAnnounceURLs {
126+
for i, us := range deps.Cfg.Market.StorageMarketConfig.IPNI.DirectAnnounceURLs {
138127
u, err := url.Parse(us)
139128
if err != nil {
140129
return nil, err
141130
}
142-
announceURLs = append(announceURLs, u)
131+
announceURLs[i] = u
143132
}
144133

145-
var httpServerAddresses []multiaddr.Multiaddr
134+
httpServerAddresses := make([]multiaddr.Multiaddr, 0, len(deps.Cfg.Market.HTTP.AnnounceAddresses))
146135

147-
for _, a := range deps.Cfg.Market.HTTP.AnnounceAddresses {
136+
for i, a := range deps.Cfg.Market.HTTP.AnnounceAddresses {
148137
addr, err := urltomultiaddr.UrlToMultiaddr(a)
149138
if err != nil {
150139
return nil, err
@@ -153,7 +142,7 @@ func NewProvider(api ipniAPI, deps *deps.Deps) (*Provider, error) {
153142
if err != nil {
154143
return nil, err
155144
}
156-
httpServerAddresses = append(httpServerAddresses, addr)
145+
httpServerAddresses[i] = addr
157146
}
158147

159148
return &Provider{
@@ -162,7 +151,6 @@ func NewProvider(api ipniAPI, deps *deps.Deps) (*Provider, error) {
162151
pieceProvider: deps.PieceProvider,
163152
cache: c,
164153
keys: keyMap,
165-
topic: topic,
166154
announceURLs: announceURLs,
167155
httpServerAddresses: httpServerAddresses,
168156
}, nil
@@ -179,9 +167,17 @@ func (p *Provider) getAd(ctx context.Context, ad cid.Cid, provider string) (sche
179167
IsRm bool
180168
}
181169

182-
err := p.db.Select(ctx, &ads, `SELECT context_id,
183-
is_rm, previous, provider, addresses,
184-
signature, entries FROM ipni WHERE ad_cid = $1 AND provider = $2`, ad.String(), provider)
170+
err := p.db.Select(ctx, &ads, `SELECT
171+
context_id,
172+
is_rm,
173+
previous,
174+
provider,
175+
addresses,
176+
signature,
177+
entries
178+
FROM ipni
179+
WHERE ad_cid = $1
180+
AND provider = $2`, ad.String(), provider)
185181

186182
if err != nil {
187183
return schema.Advertisement{}, xerrors.Errorf("getting ad from DB: %w", err)
@@ -256,7 +252,7 @@ func (p *Provider) getHead(ctx context.Context, provider string) ([]byte, error)
256252
return nil, err
257253
}
258254

259-
signedHead, err := head.NewSignedHead(lnk.(cidlink.Link).Cid, p.topic, p.keys[provider].Key)
255+
signedHead, err := head.NewSignedHead(lnk.(cidlink.Link).Cid, "", p.keys[provider].Key)
260256
if err != nil {
261257
return nil, xerrors.Errorf("failed to generate signed head for peer %s: %w", provider, err)
262258
}
@@ -461,7 +457,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
461457
return
462458
}
463459
log.Errorf("failed to get signed head for peer %s: %w", providerID, err)
464-
http.Error(w, err.Error(), http.StatusInternalServerError)
460+
http.Error(w, "", http.StatusInternalServerError)
465461
}
466462
w.WriteHeader(http.StatusOK)
467463
_, err = w.Write(sh)
@@ -487,7 +483,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
487483
return
488484
}
489485
log.Errorf("failed to get entry %s for peer %s: %w", b.String(), providerID, err)
490-
http.Error(w, err.Error(), http.StatusInternalServerError)
486+
http.Error(w, "", http.StatusInternalServerError)
491487
return
492488
}
493489
w.WriteHeader(http.StatusOK)
@@ -498,21 +494,21 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
498494
return
499495
}
500496
log.Errorf("failed to get ad %s for peer %s: %w", b.String(), providerID, err)
501-
http.Error(w, err.Error(), http.StatusInternalServerError)
497+
http.Error(w, "", http.StatusInternalServerError)
502498
return
503499
}
504500
adn, err := ad.ToNode()
505501
if err != nil {
506502
log.Errorf("failed to convert ad %s for peer %s to IPLD node: %w", b.String(), providerID, err)
507-
http.Error(w, err.Error(), http.StatusInternalServerError)
503+
http.Error(w, "", http.StatusInternalServerError)
508504
return
509505
}
510506
// Use local buffer for better error handing
511507
resp := new(bytes.Buffer)
512508
err = dagjson.Encode(adn, resp)
513509
if err != nil {
514510
log.Errorf("failed to encode ad %s for peer %s: %w", b.String(), providerID, err)
515-
http.Error(w, err.Error(), http.StatusInternalServerError)
511+
http.Error(w, "", http.StatusInternalServerError)
516512
return
517513
}
518514
w.WriteHeader(http.StatusOK)
@@ -535,7 +531,7 @@ func Routes(r *mux.Router, p *Provider) {
535531
func (p *Provider) StartPublishing(ctx context.Context) {
536532
// A poller which publishes head for each provider
537533
// every 10 minutes
538-
ticker := time.NewTicker(10 * time.Minute)
534+
ticker := time.NewTicker(publishInterval)
539535
go func() {
540536
for {
541537
select {

0 commit comments

Comments
 (0)