From 91dd98c3deb4da1eb9a996a351288e17ff3f8e35 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 19 Nov 2025 17:55:23 +0100 Subject: [PATCH 1/4] feat(rpc): add fast-provide-root and fast-provide-wait to Add add FastProvideRoot and FastProvideWait options to UnixfsAddSettings, allowing RPC clients to control immediate DHT providing of root CIDs for faster content discovery these options default to server config (Import.FastProvideRoot and Import.FastProvideWait) when not explicitly set by the client --- client/rpc/unixfs.go | 8 ++++++++ core/coreiface/options/unixfs.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go index 316cc21a8ec..68586472b8d 100644 --- a/client/rpc/unixfs.go +++ b/client/rpc/unixfs.go @@ -56,6 +56,14 @@ func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.Unix req.Option("raw-leaves", options.RawLeaves) } + if options.FastProvideRootSet { + req.Option("fast-provide-root", options.FastProvideRoot) + } + + if options.FastProvideWaitSet { + req.Option("fast-provide-wait", options.FastProvideWait) + } + switch options.Layout { case caopts.BalancedLayout: // noop, default diff --git a/core/coreiface/options/unixfs.go b/core/coreiface/options/unixfs.go index 45e880ed1cb..b95e546216f 100644 --- a/core/coreiface/options/unixfs.go +++ b/core/coreiface/options/unixfs.go @@ -52,6 +52,11 @@ type UnixfsAddSettings struct { PreserveMtime bool Mode os.FileMode Mtime time.Time + + FastProvideRoot bool + FastProvideRootSet bool + FastProvideWait bool + FastProvideWaitSet bool } type UnixfsLsSettings struct { @@ -97,6 +102,11 @@ func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix, PreserveMtime: false, Mode: 0, Mtime: time.Time{}, + + FastProvideRoot: false, + FastProvideRootSet: false, + FastProvideWait: false, + FastProvideWaitSet: false, } for _, opt := range opts { @@ -396,3 +406,23 @@ func (unixfsOpts) Mtime(seconds int64, nsecs uint32) UnixfsAddOption { return nil } } + +// FastProvideRoot sets whether to immediately provide root CID to DHT for faster discovery. +// If not set, server uses Import.FastProvideRoot config value (default: true). +func (unixfsOpts) FastProvideRoot(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.FastProvideRoot = enable + settings.FastProvideRootSet = true + return nil + } +} + +// FastProvideWait sets whether to block until fast provide completes. +// If not set, server uses Import.FastProvideWait config value (default: false). +func (unixfsOpts) FastProvideWait(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.FastProvideWait = enable + settings.FastProvideWaitSet = true + return nil + } +} From 7c5db10169210e1b656f5f0df51ff7fbd8fe5aa2 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 19 Nov 2025 19:12:59 +0100 Subject: [PATCH 2/4] feat(rpc): add dag import with fast-provide support adds Import method to APIDagService interface and RPC client implementation - new DagImportResult, DagImportRoot, DagImportStats types in coreiface - DagImportOptions with uniform Set pattern for all params (PinRoots, Stats, FastProvideRoot, FastProvideWait) - streaming channel API for handling multiple roots and stats - tests covering basic import, stats, offline mode, and blocking wait --- client/rpc/dag.go | 69 +++++++++++++ client/rpc/dag_test.go | 177 ++++++++++++++++++++++++++++++++++ core/coreiface/dag.go | 30 ++++++ core/coreiface/options/dag.go | 80 +++++++++++++++ 4 files changed, 356 insertions(+) create mode 100644 client/rpc/dag_test.go create mode 100644 core/coreiface/options/dag.go diff --git a/client/rpc/dag.go b/client/rpc/dag.go index 63cac8f61ac..345502ebf11 100644 --- a/client/rpc/dag.go +++ b/client/rpc/dag.go @@ -3,13 +3,16 @@ package rpc import ( "bytes" "context" + "encoding/json" "fmt" "io" + "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" + iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/options" multicodec "github.com/multiformats/go-multicodec" ) @@ -129,6 +132,72 @@ func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { return nil } +func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) { + options, err := options.DagImportOptions(opts...) + if err != nil { + return nil, err + } + + req := api.core().Request("dag/import") + + if options.PinRootsSet { + req.Option("pin-roots", options.PinRoots) + } + + if options.StatsSet { + req.Option("stats", options.Stats) + } + + if options.FastProvideRootSet { + req.Option("fast-provide-root", options.FastProvideRoot) + } + + if options.FastProvideWaitSet { + req.Option("fast-provide-wait", options.FastProvideWait) + } + + req.Body(files.NewMultiFileReader(files.NewMapDirectory(map[string]files.Node{"": file}), false, false)) + + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + out := make(chan iface.DagImportResult) + + go func() { + defer resp.Close() + defer close(out) + + dec := json.NewDecoder(resp.Output) + + for { + var event iface.DagImportResult + + if err := dec.Decode(&event); err != nil { + if err != io.EOF { + select { + case out <- iface.DagImportResult{}: + case <-ctx.Done(): + } + } + return + } + + select { + case out <- event: + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + func (api *httpNodeAdder) core() *HttpApi { return (*HttpApi)(api) } diff --git a/client/rpc/dag_test.go b/client/rpc/dag_test.go new file mode 100644 index 00000000000..4e1151e43e4 --- /dev/null +++ b/client/rpc/dag_test.go @@ -0,0 +1,177 @@ +package rpc + +import ( + "context" + "os" + "testing" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/go-cid" + "github.com/ipfs/kubo/core/coreiface/options" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/require" +) + +func TestDagImport_Basic(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Open test fixture + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import CAR file + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile)) + require.NoError(t, err) + + // Collect results + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + require.Empty(t, result.Root.PinErrorMsg, "pin should succeed") + } + } + + // Verify we got exactly one root + require.Len(t, roots, 1, "should have exactly one root") + + // Verify the expected root CID + expectedRoot := "bafyreifrm6uf5o4dsaacuszf35zhibyojlqclabzrms7iak67pf62jygaq" + require.Equal(t, expectedRoot, roots[0].String()) +} + +func TestDagImport_WithStats(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with stats enabled + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.Stats(true)) + require.NoError(t, err) + + var roots []cid.Cid + var gotStats bool + var blockCount uint64 + + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + if result.Stats != nil { + gotStats = true + blockCount = result.Stats.BlockCount + } + } + + require.Len(t, roots, 1, "should have one root") + require.True(t, gotStats, "should receive stats") + require.Equal(t, uint64(4), blockCount, "TestDagStat.car has 4 blocks") +} + +func TestDagImport_OfflineWithFastProvide(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon("--offline=true") + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with fast-provide enabled in offline mode + // Should succeed gracefully (fast-provide silently skipped) + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.FastProvideRoot(true), + options.Dag.FastProvideWait(true)) + require.NoError(t, err) + + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + } + + require.Len(t, roots, 1, "import should succeed offline with fast-provide enabled") +} + +func TestDagImport_OnlineWithFastProvideWait(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with fast-provide wait enabled in online mode + // This tests that FastProvideWait actually blocks (not fire-and-forget). + // In isolated test environment (no DHT peers), the provide operation may: + // 1. Succeed trivially (announced to randomly discovered peers), or + // 2. Return an error (timeout/no peers) + // Both outcomes prove blocking behavior works correctly. + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.FastProvideRoot(true), + options.Dag.FastProvideWait(true)) + + if err != nil { + // Blocking wait detected provide failure (no DHT peers in isolated test) + // This proves FastProvideWait actually blocked and error propagated + require.Contains(t, err.Error(), "fast-provide", + "error should be from fast-provide operation") + return // Test passed - blocking wait worked and returned error + } + + // No error - provide succeeded, verify we got results + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + } + + require.Len(t, roots, 1, "should receive one root when provide succeeds") +} diff --git a/core/coreiface/dag.go b/core/coreiface/dag.go index 3cc3aeb4de2..3598e4528e4 100644 --- a/core/coreiface/dag.go +++ b/core/coreiface/dag.go @@ -1,13 +1,43 @@ package iface import ( + "context" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/kubo/core/coreiface/options" ) +// DagImportResult represents the result of importing roots or stats from CAR files. +// Each result has either Root or Stats set, never both. +type DagImportResult struct { + Root *DagImportRoot + Stats *DagImportStats +} + +// DagImportRoot represents a root CID from a CAR file header +type DagImportRoot struct { + Cid cid.Cid + PinErrorMsg string +} + +// DagImportStats contains statistics about the import operation +type DagImportStats struct { + BlockCount uint64 + BlockBytesCount uint64 +} + // APIDagService extends ipld.DAGService type APIDagService interface { ipld.DAGService // Pinning returns special NodeAdder which recursively pins added nodes Pinning() ipld.NodeAdder + + // Import imports data from CAR files. + // Returns a channel that streams results for each root CID found in CAR headers, + // and optionally stats at the end if requested via options. + // Supports importing multiple CAR files, each with multiple roots. + Import(context.Context, files.File, ...options.DagImportOption) (<-chan DagImportResult, error) } diff --git a/core/coreiface/options/dag.go b/core/coreiface/options/dag.go new file mode 100644 index 00000000000..90d4e1251a2 --- /dev/null +++ b/core/coreiface/options/dag.go @@ -0,0 +1,80 @@ +package options + +type DagImportSettings struct { + PinRoots bool + PinRootsSet bool + Stats bool + StatsSet bool + FastProvideRoot bool + FastProvideRootSet bool + FastProvideWait bool + FastProvideWaitSet bool +} + +type DagImportOption func(*DagImportSettings) error + +func DagImportOptions(opts ...DagImportOption) (*DagImportSettings, error) { + options := &DagImportSettings{ + PinRoots: false, + PinRootsSet: false, + Stats: false, + StatsSet: false, + FastProvideRoot: false, + FastProvideRootSet: false, + FastProvideWait: false, + FastProvideWaitSet: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +type dagOpts struct{} + +var Dag dagOpts + +// PinRoots sets whether to pin roots listed in CAR headers after importing. +// If not set, server uses command default (true). +func (dagOpts) PinRoots(pin bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.PinRoots = pin + settings.PinRootsSet = true + return nil + } +} + +// Stats enables output of import statistics (block count and bytes). +// If not set, server uses command default (false). +func (dagOpts) Stats(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.Stats = enable + settings.StatsSet = true + return nil + } +} + +// FastProvideRoot sets whether to immediately provide root CIDs to DHT for faster discovery. +// If not set, server uses Import.FastProvideRoot config value (default: true). +func (dagOpts) FastProvideRoot(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.FastProvideRoot = enable + settings.FastProvideRootSet = true + return nil + } +} + +// FastProvideWait sets whether to block until fast provide completes. +// If not set, server uses Import.FastProvideWait config value (default: false). +func (dagOpts) FastProvideWait(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.FastProvideWait = enable + settings.FastProvideWaitSet = true + return nil + } +} From 45d17e72b6b86eda412c499b9d25179db2ba6171 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 20 Nov 2025 05:25:55 +0100 Subject: [PATCH 3/4] feat(client/rpc): add provide stat and dag import support adds RPC client support for: - ipfs provide stat (with --lan flag for dual DHT) - ipfs dag import (with --fast-provide-root/--fast-provide-wait) client/rpc changes: - dag.go: add Import() method (~70 lines) - dag_test.go: 4 test cases for Import (new file) - routing.go: add ProvideStats() method (~25 lines) - routing_test.go: 3 test cases for ProvideStats (new file) to enable RPC client, refactored commands to use CoreAPI: - add ProvideStats() to RoutingAPI interface and implementation - add Import() to APIDagService interface and implementation - commands delegate to CoreAPI (provide.go, dag/import.go) --- client/rpc/routing.go | 26 +++ client/rpc/routing_test.go | 163 +++++++++++++++++++ core/commands/dag/import.go | 201 +++++------------------- core/commands/provide.go | 69 +++----- core/coreapi/dag.go | 253 ++++++++++++++++++++++++++++++ core/coreapi/routing.go | 61 +++++++ core/coreiface/dag.go | 1 + core/coreiface/options/routing.go | 31 ++++ core/coreiface/routing.go | 13 ++ test/cli/dag_test.go | 12 +- test/cli/provide_stats_test.go | 2 +- test/dependencies/go.mod | 1 + test/dependencies/go.sum | 2 + 13 files changed, 621 insertions(+), 214 deletions(-) create mode 100644 client/rpc/routing_test.go diff --git a/client/rpc/routing.go b/client/rpc/routing.go index 693f155c6b0..54e3f84b9a5 100644 --- a/client/rpc/routing.go +++ b/client/rpc/routing.go @@ -7,6 +7,7 @@ import ( "encoding/json" "github.com/ipfs/boxo/path" + iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/options" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" @@ -156,6 +157,31 @@ func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options Exec(ctx, nil) } +func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...options.RoutingProvideStatOption) (*iface.ProvideStatsResponse, error) { + options, err := options.RoutingProvideStatOptions(opts...) + if err != nil { + return nil, err + } + + resp, err := api.core().Request("provide/stat"). + Option("lan", options.UseLAN). + Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + defer resp.Close() + + var out iface.ProvideStatsResponse + if err := json.NewDecoder(resp.Output).Decode(&out); err != nil { + return nil, err + } + + return &out, nil +} + func (api *RoutingAPI) core() *HttpApi { return (*HttpApi)(api) } diff --git a/client/rpc/routing_test.go b/client/rpc/routing_test.go new file mode 100644 index 00000000000..7895e3b6b2a --- /dev/null +++ b/client/rpc/routing_test.go @@ -0,0 +1,163 @@ +package rpc + +import ( + "context" + "encoding/json" + "testing" + + boxoprovider "github.com/ipfs/boxo/provider" + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/ipfs/kubo/core/coreiface/options" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/libp2p/go-libp2p-kad-dht/provider/stats" + "github.com/stretchr/testify/require" +) + +// Compile-time check: ensure our response type is compatible with kubo's provideStats +// This verifies that JSON marshaling/unmarshaling will work correctly +var _ = func() { + // Create instance of command's provideStats structure + cmdStats := struct { + Sweep *stats.Stats `json:"Sweep,omitempty"` + Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"` + FullRT bool `json:"FullRT,omitempty"` + }{} + + // Marshal and unmarshal to verify compatibility + data, _ := json.Marshal(cmdStats) + var ifaceStats iface.ProvideStatsResponse + _ = json.Unmarshal(data, &ifaceStats) +} + +// testProvideStats mirrors the subset of fields we verify in tests. +// Intentionally independent from coreiface types to detect breaking changes. +type testProvideStats struct { + Sweep *struct { + Connectivity struct { + Status string `json:"status"` + } `json:"connectivity"` + Queues struct { + PendingKeyProvides int `json:"pending_key_provides"` + } `json:"queues"` + Schedule struct { + Keys int `json:"keys"` + } `json:"schedule"` + } `json:"Sweep,omitempty"` + Legacy *struct { + TotalReprovides uint64 `json:"TotalReprovides"` + } `json:"Legacy,omitempty"` +} + +func TestProvideStats_WithSweepProvider(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Explicitly enable Sweep provider (default in v0.39) + node.SetIPFSConfig("Provide.DHT.SweepEnabled", true) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Get provide stats + result, err := api.Routing().ProvideStats(ctx) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify Sweep stats are present, Legacy is not + require.NotNil(t, result.Sweep, "Sweep provider should return Sweep stats") + require.Nil(t, result.Legacy, "Sweep provider should not return Legacy stats") + + // Marshal to JSON and unmarshal to test struct to verify structure + data, err := json.Marshal(result) + require.NoError(t, err) + + var testStats testProvideStats + err = json.Unmarshal(data, &testStats) + require.NoError(t, err) + + // Verify key fields exist and have reasonable values + require.NotNil(t, testStats.Sweep) + require.NotEmpty(t, testStats.Sweep.Connectivity.Status, "connectivity status should be present") + require.GreaterOrEqual(t, testStats.Sweep.Queues.PendingKeyProvides, 0, "queue size should be non-negative") + require.GreaterOrEqual(t, testStats.Sweep.Schedule.Keys, 0, "scheduled keys should be non-negative") +} + +func TestProvideStats_WithLegacyProvider(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Explicitly disable Sweep to use Legacy provider + node.SetIPFSConfig("Provide.DHT.SweepEnabled", false) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Get provide stats + result, err := api.Routing().ProvideStats(ctx) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify Legacy stats are present, Sweep is not + require.Nil(t, result.Sweep, "Legacy provider should not return Sweep stats") + require.NotNil(t, result.Legacy, "Legacy provider should return Legacy stats") + + // Marshal to JSON and unmarshal to test struct to verify structure + data, err := json.Marshal(result) + require.NoError(t, err) + + var testStats testProvideStats + err = json.Unmarshal(data, &testStats) + require.NoError(t, err) + + // Verify Legacy field exists + require.NotNil(t, testStats.Legacy) + require.GreaterOrEqual(t, testStats.Legacy.TotalReprovides, uint64(0), "total reprovides should be non-negative") +} + +func TestProvideStats_LANFlagErrorWithLegacy(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Use Legacy provider - LAN flag should error + node.SetIPFSConfig("Provide.DHT.SweepEnabled", false) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Try to get LAN stats with Legacy provider + // This should return an error + _, err = api.Routing().ProvideStats(ctx, options.Routing.UseLAN(true)) + require.Error(t, err, "LAN flag should error with Legacy provider") + require.Contains(t, err.Error(), "LAN stats only available for Sweep provider with Dual DHT", + "error should indicate LAN stats unavailable") +} diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go index 032b9e52a6c..3778c60ba9d 100644 --- a/core/commands/dag/import.go +++ b/core/commands/dag/import.go @@ -2,26 +2,14 @@ package dagcmd import ( "errors" - "fmt" - "io" "github.com/ipfs/boxo/files" - blocks "github.com/ipfs/go-block-format" - cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" - ipld "github.com/ipfs/go-ipld-format" - ipldlegacy "github.com/ipfs/go-ipld-legacy" - logging "github.com/ipfs/go-log/v2" "github.com/ipfs/kubo/config" - "github.com/ipfs/kubo/core/coreiface/options" - gocarv2 "github.com/ipld/go-car/v2" - "github.com/ipfs/kubo/core/commands/cmdenv" - "github.com/ipfs/kubo/core/commands/cmdutils" + "github.com/ipfs/kubo/core/coreiface/options" ) -var log = logging.Logger("core/commands") - func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { node, err := cmdenv.GetNode(env) if err != nil { @@ -38,62 +26,31 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment return err } - blockDecoder := ipldlegacy.NewDecoder() - - // on import ensure we do not reach out to the network for any reason - // if a pin based on what is imported + what is in the blockstore - // isn't possible: tough luck + // Ensure offline mode - import should not reach out to network api, err = api.WithOptions(options.Api.Offline(true)) if err != nil { return err } + // Parse options doPinRoots, _ := req.Options[pinRootsOptionName].(bool) - + doStats, _ := req.Options[statsOptionName].(bool) fastProvideRoot, fastProvideRootSet := req.Options[fastProvideRootOptionName].(bool) fastProvideWait, fastProvideWaitSet := req.Options[fastProvideWaitOptionName].(bool) + // Resolve fast-provide options from config if not explicitly set fastProvideRoot = config.ResolveBoolFromConfig(fastProvideRoot, fastProvideRootSet, cfg.Import.FastProvideRoot, config.DefaultFastProvideRoot) fastProvideWait = config.ResolveBoolFromConfig(fastProvideWait, fastProvideWaitSet, cfg.Import.FastProvideWait, config.DefaultFastProvideWait) - // grab a pinlock ( which doubles as a GC lock ) so that regardless of the - // size of the streamed-in cars nothing will disappear on us before we had - // a chance to roots that may show up at the very end - // This is especially important for use cases like dagger: - // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos ) - // - if doPinRoots { - unlocker := node.Blockstore.PinLock(req.Context) - defer unlocker.Unlock(req.Context) - } - - // this is *not* a transaction - // it is simply a way to relieve pressure on the blockstore - // similar to pinner.Pin/pinner.Flush - batch := ipld.NewBatch(req.Context, api.Dag(), - // Default: 128. Means 128 file descriptors needed in flatfs - ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))), - // Default 100MiB. When setting block size to 1MiB, we can add - // ~100 nodes maximum. With default 256KiB block-size, we will - // hit the max nodes limit at 32MiB.p - ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))), - ) - - roots := cid.NewSet() - var blockCount, blockBytesCount uint64 - - // remember last valid block and provide a meaningful error message - // when a truncated/mangled CAR is being imported - importError := func(previous blocks.Block, current blocks.Block, err error) error { - if current != nil { - return fmt.Errorf("import failed at block %q: %w", current.Cid(), err) - } - if previous != nil { - return fmt.Errorf("import failed after block %q: %w", previous.Cid(), err) - } - return fmt.Errorf("import failed: %w", err) + // Build CoreAPI options + dagOpts := []options.DagImportOption{ + options.Dag.PinRoots(doPinRoots), + options.Dag.Stats(doStats), + options.Dag.FastProvideRoot(fastProvideRoot), + options.Dag.FastProvideWait(fastProvideWait), } + // Process each file it := req.Files.Entries() for it.Next() { file := files.FileFromEntry(it) @@ -101,118 +58,44 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment return errors.New("expected a file handle") } - // import blocks - err = func() error { - // wrap a defer-closer-scope - // - // every single file in it() is already open before we start - // just close here sooner rather than later for neatness - // and to surface potential errors writing on closed fifos - // this won't/can't help with not running out of handles - defer file.Close() - - var previous blocks.Block - - car, err := gocarv2.NewBlockReader(file) - if err != nil { - return err - } + // Call CoreAPI to import the file + resultChan, err := api.Dag().Import(req.Context, file, dagOpts...) + if err != nil { + return err + } - for _, c := range car.Roots { - roots.Add(c) + // Stream results back to user + for result := range resultChan { + // Check for errors from CoreAPI + if result.Err != nil { + return result.Err } - for { - block, err := car.Next() - if err != nil && err != io.EOF { - return importError(previous, block, err) - } else if block == nil { - break - } - if err := cmdutils.CheckBlockSize(req, uint64(len(block.RawData()))); err != nil { - return importError(previous, block, err) - } - - // the double-decode is suboptimal, but we need it for batching - nd, err := blockDecoder.DecodeNode(req.Context, block) + // Emit root results + if result.Root != nil { + err := res.Emit(&CarImportOutput{ + Root: &RootMeta{ + Cid: result.Root.Cid, + PinErrorMsg: result.Root.PinErrorMsg, + }, + }) if err != nil { - return importError(previous, block, err) + return err } - - if err := batch.Add(req.Context, nd); err != nil { - return importError(previous, block, err) - } - blockCount++ - blockBytesCount += uint64(len(block.RawData())) - previous = block } - return nil - }() - if err != nil { - return err - } - } - - if err := batch.Commit(); err != nil { - return err - } - // It is not guaranteed that a root in a header is actually present in the same ( or any ) - // .car file. This is the case in version 1, and ideally in further versions too. - // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered - // We will attempt a pin *only* at the end in case all car files were well-formed. - - // opportunistic pinning: try whatever sticks - if doPinRoots { - err = roots.ForEach(func(c cid.Cid) error { - ret := RootMeta{Cid: c} - - // This will trigger a full read of the DAG in the pinner, to make sure we have all blocks. - // Ideally we would do colloring of the pinning state while importing the blocks - // and ensure the gray bucket is empty at the end (or use the network to download missing blocks). - if block, err := node.Blockstore.Get(req.Context, c); err != nil { - ret.PinErrorMsg = err.Error() - } else if nd, err := blockDecoder.DecodeNode(req.Context, block); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Pin(req.Context, nd, true, ""); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Flush(req.Context); err != nil { - ret.PinErrorMsg = err.Error() + // Emit stats results + if result.Stats != nil { + err := res.Emit(&CarImportOutput{ + Stats: &CarImportStats{ + BlockCount: result.Stats.BlockCount, + BlockBytesCount: result.Stats.BlockBytesCount, + }, + }) + if err != nil { + return err + } } - - return res.Emit(&CarImportOutput{Root: &ret}) - }) - if err != nil { - return err - } - } - - stats, _ := req.Options[statsOptionName].(bool) - if stats { - err = res.Emit(&CarImportOutput{ - Stats: &CarImportStats{ - BlockCount: blockCount, - BlockBytesCount: blockBytesCount, - }, - }) - if err != nil { - return err - } - } - - // Fast-provide roots for faster discovery - if fastProvideRoot { - err = roots.ForEach(func(c cid.Cid) error { - return cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false) - }) - if err != nil { - return err - } - } else { - if fastProvideWait { - log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) - } else { - log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config") } } diff --git a/core/commands/provide.go b/core/commands/provide.go index c9d3954cfe9..0b756b27b36 100644 --- a/core/commands/provide.go +++ b/core/commands/provide.go @@ -15,10 +15,8 @@ import ( cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" + "github.com/ipfs/kubo/core/coreiface/options" "github.com/libp2p/go-libp2p-kad-dht/fullrt" - "github.com/libp2p/go-libp2p-kad-dht/provider" - "github.com/libp2p/go-libp2p-kad-dht/provider/buffered" - "github.com/libp2p/go-libp2p-kad-dht/provider/dual" "github.com/libp2p/go-libp2p-kad-dht/provider/stats" routing "github.com/libp2p/go-libp2p/core/routing" "github.com/probe-lab/go-libdht/kad/key" @@ -136,26 +134,6 @@ type provideStats struct { FullRT bool // only used for legacy stats } -// extractSweepingProvider extracts a SweepingProvider from the given provider interface. -// It handles unwrapping buffered and dual providers, selecting LAN or WAN as specified. -// Returns nil if the provider is not a sweeping provider type. -func extractSweepingProvider(prov any, useLAN bool) *provider.SweepingProvider { - switch p := prov.(type) { - case *provider.SweepingProvider: - return p - case *dual.SweepingProvider: - if useLAN { - return p.LAN - } - return p.WAN - case *buffered.SweepingProvider: - // Recursively extract from the inner provider - return extractSweepingProvider(p.Provider, useLAN) - default: - return nil - } -} - var provideStatCmd = &cmds.Command{ Status: cmds.Experimental, Helptext: cmds.HelpText{ @@ -234,41 +212,36 @@ NOTES: cmds.BoolOption(provideStatQueuesOptionName, "Display provide and reprovide queue sizes"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env, req) if err != nil { return err } - if !nd.IsOnline { - return ErrNotOnline - } - lanStats, _ := req.Options[provideLanOptionName].(bool) - // Handle legacy provider - if legacySys, ok := nd.Provider.(boxoprovider.System); ok { - if lanStats { - return errors.New("LAN stats only available for Sweep provider with Dual DHT") - } - stats, err := legacySys.Stat() - if err != nil { - return err - } - _, fullRT := nd.DHTClient.(*fullrt.FullRT) - return res.Emit(provideStats{Legacy: &stats, FullRT: fullRT}) + // Get stats from CoreAPI + opts := []options.RoutingProvideStatOption{} + if lanStats { + opts = append(opts, options.Routing.UseLAN(true)) } - // Extract sweeping provider (handles buffered and dual unwrapping) - sweepingProvider := extractSweepingProvider(nd.Provider, lanStats) - if sweepingProvider == nil { - if lanStats { - return errors.New("LAN stats only available for Sweep provider with Dual DHT") - } - return fmt.Errorf("stats not available with current routing system %T", nd.Provider) + result, err := api.Routing().ProvideStats(req.Context, opts...) + if err != nil { + return err + } + + // Set FullRT field for display (command-layer presentation concern) + nd, err := cmdenv.GetNode(env) + if err != nil { + return err } + _, fullRT := nd.DHTClient.(*fullrt.FullRT) - s := sweepingProvider.Stats() - return res.Emit(provideStats{Sweep: &s}) + return res.Emit(provideStats{ + Sweep: result.Sweep, + Legacy: result.Legacy, + FullRT: fullRT, + }) }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s provideStats) error { diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index 70686f62e03..329496ded25 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -2,11 +2,20 @@ package coreapi import ( "context" + "fmt" + "io" + "github.com/ipfs/boxo/files" dag "github.com/ipfs/boxo/ipld/merkledag" pin "github.com/ipfs/boxo/pinning/pinner" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + ipldlegacy "github.com/ipfs/go-ipld-legacy" + "github.com/ipfs/kubo/config" + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/ipfs/kubo/core/coreiface/options" + gocarv2 "github.com/ipld/go-car/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -68,6 +77,250 @@ func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter { return dag.NewSession(ctx, api.DAGService) } +func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) { + // Parse options + settings, err := options.DagImportOptions(opts...) + if err != nil { + return nil, err + } + + // Get config for batch settings + cfg, err := api.core.repo.Config() + if err != nil { + return nil, err + } + + // Create block decoder for IPLD nodes + blockDecoder := ipldlegacy.NewDecoder() + + // Create batch for efficient block addition + // Uses config values for batch size tuning + batch := ipld.NewBatch(ctx, api.DAGService, + ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))), + ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))), + ) + + // Create output channel + out := make(chan iface.DagImportResult) + + // Process import in background + go func() { + defer close(out) + defer file.Close() + + // Acquire pinlock if pinning roots (also serves as GC lock) + if settings.PinRoots { + unlocker := api.core.blockstore.PinLock(ctx) + defer unlocker.Unlock(ctx) + } + + // Track roots from CAR headers and stats + roots := cid.NewSet() + var blockCount, blockBytesCount uint64 + + // Parse CAR file + car, err := gocarv2.NewBlockReader(file) + if err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to create CAR reader: %w", err)} + return + } + + // Collect roots from CAR header + for _, c := range car.Roots { + roots.Add(c) + } + + // Process all blocks from CAR file + var previous blocks.Block + for { + block, err := car.Next() + if err != nil { + if err != io.EOF { + if previous != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("error reading block after %s: %w", previous.Cid(), err)} + } else { + out <- iface.DagImportResult{Err: fmt.Errorf("error reading CAR blocks: %w", err)} + } + } + break + } + if block == nil { + break + } + + // Decode block into IPLD node + nd, err := blockDecoder.DecodeNode(ctx, block) + if err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to decode block %s: %w", block.Cid(), err)} + return + } + + // Add node to batch + if err := batch.Add(ctx, nd); err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to add block %s to batch: %w", nd.Cid(), err)} + return + } + + blockCount++ + blockBytesCount += uint64(len(block.RawData())) + previous = block + + // Check context cancellation + select { + case <-ctx.Done(): + out <- iface.DagImportResult{Err: ctx.Err()} + return + default: + } + } + + // Commit batch to blockstore + if err := batch.Commit(); err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to commit batch: %w", err)} + return + } + + // Emit all roots (with pin status if requested) + err = roots.ForEach(func(c cid.Cid) error { + result := iface.DagImportResult{ + Root: &iface.DagImportRoot{Cid: c}, + } + + // Attempt to pin if requested + if settings.PinRoots { + // Verify block exists in blockstore + block, err := api.core.blockstore.Get(ctx, c) + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("blockstore get: %v", err) + } else { + // Decode node for pinning + nd, err := blockDecoder.DecodeNode(ctx, block) + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("decode node: %v", err) + } else { + // Pin recursively + err = api.core.pinning.Pin(ctx, nd, true, "") + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("pin: %v", err) + } else { + // Flush pins to storage + err = api.core.pinning.Flush(ctx) + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("flush: %v", err) + } + } + } + } + } + + // Send root result + select { + case out <- result: + case <-ctx.Done(): + return ctx.Err() + } + + return nil + }) + if err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("error emitting roots: %w", err)} + return + } + + // Emit stats if requested + if settings.Stats { + select { + case out <- iface.DagImportResult{ + Stats: &iface.DagImportStats{ + BlockCount: blockCount, + BlockBytesCount: blockBytesCount, + }, + }: + case <-ctx.Done(): + return + } + } + + // Execute fast-provide (will check if enabled) + if err := api.executeFastProvide(ctx, cfg, roots, settings.FastProvideRoot, settings.FastProvideWait, settings.PinRoots, settings.PinRoots, false); err != nil { + select { + case out <- iface.DagImportResult{Err: err}: + case <-ctx.Done(): + } + } + }() + + return out, nil +} + +// executeFastProvide announces roots to the DHT for faster discovery +func (api *dagAPI) executeFastProvide(ctx context.Context, cfg *config.Config, roots *cid.Set, enabled bool, wait bool, isPinned bool, isPinnedRoot bool, isMFS bool) error { + // Check if fast-provide is enabled + if !enabled { + if wait { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) + } else { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config") + } + return nil + } + + log.Debugw("fast-provide-root: enabled", "wait", wait) + + // Check preconditions for providing + if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) { + log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false") + return nil + } + + if cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0 { + log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0") + return nil + } + + if !api.core.nd.HasActiveDHTClient() { + log.Debugw("fast-provide-root: skipped", "reason", "DHT not available") + return nil + } + + // Check provide strategy + strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) + strategy := config.ParseProvideStrategy(strategyStr) + shouldProvide := config.ShouldProvideForStrategy(strategy, isPinned, isPinnedRoot, isMFS) + + if !shouldProvide { + log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS) + return nil + } + + // Provide each root + return roots.ForEach(func(c cid.Cid) error { + if wait { + // Synchronous mode: block until provide completes + log.Debugw("fast-provide-root: providing synchronously", "cid", c) + if err := api.core.nd.DHTClient.Provide(ctx, c, true); err != nil { + log.Warnw("fast-provide-root: sync provide failed", "cid", c, "error", err) + return fmt.Errorf("fast-provide: %w", err) + } + log.Debugw("fast-provide-root: sync provide completed", "cid", c) + } else { + // Asynchronous mode: fire-and-forget in goroutine + log.Debugw("fast-provide-root: providing asynchronously", "cid", c) + go func(rootCid cid.Cid) { + // Use detached context with timeout to prevent hanging + asyncCtx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout) + defer cancel() + if err := api.core.nd.DHTClient.Provide(asyncCtx, rootCid, true); err != nil { + log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) + } else { + log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) + } + }(c) + } + return nil + }) +} + var ( _ ipld.DAGService = (*dagAPI)(nil) _ dag.SessionMaker = (*dagAPI)(nil) diff --git a/core/coreapi/routing.go b/core/coreapi/routing.go index b9c25805622..2910acdf280 100644 --- a/core/coreapi/routing.go +++ b/core/coreapi/routing.go @@ -11,12 +11,16 @@ import ( offline "github.com/ipfs/boxo/exchange/offline" dag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/path" + boxoprovider "github.com/ipfs/boxo/provider" cid "github.com/ipfs/go-cid" cidutil "github.com/ipfs/go-cidutil" coreiface "github.com/ipfs/kubo/core/coreiface" caopts "github.com/ipfs/kubo/core/coreiface/options" "github.com/ipfs/kubo/core/node" "github.com/ipfs/kubo/tracing" + "github.com/libp2p/go-libp2p-kad-dht/provider" + "github.com/libp2p/go-libp2p-kad-dht/provider/buffered" + "github.com/libp2p/go-libp2p-kad-dht/provider/dual" peer "github.com/libp2p/go-libp2p/core/peer" mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" @@ -222,6 +226,63 @@ func provideKeysRec(ctx context.Context, prov node.DHTProvider, bs blockstore.Bl } } +// extractSweepingProvider extracts a SweepingProvider from the given provider interface. +// It handles unwrapping buffered and dual providers, selecting LAN or WAN as specified. +// Returns nil if the provider is not a sweeping provider type. +func extractSweepingProvider(prov any, useLAN bool) *provider.SweepingProvider { + switch p := prov.(type) { + case *provider.SweepingProvider: + return p + case *dual.SweepingProvider: + if useLAN { + return p.LAN + } + return p.WAN + case *buffered.SweepingProvider: + // Recursively extract from the inner provider + return extractSweepingProvider(p.Provider, useLAN) + default: + return nil + } +} + +func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...caopts.RoutingProvideStatOption) (*coreiface.ProvideStatsResponse, error) { + options, err := caopts.RoutingProvideStatOptions(opts...) + if err != nil { + return nil, err + } + + if !api.nd.IsOnline { + return nil, coreiface.ErrOffline + } + + // Handle legacy provider + if legacySys, ok := api.provider.(boxoprovider.System); ok { + if options.UseLAN { + return nil, errors.New("LAN stats only available for Sweep provider with Dual DHT") + } + stats, err := legacySys.Stat() + if err != nil { + return nil, err + } + // Note: FullRT field is not set here as we don't have access to nd.DHTClient + // This field is primarily for display purposes in the command + return &coreiface.ProvideStatsResponse{Legacy: &stats, FullRT: false}, nil + } + + // Extract sweeping provider (handles buffered and dual unwrapping) + sweepingProvider := extractSweepingProvider(api.provider, options.UseLAN) + if sweepingProvider == nil { + if options.UseLAN { + return nil, errors.New("LAN stats only available for Sweep provider with Dual DHT") + } + return nil, fmt.Errorf("stats not available with current routing system %T", api.provider) + } + + s := sweepingProvider.Stats() + return &coreiface.ProvideStatsResponse{Sweep: &s}, nil +} + func (api *RoutingAPI) core() coreiface.CoreAPI { return (*CoreAPI)(api) } diff --git a/core/coreiface/dag.go b/core/coreiface/dag.go index 3598e4528e4..c0cf5acd9d4 100644 --- a/core/coreiface/dag.go +++ b/core/coreiface/dag.go @@ -14,6 +14,7 @@ import ( type DagImportResult struct { Root *DagImportRoot Stats *DagImportStats + Err error } // DagImportRoot represents a root CID from a CAR file header diff --git a/core/coreiface/options/routing.go b/core/coreiface/options/routing.go index 8da7e7a1db2..55b054ce03c 100644 --- a/core/coreiface/options/routing.go +++ b/core/coreiface/options/routing.go @@ -96,3 +96,34 @@ func (routingOpts) AllowOffline(allow bool) RoutingPutOption { return nil } } + +type RoutingProvideStatSettings struct { + UseLAN bool +} + +type RoutingProvideStatOption func(*RoutingProvideStatSettings) error + +func RoutingProvideStatOptions(opts ...RoutingProvideStatOption) (*RoutingProvideStatSettings, error) { + options := &RoutingProvideStatSettings{ + UseLAN: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +// UseLAN is an option for [Routing.ProvideStats] which specifies whether to +// return stats for LAN DHT only (only valid for Sweep provider with Dual DHT). +// Default value is false (WAN DHT stats). +func (routingOpts) UseLAN(useLAN bool) RoutingProvideStatOption { + return func(settings *RoutingProvideStatSettings) error { + settings.UseLAN = useLAN + return nil + } +} diff --git a/core/coreiface/routing.go b/core/coreiface/routing.go index a17dfcad920..4476884c914 100644 --- a/core/coreiface/routing.go +++ b/core/coreiface/routing.go @@ -4,7 +4,9 @@ import ( "context" "github.com/ipfs/boxo/path" + boxoprovider "github.com/ipfs/boxo/provider" "github.com/ipfs/kubo/core/coreiface/options" + "github.com/libp2p/go-libp2p-kad-dht/provider/stats" "github.com/libp2p/go-libp2p/core/peer" ) @@ -26,4 +28,15 @@ type RoutingAPI interface { // Provide announces to the network that you are providing given values Provide(context.Context, path.Path, ...options.RoutingProvideOption) error + + // ProvideStats returns statistics about the provide system. + // Returns stats for either sweep provider (new default) or legacy provider. + ProvideStats(context.Context, ...options.RoutingProvideStatOption) (*ProvideStatsResponse, error) +} + +// ProvideStatsResponse contains statistics about the provide system +type ProvideStatsResponse struct { + Sweep *stats.Stats `json:"Sweep,omitempty"` + Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"` + FullRT bool `json:"FullRT,omitempty"` } diff --git a/test/cli/dag_test.go b/test/cli/dag_test.go index f6758a71037..858b61fd76f 100644 --- a/test/cli/dag_test.go +++ b/test/cli/dag_test.go @@ -120,7 +120,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -146,7 +146,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -183,7 +183,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -226,7 +226,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -254,7 +254,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -284,7 +284,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") diff --git a/test/cli/provide_stats_test.go b/test/cli/provide_stats_test.go index fede31c0fc3..13689a77a79 100644 --- a/test/cli/provide_stats_test.go +++ b/test/cli/provide_stats_test.go @@ -180,7 +180,7 @@ func TestProvideStatBasic(t *testing.T) { res := node.RunIPFS("provide", "stat") assert.Error(t, res.Err) - assert.Contains(t, res.Stderr.String(), "this command must be run in online mode") + assert.Contains(t, res.Stderr.String(), "this action must be run in online mode") }) } diff --git a/test/dependencies/go.mod b/test/dependencies/go.mod index 68c5a99c642..d5d8fda5045 100644 --- a/test/dependencies/go.mod +++ b/test/dependencies/go.mod @@ -249,6 +249,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/polyfloyd/go-errorlint v1.7.1 // indirect + github.com/probe-lab/go-libdht v0.4.0 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect diff --git a/test/dependencies/go.sum b/test/dependencies/go.sum index 005747af87a..01fb2868be5 100644 --- a/test/dependencies/go.sum +++ b/test/dependencies/go.sum @@ -650,6 +650,8 @@ github.com/polyfloyd/go-errorlint v1.7.1 h1:RyLVXIbosq1gBdk/pChWA8zWYLsq9UEw7a1L github.com/polyfloyd/go-errorlint v1.7.1/go.mod h1:aXjNb1x2TNhoLsk26iv1yl7a+zTnXPhwEMtEXukiLR8= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/probe-lab/go-libdht v0.4.0 h1:LAqHuko/owRW6+0cs5wmJXbHzg09EUMJEh5DI37yXqo= +github.com/probe-lab/go-libdht v0.4.0/go.mod h1:hamw22kI6YkPQFGy5P6BrWWDrgE9ety5Si8iWAyuDvc= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= From 8de1ace7884b7ff494a974ad7aa80f9bc18b2b8f Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Sun, 23 Nov 2025 17:04:13 +0100 Subject: [PATCH 4/4] refactor: fix error handling and race conditions in dag import address code review feedback for PR #11069: - fix: propagate decode errors in client/rpc dag import (was silently dropping errors) - fix: acquire pinlock before spawning goroutine to prevent race with GC - fix: update fast-provide test to always expect failure in isolated environment - test: add proper json compatibility test for provide stats (replaces compile-time check) - docs: add educational comments explaining batch config defaults - style: standardize error messages to use consistent "failed to X: %w" pattern the pinlock fix is critical - moving acquisition before goroutine spawn prevents blocks from being garbage collected before the lock is held. the error handling fix ensures RPC clients receive decode errors instead of empty results. --- client/rpc/dag.go | 2 +- client/rpc/dag_test.go | 37 ++++++++++++++++++------------------- client/rpc/routing_test.go | 28 +++++++++++++++++++++------- core/coreapi/dag.go | 26 ++++++++++++++++---------- 4 files changed, 56 insertions(+), 37 deletions(-) diff --git a/client/rpc/dag.go b/client/rpc/dag.go index 345502ebf11..ee059429435 100644 --- a/client/rpc/dag.go +++ b/client/rpc/dag.go @@ -180,7 +180,7 @@ func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...opt if err := dec.Decode(&event); err != nil { if err != io.EOF { select { - case out <- iface.DagImportResult{}: + case out <- iface.DagImportResult{Err: err}: case <-ctx.Done(): } } diff --git a/client/rpc/dag_test.go b/client/rpc/dag_test.go index 4e1151e43e4..e0d69d7c1cc 100644 --- a/client/rpc/dag_test.go +++ b/client/rpc/dag_test.go @@ -147,31 +147,30 @@ func TestDagImport_OnlineWithFastProvideWait(t *testing.T) { require.NoError(t, err) defer carFile.Close() - // Import with fast-provide wait enabled in online mode + // Import with fast-provide wait enabled in online mode. // This tests that FastProvideWait actually blocks (not fire-and-forget). - // In isolated test environment (no DHT peers), the provide operation may: - // 1. Succeed trivially (announced to randomly discovered peers), or - // 2. Return an error (timeout/no peers) - // Both outcomes prove blocking behavior works correctly. + // In isolated test environment with no DHT peers, the blocking provide + // operation should fail and propagate an error. results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), options.Dag.FastProvideRoot(true), options.Dag.FastProvideWait(true)) - if err != nil { - // Blocking wait detected provide failure (no DHT peers in isolated test) - // This proves FastProvideWait actually blocked and error propagated + // Initial call may succeed, but we should get error from results channel + if err == nil { + // Consume results until we hit the expected error + var gotError bool + for result := range results { + if result.Err != nil { + gotError = true + require.Contains(t, result.Err.Error(), "fast-provide", + "error should be from fast-provide operation") + break + } + } + require.True(t, gotError, "should receive fast-provide error in isolated test environment") + } else { + // Error returned directly (also acceptable) require.Contains(t, err.Error(), "fast-provide", "error should be from fast-provide operation") - return // Test passed - blocking wait worked and returned error } - - // No error - provide succeeded, verify we got results - var roots []cid.Cid - for result := range results { - if result.Root != nil { - roots = append(roots, result.Root.Cid) - } - } - - require.Len(t, roots, 1, "should receive one root when provide succeeds") } diff --git a/client/rpc/routing_test.go b/client/rpc/routing_test.go index 7895e3b6b2a..39299291286 100644 --- a/client/rpc/routing_test.go +++ b/client/rpc/routing_test.go @@ -13,20 +13,34 @@ import ( "github.com/stretchr/testify/require" ) -// Compile-time check: ensure our response type is compatible with kubo's provideStats -// This verifies that JSON marshaling/unmarshaling will work correctly -var _ = func() { +func TestProvideStats_JSONCompatibility(t *testing.T) { + // Verify that command's provideStats structure is compatible with + // iface.ProvideStatsResponse for JSON marshaling/unmarshaling. + // This ensures RPC client can correctly decode responses from the command. + // Create instance of command's provideStats structure cmdStats := struct { Sweep *stats.Stats `json:"Sweep,omitempty"` Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"` FullRT bool `json:"FullRT,omitempty"` - }{} + }{ + Sweep: &stats.Stats{}, + FullRT: true, + } + + // Marshal command structure to JSON + data, err := json.Marshal(cmdStats) + require.NoError(t, err, "should marshal command stats") - // Marshal and unmarshal to verify compatibility - data, _ := json.Marshal(cmdStats) + // Unmarshal into interface type var ifaceStats iface.ProvideStatsResponse - _ = json.Unmarshal(data, &ifaceStats) + err = json.Unmarshal(data, &ifaceStats) + require.NoError(t, err, "should unmarshal into interface stats") + + // Verify fields transferred correctly + require.NotNil(t, ifaceStats.Sweep, "Sweep field should be present") + require.Nil(t, ifaceStats.Legacy, "Legacy field should be nil") + require.True(t, ifaceStats.FullRT, "FullRT field should be true") } // testProvideStats mirrors the subset of fields we verify in tests. diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index 329496ded25..9a0ffb600eb 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -93,8 +93,10 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options. // Create block decoder for IPLD nodes blockDecoder := ipldlegacy.NewDecoder() - // Create batch for efficient block addition - // Uses config values for batch size tuning + // Create batch for efficient block addition. + // Uses config values for batch size tuning: + // - MaxNodes: Default 128 nodes per batch (128 file descriptors in flatfs) + // - MaxSize: Default 100MiB per batch (with 256KiB blocks, hits node limit at ~32MiB) batch := ipld.NewBatch(ctx, api.DAGService, ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))), ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))), @@ -103,15 +105,19 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options. // Create output channel out := make(chan iface.DagImportResult) + // Acquire pinlock BEFORE spawning goroutine if pinning roots + // This prevents race condition with GC (lock serves as both pin and GC lock) + var pinUnlocker func(context.Context) + if settings.PinRoots { + pinUnlocker = api.core.blockstore.PinLock(ctx).Unlock + } + // Process import in background go func() { defer close(out) defer file.Close() - - // Acquire pinlock if pinning roots (also serves as GC lock) - if settings.PinRoots { - unlocker := api.core.blockstore.PinLock(ctx) - defer unlocker.Unlock(ctx) + if pinUnlocker != nil { + defer pinUnlocker(ctx) } // Track roots from CAR headers and stats @@ -137,9 +143,9 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options. if err != nil { if err != io.EOF { if previous != nil { - out <- iface.DagImportResult{Err: fmt.Errorf("error reading block after %s: %w", previous.Cid(), err)} + out <- iface.DagImportResult{Err: fmt.Errorf("failed to read block after %s: %w", previous.Cid(), err)} } else { - out <- iface.DagImportResult{Err: fmt.Errorf("error reading CAR blocks: %w", err)} + out <- iface.DagImportResult{Err: fmt.Errorf("failed to read CAR blocks: %w", err)} } } break @@ -223,7 +229,7 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options. return nil }) if err != nil { - out <- iface.DagImportResult{Err: fmt.Errorf("error emitting roots: %w", err)} + out <- iface.DagImportResult{Err: fmt.Errorf("failed to emit roots: %w", err)} return }