Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eae8874
update pieceref schema and indexing task rough draft
ZenGround0 Sep 17, 2025
84d02c3
Bring in state view
ZenGround0 Sep 25, 2025
94a2db1
WIP
ZenGround0 Sep 30, 2025
77de42f
IPNI task done
ZenGround0 Oct 1, 2025
4c85996
Fixes
ZenGround0 Oct 1, 2025
c9cbc7b
Move alteration below creation
ZenGround0 Oct 1, 2025
13ea33e
Fix lint errors and actually use new tasks
ZenGround0 Oct 1, 2025
e9fb2f2
review response
ZenGround0 Oct 5, 2025
401b135
Fix calibnet curio bug
ZenGround0 Oct 5, 2025
b2879aa
Better errors to debug pdptool
ZenGround0 Oct 6, 2025
b726468
allow simple pdp service listener for testing
ZenGround0 Oct 6, 2025
286f80d
fix
ZenGround0 Oct 6, 2025
1d94cc4
Allow 0 listener
ZenGround0 Oct 6, 2025
42307e8
SimpleService
ZenGround0 Oct 7, 2025
940f49c
sql syntax error
ZenGround0 Oct 7, 2025
6bce99f
SQL missing AND keyword
ZenGround0 Oct 7, 2025
a61f473
Chill out ipni announce
ZenGround0 Oct 7, 2025
d934d38
More and less logging
ZenGround0 Oct 7, 2025
741f8f9
more debug
ZenGround0 Oct 7, 2025
927c000
Only publish new heads
ZenGround0 Oct 7, 2025
0499503
fix
ZenGround0 Oct 7, 2025
b7e8705
init
ZenGround0 Oct 7, 2025
4b0b5a4
Use filecoinpin.contact/announce as default direct announce url
ZenGround0 Oct 8, 2025
1462dcd
Review Response
ZenGround0 Oct 8, 2025
9ce1204
Fix inconsistent sp_id
ZenGround0 Oct 8, 2025
6ef7a2e
Whoops bad sql
ZenGround0 Oct 8, 2025
473955a
further fixes for negativevalue
ZenGround0 Oct 8, 2025
af95c60
less verbose before any publish
ZenGround0 Oct 8, 2025
c70bb9a
Review Response
ZenGround0 Oct 8, 2025
df43e4e
Update tasks/indexing/task_pdp_ipni.go
ZenGround0 Oct 8, 2025
6494641
Review Response
ZenGround0 Oct 8, 2025
6688934
Lint
ZenGround0 Oct 8, 2025
d4d3e51
Trying very small publish interval
ZenGround0 Oct 8, 2025
ff020e8
docsgen
ZenGround0 Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
return nil, err
}
var sdeps cuhttp.ServiceDeps
idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)

if cfg.Subsystems.EnablePDP {
es := getSenderEth()
Expand All @@ -299,11 +300,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
pdpNotifTask := pdp.NewPDPNotifyTask(db)
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask)
pdpIndexingTask := indexing.NewPDPIndexingTask(db, iStore, dependencies.CachedPieceReader, cfg, idxMax)
pdpIpniTask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask)
}

idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
activeTasks = append(activeTasks, ipniTask, indexingTask)
Expand Down
2 changes: 1 addition & 1 deletion deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func DefaultCurioConfig() *CurioConfig {
},
IPNI: IPNIConfig{
ServiceURL: []string{"https://cid.contact"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZenGround0 : I don't know what this does, but should we also add filecoinpin.contact?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From curio docs this is a curio webUI thing and is not critical for data flowing from place to place. I won't change any defaults here, if curio operators are seeing issues with their view on advertisements they can easily make a config layer including filecoinpin.contact or others.

DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce", "https://filecoinpin.contact/announce"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the implications of having both in here? I'm guessing it's inviting both of them to hit us? a failure trying to talk to one won't stop us from trying the other?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common config. So updating here in not an option without making it common between poRep and PDP and publishing all existing PoRep chain to Filecoin pin. I plan to create a tool for basic PDP stuff in Curio, like a init after Curio init or maybe part of guided-setup. We can use that to insert values for PDP only clusters. A more cleaner approach in my opinion and allows for much more complex config changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concretely having both just means we tell both indexers about our latest IPNI head when publishing advertisements. Then I assume both will hit us when looking for retrieval endpoints.

a failure trying to talk to one won't stop us from trying the other?

From reading libipni code and observing a failure to propagate to filecoinpin.contact and success with cid.contact I'm confident that the answer is yes.

A more cleaner approach in my opinion and allows for much more complex config changes.

I don't like this either, I'd rather have people put this into an ipni layer, but I've been told repeatedly to do this for now. We'll happily get rid of it when we go full main branch.

Copy link
Member

@BigLep BigLep Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common config

Correct me if I'm wrong here @rvagg, but I think that is fine for now. I believe we are assuming someone who is participating in Filecoin Onchain Cloud at this point to be doing so with a Curio instance that is strictly for PDP. It would be unwise to also be doing mainnet PoRep with this instance. So for now, if this common place is the easiest way to specify multiple indexers, lets do it.

what are the implications of having both in here? I'm guessing it's inviting both of them to hit us? a failure trying to talk to one won't stop us from trying the other?

These are good questions. I assume @ZenGround0 or @LexLuthr can answer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(doh, sorry, I typed before seeing @ZenGround0. The key part is answered, in that the curio node us operationally ok with listing two indexers, even if one fails or is unresponsive).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries, classic collision

},
},
},
Expand Down
2 changes: 1 addition & 1 deletion harmony/harmonydb/sql/20240823-ipni.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ CREATE TABLE ipni_peerid (
CREATE TABLE ipni (
order_number BIGSERIAL PRIMARY KEY, -- Unique increasing order number
ad_cid TEXT NOT NULL,
context_id BYTEA NOT NULL, -- abi.PieceInfo in Curio
context_id BYTEA NOT NULL, -- abi.PieceInfo || PDPIPNIContext in Curio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Spark squealing in the distance ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already discussed with Spark about this. They agreed with the new contextID.

-- metadata column in not required as Curio only supports one type of metadata(HTTP)
is_rm BOOLEAN NOT NULL,

Expand Down
5 changes: 5 additions & 0 deletions harmony/harmonydb/sql/20251004-pdp-indexing.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- fields tracking indexing and ipni jobs over pdp pieces
ALTER TABLE pdp_piecerefs ADD COLUMN indexing_task_id BIGINT DEFAULT NULL;
ALTER TABLE pdp_piecerefs ADD COLUMN needs_indexing BOOLEAN DEFAULT FALSE;
ALTER TABLE pdp_piecerefs ADD COLUMN ipni_task_id BIGINT DEFAULT NULL;
ALTER TABLE pdp_piecerefs ADD COLUMN needs_ipni BOOLEAN DEFAULT FALSE;
40 changes: 34 additions & 6 deletions market/ipni/ipni-provider/ipni-provider.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we filter out cid.contract for anything not mainnet. We should remove that check as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it really matter if it's retrievable?
what's the GC like in cid.contact these days, will it figure out that the announcements are invalid after some period of time? does it periodically check endpoints for new chains and expire old ones if the SP isn't available anymore or the chain it's providing is entirely new?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might have misunderstood. There is a filter in publisher where we only Publish to cid.contact for mainnet build. We should probably remove that. Without that cid.contact will not get the ad chain at all. So, we publish for mainnet and calibnet.

If you meant why I am not publishing to cid.contact for testnet then mostly to avoid sending unnecessary data to it. Another reason is Curio's devnet spins up a local ipni instance which makes for a much better testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wanting to make sure I'm following along...
For calibration, we need to support advertising to both cid.contact and filecoinpin.contact. If there is code change needed to support this, then we need to handle that.
We expect the number of CIDs here to be small initially as no one will be doing large production workflows on calibration, so I assume we're fine not to be worry about cluttering cid.contact with calibration filecoin-pin CIDs.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/samber/lo"
"github.com/yugabyte/pgx/v5"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand All @@ -49,8 +50,8 @@ const IPNIRoutePath = "/ipni-provider/"
const IPNIPath = "/ipni/v1/ad/"

// publishInterval represents the time interval between each publishing operation.
// It is set to 10 minutes.
const publishInterval = 10 * time.Minute
// It is set to 30 seconds for the purposes of PDP index publishing
const publishInterval = 30 * time.Second
const publishProviderSpacing = 10 * time.Second

var (
Expand All @@ -73,6 +74,7 @@ type Provider struct {
indexStore *indexstore.IndexStore
sc *chunker.ServeChunker
keys map[string]*peerInfo // map[peerID String]Private_Key
latest map[string]cid.Cid // map[peerID String]last published head, used to avoid duplicate announce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes in here wrt latest look like bugfixes, or is it just the fact that 30 seconds now make this task more onerous so it's worth being spare with what we provide?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its a blend of both. It would have been good to do this with 10 minute interval and non-real time constraints but now its become important.

// announceURLs enables sending direct announcements via HTTP. This is
// the list of indexer URLs to send direct HTTP announce messages to.
announceURLs []*url.URL
Expand Down Expand Up @@ -103,8 +105,9 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
for rows.Next() && rows.Err() == nil {
var priv []byte
var peerID string
var sp int64
var spID abi.ActorID
err := rows.Scan(&priv, &peerID, &spID)
err := rows.Scan(&priv, &peerID, &sp)
if err != nil {
return nil, xerrors.Errorf("failed to scan the row: %w", err)
}
Expand All @@ -123,6 +126,10 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
return nil, xerrors.Errorf("peer ID mismatch: got %s (calculated), expected %s (DB)", id.String(), peerID)
}

if sp < 0 {
spID = abi.ActorID(0)
}

maddr, err := address.NewIDAddress(uint64(spID))
if err != nil {
return nil, xerrors.Errorf("parsing miner ID: %w", err)
Expand Down Expand Up @@ -193,6 +200,7 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
keys: keyMap,
announceURLs: announceURLs,
httpServerAddresses: httpServerAddresses,
latest: make(map[string]cid.Cid),
}, nil
}

Expand Down Expand Up @@ -364,7 +372,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
start := time.Now()

defer func() {
log.Infow("Served IPNI request", "path", r.URL.Path, "cid", reqCid, "providerId", providerID, "took", time.Since(start))
log.Infow("Served IPNI request", "path", r.URL.Path, "cid", reqCid, "providerId", providerID, "took", time.Since(start), "remote_addr", r.RemoteAddr)
}()

b, err := cid.Parse(reqCid)
Expand Down Expand Up @@ -477,7 +485,6 @@ func RemoveCidContact(slice []*url.URL) []*url.URL {
// StartPublishing starts a poller which publishes the head for each provider every 10 minutes.
func (p *Provider) StartPublishing(ctx context.Context) {
var ticker *time.Ticker

// A poller which publishes head for each provider
// every 10 minutes for mainnet build
if build.BuildType == build.BuildMainnet {
Expand All @@ -489,12 +496,22 @@ func (p *Provider) StartPublishing(ctx context.Context) {
return
}
log.Info("Starting IPNI provider publishing for testnet build")
ticker = time.NewTicker(publishInterval)
if build.BuildType != build.BuildCalibnet {
ticker = time.NewTicker(time.Second * 10)
log.Info("Resetting IPNI provider publishing ticker to 10 seconds for devnet build")
}
}

// Populated latest head cid from the ipni_head table
for provider := range p.keys {
c, err := p.getHeadCID(ctx, provider)
if err != nil {
log.Errorw("failed to get head CID", "provider", provider, "error", err)
continue
}
p.latest[provider] = c
}
go func(ticker *time.Ticker) {
for {
select {
Expand All @@ -518,6 +535,10 @@ func (p *Provider) StartPublishing(ctx context.Context) {
func (p *Provider) getHeadCID(ctx context.Context, provider string) (cid.Cid, error) {
var headStr string
err := p.db.QueryRow(ctx, `SELECT head FROM ipni_head WHERE provider = $1`, provider).Scan(&headStr)
if err == pgx.ErrNoRows {
log.Debugw("no head CID yet for provider", "provider", provider)
return cid.Undef, nil
}
if err != nil {
return cid.Undef, xerrors.Errorf("querying previous head: %w", err)
}
Expand All @@ -544,10 +565,17 @@ func (p *Provider) publishHead(ctx context.Context) {
log.Errorw("failed to get head CID", "provider", provider, "error", err)
continue
}
if _, ok := p.latest[provider]; ok && p.latest[provider] == c {
log.Debugw("Skipping duplicate announce for provider", "provider", provider, "cid", c.String())
continue
}

log.Infow("Publishing head for provider", "provider", provider, "cid", c.String())
err = p.publishhttp(ctx, c, provider)
if err != nil {
log.Errorw("failed to publish head for provide", "provider", provider, "error", err)
} else {
p.latest[provider] = c
}

i++
Expand All @@ -567,6 +595,7 @@ func (p *Provider) publishProviderSpacingWait() {
// It obtains the HTTP addresses for the peer and sends the announce message to those addresses.
func (p *Provider) publishhttp(ctx context.Context, adCid cid.Cid, peer string) error {
// Create the http announce sender.
log.Infow("Creating http announce sender", "urls", p.announceURLs)
httpSender, err := httpsender.New(p.announceURLs, p.keys[peer].ID)
if err != nil {
return fmt.Errorf("cannot create http announce sender: %w", err)
Expand All @@ -577,7 +606,6 @@ func (p *Provider) publishhttp(ctx context.Context, adCid cid.Cid, peer string)
return fmt.Errorf("cannot create provider http addresses: %w", err)
}

log.Infow("Announcing advertisements over HTTP", "urls", p.announceURLs)
return announce.Send(ctx, adCid, addrs, httpSender)
}

Expand Down
5 changes: 4 additions & 1 deletion market/ipni/ipni-provider/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (

func (p *Provider) updateSparkContract(ctx context.Context) error {
for _, pInfo := range p.keys {
pInfo := pInfo
if pInfo.SPID <= 0 {
log.Debugf("spark does not yet support pdp data")
continue
}
mInfo, err := p.full.StateMinerInfo(ctx, pInfo.Miner, types.EmptyTSK)
if err != nil {
return err
Expand Down
Loading
Loading