Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
return nil, err
}
var sdeps cuhttp.ServiceDeps
idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)

if cfg.Subsystems.EnablePDP {
es := getSenderEth()
Expand All @@ -299,11 +300,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
pdpNotifTask := pdp.NewPDPNotifyTask(db)
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask)
pdpIndexingTask := indexing.NewPDPIndexingTask(db, iStore, dependencies.CachedPieceReader, cfg, idxMax)
pdpIpniTask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask)
}

idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
activeTasks = append(activeTasks, ipniTask, indexingTask)
Expand Down
2 changes: 1 addition & 1 deletion deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func DefaultCurioConfig() *CurioConfig {
},
IPNI: IPNIConfig{
ServiceURL: []string{"https://cid.contact"},
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce", "https://filecoinpin.contact/announce"},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion harmony/harmonydb/sql/20240823-ipni.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ CREATE TABLE ipni_peerid (
CREATE TABLE ipni (
order_number BIGSERIAL PRIMARY KEY, -- Unique increasing order number
ad_cid TEXT NOT NULL,
context_id BYTEA NOT NULL, -- abi.PieceInfo in Curio
context_id BYTEA NOT NULL, -- abi.PieceInfo || PDPIPNIContext in Curio
-- metadata column in not required as Curio only supports one type of metadata(HTTP)
is_rm BOOLEAN NOT NULL,

Expand Down
5 changes: 5 additions & 0 deletions harmony/harmonydb/sql/20251004-pdp-indexing.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- fields tracking indexing and ipni jobs over pdp pieces
ALTER TABLE pdp_piecerefs ADD COLUMN indexing_task_id BIGINT DEFAULT NULL;
ALTER TABLE pdp_piecerefs ADD COLUMN needs_indexing BOOLEAN DEFAULT FALSE;
ALTER TABLE pdp_piecerefs ADD COLUMN ipni_task_id BIGINT DEFAULT NULL;
ALTER TABLE pdp_piecerefs ADD COLUMN needs_ipni BOOLEAN DEFAULT FALSE;
28 changes: 23 additions & 5 deletions market/ipni/ipni-provider/ipni-provider.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we filter out cid.contract for anything not mainnet. We should remove that check as well.

Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const IPNIRoutePath = "/ipni-provider/"
const IPNIPath = "/ipni/v1/ad/"

// publishInterval represents the time interval between each publishing operation.
// It is set to 10 minutes.
const publishInterval = 10 * time.Minute
// It is set to 30 seconds for the purposes of PDP index publishing
const publishInterval = 30 * time.Second
const publishProviderSpacing = 10 * time.Second

var (
Expand All @@ -73,6 +73,7 @@ type Provider struct {
indexStore *indexstore.IndexStore
sc *chunker.ServeChunker
keys map[string]*peerInfo // map[peerID String]Private_Key
latest map[string]cid.Cid // map[peerID String]last published head, used to avoid duplicate announce
// announceURLs enables sending direct announcements via HTTP. This is
// the list of indexer URLs to send direct HTTP announce messages to.
announceURLs []*url.URL
Expand Down Expand Up @@ -193,6 +194,7 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
keys: keyMap,
announceURLs: announceURLs,
httpServerAddresses: httpServerAddresses,
latest: make(map[string]cid.Cid),
}, nil
}

Expand Down Expand Up @@ -364,7 +366,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
start := time.Now()

defer func() {
log.Infow("Served IPNI request", "path", r.URL.Path, "cid", reqCid, "providerId", providerID, "took", time.Since(start))
log.Infow("Served IPNI request", "path", r.URL.Path, "cid", reqCid, "providerId", providerID, "took", time.Since(start), "remote_addr", r.RemoteAddr)
}()

b, err := cid.Parse(reqCid)
Expand Down Expand Up @@ -477,7 +479,6 @@ func RemoveCidContact(slice []*url.URL) []*url.URL {
// StartPublishing starts a poller which publishes the head for each provider every 10 minutes.
func (p *Provider) StartPublishing(ctx context.Context) {
var ticker *time.Ticker

// A poller which publishes head for each provider
// every 10 minutes for mainnet build
if build.BuildType == build.BuildMainnet {
Expand All @@ -489,12 +490,22 @@ func (p *Provider) StartPublishing(ctx context.Context) {
return
}
log.Info("Starting IPNI provider publishing for testnet build")
ticker = time.NewTicker(publishInterval)
if build.BuildType != build.BuildCalibnet {
ticker = time.NewTicker(time.Second * 10)
log.Info("Resetting IPNI provider publishing ticker to 10 seconds for devnet build")
}
}

// Populated latest head cid from the ipni_head table
for provider := range p.keys {
c, err := p.getHeadCID(ctx, provider)
if err != nil {
log.Errorw("failed to get head CID", "provider", provider, "error", err)
continue
}
p.latest[provider] = c
}
go func(ticker *time.Ticker) {
for {
select {
Expand Down Expand Up @@ -544,10 +555,17 @@ func (p *Provider) publishHead(ctx context.Context) {
log.Errorw("failed to get head CID", "provider", provider, "error", err)
continue
}
if _, ok := p.latest[provider]; ok && p.latest[provider] == c {
log.Debugw("Skipping duplicate announce for provider", "provider", provider, "cid", c.String())
continue
}

log.Infow("Publishing head for provider", "provider", provider, "cid", c.String())
err = p.publishhttp(ctx, c, provider)
if err != nil {
log.Errorw("failed to publish head for provide", "provider", provider, "error", err)
} else {
p.latest[provider] = c
}

i++
Expand All @@ -567,6 +585,7 @@ func (p *Provider) publishProviderSpacingWait() {
// It obtains the HTTP addresses for the peer and sends the announce message to those addresses.
func (p *Provider) publishhttp(ctx context.Context, adCid cid.Cid, peer string) error {
// Create the http announce sender.
log.Infow("Creating http announce sender", "urls", p.announceURLs)
httpSender, err := httpsender.New(p.announceURLs, p.keys[peer].ID)
if err != nil {
return fmt.Errorf("cannot create http announce sender: %w", err)
Expand All @@ -577,7 +596,6 @@ func (p *Provider) publishhttp(ctx context.Context, adCid cid.Cid, peer string)
return fmt.Errorf("cannot create provider http addresses: %w", err)
}

log.Infow("Announcing advertisements over HTTP", "urls", p.announceURLs)
return announce.Send(ctx, adCid, addrs, httpSender)
}

Expand Down
5 changes: 4 additions & 1 deletion market/ipni/ipni-provider/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (

func (p *Provider) updateSparkContract(ctx context.Context) error {
for _, pInfo := range p.keys {
pInfo := pInfo
if pInfo.SPID <= 0 {
log.Debugf("spark does not yet support pdp data")
continue
}
mInfo, err := p.full.StateMinerInfo(ctx, pInfo.Miner, types.EmptyTSK)
if err != nil {
return err
Expand Down
Loading